Skip to content

Commit

Permalink
separate Row and InternalRow (part 2)
Browse files Browse the repository at this point in the history
  • Loading branch information
Davies Liu committed Jun 24, 2015
1 parent bba6699 commit 9d24350
Show file tree
Hide file tree
Showing 35 changed files with 252 additions and 215 deletions.
4 changes: 2 additions & 2 deletions sql/catalyst/src/main/java/org/apache/spark/sql/BaseRow.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
import scala.collection.mutable.ArraySeq;

import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.catalyst.expressions.GenericRow;
import org.apache.spark.sql.catalyst.expressions.GenericInternalRow;
import org.apache.spark.sql.types.StructType;

public abstract class BaseRow extends InternalRow {
Expand Down Expand Up @@ -162,7 +162,7 @@ public InternalRow copy() {
for (int i = 0; i < n; i++) {
arr[i] = get(i);
}
return new GenericRow(arr);
return new GenericInternalRow(arr);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ object CatalystTypeConverters {
ar(idx) = converters(idx).toCatalyst(row(idx))
idx += 1
}
new GenericRowWithSchema(ar, structType)
new GenericInternalRow(ar)

case p: Product =>
val ar = new Array[Any](structType.size)
Expand All @@ -237,7 +237,7 @@ object CatalystTypeConverters {
ar(idx) = converters(idx).toCatalyst(iter.next())
idx += 1
}
new GenericRowWithSchema(ar, structType)
new GenericInternalRow(ar)
}

override def toScala(row: InternalRow): Row = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@ import org.apache.spark.sql.catalyst.expressions._
* internal types.
*/
abstract class InternalRow extends Row {

override def getString(i: Int): String = {throw new UnsupportedOperationException}

// A default implementation to change the return type
override def copy(): InternalRow = this

Expand Down Expand Up @@ -93,27 +96,15 @@ abstract class InternalRow extends Row {
}

object InternalRow {
def unapplySeq(row: InternalRow): Some[Seq[Any]] = Some(row.toSeq)

/**
* This method can be used to construct a [[Row]] with the given values.
*/
def apply(values: Any*): InternalRow = new GenericRow(values.toArray)
def apply(values: Any*): InternalRow = new GenericInternalRow(values.toArray)

/**
* This method can be used to construct a [[Row]] from a [[Seq]] of values.
*/
def fromSeq(values: Seq[Any]): InternalRow = new GenericRow(values.toArray)

def fromTuple(tuple: Product): InternalRow = fromSeq(tuple.productIterator.toSeq)

/**
* Merge multiple rows into a single row, one after another.
*/
def merge(rows: InternalRow*): InternalRow = {
// TODO: Improve the performance of this if used in performance critical part.
new GenericRow(rows.flatMap(_.toSeq).toArray)
}
def fromSeq(values: Seq[Any]): InternalRow = new GenericInternalRow(values.toArray)

/** Returns an empty row. */
val empty = apply()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ class InterpretedProjection(expressions: Seq[Expression]) extends Projection {
outputArray(i) = exprArray(i).eval(input)
i += 1
}
new GenericRow(outputArray)
new GenericInternalRow(outputArray)
}

override def toString: String = s"Row => [${exprArray.mkString(",")}]"
Expand Down Expand Up @@ -135,12 +135,6 @@ class JoinedRow extends InternalRow {
override def getFloat(i: Int): Float =
if (i < row1.length) row1.getFloat(i) else row2.getFloat(i - row1.length)

override def getString(i: Int): String =
if (i < row1.length) row1.getString(i) else row2.getString(i - row1.length)

override def getAs[T](i: Int): T =
if (i < row1.length) row1.getAs[T](i) else row2.getAs[T](i - row1.length)

override def copy(): InternalRow = {
val totalSize = row1.length + row2.length
val copiedValues = new Array[Any](totalSize)
Expand All @@ -149,7 +143,7 @@ class JoinedRow extends InternalRow {
copiedValues(i) = apply(i)
i += 1
}
new GenericRow(copiedValues)
new GenericInternalRow(copiedValues)
}

override def toString: String = {
Expand Down Expand Up @@ -235,12 +229,6 @@ class JoinedRow2 extends InternalRow {
override def getFloat(i: Int): Float =
if (i < row1.length) row1.getFloat(i) else row2.getFloat(i - row1.length)

override def getString(i: Int): String =
if (i < row1.length) row1.getString(i) else row2.getString(i - row1.length)

override def getAs[T](i: Int): T =
if (i < row1.length) row1.getAs[T](i) else row2.getAs[T](i - row1.length)

override def copy(): InternalRow = {
val totalSize = row1.length + row2.length
val copiedValues = new Array[Any](totalSize)
Expand All @@ -249,7 +237,7 @@ class JoinedRow2 extends InternalRow {
copiedValues(i) = apply(i)
i += 1
}
new GenericRow(copiedValues)
new GenericInternalRow(copiedValues)
}

override def toString: String = {
Expand Down Expand Up @@ -329,12 +317,6 @@ class JoinedRow3 extends InternalRow {
override def getFloat(i: Int): Float =
if (i < row1.length) row1.getFloat(i) else row2.getFloat(i - row1.length)

override def getString(i: Int): String =
if (i < row1.length) row1.getString(i) else row2.getString(i - row1.length)

override def getAs[T](i: Int): T =
if (i < row1.length) row1.getAs[T](i) else row2.getAs[T](i - row1.length)

override def copy(): InternalRow = {
val totalSize = row1.length + row2.length
val copiedValues = new Array[Any](totalSize)
Expand All @@ -343,7 +325,7 @@ class JoinedRow3 extends InternalRow {
copiedValues(i) = apply(i)
i += 1
}
new GenericRow(copiedValues)
new GenericInternalRow(copiedValues)
}

override def toString: String = {
Expand Down Expand Up @@ -423,12 +405,6 @@ class JoinedRow4 extends InternalRow {
override def getFloat(i: Int): Float =
if (i < row1.length) row1.getFloat(i) else row2.getFloat(i - row1.length)

override def getString(i: Int): String =
if (i < row1.length) row1.getString(i) else row2.getString(i - row1.length)

override def getAs[T](i: Int): T =
if (i < row1.length) row1.getAs[T](i) else row2.getAs[T](i - row1.length)

override def copy(): InternalRow = {
val totalSize = row1.length + row2.length
val copiedValues = new Array[Any](totalSize)
Expand All @@ -437,7 +413,7 @@ class JoinedRow4 extends InternalRow {
copiedValues(i) = apply(i)
i += 1
}
new GenericRow(copiedValues)
new GenericInternalRow(copiedValues)
}

override def toString: String = {
Expand Down Expand Up @@ -517,12 +493,6 @@ class JoinedRow5 extends InternalRow {
override def getFloat(i: Int): Float =
if (i < row1.length) row1.getFloat(i) else row2.getFloat(i - row1.length)

override def getString(i: Int): String =
if (i < row1.length) row1.getString(i) else row2.getString(i - row1.length)

override def getAs[T](i: Int): T =
if (i < row1.length) row1.getAs[T](i) else row2.getAs[T](i - row1.length)

override def copy(): InternalRow = {
val totalSize = row1.length + row2.length
val copiedValues = new Array[Any](totalSize)
Expand All @@ -531,7 +501,7 @@ class JoinedRow5 extends InternalRow {
copiedValues(i) = apply(i)
i += 1
}
new GenericRow(copiedValues)
new GenericInternalRow(copiedValues)
}

override def toString: String = {
Expand Down Expand Up @@ -611,12 +581,6 @@ class JoinedRow6 extends InternalRow {
override def getFloat(i: Int): Float =
if (i < row1.length) row1.getFloat(i) else row2.getFloat(i - row1.length)

override def getString(i: Int): String =
if (i < row1.length) row1.getString(i) else row2.getString(i - row1.length)

override def getAs[T](i: Int): T =
if (i < row1.length) row1.getAs[T](i) else row2.getAs[T](i - row1.length)

override def copy(): InternalRow = {
val totalSize = row1.length + row2.length
val copiedValues = new Array[Any](totalSize)
Expand All @@ -625,7 +589,7 @@ class JoinedRow6 extends InternalRow {
copiedValues(i) = apply(i)
i += 1
}
new GenericRow(copiedValues)
new GenericInternalRow(copiedValues)
}

override def toString: String = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ final class SpecificMutableRow(val values: Array[MutableValue]) extends MutableR
i += 1
}

new GenericRow(newValues)
new GenericInternalRow(newValues)
}

override def update(ordinal: Int, value: Any) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package org.apache.spark.sql.catalyst.expressions

import scala.collection.Map

import org.apache.spark.sql.catalyst
import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.{CatalystTypeConverters, trees}
import org.apache.spark.sql.types._

Expand Down Expand Up @@ -68,19 +68,19 @@ abstract class Generator extends Expression {
*/
case class UserDefinedGenerator(
elementTypes: Seq[(DataType, Boolean)],
function: InternalRow => TraversableOnce[InternalRow],
function: Row => TraversableOnce[InternalRow],
children: Seq[Expression])
extends Generator {

@transient private[this] var inputRow: InterpretedProjection = _
@transient private[this] var convertToScala: (InternalRow) => InternalRow = _
@transient private[this] var convertToScala: (InternalRow) => Row = _

private def initializeConverters(): Unit = {
inputRow = new InterpretedProjection(children)
convertToScala = {
val inputSchema = StructType(children.map(e => StructField(e.simpleString, e.dataType, true)))
CatalystTypeConverters.createToScalaConverter(inputSchema)
}.asInstanceOf[(InternalRow => InternalRow)]
}.asInstanceOf[InternalRow => Row]
}

override def eval(input: InternalRow): TraversableOnce[InternalRow] = {
Expand Down Expand Up @@ -113,10 +113,11 @@ case class Explode(child: Expression)
child.dataType match {
case ArrayType(_, _) =>
val inputArray = child.eval(input).asInstanceOf[Seq[Any]]
if (inputArray == null) Nil else inputArray.map(v => new GenericRow(Array(v)))
if (inputArray == null) Nil else inputArray.map(v => InternalRow(v))
case MapType(_, _, _) =>
val inputMap = child.eval(input).asInstanceOf[Map[Any, Any]]
if (inputMap == null) Nil else inputMap.map { case (k, v) => new GenericRow(Array(k, v)) }
if (inputMap == null) Nil
else inputMap.map { case (k, v) => InternalRow(k, v) }
}
}

Expand Down
Loading

0 comments on commit 9d24350

Please sign in to comment.