In [35]:
//disable dynamicAllocation to prevent spark pool from scaling since the dataGen software must be installed on every executor
%%configure -f
{
    "conf" : {
        "spark.dynamicAllocation.enabled": "false"
    }
}

StatementMeta(, 69, -1, Finished, Available)

In [69]:
val benchmark: String = "TPCDS" //"TPCDS", "TPCH"
val scaleFactor = 1 // "1", "10", "100", "1000", "10000" list of scale factors to generate and import

val baseLocation = s"abfss://raw@adlsacltdevwu201.dfs.core.windows.net/tpcds/" // abfss path to base directory to write data to
val baseDatagenFolder = "/tmp"  // usually /tmp if enough space is available for datagen files

// Output file formats
val fileFormat = "parquet" // only parquet was tested
val shuffle = true // If true, partitions will be coalesced into a single file during generation up to spark.sql.files.maxRecordsPerFile (if set)
val overwrite = true //if to delete existing files (doesn't check if results are complete on no-overwrite)
val partitionTables = false

// Generate stats for CBO
val createTableStats = true
val createColumnStats = true

val workers: Int = spark.conf.get("spark.executor.instances").toInt
val cores: Int = spark.conf.get("spark.executor.cores").toInt

val dbSuffix = "" // set only if creating multiple DBs or source file folders with different settings, use a leading _
val TPCDSUseLegacyOptions = false // set to generate file/DB naming and table options compatible with older results

StatementMeta(SmallSparkPool, 69, 34, Finished, Available)

benchmark: String = TPCDS
scaleFactor: Int = 1
baseLocation: String = abfss://raw@adlsacltdevwu201.dfs.core.windows.net/tpcds/
baseDatagenFolder: String = /tmp
fileFormat: String = parquet
shuffle: Boolean = true
overwrite: Boolean = true
partitionTables: Boolean = false
createTableStats: Boolean = true
createColumnStats: Boolean = true
workers: Int = 4
cores: Int = 4
dbSuffix: String = ""
TPCDSUseLegacyOptions: Boolean = false


In [40]:
import java.io._
import scala.sys.process._

// spark-sql-perf
import com.databricks.spark.sql.perf._
import com.databricks.spark.sql.perf.tpch._
import com.databricks.spark.sql.perf.tpcds._


// Spark/Hadoop config
import org.apache.spark.deploy.SparkHadoopUtil

StatementMeta(SmallSparkPool, 69, 5, Finished, Available)

import java.io._
import scala.sys.process._
import com.databricks.spark.sql.perf._
import com.databricks.spark.sql.perf.tpch._
import com.databricks.spark.sql.perf.tpcds._
import org.apache.spark.deploy.SparkHadoopUtil


In [41]:
spark.sqlContext.setConf("spark.sql.parquet.compression.codec", "snappy")

// Prevent very large files. 20 million records creates between 500 and 1500MB files in TPCH
spark.sqlContext.setConf("spark.sql.files.maxRecordsPerFile", "20000000")  // This also regulates the file coalesce

StatementMeta(SmallSparkPool, 69, 6, Finished, Available)

In [42]:
// Checks that we have the correct number of worker nodes to start the data generation
// Make sure you have set the workers variable correctly, as the datagens binaries need to be present in all nodes
val targetWorkers: Int = workers
def numWorkers: Int = sc.getExecutorMemoryStatus.size - 1
def waitForWorkers(requiredWorkers: Int, tries: Int) : Unit = {
  for (i <- 0 to (tries-1)) {
    if (numWorkers == requiredWorkers) {
      println(s"Waited ${i}s. for $numWorkers workers to be ready")
      return
    }
    if (i % 60 == 0) println(s"Waiting ${i}s. for workers to be ready, got only $numWorkers workers")
    Thread sleep 1000
  }
  throw new Exception(s"Timed out waiting for workers to be ready after ${tries}s.")
}
waitForWorkers(targetWorkers, 3600) //wait up to an hour

StatementMeta(SmallSparkPool, 69, 7, Finished, Available)

Waiting 0s. for workers to be ready, got only 3 workers
Waiting 60s. for workers to be ready, got only 3 workers
Waited 120s. for 4 workers to be ready
targetWorkers: Int = 4
numWorkers: Int
waitForWorkers: (requiredWorkers: Int, tries: Int)Unit


In [43]:
def time[R](block: => R): R = {  
    val t0 = System.currentTimeMillis() //nanoTime()
    val result = block    // call-by-name
    val t1 = System.currentTimeMillis() //nanoTime()
    println("Elapsed time: " + (t1 - t0) + "ms")
    result
}

StatementMeta(SmallSparkPool, 69, 8, Finished, Available)

time: [R](block: => R)R


In [59]:
// COMMAND ----------

// FOR INSTALLING TPCH DBGEN (with the stdout patch)
def installDBGEN(url: String = "https://github.com/databricks/tpch-dbgen.git", useStdout: Boolean = true, baseFolder: String = "/tmp")(i: java.lang.Long): String = {
  // check if we want the revision which makes dbgen output to stdout
  val checkoutRevision: String = if (useStdout) "git checkout 0469309147b42abac8857fa61b4cf69a6d3128a8 -- bm_utils.c" else ""
  Seq("mkdir", "-p", baseFolder).!
  val pw = new PrintWriter(new File(s"${baseFolder}/dbgen_$i.sh" ))
  pw.write(s"""
rm -rf ${baseFolder}/dbgen
rm -rf ${baseFolder}/dbgen_install_$i
mkdir ${baseFolder}/dbgen_install_$i
cd ${baseFolder}/dbgen_install_$i
git clone '$url'
cd tpch-dbgen
$checkoutRevision
make
ln -sf ${baseFolder}/dbgen_install_$i/tpch-dbgen ${baseFolder}/dbgen || echo "ln -sf failed"
test -e ${baseFolder}/dbgen/dbgen
echo "OK"
  """)
  pw.close
  Seq("chmod", "+x", s"${baseFolder}/dbgen_$i.sh").!
  Seq(s"${baseFolder}/dbgen_$i.sh").!!
}

// COMMAND ----------

// FOR INSTALLING TPCDS DSDGEN (with the stdout patch)
// Note: it assumes Debian/Ubuntu host, edit package manager if not
def installDSDGEN(url: String = "https://github.com/databricks/tpcds-kit.git", useStdout: Boolean = true, baseFolder: String = "/tmp")(i: java.lang.Long): String = {
  Seq("mkdir", "-p", baseFolder).!
  val pw = new PrintWriter(new File(s"${baseFolder}/dsdgen_$i.sh" ))
  pw.write(s"""
sudo apt-get update
sudo apt-get -y --force-yes install gcc make flex bison byacc git
rm -rf ${baseFolder}/dsdgen
rm -rf ${baseFolder}/dsdgen_install_$i
mkdir ${baseFolder}/dsdgen_install_$i
cd ${baseFolder}/dsdgen_install_$i
git clone '$url'
cd tpcds-kit/tools
make -f Makefile.suite
ln -sf ${baseFolder}/dsdgen_install_$i/tpcds-kit/tools ${baseFolder}/dsdgen || echo "ln -sf failed"
${baseFolder}/dsdgen/dsdgen -h
test -e ${baseFolder}/dsdgen/dsdgen
echo "OK"
  """)
  pw.close
  Seq("chmod", "+x", s"${baseFolder}/dsdgen_$i.sh").!
  Seq(s"${baseFolder}/dsdgen_$i.sh").!!
}

// COMMAND ----------

StatementMeta(SmallSparkPool, 69, 24, Finished, Available)

installDBGEN: (url: String, useStdout: Boolean, baseFolder: String)(i: Long)String
installDSDGEN: (url: String, useStdout: Boolean, baseFolder: String)(i: Long)String


In [60]:
val res = spark.range(0, workers, 1, workers).map(worker => benchmark match {
    case "TPCDS" => s"TPCDS worker $worker\n" + installDSDGEN(baseFolder = baseDatagenFolder)(worker)
    case "TPCH" => s"TPCH worker $worker\n" + installDBGEN(baseFolder = baseDatagenFolder)(worker)
  }).collect()

StatementMeta(SmallSparkPool, 69, 25, Finished, Available)

res: Array[String] =
Array("TPCDS worker 0
ctags address.c build_support.c date.c decimal.c dist.c driver.c error_msg.c expr.c genrand.c grammar_support.c join.c list.c load.c misc.c nulls.c parallel.c permute.c pricing.c print.c r_params.c StringBuffer.c tdef_functions.c tdefs.c text.c scd.c scaling.c release.c scd.c sparse.c porting.c validate.c dcgram.c dcomp.c grammar.c  s_brand.c s_customer_address.c scaling.c s_call_center.c s_catalog.c s_catalog_order.c s_catalog_order_lineitem.c s_catalog_page.c s_catalog_promotional_item.c s_catalog_returns.c s_category.c s_class.c s_company.c s_customer.c s_division.c s_inventory.c s_item.c s_manager.c s_manufacturer.c s_market.c s_pline.c s_product.c s_promotion.c s_purchase.c s_reason.c s_store.c s_store_promotional_item.c s_store_returns.c ...


In [76]:
def getBenchmarkData(benchmark: String, scaleFactor: Int) = benchmark match {  
  
  case "TPCH" => (
    s"tpch_sf${scaleFactor}_${fileFormat}${dbSuffix}",
    new TPCHTables(spark.sqlContext, dbgenDir = s"${baseDatagenFolder}/dbgen", scaleFactor = scaleFactor.toString, useDoubleForDecimal = false, useStringForDate = false, generatorParams = Nil),
    s"$baseLocation/tpch/sf${scaleFactor}_${fileFormat}")  
  
  case "TPCDS" if !TPCDSUseLegacyOptions => (
    s"tpcds_sf${scaleFactor}_${fileFormat}${dbSuffix}",
    new TPCDSTables(spark.sqlContext, dsdgenDir = s"${baseDatagenFolder}/dsdgen", scaleFactor = scaleFactor.toString, useDoubleForDecimal = false, useStringForDate = false),
    s"$baseLocation/tpcds-2.4/sf${scaleFactor}_${fileFormat}")
  
  case "TPCDS" if TPCDSUseLegacyOptions => (
    s"tpcds_sf${scaleFactor}_nodecimal_nodate_withnulls${dbSuffix}",
    new TPCDSTables(spark.sqlContext, s"${baseDatagenFolder}/dsdgen", scaleFactor = scaleFactor.toString, useDoubleForDecimal = true, useStringForDate = true),
    s"$baseLocation/tpcds/sf$scaleFactor-$fileFormat/useDecimal=false,useDate=false,filterNull=false")
}
def isPartitioned (tables: Tables, tableName: String) : Boolean = 
  util.Try(tables.tables.find(_.name == tableName).get.partitionColumns.nonEmpty).getOrElse(false)

def loadData(tables: Tables, location: String, scaleFactor: Int) = {
  val tableNames = tables.tables.map(_.name)
  tableNames.foreach { tableName =>
  // generate data
    time {
      tables.genData(
        location = location, 
        format = fileFormat, 
        overwrite = overwrite, 
        partitionTables = partitionTables, 
        // if to coallesce into a single file (only one writter for non partitioned tables = slow) 
        clusterByPartitionColumns = shuffle, //if (isPartitioned(tables, tableName)) false else true, 
        filterOutNullPartitionValues = false, 
        tableFilter = tableName,
        // this controlls parallelism on datagen and number of writers (# of files for non-partitioned)
        // in general we want many writers to S3, and smaller tasks for large scale factors to avoid OOM and shuffle errors    
        numPartitions = if (scaleFactor <= 100 || !isPartitioned(tables, tableName)) (workers * cores) else (workers * cores * 4))
    }
  }
}

// COMMAND ----------

// Create the DB, import data, create
def createExternal(location: String, dbname: String, tables: Tables, partitionTables: Boolean) = {
  tables.createExternalTables(location, fileFormat, dbname, overwrite = overwrite, discoverPartitions = if (partitionTables) {true} else {false})
}

def loadDB(dbname: String, tables: Tables, location: String, partitionTables: Boolean) = {
  val tableNames = tables.tables.map(_.name)
  time { 
    println(s"Creating external tables at $location")
    createExternal(location, dbname, tables, partitionTables) 
  }
  // Show table information and attempt to vacuum
  tableNames.foreach { tableName =>
    if (partitionTables) {println(s"Table $tableName has " + util.Try(sql(s"SHOW PARTITIONS $tableName").count() + " partitions").getOrElse(s"no partitions"))} else {"no partitions"} 
    util.Try(sql(s"VACUUM $tableName RETAIN 0.0. HOURS"))getOrElse(println(s"Cannot VACUUM $tableName"))
    sql(s"DESCRIBE EXTENDED $tableName").show(999, false)
    println
  }
}

// COMMAND ----------

def setScaleConfig(scaleFactor: Int): Unit = {
  // Avoid OOM when shuffling large scale fators
  // and errors like 2GB shuffle limit at 10TB like: Most recent failure reason: org.apache.spark.shuffle.FetchFailedException: Too large frame: 9640891355
  // For 10TB 16x4core nodes were needed with the config below, 8x for 1TB and below. 
  // About 24hrs. for SF 1 to 10,000.
 // if (scaleFactor >= 10000) {    
 //   spark.conf.set("spark.sql.shuffle.partitions", "20000")
 //   org.apache.spark.deploy.SparkHadoopUtil.get.conf.set("parquet.memory.pool.ratio", "0.1")
 // } 
 // else if (scaleFactor >= 1000) {
 //   spark.conf.set("spark.sql.shuffle.partitions", "2001") //one above 2000 to use HighlyCompressedMapStatus
 //   org.apache.spark.deploy.SparkHadoopUtil.get.conf.set("parquet.memory.pool.ratio", "0.3")    
 // }
 // else { 
 //   spark.conf.set("spark.sql.shuffle.partitions", "200") //default
 //   org.apache.spark.deploy.SparkHadoopUtil.get.conf.set("parquet.memory.pool.ratio", "0.5")
 // }
}

StatementMeta(SmallSparkPool, 69, 41, Finished, Available)

getBenchmarkData: (benchmark: String, scaleFactor: Int)(String, com.databricks.spark.sql.perf.Tables, String)
isPartitioned: (tables: com.databricks.spark.sql.perf.Tables, tableName: String)Boolean
loadData: (tables: com.databricks.spark.sql.perf.Tables, location: String, scaleFactor: Int)Unit
createExternal: (location: String, dbname: String, tables: com.databricks.spark.sql.perf.Tables, partitionTables: Boolean)Unit
loadDB: (dbname: String, tables: com.databricks.spark.sql.perf.Tables, location: String, partitionTables: Boolean)Unit
setScaleConfig: (scaleFactor: Int)Unit


In [77]:

// First set some config settings affecting OOMs/performance
setScaleConfig(scaleFactor)

val (dbname, tables, location) = getBenchmarkData(benchmark, scaleFactor)
// Start the actual loading
time {
  println(s"Generating data for $benchmark SF $scaleFactor at $location")
  loadData(tables = tables, location = location, scaleFactor = scaleFactor)
}
time {
  println(s"\nImporting data for $benchmark into DB $dbname from $location")
  loadDB(dbname = dbname, tables = tables, location = location, partitionTables = partitionTables)
}
if (createTableStats) time { 
  println(s"\nGenerating table statistics for DB $dbname (with analyzeColumns=$createColumnStats)")
  tables.analyzeTables(dbname, analyzeColumns = createColumnStats)
}


StatementMeta(SmallSparkPool, 69, 42, Finished, Available)

Generating data for TPCDS SF 1 at abfss://raw@adlsacltdevwu201.dfs.core.windows.net/tpcds//tpcds-2.4/sf1_parquet
Data has 1439935 rows clustered true for 20000000
Generating table catalog_sales in database to abfss://raw@adlsacltdevwu201.dfs.core.windows.net/tpcds//tpcds-2.4/sf1_parquet/catalog_sales with save mode Overwrite.
Elapsed time: 49701ms
Data has 143672 rows clustered true for 20000000
Generating table catalog_returns in database to abfss://raw@adlsacltdevwu201.dfs.core.windows.net/tpcds//tpcds-2.4/sf1_parquet/catalog_returns with save mode Overwrite.
Elapsed time: 8106ms
Data has 11745000 rows clustered true for 20000000
Generating table inventory in database to abfss://raw@adlsacltdevwu201.dfs.core.windows.net/tpcds//tpcds-2.4/sf1_parquet/inventory with save mode Overwrite.
Elapsed time: 47976ms
Data has 2879789 rows clustered true for 20000000
Generating table store_sales in database to abfss://raw@adlsacltdevwu201.dfs.core.windows.net/tpcds//tpcds-2.4/sf1_parquet/store_sa