Skip to content
This repository has been archived by the owner on Sep 18, 2023. It is now read-only.

Commit

Permalink
[OAP-2408][oap-native-sql]Support SMJ in WholeStageCodeGen (#2049)
Browse files Browse the repository at this point in the history
* [oap-native-sql] (scala)Support SMJ in WSCG

Signed-off-by: Chendi Xue <chendi.xue@intel.com>

* [oap-native-sql] (CPP)Support SMJ in WSCG

Signed-off-by: Chendi Xue <chendi.xue@intel.com>

* [oap-native-sql] Fixes some small issues in smjwscg

Signed-off-by: Chendi Xue <chendi.xue@intel.com>

* [oap-native-sql] fix to fallback existence and fullouter in smj

Signed-off-by: Chendi Xue <chendi.xue@intel.com>
  • Loading branch information
xuechendi committed Dec 17, 2020
1 parent e8b63d8 commit 9104398
Show file tree
Hide file tree
Showing 29 changed files with 3,990 additions and 605 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,27 @@ import org.apache.spark.sql.{SparkSession, SparkSessionExtensions}
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.execution._
import org.apache.spark.sql.execution.adaptive.{BroadcastQueryStageExec, ColumnarCustomShuffleReaderExec, CustomShuffleReaderExec, ShuffleQueryStageExec}
import org.apache.spark.sql.execution.adaptive.{
BroadcastQueryStageExec,
ColumnarCustomShuffleReaderExec,
CustomShuffleReaderExec,
ShuffleQueryStageExec
}
import org.apache.spark.sql.execution.aggregate.HashAggregateExec
import org.apache.spark.sql.execution.datasources.v2.BatchScanExec
import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, ReusedExchangeExec, ShuffleExchangeExec}
import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, BuildLeft, BuildRight, ShuffledHashJoinExec, SortMergeJoinExec, _}
import org.apache.spark.sql.execution.exchange.{
BroadcastExchangeExec,
ReusedExchangeExec,
ShuffleExchangeExec
}
import org.apache.spark.sql.execution.joins.{
BroadcastHashJoinExec,
BuildLeft,
BuildRight,
ShuffledHashJoinExec,
SortMergeJoinExec,
_
}
import org.apache.spark.sql.execution.window.WindowExec
import org.apache.spark.sql.internal.SQLConf

Expand Down Expand Up @@ -320,7 +336,11 @@ case class ColumnarPreOverrides(conf: SparkConf) extends Rule[SparkPlan] {
}
logDebug(s"Columnar Processing for ${plan.getClass} is currently supported.")
try {
return new ColumnarWindowExec(plan.windowExpression, plan.partitionSpec, plan.orderSpec, child)
return new ColumnarWindowExec(
plan.windowExpression,
plan.partitionSpec,
plan.orderSpec,
child)
} catch {
case _: Throwable =>
logInfo("Columnar Window: Falling back to regular Window...")
Expand Down Expand Up @@ -544,7 +564,7 @@ case class ColumnarPreOverrides(conf: SparkConf) extends Rule[SparkPlan] {
}
case _ =>
replaceWithColumnarPlan(child)
})
})
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,6 @@ case class CoalesceBatchesExec(child: SparkPlan) extends UnaryExecNode {
var numRowsTotal: Long = _
val resultStructType =
StructType(output.map(a => StructField(a.name, a.dataType, a.nullable, a.metadata)))
System.out.println(s"Coalecse schema is ${resultStructType}")

SparkMemoryUtils.addLeakSafeTaskCompletionListener[Unit] { _ =>
if (numBatchesTotal > 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,13 +92,20 @@ case class ColumnarConditionProjectExec(
Seq(child.executeColumnar())
}

override def getHashBuildPlans: Seq[SparkPlan] = child match {
override def getBuildPlans: Seq[SparkPlan] = child match {
case c: ColumnarCodegenSupport if c.supportColumnarCodegen == true =>
c.getHashBuildPlans
c.getBuildPlans
case _ =>
Seq()
}

override def getStreamedLeafPlan: SparkPlan = child match {
case c: ColumnarCodegenSupport if c.supportColumnarCodegen == true =>
c.getStreamedLeafPlan
case _ =>
this
}

override def supportColumnarCodegen: Boolean = true

override def canEqual(that: Any): Boolean = false
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,14 +111,21 @@ case class ColumnarBroadcastHashJoinExec(
case _ =>
Seq(streamedPlan.executeColumnar())
}
override def getHashBuildPlans: Seq[SparkPlan] = streamedPlan match {
override def getBuildPlans: Seq[SparkPlan] = streamedPlan match {
case c: ColumnarCodegenSupport if c.supportColumnarCodegen == true =>
val childPlans = c.getHashBuildPlans
val childPlans = c.getBuildPlans
childPlans :+ this
case _ =>
Seq(this)
}

override def getStreamedLeafPlan: SparkPlan = streamedPlan match {
case c: ColumnarCodegenSupport if c.supportColumnarCodegen == true =>
c.getStreamedLeafPlan
case _ =>
this
}

override def dependentPlanCtx: ColumnarCodegenContext = {
val inputSchema = ConverterUtils.toArrowSchema(buildPlan.output)
ColumnarCodegenContext(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,14 +110,21 @@ case class ColumnarShuffledHashJoinExec(
case _ =>
Seq(streamedPlan.executeColumnar())
}
override def getHashBuildPlans: Seq[SparkPlan] = streamedPlan match {
override def getBuildPlans: Seq[SparkPlan] = streamedPlan match {
case c: ColumnarCodegenSupport if c.supportColumnarCodegen == true =>
val childPlans = c.getHashBuildPlans
val childPlans = c.getBuildPlans
childPlans :+ this
case _ =>
Seq(this)
}

override def getStreamedLeafPlan: SparkPlan = streamedPlan match {
case c: ColumnarCodegenSupport if c.supportColumnarCodegen == true =>
c.getStreamedLeafPlan
case _ =>
this
}

override def dependentPlanCtx: ColumnarCodegenContext = {
val inputSchema = ConverterUtils.toArrowSchema(buildPlan.output)
ColumnarCodegenContext(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,25 @@ package com.intel.oap.execution
import com.intel.oap.ColumnarPluginConfig
import com.intel.oap.expression._
import com.intel.oap.vectorized._
import com.google.common.collect.Lists
import java.util.concurrent.TimeUnit._

import org.apache.arrow.vector.ipc.message.ArrowFieldNode
import org.apache.arrow.vector.ipc.message.ArrowRecordBatch
import org.apache.arrow.vector.types.pojo.ArrowType
import org.apache.arrow.vector.types.pojo.Field
import org.apache.arrow.vector.types.pojo.Schema
import org.apache.arrow.gandiva.expression._
import org.apache.arrow.gandiva.evaluator._

import org.apache.spark.{SparkContext, SparkEnv, TaskContext}
import org.apache.spark.executor.TaskMetrics
import org.apache.spark.sql.execution._
import org.apache.spark.sql.catalyst.expressions.SortOrder
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.BoundReference
import org.apache.spark.sql.catalyst.expressions.BindReferences.bindReference
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.plans.physical._
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.execution.datasources.v2.arrow.SparkMemoryUtils
import org.apache.spark.util.{UserAddedJarUtils, Utils, ExecutorManager}
Expand All @@ -38,23 +48,34 @@ import org.apache.spark.sql.vectorized.{ColumnarBatch, ColumnVector}
/**
* Columnar Based SortExec.
*/
class ColumnarSortExec(
case class ColumnarSortExec(
sortOrder: Seq[SortOrder],
global: Boolean,
child: SparkPlan,
testSpillFrequency: Int = 0)
extends SortExec(sortOrder, global, child, testSpillFrequency) {
extends UnaryExecNode
with ColumnarCodegenSupport {

val sparkConf = sparkContext.getConf
val numaBindingInfo = ColumnarPluginConfig.getConf(sparkContext.getConf).numaBindingInfo
override def supportsColumnar = true
override protected def doExecute(): RDD[InternalRow] = {
throw new UnsupportedOperationException(s"ColumnarSortExec doesn't support doExecute")
}

override def output: Seq[Attribute] = child.output

override def outputOrdering: Seq[SortOrder] = sortOrder

// Disable code generation
override def supportCodegen: Boolean = false
override def outputPartitioning: Partitioning = child.outputPartitioning

override def requiredChildDistribution: Seq[Distribution] =
if (global) OrderedDistribution(sortOrder) :: Nil else UnspecifiedDistribution :: Nil

override lazy val metrics = Map(
"totalSortTime" -> SQLMetrics
.createTimingMetric(sparkContext, "totaltime_sort"),
"buildTime" -> SQLMetrics.createTimingMetric(sparkContext, "time in cache all data"),
"sortTime" -> SQLMetrics.createTimingMetric(sparkContext, "time in sort process"),
"shuffleTime" -> SQLMetrics.createTimingMetric(sparkContext, "time in shuffle process"),
"numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"),
Expand All @@ -66,14 +87,50 @@ class ColumnarSortExec(
val numOutputRows = longMetric("numOutputRows")
val numOutputBatches = longMetric("numOutputBatches")

/***************** WSCG related function ******************/
override def inputRDDs(): Seq[RDD[ColumnarBatch]] = child match {
case c: ColumnarCodegenSupport if c.supportColumnarCodegen == true =>
c.inputRDDs
case _ =>
Seq(child.executeColumnar())
}

override def supportColumnarCodegen: Boolean = true

override def getBuildPlans: Seq[SparkPlan] = child match {
case c: ColumnarCodegenSupport if c.supportColumnarCodegen == true =>
val childPlans = c.getBuildPlans
childPlans :+ this
case _ =>
Seq(this)
}

override def getStreamedLeafPlan: SparkPlan = child match {
case c: ColumnarCodegenSupport if c.supportColumnarCodegen == true =>
c.getStreamedLeafPlan
case _ =>
this
}

override def dependentPlanCtx: ColumnarCodegenContext = {
val inputSchema = ConverterUtils.toArrowSchema(child.output)
val outSchema = ConverterUtils.toArrowSchema(output)
ColumnarCodegenContext(
inputSchema,
outSchema,
ColumnarSorter.prepareKernelFunction(sortOrder, child.output, sparkConf, 1))
}

override def doCodeGen: ColumnarCodegenContext = null

/***********************************************************/
def getCodeGenSignature =
if (!sortOrder
.filter(
expr => bindReference(expr.child, child.output, true).isInstanceOf[BoundReference])
.isEmpty) {
.filter(
expr => bindReference(expr.child, child.output, true).isInstanceOf[BoundReference])
.isEmpty) {
ColumnarSorter.prebuild(
sortOrder,
true,
child.output,
sortTime,
numOutputBatches,
Expand Down Expand Up @@ -122,7 +179,6 @@ class ColumnarSortExec(
})
val sorter = ColumnarSorter.create(
sortOrder,
true,
child.output,
jarList,
sortTime,
Expand All @@ -132,8 +188,8 @@ class ColumnarSortExec(
elapse,
sparkConf)
SparkMemoryUtils.addLeakSafeTaskCompletionListener[Unit](_ => {
sorter.close()
})
sorter.close()
})
new CloseableColumnBatchIterator(sorter.createColumnarIterator(iter))
}
res
Expand Down
Loading

0 comments on commit 9104398

Please sign in to comment.