Skip to content
This repository has been archived by the owner on Jan 3, 2023. It is now read-only.

[ sql-ds-cache-121] Fix Index stuck issues #122

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -74,14 +74,14 @@ abstract class BTreeIndexRecordWriter(
@transient private lazy val genericProjector = SafeProjection.create(keySchema)
@transient protected lazy val nnkw = new NonNullKeyWriter(keySchema)

private val combiner: Int => Seq[Int] = Seq(_)
private val merger: (Seq[Int], Int) => Seq[Int] = _ :+ _
private val mergeCombiner: (Seq[Int], Seq[Int]) => Seq[Int] = _ ++ _
private val combiner: Int => ArrayBuffer[Int] = ArrayBuffer[Int](_)
private val merger: (ArrayBuffer[Int], Int) => ArrayBuffer[Int] = _ += _
private val mergeCombiner: (ArrayBuffer[Int], ArrayBuffer[Int]) => ArrayBuffer[Int] = _ ++= _
private val aggregator =
new Aggregator[InternalRow, Int, Seq[Int]](combiner, merger, mergeCombiner)
new Aggregator[InternalRow, Int, ArrayBuffer[Int]](combiner, merger, mergeCombiner)
private val externalSorter = {
val taskContext = TaskContext.get()
val sorter = new OapExternalSorter[InternalRow, Int, Seq[Int]](
val sorter = new OapExternalSorter[InternalRow, Int, ArrayBuffer[Int]](
taskContext, Some(aggregator), Some(ordering))
taskContext.addTaskCompletionListener[Unit](_ => sorter.stop())
sorter
Expand Down Expand Up @@ -260,7 +260,7 @@ abstract class BTreeIndexRecordWriter(
* @return BTreeNodeMetaData
*/
private def serializeNode(
uniqueKeys: Array[Product2[InternalRow, Seq[Int]]],
uniqueKeys: Array[Product2[InternalRow, ArrayBuffer[Int]]],
initRowPos: Int,
rowIdListWriter: IndexFileWriter,
rowIdListBuffer: ByteArrayOutputStream): BTreeNodeMetaData = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,7 @@ private[oap] class PartByValueStatisticsWriter(schema: StructType, conf: Configu

// This should provide the same function to get the metas as buildPartMeta().
// And this will be used when using the oapExternalSorter data
def buildMetas(keyArray: Array[Product2[Key, Seq[Int]]], isLast: Boolean): Unit = {
def buildMetas(keyArray: Array[Product2[Key, ArrayBuffer[Int]]], isLast: Boolean): Unit = {
var kv: Product2[Key, Seq[Int]] = null
if (keyArray != null && keyArray.size != 0) {
keyArray.foreach(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ private[oap] class SampleBasedStatisticsWriter(schema: StructType, conf: Configu
}
}

def buildSampleArray(keyArray: Array[Product2[Key, Seq[Int]]], isLast: Boolean): Unit = {
def buildSampleArray(keyArray: Array[Product2[Key, ArrayBuffer[Int]]], isLast: Boolean): Unit = {
keyArray.foreach(
value => {
value._2.foreach(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ class PartByValueStatisticsSuite extends StatisticsTest {
val keys = (1 to 300).map(i => rowGen(i)).toArray

val product2Keys = keys.map(v => (v, Seq(1)))
.asInstanceOf[Array[Product2[Key, Seq[Int]]]]
.asInstanceOf[Array[Product2[Key, ArrayBuffer[Int]]]]

val testPartByValueWriter = new TestPartByValueWriter(schema)
testPartByValueWriter.initParams(product2Keys.size)
Expand Down Expand Up @@ -213,7 +213,7 @@ class PartByValueStatisticsSuite extends StatisticsTest {
val keys = (1 to 300).map(i => rowGen(i)).toArray

val product2Keys = keys.map(v => (v, Seq(1)))
.asInstanceOf[Array[Product2[Key, Seq[Int]]]]
.asInstanceOf[Array[Product2[Key, ArrayBuffer[Int]]]]

val testPartByValueWriter = new TestPartByValueWriter(schema)
testPartByValueWriter.initParams(product2Keys.size)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ class SampleBasedStatisticsSuite extends StatisticsTest {
val keys = (1 to 300).map(i => rowGen(i)).toArray

val product2Keys = keys.map(v => (v, Seq(1)))
.asInstanceOf[Array[Product2[Key, Seq[Int]]]]
.asInstanceOf[Array[Product2[Key, ArrayBuffer[Int]]]]

val testSampleWriter = new TestSampleWriter(schema)
testSampleWriter.initParams(product2Keys.size)
Expand All @@ -192,7 +192,7 @@ class SampleBasedStatisticsSuite extends StatisticsTest {
val keys = (1 to 300).map(i => rowGen(i)).toArray

val product2Keys = keys.map(v => (v, Seq(1)))
.asInstanceOf[Array[Product2[Key, Seq[Int]]]]
.asInstanceOf[Array[Product2[Key, ArrayBuffer[Int]]]]

val testSampleWriter = new TestSampleWriter(schema)
testSampleWriter.initParams(product2Keys.size)
Expand Down