## Get the journey data.
Clones the tpcds journey repository to get access to all the data and scripts that are required to excercise this journey. Normally the data and queries are generated by running the data and query generation utility from the tpcds toolkit available at http://www.tpc.org/tpcds. However for easy of use, the data and queries are pre-generated for 1GB scale factor. We use the pre-generated data and queries to demonstrate how they can be used to run the tpcds queries against spark.


In [1]:
import sys.process._
%AddJar -magic https://brunelvis.org/jar/spark-kernel-brunel-all-2.4.jar
"rm -rf tpcds-journey" !
"git --version" !
"git clone --progress https://github.com/SparkTC/tpcds-journey.git" !

Starting download from https://brunelvis.org/jar/spark-kernel-brunel-all-2.4.jar
Finished download of spark-kernel-brunel-all-2.4.jar
git version 1.8.3.1
Cloning into 'tpcds-journey'...
remote: Counting objects: 1037, done.        
remote: Total 1037 (delta 0), reused 0 (delta 0), pack-reused 1037        
Receiving objects: 100% (1037/1037), 362.46 MiB | 10.05 MiB/s, done.
Resolving deltas: 100% (453/453), done.
Checking out files: 100% (806/806), done.


## Setup variables.
* Sets up variables that are used in the rest of this notebook.
* The path variables are relative to the git clone directory.
* tpcdsDatabaseName is hard-coded to "TPCDS1G". This can be changed if a different database name is desired.

In [2]:
def deleteFile1(tableName: String): Unit = {
    import sys.process._
    val commandStr1 = s"rm -rf spark-warehouse/tpcds2g.db/${tableName}/*"
    val commandStr2 = s"rm -rf spark-warehouse/tpcds2g.db/${tableName}"
    commandStr1 !
}

In [3]:
def deleteFile2(tableName: String): Unit = {
    import sys.process._
    val commandStr2 = s"rm -rf spark-warehouse/tpcds2g.db/${tableName}"
    commandStr2 !
}

In [4]:
val tpcdsRootDir = "tpcds-journey"
val tpcdsWorkDir = "tpcds-journey/work"
val tpcdsDdlDir = s"${tpcdsRootDir}/src/ddl/individual"
val tpcdsGenDataDir = s"${tpcdsRootDir}/src/data"
val tpcdsQueriesDir = s"${tpcdsRootDir}/src/queries"
val tpcdsDatabaseName = "TPCDS2G"
var totalTime: Long = 0
println("TPCDS root directory is at : "+ tpcdsRootDir)
println("TPCDS ddl scripts directory is at: " + tpcdsDdlDir)
println("TPCDS data directory is at: "+ tpcdsGenDataDir)
println("TPCDS queries directory is at: "+ tpcdsQueriesDir)

val journey_spark = SparkSession.
    builder().
    config("spark.ui.showConsoleProgress", false).
    config("spark.sql.autoBroadcastJoinThreshold", -1).
    config("spark.sql.crossJoin.enabled", true).
    getOrCreate()


journey_spark.sparkContext.setLogLevel("ERROR")

TPCDS root directory is at : tpcds-journey
TPCDS ddl scripts directory is at: tpcds-journey/src/ddl/individual
TPCDS data directory is at: tpcds-journey/src/data
TPCDS queries directory is at: tpcds-journey/src/queries


## Setup the TPC-DS schema
* Creates the database as specified by tpcdsDatabaseName
* Creates all the tpc-ds tables.
* Loads data into the tables in parquet format. 
  * Since the data generated by tpc-ds toolkit is in CSV format, we do the loading in multi steps.
  * As first step, we create tables in csv format by pointing the location to the generated data.
  * As second step, we create parquet tables by using CTAS that convert text data into parquet.
  * As last step, we drop the text tables as we longer need them.

### Utility function definitions.
* Defines the utility functions that are called from the cells below in the notebook.

In [5]:
def createDatabase(): Unit = {
    journey_spark.sql(s"DROP DATABASE IF EXISTS ${tpcdsDatabaseName} CASCADE")
    journey_spark.sql(s"CREATE DATABASE ${tpcdsDatabaseName}")
    journey_spark.sql(s"USE ${tpcdsDatabaseName}")
}

/**
 * Function to create a table in spark. It reads the DDL script for each of the
 * tpc-ds table and executes it on Spark.
 */
def createTable(tableName: String): Unit = {
  println(s"Creating table $tableName ..")
  journey_spark.sql(s"DROP TABLE IF EXISTS $tableName")
  deleteFile1(tableName)   
  deleteFile2(tableName)    
  val (fileName, content) = 
    journey_spark.sparkContext.wholeTextFiles(s"${tpcdsDdlDir}/$tableName.sql").collect()(0) 
    
  // Remove the replace for the .dat once it is fixed in the github repo
  val sqlStmts = content.stripLineEnd
    .replace('\n', ' ')
    .replace("${TPCDS_GENDATA_DIR}", tpcdsGenDataDir)
    .replace("csv", "org.apache.spark.sql.execution.datasources.csv.CSVFileFormat").split(";")
  sqlStmts.map(stmt => journey_spark.sql(stmt))    
}  

import scala.collection.mutable.ArrayBuffer
import org.apache.spark.sql.DataFrame

def runQuery(queryStr: String,
             individual: Boolean = true,
             resultDir: String): Seq[(String, Double, Int, String)] = {
  val querySummary = ArrayBuffer.empty[(String, Double, Int, String)]  
  val queryName = s"${tpcdsQueriesDir}/query${queryStr}.sql"   
  val (_, content) = journey_spark.sparkContext.wholeTextFiles(queryName).collect()(0)  
  val queries = content.split("\n")
    .filterNot (_.startsWith("--"))
    .mkString(" ").split(";")
  
  var cnt = 1  
  for (query <- queries)  {
   val start = System.nanoTime()
   val df = journey_spark.sql(query)   
   val result = journey_spark.sql(query).collect  
   val timeElapsed = (System.nanoTime() - start) / 1000000000
   val name = if (queries.length > 1) {
       s"query${queryStr}-${cnt}"
   } else {
       s"query${queryStr}"
   }  
   val resultFile = s"${resultDir}/${name}-notebook.res"  
   df.coalesce(1)
      .write.format("com.databricks.spark.csv")
      .option("header", "true")
      .mode("overwrite")
      .save(resultFile)
   totalTime = totalTime + timeElapsed
  
   querySummary += Tuple4.apply(name, timeElapsed, result.length, resultFile)
   cnt += 1                
  }
  querySummary 
}

// run function for each table in tables array
def forEachTable(tables: Array[String], f: (String) => Unit): Unit = {
  for ( table <- tables) {
    try {
      f(table)
    } catch {
      case e: Throwable => {
        println("EXCEPTION!! " + e.getMessage())
        throw e
      }
    }
  }
}

def runIndividualQuery(queryNum: Int, resultDir: String = tpcdsWorkDir ): DataFrame = {
    val queryStr = "%02d".format(queryNum) 
    val testSummary = ArrayBuffer.empty[(String, Double, Int, String)] 
    try {      
      println(s"Running TPC-DS Query : $queryStr")  
      testSummary ++= runQuery(queryStr, true, resultDir)
    } catch {
        case e: Throwable => {
            println("Error in query "+ queryNum + "msg = " + e.getMessage)
        }
    }
    testSummary.toDF("QueryName","ElapsedTime","RowsReturned", "ResultLocation")
}

def runAllQueries(resultDir: String = tpcdsWorkDir): DataFrame = {
  val testSummary = ArrayBuffer.empty[(String, Double, Int, String)]    
  var queryErrors = 0
  for (i <- 1 to 99) {
    try{
      val queryStr = "%02d".format(i)
      println(s"Running TPC-DS Query : $queryStr")   
      testSummary ++= runQuery(queryStr, false, resultDir)
    } catch {
       case e: Throwable => {
            println("Error in query "+ i + "msg = " + e.getMessage)
            queryErrors += 1
       }
    }
  }

  println("=====================================================")
  if ( queryErrors > 0) {
    println(s"Query execution failed with $queryErrors errors")
  } else {
    println("All TPC-DS queries ran successfully")
  }
  println (s"Total Elapsed Time so far: ${totalTime} seconds.")
  println("=====================================================")
  testSummary.toDF("QueryName","ElapsedTime","RowsReturned", "ResultLocation")
}

def displaySummary(summaryDF: DataFrame): Unit = {
    summaryDF.show(10000)
}

def displayResult(queryNum: Int, summaryDF: DataFrame) = {
   val queryStr = "%02d".format(queryNum)
   // Find result files for this query number. For some queries there are
   // multiple result files. 
   val  files = summaryDF.where(s"queryName like 'query${queryStr}%'").select("ResultLocation").collect()
   for (file <- files) {
       val fileName = file.getString(0)
       val df = journey_spark.read
         .format("csv")
         .option("header", "true") //reading the headers
         .option("mode", "DROPMALFORMED")
         .load(fileName)
       val numRows:Int = df.count().toInt
       df.show(numRows, truncate=false)
   }
}

def explainQuery(queryNum: Int) = {
  val queryStr = "%02d".format(queryNum)  
  val queryName = s"${tpcdsQueriesDir}/query${queryStr}.sql"   
  val (_, content) = journey_spark.sparkContext.wholeTextFiles(queryName).collect()(0)  
  val queries = content.split("\n")
    .filterNot (_.startsWith("--"))
    .mkString(" ").split(";")
    
  for (query <- queries)  {    
    journey_spark.sql(query).explain(true) 
  }
}

### Create the database and tables.
* Creates the tpc-ds database.
* For each of the table name in TPC-DS schema, calls up on the function to create the table in spark.

In [6]:
// TPC-DS table names.
val tables = Array("call_center", "catalog_sales",
                   "customer_demographics", "income_band",
                   "promotion", "store", "time_dim", "web_returns",
                   "catalog_page", "customer", "date_dim",
                   "inventory", "reason", "store_returns", "warehouse",
                   "web_sales", "catalog_returns", "customer_address",
                   "household_demographics", "item", "ship_mode", "store_sales",
                   "web_page", "web_site" )

// Create database
createDatabase

// Create table
forEachTable(tables, table => createTable(table))


Creating table call_center ..
Creating table catalog_sales ..
Creating table customer_demographics ..
Creating table income_band ..
Creating table promotion ..
Creating table store ..
Creating table time_dim ..
Creating table web_returns ..
Creating table catalog_page ..
Creating table customer ..
Creating table date_dim ..
Creating table inventory ..
Creating table reason ..
Creating table store_returns ..
Creating table warehouse ..
Creating table web_sales ..
Creating table catalog_returns ..
Creating table customer_address ..
Creating table household_demographics ..
Creating table item ..
Creating table ship_mode ..
Creating table store_sales ..
Creating table web_page ..
Creating table web_site ..


## Verify table creating and data loading.
* Run a simple Spark SQL query to get the count of rows
* Verify that the row counts are as expected

In [7]:
// Run a count query and get the counts
val rowCounts = tables.map { table =>
    journey_spark.table(table).count()
}

val expectedCounts = Array (
    6, 1441548, 1920800, 20, 300, 12, 86400,
    71763,  11718, 100000, 73049, 11745000, 
    35, 287514, 5, 719384, 144067, 50000, 7200,
    18000, 20, 2880404, 60, 30
)

var errorCount = 0;
val zippedCountsWithIndex = rowCounts.zip(expectedCounts).zipWithIndex
for ((pair, index) <- zippedCountsWithIndex) {
    if (pair._1 != pair._2) {
        println(s"""ERROR!! Row counts for ${tables(index)} does not match.
        Expected=${expectedCounts(index)} but found ${rowCounts(index)}""")
        errorCount += 1
    }
}

println("=====================================================")
if ( errorCount > 0) {
  println(s"Load verification failed with $errorCount errors")
} else {
  println("Loaded and verified the table counts successfully")
}
println("=====================================================")

Loaded and verified the table counts successfully


## Run a single query
* Given a query number between 1 to 99, run it against spark.
* Display the query results, time taken to execute the query and number of rows returned.
* To run a differnt query , please change the QUERY_NUM to a valid value from 1 to 99.

In [8]:
val QUERY_NUM = 1
val result = runIndividualQuery(QUERY_NUM)
displaySummary(result)
displayResult(QUERY_NUM, result)

Running TPC-DS Query : 01
single query path
+---------+-----------+------------+--------------------+
|QueryName|ElapsedTime|RowsReturned|      ResultLocation|
+---------+-----------+------------+--------------------+
|  query01|       12.0|         100|tpcds-journey/wor...|
+---------+-----------+------------+--------------------+

+----------------+
|c_customer_id   |
+----------------+
|AAAAAAAAAAABBAAA|
|AAAAAAAAAAADBAAA|
|AAAAAAAAAAADBAAA|
|AAAAAAAAAAAKAAAA|
|AAAAAAAAAABDAAAA|
|AAAAAAAAAABHBAAA|
|AAAAAAAAAABLAAAA|
|AAAAAAAAAABMAAAA|
|AAAAAAAAAACHAAAA|
|AAAAAAAAAACMAAAA|
|AAAAAAAAAADDAAAA|
|AAAAAAAAAADGAAAA|
|AAAAAAAAAADGBAAA|
|AAAAAAAAAADGBAAA|
|AAAAAAAAAADPAAAA|
|AAAAAAAAAAEBAAAA|
|AAAAAAAAAAEFBAAA|
|AAAAAAAAAAEGBAAA|
|AAAAAAAAAAEIAAAA|
|AAAAAAAAAAEMAAAA|
|AAAAAAAAAAFAAAAA|
|AAAAAAAAAAFPAAAA|
|AAAAAAAAAAGGBAAA|
|AAAAAAAAAAGHBAAA|
|AAAAAAAAAAGJAAAA|
|AAAAAAAAAAGMAAAA|
|AAAAAAAAAAHEBAAA|
|AAAAAAAAAAHFBAAA|
|AAAAAAAAAAIEBAAA|
|AAAAAAAAAAJGBAAA|
|AAAAAAAAAAJHBAAA|
|AAAAAAAAAAKCAAAA|


## Run all the TPC-DS queries
* Runs all the queries starting from 1 to 99
* The query results are saved and can be queried by calling getResults method.
* The summary will be shown at the end.

In [None]:
val result = runAllQueries()
displaySummary(result)

Running TPC-DS Query : 01
single query path
Running TPC-DS Query : 02
single query path
Running TPC-DS Query : 03
single query path
Running TPC-DS Query : 04
single query path
Running TPC-DS Query : 05
single query path
Running TPC-DS Query : 06
single query path
Running TPC-DS Query : 07
single query path
Running TPC-DS Query : 08
single query path
Running TPC-DS Query : 09
single query path
Running TPC-DS Query : 10
single query path
Running TPC-DS Query : 11
single query path
Running TPC-DS Query : 12
single query path
Running TPC-DS Query : 13
single query path
Running TPC-DS Query : 14
multi query path =1
multi query path =2
Running TPC-DS Query : 15
single query path
Running TPC-DS Query : 16
single query path
Running TPC-DS Query : 17
single query path
Running TPC-DS Query : 18
single query path
Running TPC-DS Query : 19
single query path
Running TPC-DS Query : 20
single query path
Running TPC-DS Query : 21
single query path
Running TPC-DS Query : 22
single query path
Running TP

## Display Result for a individual Query
* Reads the result file for the given query stored when thery are run in previous steps.
* Certain queries have multiple associated result files. The result files are read in sequence and
  results are displayed.
* If the result file(s) are not found , then an error is displayed.  


In [None]:
displayResult(14, result)

## Display SQL Execution Plan
* Display the analyzed, optimized and phyical plan for a given query.
* Can be used by developers for debugging purposes.
* QUERY_NUM can be changed to display the plan for different query.

In [None]:
explainQuery(1)

In [None]:
%%brunel
data('result') bar x(QueryName) y(ElapsedTime) title("Query Execution Time in seconds", "Execution Summary":footer)