diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateRowConcat.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateRowConcat.scala index 16eb4cbcf8188..c7e06bb961e8c 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateRowConcat.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateRowConcat.scala @@ -58,7 +58,7 @@ object GenerateRowConcat extends CodeGenerator[(StructType, StructType), UnsafeR // The only reduction comes from merging the bitset portion of the two rows, saving 1 word. val sizeReduction = bitset1Words + bitset2Words - outputBitsetWords - // Copy bitset from row1: pretty straightforward + // --------------------- copy bitset from row 1 ----------------------- // val copyBitset1 = Seq.tabulate(bitset1Words) { i => s""" |PlatformDependent.UNSAFE.putLong(buf, ${offset + i * 8}, @@ -66,6 +66,8 @@ object GenerateRowConcat extends CodeGenerator[(StructType, StructType), UnsafeR """.stripMargin }.mkString + + // --------------------- copy bitset from row 2 ----------------------- // var copyBitset2 = "" if (bitset1Remainder == 0) { copyBitset2 += Seq.tabulate(bitset2Words) { i => @@ -75,8 +77,6 @@ object GenerateRowConcat extends CodeGenerator[(StructType, StructType), UnsafeR """.stripMargin }.mkString } else { - // Copy bitset from row2: slightly more complicated here, as we need to use the leftover - // space from the first bitset. copyBitset2 = Seq.tabulate(bitset2Words) { i => s""" |long bs2w$i = PlatformDependent.UNSAFE.getLong(obj2, ${offset + i * 8}); @@ -105,7 +105,8 @@ object GenerateRowConcat extends CodeGenerator[(StructType, StructType), UnsafeR } }.mkString("\n") - if (bitset2Remainder > (64 - bitset1Remainder)) { + if (bitset2Words > 0 && + (bitset2Remainder == 0 || bitset2Remainder > (64 - bitset1Remainder))) { val lastWord = bitset2Words - 1 copyBitset2 += s""" @@ -115,6 +116,38 @@ object GenerateRowConcat extends CodeGenerator[(StructType, StructType), UnsafeR } } + // --------------------- copy fixed length portion from row 1 ----------------------- // + val copyFixedLengthRow1 = s""" + |PlatformDependent.UNSAFE.copyMemory( + | obj1, offset1 + ${bitset1Words * 8}, + | buf, offset + ${outputBitsetWords * 8}, + | ${schema1.size * 8}); + """.stripMargin + + // --------------------- copy fixed length portion from row 2 ----------------------- // + val copyFixedLengthRow2 = s""" + |PlatformDependent.UNSAFE.copyMemory( + | obj2, offset2 + ${bitset1Words * 8}, + | buf, offset + ${(outputBitsetWords + schema1.size) * 8}, + | ${schema2.size * 8}); + """.stripMargin + + // --------------------- copy variable length portion from row 1 ----------------------- // + val copyVariableLengthRow1 = s""" + |PlatformDependent.UNSAFE.copyMemory( + | obj1, offset1 + ${(bitset1Words + schema1.size) * 8}, + | buf, offset + ${(outputBitsetWords + schema1.size + schema2.size) * 8}, + | row1.getSizeInBytes() - ${(bitset1Words + schema1.size) * 8}); + """.stripMargin + + // --------------------- copy variable length portion from row 2 ----------------------- // + val copyVariableLengthRow2 = s""" + |PlatformDependent.UNSAFE.copyMemory( + | obj1, offset1 + ${(outputBitsetWords + schema1.size + schema2.size) * 8}, + | buf, offset + ${(outputBitsetWords + schema1.size + schema2.size) * 8}, + | ${schema1.size * 8}); + """.stripMargin + val code = s""" |public Object generate($exprType[] exprs) { @@ -151,6 +184,8 @@ object GenerateRowConcat extends CodeGenerator[(StructType, StructType), UnsafeR logDebug(s"code for GenerateRowConcat($schema1, $schema2):\n${CodeFormatter.format(code)}") + // println(CodeFormatter.format(code)) + val c = compile(code) c.generate(Array.empty).asInstanceOf[UnsafeRowConcat] } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateRowConcatSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateRowConcatSuite.scala index 591c5763fe1d7..b2f681c0fcaeb 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateRowConcatSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateRowConcatSuite.scala @@ -37,6 +37,12 @@ class GenerateRowConcatSuite extends SparkFunSuite { } private def testBitsets(numFields1: Int, numFields2: Int): Unit = { + for (i <- 0 until 5) { + testBitsetsOnce(numFields1, numFields2) + } + } + + private def testBitsetsOnce(numFields1: Int, numFields2: Int): Unit = { val schema1 = StructType(Seq.tabulate(numFields1) { i => StructField(s"a_$i", IntegerType) }) val schema2 = StructType(Seq.tabulate(numFields2) { i => StructField(s"b_$i", IntegerType) }) @@ -64,8 +70,8 @@ class GenerateRowConcatSuite extends SparkFunSuite { s""" |input1: ${set1.mkString} - |input2: ${set2.mkString} - |output: ${out.mkString} + |input2: ${set2.mkString} + |output: ${out.mkString} """.stripMargin } @@ -78,62 +84,63 @@ class GenerateRowConcatSuite extends SparkFunSuite { } } - test("boundary size 0, 0") { + test("bitset concat: boundary size 0, 0") { testBitsets(0, 0) } - test("boundary size 0, 64") { + test("bitset concat: boundary size 0, 64") { testBitsets(0, 64) } - test("boundary size 64, 0") { + test("bitset concat: boundary size 64, 0") { testBitsets(64, 0) } - test("boundary size 64, 64") { + test("bitset concat: boundary size 64, 64") { testBitsets(64, 64) } - test("boundary size 0, 128") { + test("bitset concat: boundary size 0, 128") { testBitsets(0, 128) } - test("boundary size 128, 0") { + test("bitset concat: boundary size 128, 0") { testBitsets(128, 0) } - test("boundary size 128, 128") { + test("bitset concat: boundary size 128, 128") { testBitsets(128, 128) } - test("single word bitsets") { + test("bitset concat: single word bitsets") { testBitsets(10, 5) } - test("first bitset larger than a word") { + test("bitset concat: first bitset larger than a word") { testBitsets(67, 5) } - test("second bitset larger than a word") { + test("bitset concat: second bitset larger than a word") { testBitsets(6, 67) } - test("no reduction in bitset size") { + test("bitset concat: no reduction in bitset size") { testBitsets(33, 34) } - test("two words") { + test("bitset concat: two words") { testBitsets(120, 95) } - test("bitset 595, 960") { + test("bitset concat: bitset 65, 128") { testBitsets(65, 128) } - test("randomized tests") { + test("bitset concat: randomized tests") { for (i <- 1 until 20) { val numFields1 = Random.nextInt(1000) val numFields2 = Random.nextInt(1000) + info(s"num fields: $numFields1 and $numFields2") testBitsets(numFields1, numFields2) } }