Skip to content

Commit

Permalink
Fixed a bug .
Browse files Browse the repository at this point in the history
  • Loading branch information
rxin committed Aug 1, 2015
1 parent 0f89716 commit 6269f96
Show file tree
Hide file tree
Showing 2 changed files with 62 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -58,14 +58,16 @@ object GenerateRowConcat extends CodeGenerator[(StructType, StructType), UnsafeR
// The only reduction comes from merging the bitset portion of the two rows, saving 1 word.
val sizeReduction = bitset1Words + bitset2Words - outputBitsetWords

// Copy bitset from row1: pretty straightforward
// --------------------- copy bitset from row 1 ----------------------- //
val copyBitset1 = Seq.tabulate(bitset1Words) { i =>
s"""
|PlatformDependent.UNSAFE.putLong(buf, ${offset + i * 8},
| PlatformDependent.UNSAFE.getLong(obj1, ${offset + i * 8}));
""".stripMargin
}.mkString


// --------------------- copy bitset from row 2 ----------------------- //
var copyBitset2 = ""
if (bitset1Remainder == 0) {
copyBitset2 += Seq.tabulate(bitset2Words) { i =>
Expand All @@ -75,8 +77,6 @@ object GenerateRowConcat extends CodeGenerator[(StructType, StructType), UnsafeR
""".stripMargin
}.mkString
} else {
// Copy bitset from row2: slightly more complicated here, as we need to use the leftover
// space from the first bitset.
copyBitset2 = Seq.tabulate(bitset2Words) { i =>
s"""
|long bs2w$i = PlatformDependent.UNSAFE.getLong(obj2, ${offset + i * 8});
Expand Down Expand Up @@ -105,7 +105,8 @@ object GenerateRowConcat extends CodeGenerator[(StructType, StructType), UnsafeR
}
}.mkString("\n")

if (bitset2Remainder > (64 - bitset1Remainder)) {
if (bitset2Words > 0 &&
(bitset2Remainder == 0 || bitset2Remainder > (64 - bitset1Remainder))) {
val lastWord = bitset2Words - 1
copyBitset2 +=
s"""
Expand All @@ -115,6 +116,38 @@ object GenerateRowConcat extends CodeGenerator[(StructType, StructType), UnsafeR
}
}

// --------------------- copy fixed length portion from row 1 ----------------------- //
val copyFixedLengthRow1 = s"""
|PlatformDependent.UNSAFE.copyMemory(
| obj1, offset1 + ${bitset1Words * 8},
| buf, offset + ${outputBitsetWords * 8},
| ${schema1.size * 8});
""".stripMargin

// --------------------- copy fixed length portion from row 2 ----------------------- //
val copyFixedLengthRow2 = s"""
|PlatformDependent.UNSAFE.copyMemory(
| obj2, offset2 + ${bitset1Words * 8},
| buf, offset + ${(outputBitsetWords + schema1.size) * 8},
| ${schema2.size * 8});
""".stripMargin

// --------------------- copy variable length portion from row 1 ----------------------- //
val copyVariableLengthRow1 = s"""
|PlatformDependent.UNSAFE.copyMemory(
| obj1, offset1 + ${(bitset1Words + schema1.size) * 8},
| buf, offset + ${(outputBitsetWords + schema1.size + schema2.size) * 8},
| row1.getSizeInBytes() - ${(bitset1Words + schema1.size) * 8});
""".stripMargin

// --------------------- copy variable length portion from row 2 ----------------------- //
val copyVariableLengthRow2 = s"""
|PlatformDependent.UNSAFE.copyMemory(
| obj1, offset1 + ${(outputBitsetWords + schema1.size + schema2.size) * 8},
| buf, offset + ${(outputBitsetWords + schema1.size + schema2.size) * 8},
| ${schema1.size * 8});
""".stripMargin


val code = s"""
|public Object generate($exprType[] exprs) {
Expand Down Expand Up @@ -151,6 +184,8 @@ object GenerateRowConcat extends CodeGenerator[(StructType, StructType), UnsafeR

logDebug(s"code for GenerateRowConcat($schema1, $schema2):\n${CodeFormatter.format(code)}")

// println(CodeFormatter.format(code))

val c = compile(code)
c.generate(Array.empty).asInstanceOf[UnsafeRowConcat]
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,12 @@ class GenerateRowConcatSuite extends SparkFunSuite {
}

private def testBitsets(numFields1: Int, numFields2: Int): Unit = {
for (i <- 0 until 5) {
testBitsetsOnce(numFields1, numFields2)
}
}

private def testBitsetsOnce(numFields1: Int, numFields2: Int): Unit = {
val schema1 = StructType(Seq.tabulate(numFields1) { i => StructField(s"a_$i", IntegerType) })
val schema2 = StructType(Seq.tabulate(numFields2) { i => StructField(s"b_$i", IntegerType) })

Expand Down Expand Up @@ -64,8 +70,8 @@ class GenerateRowConcatSuite extends SparkFunSuite {

s"""
|input1: ${set1.mkString}
|input2: ${set2.mkString}
|output: ${out.mkString}
|input2: ${set2.mkString}
|output: ${out.mkString}
""".stripMargin
}

Expand All @@ -78,62 +84,63 @@ class GenerateRowConcatSuite extends SparkFunSuite {
}
}

test("boundary size 0, 0") {
test("bitset concat: boundary size 0, 0") {
testBitsets(0, 0)
}

test("boundary size 0, 64") {
test("bitset concat: boundary size 0, 64") {
testBitsets(0, 64)
}

test("boundary size 64, 0") {
test("bitset concat: boundary size 64, 0") {
testBitsets(64, 0)
}

test("boundary size 64, 64") {
test("bitset concat: boundary size 64, 64") {
testBitsets(64, 64)
}

test("boundary size 0, 128") {
test("bitset concat: boundary size 0, 128") {
testBitsets(0, 128)
}

test("boundary size 128, 0") {
test("bitset concat: boundary size 128, 0") {
testBitsets(128, 0)
}

test("boundary size 128, 128") {
test("bitset concat: boundary size 128, 128") {
testBitsets(128, 128)
}

test("single word bitsets") {
test("bitset concat: single word bitsets") {
testBitsets(10, 5)
}

test("first bitset larger than a word") {
test("bitset concat: first bitset larger than a word") {
testBitsets(67, 5)
}

test("second bitset larger than a word") {
test("bitset concat: second bitset larger than a word") {
testBitsets(6, 67)
}

test("no reduction in bitset size") {
test("bitset concat: no reduction in bitset size") {
testBitsets(33, 34)
}

test("two words") {
test("bitset concat: two words") {
testBitsets(120, 95)
}

test("bitset 595, 960") {
test("bitset concat: bitset 65, 128") {
testBitsets(65, 128)
}

test("randomized tests") {
test("bitset concat: randomized tests") {
for (i <- 1 until 20) {
val numFields1 = Random.nextInt(1000)
val numFields2 = Random.nextInt(1000)
info(s"num fields: $numFields1 and $numFields2")
testBitsets(numFields1, numFields2)
}
}
Expand Down

0 comments on commit 6269f96

Please sign in to comment.