diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/BaseRow.java b/sql/catalyst/src/main/java/org/apache/spark/sql/BaseRow.java index 6a2356f1f9c6f..9bff403005200 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/BaseRow.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/BaseRow.java @@ -26,7 +26,7 @@ import scala.collection.mutable.ArraySeq; import org.apache.spark.sql.catalyst.InternalRow; -import org.apache.spark.sql.catalyst.expressions.GenericRow; +import org.apache.spark.sql.catalyst.expressions.GenericInternalRow; import org.apache.spark.sql.types.StructType; public abstract class BaseRow extends InternalRow { @@ -162,7 +162,7 @@ public InternalRow copy() { for (int i = 0; i < n; i++) { arr[i] = get(i); } - return new GenericRow(arr); + return new GenericInternalRow(arr); } @Override diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystTypeConverters.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystTypeConverters.scala index 429fc4077be9a..d720a9aaa1d9a 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystTypeConverters.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystTypeConverters.scala @@ -227,7 +227,7 @@ object CatalystTypeConverters { ar(idx) = converters(idx).toCatalyst(row(idx)) idx += 1 } - new GenericRowWithSchema(ar, structType) + new GenericInternalRow(ar) case p: Product => val ar = new Array[Any](structType.size) @@ -237,7 +237,7 @@ object CatalystTypeConverters { ar(idx) = converters(idx).toCatalyst(iter.next()) idx += 1 } - new GenericRowWithSchema(ar, structType) + new GenericInternalRow(ar) } override def toScala(row: InternalRow): Row = { diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/InternalRow.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/InternalRow.scala index d7b537a9fe3bc..8faadad2c98ab 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/InternalRow.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/InternalRow.scala @@ -25,6 +25,9 @@ import org.apache.spark.sql.catalyst.expressions._ * internal types. */ abstract class InternalRow extends Row { + + override def getString(i: Int): String = {throw new UnsupportedOperationException} + // A default implementation to change the return type override def copy(): InternalRow = this @@ -93,27 +96,15 @@ abstract class InternalRow extends Row { } object InternalRow { - def unapplySeq(row: InternalRow): Some[Seq[Any]] = Some(row.toSeq) - /** * This method can be used to construct a [[Row]] with the given values. */ - def apply(values: Any*): InternalRow = new GenericRow(values.toArray) + def apply(values: Any*): InternalRow = new GenericInternalRow(values.toArray) /** * This method can be used to construct a [[Row]] from a [[Seq]] of values. */ - def fromSeq(values: Seq[Any]): InternalRow = new GenericRow(values.toArray) - - def fromTuple(tuple: Product): InternalRow = fromSeq(tuple.productIterator.toSeq) - - /** - * Merge multiple rows into a single row, one after another. - */ - def merge(rows: InternalRow*): InternalRow = { - // TODO: Improve the performance of this if used in performance critical part. - new GenericRow(rows.flatMap(_.toSeq).toArray) - } + def fromSeq(values: Seq[Any]): InternalRow = new GenericInternalRow(values.toArray) /** Returns an empty row. */ val empty = apply() diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Projection.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Projection.scala index d5967438ccb5a..fcfe83ceb863a 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Projection.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Projection.scala @@ -36,7 +36,7 @@ class InterpretedProjection(expressions: Seq[Expression]) extends Projection { outputArray(i) = exprArray(i).eval(input) i += 1 } - new GenericRow(outputArray) + new GenericInternalRow(outputArray) } override def toString: String = s"Row => [${exprArray.mkString(",")}]" @@ -135,12 +135,6 @@ class JoinedRow extends InternalRow { override def getFloat(i: Int): Float = if (i < row1.length) row1.getFloat(i) else row2.getFloat(i - row1.length) - override def getString(i: Int): String = - if (i < row1.length) row1.getString(i) else row2.getString(i - row1.length) - - override def getAs[T](i: Int): T = - if (i < row1.length) row1.getAs[T](i) else row2.getAs[T](i - row1.length) - override def copy(): InternalRow = { val totalSize = row1.length + row2.length val copiedValues = new Array[Any](totalSize) @@ -149,7 +143,7 @@ class JoinedRow extends InternalRow { copiedValues(i) = apply(i) i += 1 } - new GenericRow(copiedValues) + new GenericInternalRow(copiedValues) } override def toString: String = { @@ -235,12 +229,6 @@ class JoinedRow2 extends InternalRow { override def getFloat(i: Int): Float = if (i < row1.length) row1.getFloat(i) else row2.getFloat(i - row1.length) - override def getString(i: Int): String = - if (i < row1.length) row1.getString(i) else row2.getString(i - row1.length) - - override def getAs[T](i: Int): T = - if (i < row1.length) row1.getAs[T](i) else row2.getAs[T](i - row1.length) - override def copy(): InternalRow = { val totalSize = row1.length + row2.length val copiedValues = new Array[Any](totalSize) @@ -249,7 +237,7 @@ class JoinedRow2 extends InternalRow { copiedValues(i) = apply(i) i += 1 } - new GenericRow(copiedValues) + new GenericInternalRow(copiedValues) } override def toString: String = { @@ -329,12 +317,6 @@ class JoinedRow3 extends InternalRow { override def getFloat(i: Int): Float = if (i < row1.length) row1.getFloat(i) else row2.getFloat(i - row1.length) - override def getString(i: Int): String = - if (i < row1.length) row1.getString(i) else row2.getString(i - row1.length) - - override def getAs[T](i: Int): T = - if (i < row1.length) row1.getAs[T](i) else row2.getAs[T](i - row1.length) - override def copy(): InternalRow = { val totalSize = row1.length + row2.length val copiedValues = new Array[Any](totalSize) @@ -343,7 +325,7 @@ class JoinedRow3 extends InternalRow { copiedValues(i) = apply(i) i += 1 } - new GenericRow(copiedValues) + new GenericInternalRow(copiedValues) } override def toString: String = { @@ -423,12 +405,6 @@ class JoinedRow4 extends InternalRow { override def getFloat(i: Int): Float = if (i < row1.length) row1.getFloat(i) else row2.getFloat(i - row1.length) - override def getString(i: Int): String = - if (i < row1.length) row1.getString(i) else row2.getString(i - row1.length) - - override def getAs[T](i: Int): T = - if (i < row1.length) row1.getAs[T](i) else row2.getAs[T](i - row1.length) - override def copy(): InternalRow = { val totalSize = row1.length + row2.length val copiedValues = new Array[Any](totalSize) @@ -437,7 +413,7 @@ class JoinedRow4 extends InternalRow { copiedValues(i) = apply(i) i += 1 } - new GenericRow(copiedValues) + new GenericInternalRow(copiedValues) } override def toString: String = { @@ -517,12 +493,6 @@ class JoinedRow5 extends InternalRow { override def getFloat(i: Int): Float = if (i < row1.length) row1.getFloat(i) else row2.getFloat(i - row1.length) - override def getString(i: Int): String = - if (i < row1.length) row1.getString(i) else row2.getString(i - row1.length) - - override def getAs[T](i: Int): T = - if (i < row1.length) row1.getAs[T](i) else row2.getAs[T](i - row1.length) - override def copy(): InternalRow = { val totalSize = row1.length + row2.length val copiedValues = new Array[Any](totalSize) @@ -531,7 +501,7 @@ class JoinedRow5 extends InternalRow { copiedValues(i) = apply(i) i += 1 } - new GenericRow(copiedValues) + new GenericInternalRow(copiedValues) } override def toString: String = { @@ -611,12 +581,6 @@ class JoinedRow6 extends InternalRow { override def getFloat(i: Int): Float = if (i < row1.length) row1.getFloat(i) else row2.getFloat(i - row1.length) - override def getString(i: Int): String = - if (i < row1.length) row1.getString(i) else row2.getString(i - row1.length) - - override def getAs[T](i: Int): T = - if (i < row1.length) row1.getAs[T](i) else row2.getAs[T](i - row1.length) - override def copy(): InternalRow = { val totalSize = row1.length + row2.length val copiedValues = new Array[Any](totalSize) @@ -625,7 +589,7 @@ class JoinedRow6 extends InternalRow { copiedValues(i) = apply(i) i += 1 } - new GenericRow(copiedValues) + new GenericInternalRow(copiedValues) } override def toString: String = { diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SpecificMutableRow.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SpecificMutableRow.scala index 05aab34559985..53fedb531cfb2 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SpecificMutableRow.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SpecificMutableRow.scala @@ -230,7 +230,7 @@ final class SpecificMutableRow(val values: Array[MutableValue]) extends MutableR i += 1 } - new GenericRow(newValues) + new GenericInternalRow(newValues) } override def update(ordinal: Int, value: Any) { diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/generators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/generators.scala index f30cb42d12b83..cec5594e4324e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/generators.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/generators.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql.catalyst.expressions import scala.collection.Map -import org.apache.spark.sql.catalyst +import org.apache.spark.sql.Row import org.apache.spark.sql.catalyst.{CatalystTypeConverters, trees} import org.apache.spark.sql.types._ @@ -68,19 +68,19 @@ abstract class Generator extends Expression { */ case class UserDefinedGenerator( elementTypes: Seq[(DataType, Boolean)], - function: InternalRow => TraversableOnce[InternalRow], + function: Row => TraversableOnce[InternalRow], children: Seq[Expression]) extends Generator { @transient private[this] var inputRow: InterpretedProjection = _ - @transient private[this] var convertToScala: (InternalRow) => InternalRow = _ + @transient private[this] var convertToScala: (InternalRow) => Row = _ private def initializeConverters(): Unit = { inputRow = new InterpretedProjection(children) convertToScala = { val inputSchema = StructType(children.map(e => StructField(e.simpleString, e.dataType, true))) CatalystTypeConverters.createToScalaConverter(inputSchema) - }.asInstanceOf[(InternalRow => InternalRow)] + }.asInstanceOf[InternalRow => Row] } override def eval(input: InternalRow): TraversableOnce[InternalRow] = { @@ -113,10 +113,11 @@ case class Explode(child: Expression) child.dataType match { case ArrayType(_, _) => val inputArray = child.eval(input).asInstanceOf[Seq[Any]] - if (inputArray == null) Nil else inputArray.map(v => new GenericRow(Array(v))) + if (inputArray == null) Nil else inputArray.map(v => InternalRow(v)) case MapType(_, _, _) => val inputMap = child.eval(input).asInstanceOf[Map[Any, Any]] - if (inputMap == null) Nil else inputMap.map { case (k, v) => new GenericRow(Array(k, v)) } + if (inputMap == null) Nil + else inputMap.map { case (k, v) => InternalRow(k, v) } } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/rows.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/rows.scala index 0d4c9ace5e124..f9afcc6a6f9b9 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/rows.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/rows.scala @@ -17,6 +17,7 @@ package org.apache.spark.sql.catalyst.expressions +import org.apache.spark.sql.Row import org.apache.spark.sql.types.{DataType, StructType, AtomicType} import org.apache.spark.unsafe.types.UTF8String @@ -64,7 +65,7 @@ object EmptyRow extends InternalRow { * the array is not copied, and thus could technically be mutated after creation, this is not * allowed. */ -class GenericRow(protected[sql] val values: Array[Any]) extends InternalRow { +class GenericRow(protected[sql] val values: Array[Any]) extends Row { /** No-arg constructor for serialization. */ protected def this() = this(null) @@ -114,14 +115,31 @@ class GenericRow(protected[sql] val values: Array[Any]) extends InternalRow { } override def getString(i: Int): String = { - values(i) match { - case null => null - case s: String => s - case utf8: UTF8String => utf8.toString - } + values(i).asInstanceOf[String] } - override def copy(): InternalRow = this + // This is used by test or outside + override def equals(o: Any): Boolean = o match { + case other: Row if other.length == length => + var i = 0 + while (i < length) { + if (isNullAt(i) != other.isNullAt(i)) { + return false + } + val equal = (apply(i), other.apply(i)) match { + case (a: Array[Byte], b: Array[Byte]) => java.util.Arrays.equals(a, b) + case (a, b) => a == b + } + if (!equal) { + return false + } + i += 1 + } + true + case _ => false + } + + override def copy(): Row = this } class GenericRowWithSchema(values: Array[Any], override val schema: StructType) @@ -133,7 +151,68 @@ class GenericRowWithSchema(values: Array[Any], override val schema: StructType) override def fieldIndex(name: String): Int = schema.fieldIndex(name) } -class GenericMutableRow(v: Array[Any]) extends GenericRow(v) with MutableRow { +/** + * A internal row implementation that uses an array of objects as the underlying storage. + * Note that, while the array is not copied, and thus could technically be mutated after creation, + * this is not allowed. + */ +class GenericInternalRow(protected[sql] val values: Array[Any]) extends InternalRow { + /** No-arg constructor for serialization. */ + protected def this() = this(null) + + def this(size: Int) = this(new Array[Any](size)) + + override def toSeq: Seq[Any] = values.toSeq + + override def length: Int = values.length + + override def apply(i: Int): Any = values(i) + + override def isNullAt(i: Int): Boolean = values(i) == null + + override def getInt(i: Int): Int = { + if (values(i) == null) sys.error("Failed to check null bit for primitive int value.") + values(i).asInstanceOf[Int] + } + + override def getLong(i: Int): Long = { + if (values(i) == null) sys.error("Failed to check null bit for primitive long value.") + values(i).asInstanceOf[Long] + } + + override def getDouble(i: Int): Double = { + if (values(i) == null) sys.error("Failed to check null bit for primitive double value.") + values(i).asInstanceOf[Double] + } + + override def getFloat(i: Int): Float = { + if (values(i) == null) sys.error("Failed to check null bit for primitive float value.") + values(i).asInstanceOf[Float] + } + + override def getBoolean(i: Int): Boolean = { + if (values(i) == null) sys.error("Failed to check null bit for primitive boolean value.") + values(i).asInstanceOf[Boolean] + } + + override def getShort(i: Int): Short = { + if (values(i) == null) sys.error("Failed to check null bit for primitive short value.") + values(i).asInstanceOf[Short] + } + + override def getByte(i: Int): Byte = { + if (values(i) == null) sys.error("Failed to check null bit for primitive byte value.") + values(i).asInstanceOf[Byte] + } + + override def getString(i: Int): String = { + values(i).asInstanceOf[UTF8String].toString + } + + override def copy(): InternalRow = this +} + +class GenericMutableRow(v: Array[Any]) extends GenericInternalRow(v) with MutableRow { /** No-arg constructor for serialization. */ protected def this() = this(null) @@ -155,7 +234,7 @@ class GenericMutableRow(v: Array[Any]) extends GenericRow(v) with MutableRow { override def update(ordinal: Int, value: Any): Unit = { values(ordinal) = value } - override def copy(): InternalRow = new GenericRow(values.clone()) + override def copy(): InternalRow = new GenericInternalRow(values.clone()) } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/CatalystTypeConvertersSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/CatalystTypeConvertersSuite.scala index df0f04563edcf..1c51dee2fae3b 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/CatalystTypeConvertersSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/CatalystTypeConvertersSuite.scala @@ -40,6 +40,7 @@ class CatalystTypeConvertersSuite extends SparkFunSuite { val convertToScala = CatalystTypeConverters.createToScalaConverter(schema) val scalaRow = Row.fromSeq(Seq.fill(simpleTypes.length)(null)) + println(s"${scalaRow.getClass} ${convertToScala(convertToCatalyst(scalaRow)).getClass} ") assert(convertToScala(convertToCatalyst(scalaRow)) === scalaRow) } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionEvalHelper.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionEvalHelper.scala index 158f54af13802..7d95ef7f710af 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionEvalHelper.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionEvalHelper.scala @@ -33,7 +33,7 @@ trait ExpressionEvalHelper { self: SparkFunSuite => protected def create_row(values: Any*): InternalRow = { - new GenericRow(values.map(CatalystTypeConverters.convertToCatalyst).toArray) + InternalRow.fromSeq(values.map(CatalystTypeConverters.convertToCatalyst)) } protected def checkEvaluation( @@ -122,7 +122,7 @@ trait ExpressionEvalHelper { } val actual = plan(inputRow) - val expectedRow = new GenericRow(Array[Any](expected)) + val expectedRow = InternalRow(expected) if (actual.hashCode() != expectedRow.hashCode()) { fail( s""" diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/UnsafeFixedWidthAggregationMapSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/UnsafeFixedWidthAggregationMapSuite.scala index 7aae2bbd8a0b8..3095ccb77761b 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/UnsafeFixedWidthAggregationMapSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/UnsafeFixedWidthAggregationMapSuite.scala @@ -37,7 +37,7 @@ class UnsafeFixedWidthAggregationMapSuite private val groupKeySchema = StructType(StructField("product", StringType) :: Nil) private val aggBufferSchema = StructType(StructField("salePrice", IntegerType) :: Nil) - private def emptyAggregationBuffer: InternalRow = new GenericRow(Array[Any](0)) + private def emptyAggregationBuffer: InternalRow = InternalRow(0) private var memoryManager: TaskMemoryManager = null @@ -84,7 +84,7 @@ class UnsafeFixedWidthAggregationMapSuite 1024, // initial capacity false // disable perf metrics ) - val groupKey = new GenericRow(Array[Any](UTF8String.fromString("cats"))) + val groupKey = InternalRow(UTF8String.fromString("cats")) // Looking up a key stores a zero-entry in the map (like Python Counters or DefaultDicts) map.getAggregationBuffer(groupKey) @@ -113,7 +113,7 @@ class UnsafeFixedWidthAggregationMapSuite val rand = new Random(42) val groupKeys: Set[String] = Seq.fill(512)(rand.nextString(1024)).toSet groupKeys.foreach { keyString => - map.getAggregationBuffer(new GenericRow(Array[Any](UTF8String.fromString(keyString)))) + map.getAggregationBuffer(InternalRow(UTF8String.fromString(keyString))) } val seenKeys: Set[String] = map.iterator().asScala.map { entry => entry.key.getString(0) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala index f3f0f5305318e..bb830b6f8f7c0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala @@ -1499,8 +1499,8 @@ class DataFrame private[sql]( */ protected[sql] def javaToPython: JavaRDD[Array[Byte]] = { val fieldTypes = schema.fields.map(_.dataType) - val jrdd = rdd.map(EvaluatePython.rowToArray(_, fieldTypes)).toJavaRDD() - SerDeUtil.javaToPython(jrdd) + val rdd = queryExecution.executedPlan.execute().map(EvaluatePython.rowToArray(_, fieldTypes)) + SerDeUtil.javaToPython(rdd.toJavaRDD()) } //////////////////////////////////////////////////////////////////////////// diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala index 04fc798bf3738..7f321a5d0d215 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala @@ -42,6 +42,7 @@ import org.apache.spark.sql.catalyst.{InternalRow, ParserDialect, _} import org.apache.spark.sql.execution.{Filter, _} import org.apache.spark.sql.sources._ import org.apache.spark.sql.types._ +import org.apache.spark.unsafe.types.UTF8String import org.apache.spark.util.Utils /** @@ -377,10 +378,11 @@ class SQLContext(@transient val sparkContext: SparkContext) val row = new SpecificMutableRow(dataType :: Nil) iter.map { v => row.setInt(0, v) - row: Row + row: InternalRow } } - DataFrameHolder(self.createDataFrame(rows, StructType(StructField("_1", dataType) :: Nil))) + DataFrameHolder( + self.internalCreateDataFrame(rows, StructType(StructField("_1", dataType) :: Nil))) } /** @@ -393,10 +395,11 @@ class SQLContext(@transient val sparkContext: SparkContext) val row = new SpecificMutableRow(dataType :: Nil) iter.map { v => row.setLong(0, v) - row: Row + row: InternalRow } } - DataFrameHolder(self.createDataFrame(rows, StructType(StructField("_1", dataType) :: Nil))) + DataFrameHolder( + self.internalCreateDataFrame(rows, StructType(StructField("_1", dataType) :: Nil))) } /** @@ -408,11 +411,12 @@ class SQLContext(@transient val sparkContext: SparkContext) val rows = data.mapPartitions { iter => val row = new SpecificMutableRow(dataType :: Nil) iter.map { v => - row.setString(0, v) - row: Row + row.update(0, UTF8String.fromString(v)) + row: InternalRow } } - DataFrameHolder(self.createDataFrame(rows, StructType(StructField("_1", dataType) :: Nil))) + DataFrameHolder( + self.internalCreateDataFrame(rows, StructType(StructField("_1", dataType) :: Nil))) } } @@ -559,9 +563,9 @@ class SQLContext(@transient val sparkContext: SparkContext) (e, CatalystTypeConverters.createToCatalystConverter(attr.dataType)) } iter.map { row => - new GenericRow( + new GenericInternalRow( methodsToConverts.map { case (e, convert) => convert(e.invoke(row)) }.toArray[Any] - ) : InternalRow + ): InternalRow } } DataFrame(this, LogicalRDD(attributeSeq, rowRdd)(this)) @@ -1065,7 +1069,7 @@ class SQLContext(@transient val sparkContext: SparkContext) } val rowRdd = convertedRdd.mapPartitions { iter => - iter.map { m => new GenericRow(m): InternalRow} + iter.map { m => new GenericInternalRow(m): InternalRow} } DataFrame(this, LogicalRDD(schema.toAttributes, rowRdd)(self)) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnType.scala b/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnType.scala index 8e21020917768..8bf2151e4de68 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnType.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnType.scala @@ -21,7 +21,7 @@ import java.nio.ByteBuffer import scala.reflect.runtime.universe.TypeTag -import org.apache.spark.sql.Row +import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.MutableRow import org.apache.spark.sql.execution.SparkSqlSerializer import org.apache.spark.sql.types._ @@ -63,7 +63,7 @@ private[sql] sealed abstract class ColumnType[T <: DataType, JvmType]( * Appends `row(ordinal)` of type T into the given ByteBuffer. Subclasses should override this * method to avoid boxing/unboxing costs whenever possible. */ - def append(row: Row, ordinal: Int, buffer: ByteBuffer): Unit = { + def append(row: InternalRow, ordinal: Int, buffer: ByteBuffer): Unit = { append(getField(row, ordinal), buffer) } @@ -71,13 +71,13 @@ private[sql] sealed abstract class ColumnType[T <: DataType, JvmType]( * Returns the size of the value `row(ordinal)`. This is used to calculate the size of variable * length types such as byte arrays and strings. */ - def actualSize(row: Row, ordinal: Int): Int = defaultSize + def actualSize(row: InternalRow, ordinal: Int): Int = defaultSize /** * Returns `row(ordinal)`. Subclasses should override this method to avoid boxing/unboxing costs * whenever possible. */ - def getField(row: Row, ordinal: Int): JvmType + def getField(row: InternalRow, ordinal: Int): JvmType /** * Sets `row(ordinal)` to `field`. Subclasses should override this method to avoid boxing/unboxing @@ -89,7 +89,7 @@ private[sql] sealed abstract class ColumnType[T <: DataType, JvmType]( * Copies `from(fromOrdinal)` to `to(toOrdinal)`. Subclasses should override this method to avoid * boxing/unboxing costs whenever possible. */ - def copyField(from: Row, fromOrdinal: Int, to: MutableRow, toOrdinal: Int): Unit = { + def copyField(from: InternalRow, fromOrdinal: Int, to: MutableRow, toOrdinal: Int): Unit = { to(toOrdinal) = from(fromOrdinal) } @@ -118,7 +118,7 @@ private[sql] object INT extends NativeColumnType(IntegerType, 0, 4) { buffer.putInt(v) } - override def append(row: Row, ordinal: Int, buffer: ByteBuffer): Unit = { + override def append(row: InternalRow, ordinal: Int, buffer: ByteBuffer): Unit = { buffer.putInt(row.getInt(ordinal)) } @@ -134,9 +134,9 @@ private[sql] object INT extends NativeColumnType(IntegerType, 0, 4) { row.setInt(ordinal, value) } - override def getField(row: Row, ordinal: Int): Int = row.getInt(ordinal) + override def getField(row: InternalRow, ordinal: Int): Int = row.getInt(ordinal) - override def copyField(from: Row, fromOrdinal: Int, to: MutableRow, toOrdinal: Int): Unit = { + override def copyField(from: InternalRow, fromOrdinal: Int, to: MutableRow, toOrdinal: Int) { to.setInt(toOrdinal, from.getInt(fromOrdinal)) } } @@ -146,7 +146,7 @@ private[sql] object LONG extends NativeColumnType(LongType, 1, 8) { buffer.putLong(v) } - override def append(row: Row, ordinal: Int, buffer: ByteBuffer): Unit = { + override def append(row: InternalRow, ordinal: Int, buffer: ByteBuffer): Unit = { buffer.putLong(row.getLong(ordinal)) } @@ -162,9 +162,9 @@ private[sql] object LONG extends NativeColumnType(LongType, 1, 8) { row.setLong(ordinal, value) } - override def getField(row: Row, ordinal: Int): Long = row.getLong(ordinal) + override def getField(row: InternalRow, ordinal: Int): Long = row.getLong(ordinal) - override def copyField(from: Row, fromOrdinal: Int, to: MutableRow, toOrdinal: Int): Unit = { + override def copyField(from: InternalRow, fromOrdinal: Int, to: MutableRow, toOrdinal: Int) { to.setLong(toOrdinal, from.getLong(fromOrdinal)) } } @@ -174,7 +174,7 @@ private[sql] object FLOAT extends NativeColumnType(FloatType, 2, 4) { buffer.putFloat(v) } - override def append(row: Row, ordinal: Int, buffer: ByteBuffer): Unit = { + override def append(row: InternalRow, ordinal: Int, buffer: ByteBuffer): Unit = { buffer.putFloat(row.getFloat(ordinal)) } @@ -190,9 +190,9 @@ private[sql] object FLOAT extends NativeColumnType(FloatType, 2, 4) { row.setFloat(ordinal, value) } - override def getField(row: Row, ordinal: Int): Float = row.getFloat(ordinal) + override def getField(row: InternalRow, ordinal: Int): Float = row.getFloat(ordinal) - override def copyField(from: Row, fromOrdinal: Int, to: MutableRow, toOrdinal: Int): Unit = { + override def copyField(from: InternalRow, fromOrdinal: Int, to: MutableRow, toOrdinal: Int) { to.setFloat(toOrdinal, from.getFloat(fromOrdinal)) } } @@ -202,7 +202,7 @@ private[sql] object DOUBLE extends NativeColumnType(DoubleType, 3, 8) { buffer.putDouble(v) } - override def append(row: Row, ordinal: Int, buffer: ByteBuffer): Unit = { + override def append(row: InternalRow, ordinal: Int, buffer: ByteBuffer): Unit = { buffer.putDouble(row.getDouble(ordinal)) } @@ -218,9 +218,9 @@ private[sql] object DOUBLE extends NativeColumnType(DoubleType, 3, 8) { row.setDouble(ordinal, value) } - override def getField(row: Row, ordinal: Int): Double = row.getDouble(ordinal) + override def getField(row: InternalRow, ordinal: Int): Double = row.getDouble(ordinal) - override def copyField(from: Row, fromOrdinal: Int, to: MutableRow, toOrdinal: Int): Unit = { + override def copyField(from: InternalRow, fromOrdinal: Int, to: MutableRow, toOrdinal: Int) { to.setDouble(toOrdinal, from.getDouble(fromOrdinal)) } } @@ -230,7 +230,7 @@ private[sql] object BOOLEAN extends NativeColumnType(BooleanType, 4, 1) { buffer.put(if (v) 1: Byte else 0: Byte) } - override def append(row: Row, ordinal: Int, buffer: ByteBuffer): Unit = { + override def append(row: InternalRow, ordinal: Int, buffer: ByteBuffer): Unit = { buffer.put(if (row.getBoolean(ordinal)) 1: Byte else 0: Byte) } @@ -244,9 +244,9 @@ private[sql] object BOOLEAN extends NativeColumnType(BooleanType, 4, 1) { row.setBoolean(ordinal, value) } - override def getField(row: Row, ordinal: Int): Boolean = row.getBoolean(ordinal) + override def getField(row: InternalRow, ordinal: Int): Boolean = row.getBoolean(ordinal) - override def copyField(from: Row, fromOrdinal: Int, to: MutableRow, toOrdinal: Int): Unit = { + override def copyField(from: InternalRow, fromOrdinal: Int, to: MutableRow, toOrdinal: Int) { to.setBoolean(toOrdinal, from.getBoolean(fromOrdinal)) } } @@ -256,7 +256,7 @@ private[sql] object BYTE extends NativeColumnType(ByteType, 5, 1) { buffer.put(v) } - override def append(row: Row, ordinal: Int, buffer: ByteBuffer): Unit = { + override def append(row: InternalRow, ordinal: Int, buffer: ByteBuffer): Unit = { buffer.put(row.getByte(ordinal)) } @@ -272,9 +272,9 @@ private[sql] object BYTE extends NativeColumnType(ByteType, 5, 1) { row.setByte(ordinal, value) } - override def getField(row: Row, ordinal: Int): Byte = row.getByte(ordinal) + override def getField(row: InternalRow, ordinal: Int): Byte = row.getByte(ordinal) - override def copyField(from: Row, fromOrdinal: Int, to: MutableRow, toOrdinal: Int): Unit = { + override def copyField(from: InternalRow, fromOrdinal: Int, to: MutableRow, toOrdinal: Int) { to.setByte(toOrdinal, from.getByte(fromOrdinal)) } } @@ -284,7 +284,7 @@ private[sql] object SHORT extends NativeColumnType(ShortType, 6, 2) { buffer.putShort(v) } - override def append(row: Row, ordinal: Int, buffer: ByteBuffer): Unit = { + override def append(row: InternalRow, ordinal: Int, buffer: ByteBuffer): Unit = { buffer.putShort(row.getShort(ordinal)) } @@ -300,15 +300,15 @@ private[sql] object SHORT extends NativeColumnType(ShortType, 6, 2) { row.setShort(ordinal, value) } - override def getField(row: Row, ordinal: Int): Short = row.getShort(ordinal) + override def getField(row: InternalRow, ordinal: Int): Short = row.getShort(ordinal) - override def copyField(from: Row, fromOrdinal: Int, to: MutableRow, toOrdinal: Int): Unit = { + override def copyField(from: InternalRow, fromOrdinal: Int, to: MutableRow, toOrdinal: Int) { to.setShort(toOrdinal, from.getShort(fromOrdinal)) } } private[sql] object STRING extends NativeColumnType(StringType, 7, 8) { - override def actualSize(row: Row, ordinal: Int): Int = { + override def actualSize(row: InternalRow, ordinal: Int): Int = { row.getString(ordinal).getBytes("utf-8").length + 4 } @@ -328,11 +328,11 @@ private[sql] object STRING extends NativeColumnType(StringType, 7, 8) { row.update(ordinal, value) } - override def getField(row: Row, ordinal: Int): UTF8String = { + override def getField(row: InternalRow, ordinal: Int): UTF8String = { row(ordinal).asInstanceOf[UTF8String] } - override def copyField(from: Row, fromOrdinal: Int, to: MutableRow, toOrdinal: Int): Unit = { + override def copyField(from: InternalRow, fromOrdinal: Int, to: MutableRow, toOrdinal: Int) { to.update(toOrdinal, from(fromOrdinal)) } } @@ -346,7 +346,7 @@ private[sql] object DATE extends NativeColumnType(DateType, 8, 4) { buffer.putInt(v) } - override def getField(row: Row, ordinal: Int): Int = { + override def getField(row: InternalRow, ordinal: Int): Int = { row(ordinal).asInstanceOf[Int] } @@ -364,7 +364,7 @@ private[sql] object TIMESTAMP extends NativeColumnType(TimestampType, 9, 8) { buffer.putLong(v) } - override def getField(row: Row, ordinal: Int): Long = { + override def getField(row: InternalRow, ordinal: Int): Long = { row(ordinal).asInstanceOf[Long] } @@ -387,7 +387,7 @@ private[sql] case class FIXED_DECIMAL(precision: Int, scale: Int) buffer.putLong(v.toUnscaledLong) } - override def getField(row: Row, ordinal: Int): Decimal = { + override def getField(row: InternalRow, ordinal: Int): Decimal = { row(ordinal).asInstanceOf[Decimal] } @@ -405,7 +405,7 @@ private[sql] sealed abstract class ByteArrayColumnType[T <: DataType]( defaultSize: Int) extends ColumnType[T, Array[Byte]](typeId, defaultSize) { - override def actualSize(row: Row, ordinal: Int): Int = { + override def actualSize(row: InternalRow, ordinal: Int): Int = { getField(row, ordinal).length + 4 } @@ -426,7 +426,7 @@ private[sql] object BINARY extends ByteArrayColumnType[BinaryType.type](11, 16) row(ordinal) = value } - override def getField(row: Row, ordinal: Int): Array[Byte] = { + override def getField(row: InternalRow, ordinal: Int): Array[Byte] = { row(ordinal).asInstanceOf[Array[Byte]] } } @@ -439,7 +439,7 @@ private[sql] object GENERIC extends ByteArrayColumnType[DataType](12, 16) { row(ordinal) = SparkSqlSerializer.deserialize[Any](value) } - override def getField(row: Row, ordinal: Int): Array[Byte] = { + override def getField(row: InternalRow, ordinal: Int): Array[Byte] = { SparkSqlSerializer.serialize(row(ordinal)) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala b/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala index 761f427b8cd0d..cb1fd4947fdbc 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala @@ -146,7 +146,8 @@ private[sql] case class InMemoryRelation( rowCount += 1 } - val stats = InternalRow.merge(columnBuilders.map(_.columnStats.collectedStatistics) : _*) + val stats = InternalRow.fromSeq(columnBuilders.map(_.columnStats.collectedStatistics) + .flatMap(_.toSeq)) batchStats += stats CachedBatch(columnBuilders.map(_.build().array()), stats) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlSerializer.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlSerializer.scala index eea15aff5dbcf..b19ad4f1c563e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlSerializer.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlSerializer.scala @@ -20,22 +20,20 @@ package org.apache.spark.sql.execution import java.nio.ByteBuffer import java.util.{HashMap => JavaHashMap} -import org.apache.spark.sql.types.Decimal - import scala.reflect.ClassTag import com.clearspring.analytics.stream.cardinality.HyperLogLog import com.esotericsoftware.kryo.io.{Input, Output} -import com.esotericsoftware.kryo.{Serializer, Kryo} +import com.esotericsoftware.kryo.{Kryo, Serializer} import com.twitter.chill.ResourcePool -import org.apache.spark.{SparkEnv, SparkConf} -import org.apache.spark.serializer.{SerializerInstance, KryoSerializer} -import org.apache.spark.sql.catalyst.expressions.GenericRow -import org.apache.spark.util.collection.OpenHashSet -import org.apache.spark.util.MutablePair - +import org.apache.spark.serializer.{KryoSerializer, SerializerInstance} +import org.apache.spark.sql.catalyst.expressions.GenericInternalRow import org.apache.spark.sql.catalyst.expressions.codegen.{IntegerHashSet, LongHashSet} +import org.apache.spark.sql.types.Decimal +import org.apache.spark.util.MutablePair +import org.apache.spark.util.collection.OpenHashSet +import org.apache.spark.{SparkConf, SparkEnv} private[sql] class SparkSqlSerializer(conf: SparkConf) extends KryoSerializer(conf) { override def newKryo(): Kryo = { @@ -43,6 +41,7 @@ private[sql] class SparkSqlSerializer(conf: SparkConf) extends KryoSerializer(co kryo.setRegistrationRequired(false) kryo.register(classOf[MutablePair[_, _]]) kryo.register(classOf[org.apache.spark.sql.catalyst.expressions.GenericRow]) + kryo.register(classOf[org.apache.spark.sql.catalyst.expressions.GenericInternalRow]) kryo.register(classOf[org.apache.spark.sql.catalyst.expressions.GenericMutableRow]) kryo.register(classOf[com.clearspring.analytics.stream.cardinality.HyperLogLog], new HyperLogLogSerializer) @@ -139,7 +138,7 @@ private[sql] class OpenHashSetSerializer extends Serializer[OpenHashSet[_]] { val iterator = hs.iterator while(iterator.hasNext) { val row = iterator.next() - rowSerializer.write(kryo, output, row.asInstanceOf[GenericRow].values) + rowSerializer.write(kryo, output, row.asInstanceOf[GenericInternalRow].values) } } @@ -150,7 +149,7 @@ private[sql] class OpenHashSetSerializer extends Serializer[OpenHashSet[_]] { var i = 0 while (i < numItems) { val row = - new GenericRow(rowSerializer.read( + new GenericInternalRow(rowSerializer.read( kryo, input, classOf[Array[Any]].asInstanceOf[Class[Any]]).asInstanceOf[Array[Any]]) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlSerializer2.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlSerializer2.scala index 15b6936acd59b..74a22353b1d27 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlSerializer2.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlSerializer2.scala @@ -26,7 +26,8 @@ import scala.reflect.ClassTag import org.apache.spark.Logging import org.apache.spark.serializer._ import org.apache.spark.sql.Row -import org.apache.spark.sql.catalyst.expressions.{GenericMutableRow, MutableRow, SpecificMutableRow} +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.{MutableRow, SpecificMutableRow} import org.apache.spark.sql.types._ import org.apache.spark.unsafe.types.UTF8String @@ -329,7 +330,7 @@ private[sql] object SparkSqlSerializer2 { */ def createDeserializationFunction( schema: Array[DataType], - in: DataInputStream): (MutableRow) => Row = { + in: DataInputStream): (MutableRow) => InternalRow = { if (schema == null) { (mutableRow: MutableRow) => null } else { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala index 1ff1cc224de8c..d6cccd240d198 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala @@ -210,8 +210,7 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { } } - protected lazy val singleRowRdd = - sparkContext.parallelize(Seq(new GenericRow(Array[Any]()): InternalRow), 1) + protected lazy val singleRowRdd = sparkContext.parallelize(Seq(InternalRow()), 1) object TakeOrdered extends Strategy { def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashOuterJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashOuterJoin.scala index bce0e8d70a57b..e41538ec1fc1a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashOuterJoin.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashOuterJoin.scala @@ -71,8 +71,8 @@ case class HashOuterJoin( @transient private[this] lazy val DUMMY_LIST = Seq[InternalRow](null) @transient private[this] lazy val EMPTY_LIST = Seq.empty[InternalRow] - @transient private[this] lazy val leftNullRow = new GenericRow(left.output.length) - @transient private[this] lazy val rightNullRow = new GenericRow(right.output.length) + @transient private[this] lazy val leftNullRow = new GenericInternalRow(left.output.length) + @transient private[this] lazy val rightNullRow = new GenericInternalRow(right.output.length) @transient private[this] lazy val boundCondition = condition.map( newPredicate(_, left.output ++ right.output)).getOrElse((row: InternalRow) => true) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/pythonUdfs.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/pythonUdfs.scala index 6db551c543a9c..e1abfe9e16121 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/pythonUdfs.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/pythonUdfs.scala @@ -157,9 +157,9 @@ object EvaluatePython { } /** - * Convert Row into Java Array (for pickled into Python) + * Convert InternalRow into Java Array (for pickled into Python) */ - def rowToArray(row: Row, fields: Seq[DataType]): Array[Any] = { + def rowToArray(row: InternalRow, fields: Seq[DataType]): Array[Any] = { // TODO: this is slow! row.toSeq.zip(fields).map {case (obj, dt) => toJava(obj, dt)}.toArray } @@ -183,9 +183,9 @@ object EvaluatePython { }.toMap case (c, StructType(fields)) if c.getClass.isArray => - new GenericRow(c.asInstanceOf[Array[_]].zip(fields).map { + new GenericInternalRow(c.asInstanceOf[Array[_]].zip(fields).map { case (e, f) => fromJava(e, f.dataType) - }): Row + }) case (c: java.util.Calendar, DateType) => DateTimeUtils.fromJavaDate(new java.sql.Date(c.getTimeInMillis)) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/stat/StatFunctions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/stat/StatFunctions.scala index 93383e5a62f11..84e2988b7768d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/stat/StatFunctions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/stat/StatFunctions.scala @@ -23,6 +23,7 @@ import org.apache.spark.sql.catalyst.expressions.{GenericMutableRow, Cast} import org.apache.spark.sql.catalyst.plans.logical.LocalRelation import org.apache.spark.sql.functions._ import org.apache.spark.sql.types._ +import org.apache.spark.unsafe.types.UTF8String private[sql] object StatFunctions extends Logging { @@ -123,7 +124,7 @@ private[sql] object StatFunctions extends Logging { countsRow.setLong(distinctCol2.get(row.get(1)).get + 1, row.getLong(2)) } // the value of col1 is the first value, the rest are the counts - countsRow.setString(0, col1Item.toString) + countsRow.update(0, UTF8String.fromString(col1Item.toString)) countsRow }.toSeq val headerNames = distinctCol2.map(r => StructField(r._1.toString, LongType)).toSeq diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JDBCRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JDBCRDD.scala index 8b4276b2c364c..30c5f4ca3e1b2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JDBCRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JDBCRDD.scala @@ -417,7 +417,7 @@ private[sql] class JDBCRDD( case IntegerConversion => mutableRow.setInt(i, rs.getInt(pos)) case LongConversion => mutableRow.setLong(i, rs.getLong(pos)) // TODO(davies): use getBytes for better performance, if the encoding is UTF-8 - case StringConversion => mutableRow.setString(i, rs.getString(pos)) + case StringConversion => mutableRow.update(i, UTF8String.fromString(rs.getString(pos))) case TimestampConversion => val t = rs.getTimestamp(pos) if (t != null) { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetConverter.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetConverter.scala index cf7aa44e4cd55..ae7cbf0624dc8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetConverter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetConverter.scala @@ -318,7 +318,7 @@ private[parquet] class CatalystGroupConverter( // Note: this will ever only be called in the root converter when the record has been // fully processed. Therefore it will be difficult to use mutable rows instead, since // any non-root converter never would be sure when it would be safe to re-use the buffer. - new GenericRow(current.toArray) + new GenericInternalRow(current.toArray) } override def getConverter(fieldIndex: Int): Converter = converters(fieldIndex) @@ -342,8 +342,8 @@ private[parquet] class CatalystGroupConverter( override def end(): Unit = { if (!isRootConverter) { assert(current != null) // there should be no empty groups - buffer.append(new GenericRow(current.toArray)) - parent.updateField(index, new GenericRow(buffer.toArray.asInstanceOf[Array[Any]])) + buffer.append(new GenericInternalRow(current.toArray)) + parent.updateField(index, new GenericInternalRow(buffer.toArray.asInstanceOf[Array[Any]])) } } } @@ -788,7 +788,7 @@ private[parquet] class CatalystStructConverter( // here we need to make sure to use StructScalaType // Note: we need to actually make a copy of the array since we // may be in a nested field - parent.updateField(index, new GenericRow(current.toArray)) + parent.updateField(index, new GenericInternalRow(current.toArray)) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala index fb6173f58ece6..de44ad6ed3998 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala @@ -44,7 +44,7 @@ private[sql] case class InsertIntoDataSource( overwrite: Boolean) extends RunnableCommand { - override def run(sqlContext: SQLContext): Seq[InternalRow] = { + override def run(sqlContext: SQLContext): Seq[Row] = { val relation = logicalRelation.relation.asInstanceOf[InsertableRelation] val data = DataFrame(sqlContext, query) // Apply the schema of the existing table to the new data. @@ -54,7 +54,7 @@ private[sql] case class InsertIntoDataSource( // Invalidate the cache. sqlContext.cacheManager.invalidateCache(logicalRelation) - Seq.empty[InternalRow] + Seq.empty[Row] } } @@ -86,7 +86,7 @@ private[sql] case class InsertIntoHadoopFsRelation( mode: SaveMode) extends RunnableCommand { - override def run(sqlContext: SQLContext): Seq[InternalRow] = { + override def run(sqlContext: SQLContext): Seq[Row] = { require( relation.paths.length == 1, s"Cannot write to multiple destinations: ${relation.paths.mkString(",")}") diff --git a/sql/core/src/test/scala/org/apache/spark/sql/ScalaReflectionRelationSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/ScalaReflectionRelationSuite.scala index ece3d6fdf2af5..4cb5ba2f0d5eb 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/ScalaReflectionRelationSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/ScalaReflectionRelationSuite.scala @@ -20,7 +20,6 @@ package org.apache.spark.sql import java.sql.{Date, Timestamp} import org.apache.spark.SparkFunSuite -import org.apache.spark.sql.catalyst.expressions._ case class ReflectData( stringField: String, @@ -128,16 +127,16 @@ class ScalaReflectionRelationSuite extends SparkFunSuite { Seq(data).toDF().registerTempTable("reflectComplexData") assert(ctx.sql("SELECT * FROM reflectComplexData").collect().head === - new GenericRow(Array[Any]( + Row( Seq(1, 2, 3), Seq(1, 2, null), Map(1 -> 10L, 2 -> 20L), Map(1 -> 10L, 2 -> 20L, 3 -> null), - new GenericRow(Array[Any]( + Row( Seq(10, 20, 30), Seq(10, 20, null), Map(10 -> 100L, 20 -> 200L), Map(10 -> 100L, 20 -> 200L, 30 -> null), - new GenericRow(Array[Any](null, "abc"))))))) + Row(null, "abc")))) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/DDLTestSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/DDLTestSuite.scala index 5fc53f7012994..54e1efb6e36e7 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/DDLTestSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/DDLTestSuite.scala @@ -62,7 +62,7 @@ case class SimpleDDLScan(from: Int, to: Int, table: String)(@transient val sqlCo override def buildScan(): RDD[Row] = { sqlContext.sparkContext.parallelize(from to to).map { e => - InternalRow(UTF8String.fromString(s"people$e"), e * 2) + InternalRow(UTF8String.fromString(s"people$e"), e * 2): Row } } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/TableScanSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/TableScanSuite.scala index 79eac930e54f7..c3e0ec16aac99 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/TableScanSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/TableScanSuite.scala @@ -90,9 +90,9 @@ case class AllDataTypesScan( Seq(Map(UTF8String.fromString(s"str_$i") -> InternalRow(i.toLong))), Map(i -> i.toString), Map(Map(UTF8String.fromString(s"str_$i") -> i.toFloat) -> InternalRow(i.toLong)), - Row(i, i.toString), - Row(Seq(UTF8String.fromString(s"str_$i"), UTF8String.fromString(s"str_${i + 1}")), - InternalRow(Seq(DateTimeUtils.fromJavaDate(new Date(1970, 1, i + 1)))))) + InternalRow(i, i.toString), + InternalRow(Seq(UTF8String.fromString(s"str_$i"), UTF8String.fromString(s"str_${i + 1}")), + InternalRow(Seq(DateTimeUtils.fromJavaDate(new Date(1970, 1, i + 1)))))): Row } } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala index 864c888ab073d..a6b8ead577fb5 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala @@ -336,9 +336,8 @@ private[hive] trait HiveInspectors { // currently, hive doesn't provide the ConstantStructObjectInspector case si: StructObjectInspector => val allRefs = si.getAllStructFieldRefs - new GenericRow( - allRefs.map(r => - unwrap(si.getStructFieldData(data, r), r.getFieldObjectInspector)).toArray) + InternalRow.fromSeq( + allRefs.map(r => unwrap(si.getStructFieldData(data, r), r.getFieldObjectInspector))) } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala index 00e61e35d4354..b251a9523bed6 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala @@ -34,6 +34,7 @@ import org.apache.spark.broadcast.Broadcast import org.apache.spark.rdd.{EmptyRDD, HadoopRDD, RDD, UnionRDD} import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.util.DateTimeUtils +import org.apache.spark.unsafe.types.UTF8String import org.apache.spark.util.{SerializableConfiguration, Utils} /** @@ -356,7 +357,7 @@ private[hive] object HadoopTableReader extends HiveInspectors with Logging { (value: Any, row: MutableRow, ordinal: Int) => row.setDouble(ordinal, oi.get(value)) case oi: HiveVarcharObjectInspector => (value: Any, row: MutableRow, ordinal: Int) => - row.setString(ordinal, oi.getPrimitiveJavaObject(value).getValue) + row.update(ordinal, UTF8String.fromString(oi.getPrimitiveJavaObject(value).getValue)) case oi: HiveDecimalObjectInspector => (value: Any, row: MutableRow, ordinal: Int) => row.update(ordinal, HiveShim.toCatalystDecimal(oi, value)) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala index 0e4a2427a9c15..84358cb73c9e3 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala @@ -17,13 +17,11 @@ package org.apache.spark.sql.hive.execution -import org.apache.spark.annotation.Experimental -import org.apache.spark.sql.{AnalysisException, SQLContext} -import org.apache.spark.sql.catalyst.expressions.InternalRow import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, LogicalPlan} import org.apache.spark.sql.execution.RunnableCommand -import org.apache.spark.sql.hive.client.{HiveTable, HiveColumn} -import org.apache.spark.sql.hive.{HiveContext, MetastoreRelation, HiveMetastoreTypes} +import org.apache.spark.sql.hive.client.{HiveColumn, HiveTable} +import org.apache.spark.sql.hive.{HiveContext, HiveMetastoreTypes, MetastoreRelation} +import org.apache.spark.sql.{AnalysisException, Row, SQLContext} /** * Create table and insert the query result into it. @@ -42,11 +40,11 @@ case class CreateTableAsSelect( def database: String = tableDesc.database def tableName: String = tableDesc.name - override def run(sqlContext: SQLContext): Seq[InternalRow] = { + override def run(sqlContext: SQLContext): Seq[Row] = { val hiveContext = sqlContext.asInstanceOf[HiveContext] lazy val metastoreRelation: MetastoreRelation = { - import org.apache.hadoop.hive.serde2.`lazy`.LazySimpleSerDe import org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + import org.apache.hadoop.hive.serde2.`lazy`.LazySimpleSerDe import org.apache.hadoop.io.Text import org.apache.hadoop.mapred.TextInputFormat @@ -89,7 +87,7 @@ case class CreateTableAsSelect( hiveContext.executePlan(InsertIntoTable(metastoreRelation, Map(), query, true, false)).toRdd } - Seq.empty[InternalRow] + Seq.empty[Row] } override def argString: String = { diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/DescribeHiveTableCommand.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/DescribeHiveTableCommand.scala index a89381000ad5f..5f0ed5393d191 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/DescribeHiveTableCommand.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/DescribeHiveTableCommand.scala @@ -21,10 +21,10 @@ import scala.collection.JavaConversions._ import org.apache.hadoop.hive.metastore.api.FieldSchema -import org.apache.spark.sql.SQLContext -import org.apache.spark.sql.catalyst.expressions.{Attribute, InternalRow} +import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.execution.RunnableCommand import org.apache.spark.sql.hive.MetastoreRelation +import org.apache.spark.sql.{Row, SQLContext} /** * Implementation for "describe [extended] table". @@ -35,7 +35,7 @@ case class DescribeHiveTableCommand( override val output: Seq[Attribute], isExtended: Boolean) extends RunnableCommand { - override def run(sqlContext: SQLContext): Seq[InternalRow] = { + override def run(sqlContext: SQLContext): Seq[Row] = { // Trying to mimic the format of Hive's output. But not exactly the same. var results: Seq[(String, String, String)] = Nil @@ -57,7 +57,7 @@ case class DescribeHiveTableCommand( } results.map { case (name, dataType, comment) => - InternalRow(name, dataType, comment) + Row(name, dataType, comment) } } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveNativeCommand.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveNativeCommand.scala index 87f8e3f7fcfcc..41b645b2c9c93 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveNativeCommand.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveNativeCommand.scala @@ -17,11 +17,11 @@ package org.apache.spark.sql.hive.execution -import org.apache.spark.sql.catalyst.expressions.{AttributeReference, InternalRow} +import org.apache.spark.sql.catalyst.expressions.AttributeReference import org.apache.spark.sql.execution.RunnableCommand import org.apache.spark.sql.hive.HiveContext -import org.apache.spark.sql.SQLContext import org.apache.spark.sql.types.StringType +import org.apache.spark.sql.{Row, SQLContext} private[hive] case class HiveNativeCommand(sql: String) extends RunnableCommand { @@ -29,6 +29,6 @@ case class HiveNativeCommand(sql: String) extends RunnableCommand { override def output: Seq[AttributeReference] = Seq(AttributeReference("result", StringType, nullable = false)()) - override def run(sqlContext: SQLContext): Seq[InternalRow] = - sqlContext.asInstanceOf[HiveContext].runSqlHive(sql).map(InternalRow(_)) + override def run(sqlContext: SQLContext): Seq[Row] = + sqlContext.asInstanceOf[HiveContext].runSqlHive(sql).map(Row(_)) } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScan.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScan.scala index 1f5e4af2e4746..f4c8c9a7e8a68 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScan.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScan.scala @@ -123,7 +123,7 @@ case class HiveTableScan( // Only partitioned values are needed here, since the predicate has already been bound to // partition key attribute references. - val row = new GenericRow(castedValues.toArray) + val row = InternalRow.fromSeq(castedValues) shouldKeep.eval(row).asInstanceOf[Boolean] } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala index 9d8872aa47d1f..c48c6a62a7990 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala @@ -129,11 +129,11 @@ case class ScriptTransformation( val prevLine = curLine curLine = reader.readLine() if (!ioschema.schemaLess) { - new GenericRow(CatalystTypeConverters.convertToCatalyst( + new GenericInternalRow(CatalystTypeConverters.convertToCatalyst( prevLine.split(ioschema.outputRowFormatMap("TOK_TABLEROWFORMATFIELD"))) .asInstanceOf[Array[Any]]) } else { - new GenericRow(CatalystTypeConverters.convertToCatalyst( + new GenericInternalRow(CatalystTypeConverters.convertToCatalyst( prevLine.split(ioschema.outputRowFormatMap("TOK_TABLEROWFORMATFIELD"), 2)) .asInstanceOf[Array[Any]]) } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala index aad58bfa2e6e0..71fa3e9c33ad9 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala @@ -17,15 +17,14 @@ package org.apache.spark.sql.hive.execution -import org.apache.spark.sql.AnalysisException +import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.analysis.EliminateSubQueries -import org.apache.spark.sql.catalyst.util._ -import org.apache.spark.sql.sources._ -import org.apache.spark.sql.{SaveMode, DataFrame, SQLContext} -import org.apache.spark.sql.catalyst.expressions.{Attribute, InternalRow} +import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.catalyst.util._ import org.apache.spark.sql.execution.RunnableCommand import org.apache.spark.sql.hive.HiveContext +import org.apache.spark.sql.sources._ import org.apache.spark.sql.types._ import org.apache.spark.util.Utils @@ -39,9 +38,9 @@ import org.apache.spark.util.Utils private[hive] case class AnalyzeTable(tableName: String) extends RunnableCommand { - override def run(sqlContext: SQLContext): Seq[InternalRow] = { + override def run(sqlContext: SQLContext): Seq[Row] = { sqlContext.asInstanceOf[HiveContext].analyze(tableName) - Seq.empty[InternalRow] + Seq.empty[Row] } } @@ -53,7 +52,7 @@ case class DropTable( tableName: String, ifExists: Boolean) extends RunnableCommand { - override def run(sqlContext: SQLContext): Seq[InternalRow] = { + override def run(sqlContext: SQLContext): Seq[Row] = { val hiveContext = sqlContext.asInstanceOf[HiveContext] val ifExistsClause = if (ifExists) "IF EXISTS " else "" try { @@ -70,7 +69,7 @@ case class DropTable( hiveContext.invalidateTable(tableName) hiveContext.runSqlHive(s"DROP TABLE $ifExistsClause$tableName") hiveContext.catalog.unregisterTable(Seq(tableName)) - Seq.empty[InternalRow] + Seq.empty[Row] } } @@ -83,7 +82,7 @@ case class AddJar(path: String) extends RunnableCommand { schema.toAttributes } - override def run(sqlContext: SQLContext): Seq[InternalRow] = { + override def run(sqlContext: SQLContext): Seq[Row] = { val hiveContext = sqlContext.asInstanceOf[HiveContext] val currentClassLoader = Utils.getContextOrSparkClassLoader @@ -105,18 +104,18 @@ case class AddJar(path: String) extends RunnableCommand { // Add jar to executors hiveContext.sparkContext.addJar(path) - Seq(InternalRow(0)) + Seq(Row(0)) } } private[hive] case class AddFile(path: String) extends RunnableCommand { - override def run(sqlContext: SQLContext): Seq[InternalRow] = { + override def run(sqlContext: SQLContext): Seq[Row] = { val hiveContext = sqlContext.asInstanceOf[HiveContext] hiveContext.runSqlHive(s"ADD FILE $path") hiveContext.sparkContext.addFile(path) - Seq.empty[InternalRow] + Seq.empty[Row] } } @@ -129,12 +128,12 @@ case class CreateMetastoreDataSource( allowExisting: Boolean, managedIfNoPath: Boolean) extends RunnableCommand { - override def run(sqlContext: SQLContext): Seq[InternalRow] = { + override def run(sqlContext: SQLContext): Seq[Row] = { val hiveContext = sqlContext.asInstanceOf[HiveContext] if (hiveContext.catalog.tableExists(tableName :: Nil)) { if (allowExisting) { - return Seq.empty[InternalRow] + return Seq.empty[Row] } else { throw new AnalysisException(s"Table $tableName already exists.") } @@ -157,7 +156,7 @@ case class CreateMetastoreDataSource( optionsWithPath, isExternal) - Seq.empty[InternalRow] + Seq.empty[Row] } } @@ -170,7 +169,7 @@ case class CreateMetastoreDataSourceAsSelect( options: Map[String, String], query: LogicalPlan) extends RunnableCommand { - override def run(sqlContext: SQLContext): Seq[InternalRow] = { + override def run(sqlContext: SQLContext): Seq[Row] = { val hiveContext = sqlContext.asInstanceOf[HiveContext] var createMetastoreTable = false var isExternal = true @@ -194,7 +193,7 @@ case class CreateMetastoreDataSourceAsSelect( s"Or, if you are using SQL CREATE TABLE, you need to drop $tableName first.") case SaveMode.Ignore => // Since the table already exists and the save mode is Ignore, we will just return. - return Seq.empty[InternalRow] + return Seq.empty[Row] case SaveMode.Append => // Check if the specified data source match the data source of the existing table. val resolved = ResolvedDataSource( @@ -259,6 +258,6 @@ case class CreateMetastoreDataSourceAsSelect( // Refresh the cache of the table in the catalog. hiveContext.refreshTable(tableName) - Seq.empty[InternalRow] + Seq.empty[Row] } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala index 705f48f1cd9f0..bab08f22e820e 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala @@ -190,7 +190,7 @@ private[sql] class OrcRelation( filters: Array[Filter], inputPaths: Array[FileStatus]): RDD[Row] = { val output = StructType(requiredColumns.map(dataSchema(_))).toAttributes - OrcTableScan(output, this, filters, inputPaths).execute() + OrcTableScan(output, this, filters, inputPaths).execute().map(_.asInstanceOf[Row]) } override def prepareJobForWrite(job: Job): OutputWriterFactory = { @@ -224,13 +224,13 @@ private[orc] case class OrcTableScan( HiveShim.appendReadColumns(conf, sortedIds, sortedNames) } - // Transform all given raw `Writable`s into `Row`s. + // Transform all given raw `Writable`s into `InternalRow`s. private def fillObject( path: String, conf: Configuration, iterator: Iterator[Writable], nonPartitionKeyAttrs: Seq[(Attribute, Int)], - mutableRow: MutableRow): Iterator[Row] = { + mutableRow: MutableRow): Iterator[InternalRow] = { val deserializer = new OrcSerde val soi = OrcFileOperator.getObjectInspector(path, Some(conf)) val (fieldRefs, fieldOrdinals) = nonPartitionKeyAttrs.map { @@ -251,11 +251,11 @@ private[orc] case class OrcTableScan( } i += 1 } - mutableRow: Row + mutableRow: InternalRow } } - def execute(): RDD[Row] = { + def execute(): RDD[InternalRow] = { val job = new Job(sqlContext.sparkContext.hadoopConfiguration) val conf = job.getConfiguration