Skip to content

Commit

Permalink
Merge branch 'master' into utf8string-java
Browse files Browse the repository at this point in the history
Conflicts:
	sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala
  • Loading branch information
rxin committed Jun 11, 2015
2 parents a3b124d + c8d551d commit cfa6bdf
Show file tree
Hide file tree
Showing 35 changed files with 402 additions and 180 deletions.
38 changes: 38 additions & 0 deletions dev/create-release/known_translations
Original file line number Diff line number Diff line change
Expand Up @@ -91,3 +91,41 @@ zapletal-martin - Martin Zapletal
zuxqoj - Shekhar Bansal
mingyukim - Mingyu Kim
sigmoidanalytics - Mayur Rustagi
AiHe - Ai He
BenFradet - Ben Fradet
FavioVazquez - Favio Vazquez
JaysonSunshine - Jayson Sunshine
Liuchang0812 - Liu Chang
Sephiroth-Lin - Sephiroth Lin
dobashim - Masaru Dobashi
ehnalis - Zoltan Zvara
emres - Emre Sevinc
gchen - Guancheng Chen
haiyangsea - Haiyang Sea
hlin09 - Hao Lin
hqzizania - Qian Huang
jeanlyn - Jean Lyn
jerluc - Jeremy A. Lucas
jrabary - Jaonary Rabarisoa
judynash - Judy Nash
kaka1992 - Chen Song
ksonj - Kalle Jepsen
kuromatsu-nobuyuki - Nobuyuki Kuromatsu
lazyman500 - Dong Xu
leahmcguire - Leah McGuire
mbittmann - Mark Bittmann
mbonaci - Marko Bonaci
meawoppl - Matthew Goodman
nyaapa - Arsenii Krasikov
phatak-dev - Madhukara Phatak
prabeesh - Prabeesh K
rakeshchalasani - Rakesh Chalasani
rekhajoshm - Rekha Joshi
sisihj - June He
szheng79 - Shuai Zheng
texasmichelle - Michelle Casbon
vinodkc - Vinod KC
yongtang - Yong Tang
ypcat - Pei-Lun Lee
zhichao-li - Zhichao Li
zzcclp - Zhichao Zhang
4 changes: 2 additions & 2 deletions ec2/spark_ec2.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@
raw_input = input
xrange = range

SPARK_EC2_VERSION = "1.3.1"
SPARK_EC2_VERSION = "1.4.0"
SPARK_EC2_DIR = os.path.dirname(os.path.realpath(__file__))

VALID_SPARK_VERSIONS = set([
Expand Down Expand Up @@ -89,7 +89,7 @@

# Default location to get the spark-ec2 scripts (and ami-list) from
DEFAULT_SPARK_EC2_GITHUB_REPO = "https://github.com/mesos/spark-ec2"
DEFAULT_SPARK_EC2_BRANCH = "branch-1.3"
DEFAULT_SPARK_EC2_BRANCH = "branch-1.4"


def setup_external_libs(libs):
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -1244,7 +1244,7 @@
<include>**/*Suite.java</include>
</includes>
<reportsDirectory>${project.build.directory}/surefire-reports</reportsDirectory>
<argLine>-Xmx3g -XX:MaxPermSize=${MaxPermGen} -XX:ReservedCodeCacheSize=512m</argLine>
<argLine>-Xmx3g -Xss4096k -XX:MaxPermSize=${MaxPermGen} -XX:ReservedCodeCacheSize=512m</argLine>
<environmentVariables>
<!--
Setting SPARK_DIST_CLASSPATH is a simple way to make sure any child processes
Expand Down
2 changes: 1 addition & 1 deletion project/SparkBuild.scala
Original file line number Diff line number Diff line change
Expand Up @@ -516,7 +516,7 @@ object TestSettings {
javaOptions in Test ++= System.getProperties.filter(_._1 startsWith "spark")
.map { case (k,v) => s"-D$k=$v" }.toSeq,
javaOptions in Test += "-ea",
javaOptions in Test ++= "-Xmx3g -XX:PermSize=128M -XX:MaxNewSize=256m -XX:MaxPermSize=1g"
javaOptions in Test ++= "-Xmx3g -Xss4096k -XX:PermSize=128M -XX:MaxNewSize=256m -XX:MaxPermSize=1g"
.split(" ").toSeq,
javaOptions += "-Xmx3g",
// Show full stack trace and duration in test cases.
Expand Down
32 changes: 32 additions & 0 deletions python/pyspark/sql/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import tempfile
import pickle
import functools
import time
import datetime

import py4j
Expand All @@ -47,6 +48,20 @@
from pyspark.sql.window import Window


class UTC(datetime.tzinfo):
"""UTC"""
ZERO = datetime.timedelta(0)

def utcoffset(self, dt):
return self.ZERO

def tzname(self, dt):
return "UTC"

def dst(self, dt):
return self.ZERO


class ExamplePointUDT(UserDefinedType):
"""
User-defined type (UDT) for ExamplePoint.
Expand Down Expand Up @@ -588,6 +603,23 @@ def test_filter_with_datetime(self):
self.assertEqual(0, df.filter(df.date > date).count())
self.assertEqual(0, df.filter(df.time > time).count())

def test_time_with_timezone(self):
day = datetime.date.today()
now = datetime.datetime.now()
ts = time.mktime(now.timetuple()) + now.microsecond / 1e6
# class in __main__ is not serializable
from pyspark.sql.tests import UTC
utc = UTC()
utcnow = datetime.datetime.fromtimestamp(ts, utc)
df = self.sqlCtx.createDataFrame([(day, now, utcnow)])
day1, now1, utcnow1 = df.first()
# Pyrolite serialize java.sql.Date as datetime, will be fixed in new version
self.assertEqual(day1.date(), day)
# Pyrolite does not support microsecond, the error should be
# less than 1 millisecond
self.assertTrue(now - now1 < datetime.timedelta(0.001))
self.assertTrue(now - utcnow1 < datetime.timedelta(0.001))

def test_dropna(self):
schema = StructType([
StructField("name", StringType(), True),
Expand Down
27 changes: 18 additions & 9 deletions python/pyspark/sql/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -655,12 +655,15 @@ def _need_python_to_sql_conversion(dataType):
_need_python_to_sql_conversion(dataType.valueType)
elif isinstance(dataType, UserDefinedType):
return True
elif isinstance(dataType, TimestampType):
elif isinstance(dataType, (DateType, TimestampType)):
return True
else:
return False


EPOCH_ORDINAL = datetime.datetime(1970, 1, 1).toordinal()


def _python_to_sql_converter(dataType):
"""
Returns a converter that converts a Python object into a SQL datum for the given type.
Expand Down Expand Up @@ -698,26 +701,32 @@ def converter(obj):
return tuple(c(d.get(n)) for n, c in zip(names, converters))
else:
return tuple(c(v) for c, v in zip(converters, obj))
else:
elif obj is not None:
raise ValueError("Unexpected tuple %r with type %r" % (obj, dataType))
return converter
elif isinstance(dataType, ArrayType):
element_converter = _python_to_sql_converter(dataType.elementType)
return lambda a: [element_converter(v) for v in a]
return lambda a: a and [element_converter(v) for v in a]
elif isinstance(dataType, MapType):
key_converter = _python_to_sql_converter(dataType.keyType)
value_converter = _python_to_sql_converter(dataType.valueType)
return lambda m: dict([(key_converter(k), value_converter(v)) for k, v in m.items()])
return lambda m: m and dict([(key_converter(k), value_converter(v)) for k, v in m.items()])

elif isinstance(dataType, UserDefinedType):
return lambda obj: dataType.serialize(obj)
return lambda obj: obj and dataType.serialize(obj)

elif isinstance(dataType, DateType):
return lambda d: d and d.toordinal() - EPOCH_ORDINAL

elif isinstance(dataType, TimestampType):

def to_posix_timstamp(dt):
if dt.tzinfo is None:
return int(time.mktime(dt.timetuple()) * 1e7 + dt.microsecond * 10)
else:
return int(calendar.timegm(dt.utctimetuple()) * 1e7 + dt.microsecond * 10)
if dt:
seconds = (calendar.timegm(dt.utctimetuple()) if dt.tzinfo
else time.mktime(dt.timetuple()))
return int(seconds * 1e7 + dt.microsecond * 10)
return to_posix_timstamp

else:
raise ValueError("Unexpected type %r" % dataType)

Expand Down
21 changes: 21 additions & 0 deletions sql/catalyst/src/main/scala/org/apache/spark/sql/BaseRow.java
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,27 @@ public int fieldIndex(String name) {
throw new UnsupportedOperationException();
}

/**
* A generic version of Row.equals(Row), which is used for tests.
*/
@Override
public boolean equals(Object other) {
if (other instanceof Row) {
Row row = (Row) other;
int n = size();
if (n != row.size()) {
return false;
}
for (int i = 0; i < n; i ++) {
if (isNullAt(i) != row.isNullAt(i) || (!isNullAt(i) && !get(i).equals(row.get(i)))) {
return false;
}
}
return true;
}
return false;
}

@Override
public Row copy() {
final int n = size();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,8 +172,8 @@ class Analyzer(
* expressions which equal GroupBy expressions with Literal(null), if those expressions
* are not set for this grouping set (according to the bit mask).
*/
private[this] def expand(g: GroupingSets): Seq[GroupExpression] = {
val result = new scala.collection.mutable.ArrayBuffer[GroupExpression]
private[this] def expand(g: GroupingSets): Seq[Seq[Expression]] = {
val result = new scala.collection.mutable.ArrayBuffer[Seq[Expression]]

g.bitmasks.foreach { bitmask =>
// get the non selected grouping attributes according to the bit mask
Expand All @@ -194,7 +194,7 @@ class Analyzer(
Literal.create(bitmask, IntegerType)
})

result += GroupExpression(substitution)
result += substitution
}

result.toSeq
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,14 +89,10 @@ object FunctionRegistry {
expression[CreateArray]("array"),
expression[Coalesce]("coalesce"),
expression[Explode]("explode"),
expression[Lower]("lower"),
expression[Substring]("substr"),
expression[Substring]("substring"),
expression[Rand]("rand"),
expression[Randn]("randn"),
expression[CreateStruct]("struct"),
expression[Sqrt]("sqrt"),
expression[Upper]("upper"),

// Math functions
expression[Acos]("acos"),
Expand All @@ -115,6 +111,7 @@ object FunctionRegistry {
expression[Log10]("log10"),
expression[Log1p]("log1p"),
expression[Pi]("pi"),
expression[Log2]("log2"),
expression[Pow]("pow"),
expression[Rint]("rint"),
expression[Signum]("signum"),
Expand All @@ -132,7 +129,14 @@ object FunctionRegistry {
expression[Last]("last"),
expression[Max]("max"),
expression[Min]("min"),
expression[Sum]("sum")
expression[Sum]("sum"),

// string functions
expression[Lower]("lower"),
expression[StringLength]("length"),
expression[Substring]("substr"),
expression[Substring]("substring"),
expression[Upper]("upper")
)

val builtin: FunctionRegistry = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ case class Cast(child: Expression, dataType: DataType) extends UnaryExpression w
case ByteType =>
buildCast[Byte](_, _ != 0)
case DecimalType() =>
buildCast[Decimal](_, _ != 0)
buildCast[Decimal](_, _ != Decimal(0))
case DoubleType =>
buildCast[Double](_, _ != 0)
case FloatType =>
Expand Down Expand Up @@ -455,7 +455,7 @@ case class Cast(child: Expression, dataType: DataType) extends UnaryExpression w
case (BooleanType, dt: NumericType) =>
defineCodeGen(ctx, ev, c => s"(${ctx.javaType(dt)})($c ? 1 : 0)")
case (dt: DecimalType, BooleanType) =>
defineCodeGen(ctx, ev, c => s"$c.isZero()")
defineCodeGen(ctx, ev, c => s"!$c.isZero()")
case (dt: NumericType, BooleanType) =>
defineCodeGen(ctx, ev, c => s"$c != 0")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,9 @@ abstract class LeafExpression extends Expression with trees.LeafNode[Expression]
abstract class UnaryExpression extends Expression with trees.UnaryNode[Expression] {
self: Product =>

override def foldable: Boolean = child.foldable
override def nullable: Boolean = child.nullable

/**
* Called by unary expressions to generate a code block that returns null if its parent returns
* null, and if not not null, use `f` to generate the expression.
Expand Down Expand Up @@ -239,18 +242,6 @@ abstract class UnaryExpression extends Expression with trees.UnaryNode[Expressio
}
}

// TODO Semantically we probably not need GroupExpression
// All we need is holding the Seq[Expression], and ONLY used in doing the
// expressions transformation correctly. Probably will be removed since it's
// not like a real expressions.
case class GroupExpression(children: Seq[Expression]) extends Expression {
self: Product =>
override def eval(input: Row): Any = throw new UnsupportedOperationException
override def nullable: Boolean = false
override def foldable: Boolean = false
override def dataType: DataType = throw new UnsupportedOperationException
}

/**
* Expressions that require a specific `DataType` as input should implement this trait
* so that the proper type conversions can be performed in the analyzer.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,15 +163,23 @@ class CodeGenContext {
}

/**
* Returns a function to generate equal expression in Java
* Generate code for equal expression in Java
*/
def equalFunc(dataType: DataType): ((String, String) => String) = dataType match {
case BinaryType => { case (eval1, eval2) =>
s"java.util.Arrays.equals($eval1, $eval2)" }
case IntegerType | BooleanType | LongType | DoubleType | FloatType | ShortType | ByteType =>
{ case (eval1, eval2) => s"$eval1 == $eval2" }
case other =>
{ case (eval1, eval2) => s"$eval1.equals($eval2)" }
def genEqual(dataType: DataType, c1: String, c2: String): String = dataType match {
case BinaryType => s"java.util.Arrays.equals($c1, $c2)"
case dt: DataType if isPrimitiveType(dt) => s"$c1 == $c2"
case other => s"$c1.equals($c2)"
}

/**
* Generate code for compare expression in Java
*/
def genComp(dataType: DataType, c1: String, c2: String): String = dataType match {
// Use signum() to keep any small difference bwteen float/double
case FloatType | DoubleType => s"(int)java.lang.Math.signum($c1 - $c2)"
case dt: DataType if isPrimitiveType(dt) => s"(int)($c1 - $c2)"
case BinaryType => s"org.apache.spark.sql.catalyst.util.TypeUtils.compareBinary($c1, $c2)"
case other => s"$c1.compare($c2)"
}

/**
Expand All @@ -184,6 +192,16 @@ class CodeGenContext {
* Returns true if the data type has a special accessor and setter in [[Row]].
*/
def isNativeType(dt: DataType): Boolean = nativeTypes.contains(dt)

/**
* List of data types who's Java type is primitive type
*/
val primitiveTypes = nativeTypes ++ Seq(DateType, TimestampType)

/**
* Returns true if the Java type is primitive type
*/
def isPrimitiveType(dt: DataType): Boolean = primitiveTypes.contains(dt)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,6 @@ object GenerateMutableProjection extends CodeGenerator[Seq[Expression], () => Mu
}
"""


logDebug(s"code for ${expressions.mkString(",")}:\n$code")

val c = compile(code)
Expand Down
Loading

0 comments on commit cfa6bdf

Please sign in to comment.