Skip to content
This repository has been archived by the owner on Jan 3, 2023. It is now read-only.

Commit

Permalink
Fix Index stuck issues (#122)
Browse files Browse the repository at this point in the history
  • Loading branch information
zhixingheyi-tian committed May 24, 2021
1 parent 264bb1e commit 4622640
Show file tree
Hide file tree
Showing 5 changed files with 12 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -74,14 +74,14 @@ abstract class BTreeIndexRecordWriter(
@transient private lazy val genericProjector = SafeProjection.create(keySchema)
@transient protected lazy val nnkw = new NonNullKeyWriter(keySchema)

private val combiner: Int => Seq[Int] = Seq(_)
private val merger: (Seq[Int], Int) => Seq[Int] = _ :+ _
private val mergeCombiner: (Seq[Int], Seq[Int]) => Seq[Int] = _ ++ _
private val combiner: Int => ArrayBuffer[Int] = ArrayBuffer[Int](_)
private val merger: (ArrayBuffer[Int], Int) => ArrayBuffer[Int] = _ += _
private val mergeCombiner: (ArrayBuffer[Int], ArrayBuffer[Int]) => ArrayBuffer[Int] = _ ++= _
private val aggregator =
new Aggregator[InternalRow, Int, Seq[Int]](combiner, merger, mergeCombiner)
new Aggregator[InternalRow, Int, ArrayBuffer[Int]](combiner, merger, mergeCombiner)
private val externalSorter = {
val taskContext = TaskContext.get()
val sorter = new OapExternalSorter[InternalRow, Int, Seq[Int]](
val sorter = new OapExternalSorter[InternalRow, Int, ArrayBuffer[Int]](
taskContext, Some(aggregator), Some(ordering))
taskContext.addTaskCompletionListener[Unit](_ => sorter.stop())
sorter
Expand Down Expand Up @@ -260,7 +260,7 @@ abstract class BTreeIndexRecordWriter(
* @return BTreeNodeMetaData
*/
private def serializeNode(
uniqueKeys: Array[Product2[InternalRow, Seq[Int]]],
uniqueKeys: Array[Product2[InternalRow, ArrayBuffer[Int]]],
initRowPos: Int,
rowIdListWriter: IndexFileWriter,
rowIdListBuffer: ByteArrayOutputStream): BTreeNodeMetaData = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,7 @@ private[oap] class PartByValueStatisticsWriter(schema: StructType, conf: Configu

// This should provide the same function to get the metas as buildPartMeta().
// And this will be used when using the oapExternalSorter data
def buildMetas(keyArray: Array[Product2[Key, Seq[Int]]], isLast: Boolean): Unit = {
def buildMetas(keyArray: Array[Product2[Key, ArrayBuffer[Int]]], isLast: Boolean): Unit = {
var kv: Product2[Key, Seq[Int]] = null
if (keyArray != null && keyArray.size != 0) {
keyArray.foreach(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ private[oap] class SampleBasedStatisticsWriter(schema: StructType, conf: Configu
}
}

def buildSampleArray(keyArray: Array[Product2[Key, Seq[Int]]], isLast: Boolean): Unit = {
def buildSampleArray(keyArray: Array[Product2[Key, ArrayBuffer[Int]]], isLast: Boolean): Unit = {
keyArray.foreach(
value => {
value._2.foreach(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ class PartByValueStatisticsSuite extends StatisticsTest {
val keys = (1 to 300).map(i => rowGen(i)).toArray

val product2Keys = keys.map(v => (v, Seq(1)))
.asInstanceOf[Array[Product2[Key, Seq[Int]]]]
.asInstanceOf[Array[Product2[Key, ArrayBuffer[Int]]]]

val testPartByValueWriter = new TestPartByValueWriter(schema)
testPartByValueWriter.initParams(product2Keys.size)
Expand Down Expand Up @@ -213,7 +213,7 @@ class PartByValueStatisticsSuite extends StatisticsTest {
val keys = (1 to 300).map(i => rowGen(i)).toArray

val product2Keys = keys.map(v => (v, Seq(1)))
.asInstanceOf[Array[Product2[Key, Seq[Int]]]]
.asInstanceOf[Array[Product2[Key, ArrayBuffer[Int]]]]

val testPartByValueWriter = new TestPartByValueWriter(schema)
testPartByValueWriter.initParams(product2Keys.size)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ class SampleBasedStatisticsSuite extends StatisticsTest {
val keys = (1 to 300).map(i => rowGen(i)).toArray

val product2Keys = keys.map(v => (v, Seq(1)))
.asInstanceOf[Array[Product2[Key, Seq[Int]]]]
.asInstanceOf[Array[Product2[Key, ArrayBuffer[Int]]]]

val testSampleWriter = new TestSampleWriter(schema)
testSampleWriter.initParams(product2Keys.size)
Expand All @@ -192,7 +192,7 @@ class SampleBasedStatisticsSuite extends StatisticsTest {
val keys = (1 to 300).map(i => rowGen(i)).toArray

val product2Keys = keys.map(v => (v, Seq(1)))
.asInstanceOf[Array[Product2[Key, Seq[Int]]]]
.asInstanceOf[Array[Product2[Key, ArrayBuffer[Int]]]]

val testSampleWriter = new TestSampleWriter(schema)
testSampleWriter.initParams(product2Keys.size)
Expand Down

0 comments on commit 4622640

Please sign in to comment.