In [1]:
%%init_spark
launcher.jars = ["file:///opt/benchmark-tools/spark-sql-perf/target/scala-2.12/spark-sql-perf_2.12-0.5.1-SNAPSHOT.jar", "/opt/benchmark-tools/oap/oap_jars/gazelle-plugin-1.4.0-spark-3.1.1.jar"]
launcher.conf.set("spark.driver.extraClassPath", "/opt/benchmark-tools/oap/oap_jars/gazelle-plugin-1.4.0-spark-3.1.1.jar")
launcher.conf.set("spark.executor.extraClassPath", "/opt/benchmark-tools/oap/oap_jars/gazelle-plugin-1.4.0-spark-3.1.1.jar")
launcher.conf.set("spark.sql.warehouse.dir", "hdfs:///user/livy")
launcher.conf.set("spark.executorEnv.LD_LIBRARY_PATH", "/opt/benchmark-tools/oap/lib")
launcher.conf.set("spark.executor.extraLibraryPath", "/opt/benchmark-tools/oap/lib")
launcher.conf.set("spark.driver.extraLibraryPath", "/opt/benchmark-tools/oap/lib")
launcher.conf.set("spark.executorEnv.LIBARROW_DIR", "/opt/benchmark-tools/oap")
launcher.conf.set("spark.executorEnv.CC", "/opt/benchmark-tools/oap/bin/gcc")
launcher.conf.set("spark.sql.extensions", "com.intel.oap.ColumnarPlugin")
launcher.conf.set("spark.oap.sql.columnar.preferColumnar", "true")
launcher.conf.set("spark.sql.join.preferSortMergeJoin", "false")
launcher.conf.set("spark.oap.sql.columnar.joinOptimizationLevel", "12")
launcher.conf.set("spark.sql.broadcastTimeout", "3600")
launcher.conf.set("spark.executor.memoryOverhead", "2989")
launcher.conf.set("spark.dynamicAllocation.executorIdleTimeout", "3600s")
launcher.conf.set("spark.sql.autoBroadcastJoinThreshold", "31457280")
launcher.conf.set("spark.kryoserializer.buffer.max", "256m")
launcher.conf.set("spark.network.timeout", "3600s")
launcher.conf.set("spark.memory.offHeap.enabled", "false")
launcher.conf.set("spark.sql.inMemoryColumnarStorage.batchSize", "20480")
launcher.conf.set("spark.sql.sources.useV1SourceList", "avro")
launcher.conf.set("spark.sql.extensions", "com.intel.oap.ColumnarPlugin")
launcher.conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
launcher.conf.set("spark.dynamicAllocation.executorIdleTimeout", "3600s")
launcher.conf.set("spark.sql.execution.arrow.maxRecordsPerBatch", "20480")
launcher.conf.set("spark.kryoserializer.buffer", "64m")
launcher.conf.set("spark.sql.shuffle.partitions", "72")
launcher.conf.set("spark.sql.parquet.columnarReaderBatchSize", "20480")
launcher.conf.set("spark.shuffle.manager", "org.apache.spark.shuffle.sort.ColumnarShuffleManager")
launcher.conf.set("spark.sql.columnar.codegen.hashAggregate", "false")
launcher.conf.set("spark.memory.offHeap.size", "3g")


In [None]:
!hadoop fs -mkdir /user/livy

In [None]:
import java.text.SimpleDateFormat;
import java.util.Date
import java.util.concurrent.Executors
import java.util.concurrent.ExecutorService
import java.util.concurrent.TimeUnit
import com.databricks.spark.sql.perf.tpcds.TPCDS
import com.databricks.spark.sql.perf.Benchmark.ExperimentStatus
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.functions.{col, lit, substring}

val stream_num = 2              // how many streams you want to start 
val scaleFactor = "1"           // data scale 1GB
val iterations = 1              // how many times to run the whole set of queries.
val format = "parquet"          // support parquet or orc
val storage = "hdfs"            // choose HDFS
val bucket_name = "/user/livy"  // scala notebook only has the write permission of "hdfs:///user/livy" directory    
val partitionTables = true      // create partition tables
val query_filter = Seq()        // Seq() == all queries
//val query_filter = Seq("q1-v2.4", "q2-v2.4") // run subset of queries
val randomizeQueries = true     // run queries in a random order. Recommended for parallel runs.

val current_time = new SimpleDateFormat("yyyy-MM-dd-HH-mm-ss").format(new Date)
var resultLocation = s"${storage}://${bucket_name}/results/tpcds_${format}/${scaleFactor}/${current_time}"
var databaseName = s"tpcds_${format}_scale_${scaleFactor}_db"
val use_arrow = true            // when you want to use gazella_plugin to run TPC-DS, you need to set it true.

if (use_arrow){
    val data_path = s"${storage}://${bucket_name}/datagen/tpcds_${format}/${scaleFactor}"
    resultLocation = s"${storage}://${bucket_name}/results/tpcds_arrow/${scaleFactor}/${current_time}"
    databaseName = s"tpcds_arrow_scale_${scaleFactor}_db"
    val tables = Seq("call_center", "catalog_page", "catalog_returns", "catalog_sales", "customer", "customer_address", "customer_demographics", "date_dim", "household_demographics", "income_band", "inventory", "item", "promotion", "reason", "ship_mode", "store", "store_returns", "store_sales", "time_dim", "warehouse", "web_page", "web_returns", "web_sales", "web_site")
    if (spark.catalog.databaseExists(s"$databaseName")) {
        println(s"$databaseName has exists!")
    }else{
        spark.sql(s"create database if not exists $databaseName").show
        spark.sql(s"use $databaseName").show
        for (table <- tables) {
            if (spark.catalog.tableExists(s"$table")){
                println(s"$table has exists!")
            }else{
                spark.catalog.createTable(s"$table", s"$data_path/$table", "arrow")
            }
        }
        if (partitionTables) {
            for (table <- tables) {
                try{
                    spark.sql(s"ALTER TABLE $table RECOVER PARTITIONS").show
                }catch{
                        case e: Exception => println(e)
                }
            }
        }
    }
}

val timeout = 60 // timeout in hours
val tpcds = new TPCDS (sqlContext = spark.sqlContext)
spark.conf.set("spark.sql.broadcastTimeout", "10000") // good idea for Q14, Q88.
sql(s"use $databaseName")

def queries = {
    val filtered_queries = query_filter match {
        case Seq() => tpcds.tpcds2_4Queries
        case _ => tpcds.tpcds2_4Queries.filter(q => query_filter.contains(q.name))
    }
    if (randomizeQueries) scala.util.Random.shuffle(filtered_queries) else filtered_queries
}

class ThreadStream(experiment:ExperimentStatus, i:Int) extends Thread{
    override def run(){
        println("stream_" + i + " has started...")
        println(experiment.toString)
        experiment.waitForFinish(timeout*60*60)
        println("stream_" + i + " has finished.")
    }
}

val threadPool:ExecutorService=Executors.newFixedThreadPool(stream_num)
val experiments:Array[ExperimentStatus] = new Array[ExperimentStatus](stream_num)

try {
    for(i <- 0 to (stream_num - 1)){
        experiments(i) = tpcds.runExperiment(
            queries, 
            iterations = iterations,
            resultLocation = resultLocation,
            tags = Map("runtype" -> "benchmark", "database" -> databaseName, "scale_factor" -> scaleFactor)
        )
        threadPool.execute(new ThreadStream(experiments(i), i))
    }
}finally{
    threadPool.shutdown()
    threadPool.awaitTermination(Long.MaxValue, TimeUnit.NANOSECONDS)
}

val summary_dfs:Array[DataFrame] = new Array[DataFrame](stream_num)
for(i <- 0 to (stream_num - 1)){
   summary_dfs(i) = experiments(i).getCurrentResults.withColumn("Name", substring(col("name"), 2, 100)).withColumn("Runtime", (col("parsingTime") + col("analysisTime") + col("optimizationTime") + col("planningTime") + col("executionTime")) / 1000.0).select('Name, 'Runtime).agg(sum("Runtime")).withColumn("stream", lit("stream_" + i)).select("stream", "sum(Runtime)")
}
var summary_df = summary_dfs(0)
for (i <- 0 to (stream_num - 1)){
    if (i != 0) {
        summary_df = summary_df.union(summary_dfs(i))
    }   
}
summary_df = summary_df.union(summary_df.agg(max("sum(Runtime)")).withColumnRenamed("max(sum(Runtime))","sum(Runtime)").withColumn("stream", lit("max_stream")).select("stream", "sum(Runtime)"))
summary_df.show()