Skip to content

Commit

Permalink
[SPARK-7199] [SQL] Add date and timestamp support to UnsafeRow
Browse files Browse the repository at this point in the history
JIRA: https://issues.apache.org/jira/browse/SPARK-7199

Author: Liang-Chi Hsieh <viirya@gmail.com>

Closes apache#5984 from viirya/add_date_timestamp and squashes the following commits:

7f21ce9 [Liang-Chi Hsieh] For comment.
0b89698 [Liang-Chi Hsieh] Add timestamp to settableFieldTypes.
c30d490 [Liang-Chi Hsieh] Use default IntUnsafeColumnWriter and LongUnsafeColumnWriter.
672ef17 [Liang-Chi Hsieh] Remove getter/setter for Date and Timestamp and use Int and Long for them.
9f3e577 [Liang-Chi Hsieh] Merge remote-tracking branch 'upstream/master' into add_date_timestamp
281e844 [Liang-Chi Hsieh] Fix scala style.
fb532b5 [Liang-Chi Hsieh] Merge remote-tracking branch 'upstream/master' into add_date_timestamp
80af342 [Liang-Chi Hsieh] Fix compiling error.
f4f5de6 [Liang-Chi Hsieh] Fix scala style.
a463e83 [Liang-Chi Hsieh] Use Long to store timestamp for rows.
635388a [Liang-Chi Hsieh] Merge remote-tracking branch 'upstream/master' into add_date_timestamp
46946c6 [Liang-Chi Hsieh] Adapt for moved DateUtils.
b16994e [Liang-Chi Hsieh] Merge remote-tracking branch 'upstream/master' into add_date_timestamp
752251f [Liang-Chi Hsieh] Support setDate. Fix failed test.
fcf8db9 [Liang-Chi Hsieh] Add functions for Date and Timestamp to SpecificRow.
e42a809 [Liang-Chi Hsieh] Fix style.
4c07b57 [Liang-Chi Hsieh] Add date and timestamp support to UnsafeRow.
  • Loading branch information
viirya authored and Davies Liu committed Jun 17, 2015
1 parent c13da20 commit 104f30c
Show file tree
Hide file tree
Showing 4 changed files with 39 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,9 @@ public static int calculateBitSetWidthInBytes(int numFields) {
IntegerType,
LongType,
FloatType,
DoubleType
DoubleType,
DateType,
TimestampType
})));

// We support get() on a superset of the types for which we support set():
Expand Down Expand Up @@ -331,8 +333,6 @@ public String getString(int i) {
return getUTF8String(i).toString();
}



@Override
public InternalRow copy() {
throw new UnsupportedOperationException();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.spark.sql.catalyst.expressions

import org.apache.spark.sql.catalyst.util.DateUtils
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.PlatformDependent
import org.apache.spark.unsafe.array.ByteArrayMethods
Expand Down Expand Up @@ -120,6 +122,8 @@ private object UnsafeColumnWriter {
case FloatType => FloatUnsafeColumnWriter
case DoubleType => DoubleUnsafeColumnWriter
case StringType => StringUnsafeColumnWriter
case DateType => IntUnsafeColumnWriter
case TimestampType => LongUnsafeColumnWriter
case t =>
throw new UnsupportedOperationException(s"Do not know how to write columns of type $t")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -197,9 +197,10 @@ class GenericMutableRow(v: Array[Any]) extends GenericRow(v) with MutableRow {
override def setFloat(ordinal: Int, value: Float): Unit = { values(ordinal) = value }
override def setInt(ordinal: Int, value: Int): Unit = { values(ordinal) = value }
override def setLong(ordinal: Int, value: Long): Unit = { values(ordinal) = value }
override def setString(ordinal: Int, value: String) {
override def setString(ordinal: Int, value: String): Unit = {
values(ordinal) = UTF8String.fromString(value)
}

override def setNullAt(i: Int): Unit = { values(i) = null }

override def setShort(ordinal: Int, value: Short): Unit = { values(ordinal) = value }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,14 @@

package org.apache.spark.sql.catalyst.expressions

import java.sql.{Date, Timestamp}
import java.util.Arrays

import org.scalatest.Matchers

import org.apache.spark.SparkFunSuite
import org.apache.spark.sql.types._
import org.apache.spark.sql.catalyst.util.DateUtils
import org.apache.spark.unsafe.PlatformDependent
import org.apache.spark.unsafe.array.ByteArrayMethods

Expand Down Expand Up @@ -74,6 +76,34 @@ class UnsafeRowConverterSuite extends SparkFunSuite with Matchers {
unsafeRow.getString(2) should be ("World")
}

test("basic conversion with primitive, string, date and timestamp types") {
val fieldTypes: Array[DataType] = Array(LongType, StringType, DateType, TimestampType)
val converter = new UnsafeRowConverter(fieldTypes)

val row = new SpecificMutableRow(fieldTypes)
row.setLong(0, 0)
row.setString(1, "Hello")
row.update(2, DateUtils.fromJavaDate(Date.valueOf("1970-01-01")))
row.update(3, DateUtils.fromJavaTimestamp(Timestamp.valueOf("2015-05-08 08:10:25")))

val sizeRequired: Int = converter.getSizeRequirement(row)
sizeRequired should be (8 + (8 * 4) +
ByteArrayMethods.roundNumberOfBytesToNearestWord("Hello".getBytes.length + 8))
val buffer: Array[Long] = new Array[Long](sizeRequired / 8)
val numBytesWritten = converter.writeRow(row, buffer, PlatformDependent.LONG_ARRAY_OFFSET)
numBytesWritten should be (sizeRequired)

val unsafeRow = new UnsafeRow()
unsafeRow.pointTo(buffer, PlatformDependent.LONG_ARRAY_OFFSET, fieldTypes.length, null)
unsafeRow.getLong(0) should be (0)
unsafeRow.getString(1) should be ("Hello")
// Date is represented as Int in unsafeRow
DateUtils.toJavaDate(unsafeRow.getInt(2)) should be (Date.valueOf("1970-01-01"))
// Timestamp is represented as Long in unsafeRow
DateUtils.toJavaTimestamp(unsafeRow.getLong(3)) should be
(Timestamp.valueOf("2015-05-08 08:10:25"))
}

test("null handling") {
val fieldTypes: Array[DataType] = Array(
NullType,
Expand Down

0 comments on commit 104f30c

Please sign in to comment.