Skip to content

Commit

Permalink
address comment
Browse files Browse the repository at this point in the history
  • Loading branch information
Davies Liu committed Jun 18, 2015
1 parent 6ad2a90 commit d96929b
Show file tree
Hide file tree
Showing 3 changed files with 28 additions and 25 deletions.
2 changes: 1 addition & 1 deletion python/pyspark/streaming/dstream.py
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ def takeAndPrint(time, rdd):
print(record)
if len(taken) > num:
print("...")
print()
print("")

self.foreachRDD(takeAndPrint)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,35 +26,38 @@ import org.apache.spark.sql.catalyst.expressions._
*/
abstract class InternalRow extends Row {
// A default implementation to change the return type
override def copy(): InternalRow = {this}
override def copy(): InternalRow = this

// A default version (slow), used for tests
override def equals(o: Any): Boolean = o match {
case other: InternalRow =>
if (length != other.length) {
override def equals(o: Any): Boolean = {
if (!o.isInstanceOf[Row]) {
return false
}

val other = o.asInstanceOf[Row]
if (length != other.length) {
return false
}

for (i <- 0 until length) {
if (isNullAt(i) != other.isNullAt(i)) {
return false
}

for (i <- 0 until length) {
if (isNullAt(i) != other.isNullAt(i)) {
return false
}
if (!isNullAt(i)) {
val o1 = apply(i)
val o2 = other.apply(i)
if (o1.isInstanceOf[Array[Byte]]) {
val b1 = o1.asInstanceOf[Array[Byte]]
if (!o2.isInstanceOf[Array[Byte]] ||
!java.util.Arrays.equals(b1, o2.asInstanceOf[Array[Byte]])) {
return false
}
} else if (o1 != o2) {
if (!isNullAt(i)) {
val o1 = apply(i)
val o2 = other.apply(i)
if (o1.isInstanceOf[Array[Byte]]) {
// handle equality of Array[Byte]
val b1 = o1.asInstanceOf[Array[Byte]]
if (!o2.isInstanceOf[Array[Byte]] ||
!java.util.Arrays.equals(b1, o2.asInstanceOf[Array[Byte]])) {
return false
}
} else if (o1 != o2) {
return false
}
}
true
case _ => false
}
true
}

// Custom hashCode function that matches the efficient code generated version.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ trait ExpressionEvalHelper {
val actual = try evaluate(expression, inputRow) catch {
case e: Exception => fail(s"Exception evaluating $expression", e)
}
if (actual !== expected) {
if (actual != expected) {
val input = if (inputRow == EmptyRow) "" else s", input: $inputRow"
fail(s"Incorrect evaluation (codegen off): $expression, " +
s"actual: $actual, " +
Expand Down Expand Up @@ -83,7 +83,7 @@ trait ExpressionEvalHelper {
}

val actual = plan(inputRow).apply(0)
if (actual !== expected) {
if (actual != expected) {
val input = if (inputRow == EmptyRow) "" else s", input: $inputRow"
fail(s"Incorrect Evaluation: $expression, actual: $actual, expected: $expected$input")
}
Expand Down

0 comments on commit d96929b

Please sign in to comment.