# Spark SQL Benchmarking

Using Apache Spark v2.x

### Define SparkSession

In [1]:
import os
import sys
import argparse
import time
from random import randint
import json
import logging
import pandas
from inflection import underscore
from datetime import datetime, timezone
from threading import Thread
from sqlalchemy import types
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("Spark SQL Benchmarking").master("local[*]").config("spark.executor.memory", "2g").getOrCreate()

In [2]:
# Create database
database_name = 'spark_sql_benchmarking'
spark.sql('CREATE DATABASE IF NOT EXISTS ' + database_name)
spark.catalog.setCurrentDatabase(database_name)

In [None]:
# REFERENCE
#sqlContext.tableNames('spark_sql_benchmarking')          # ['test_table_1']
#sqlContext.tables('spark_sql_benchmarking').toPandas()   # returns a pandas.DataFrame
##spark.catalog.listDatabases()  # [Database(name='default', description='default database', locationUri='file:/jupyter-vagrant/notebook/spark-warehouse'), Database(name='spark_sql_benchmarking', description='', locationUri='file:/jupyter-vagrant/notebook/spark-warehouse/spark_sql_benchmarking.db')]
##spark.catalog.currentDatabase()  # 'spark_sql_benchmarking'
##spark.catalog.listTables('spark_sql_benchmarking')  # [Table(name='test_table_1', database=None, description=None, tableType='TEMPORARY', isTemporary=True)]

#sqlContext.registerDataFrameAsTable(dataframe, 'df_registertable')
#dataframe.createOrReplaceTempView('df_tempview')
#dataframe.createGlobalTempView('df_globaltempview2')
#sqlContext.createExternalTable('df_external')#, path=None, source=None, schema=None, **options)
#sqlContext.createExternalTable(tableName, "parquet", someDF.schema, Map("path" -> path))

In [None]:
# Testing
#dataframe = spark.catalog.createExternalTable('temp2_external_csv', 'data/test_table_1.csv', 'csv', header='true', inferschema='true')
#spark.table('temp2_external_csv').schema

# Alternative syntax
#sqlContext.sql('CREATE TABLE temp_external_csv USING CSV')


### Configure logging

In [3]:
logger = logging.getLogger()
logger.setLevel(logging.INFO)
formatter = logging.Formatter('%(asctime)s [%(levelname)s] %(message)s')

handler = logging.StreamHandler()
handler.setLevel(logging.INFO)
handler.setFormatter(formatter)
logger.addHandler(handler)

### Timer class

In [4]:
class Timer:
    def __enter__(self):
        self.start = time.perf_counter()
        return self

    def __exit__(self, *args):
        self.end = time.perf_counter()
        self.interval = self.end - self.start

### Search for data files on local file system

In [5]:
# full directory path to where the data files are stored.  These will be used to create the tables and insert into database.
data_path = os.path.join(os.path.abspath(os.curdir), 'data')

data_filepath_list = [os.path.join(data_path, filename) for filename in os.listdir(data_path) if filename.endswith(".csv")]
if not data_filepath_list:
    logging.error("No data files found in path:  " + str(data_path))
    sys.exit(1)
tables_dataframe = pandas.DataFrame({'table_name': [(os.path.splitext(os.path.basename(filename))[0])
                                                    for filename in data_filepath_list]})
logging.info('Found the following files in path:  ' + str(data_path))
for filename in data_filepath_list:
    logging.info(filename)

2017-03-06 19:27:55,767 [INFO] Found the following files in path:  /jupyter-vagrant/notebook/data
2017-03-06 19:27:55,770 [INFO] /jupyter-vagrant/notebook/data/test_table_1.csv
2017-03-06 19:27:55,772 [INFO] /jupyter-vagrant/notebook/data/test_table_2.csv


### Load data into Spark SQL

In [6]:
for filepath in data_filepath_list:
    logging.info("Reading data file: " + filepath)
    table_name = os.path.splitext(os.path.basename(filepath))[0] #  set table name to basename of filepath
    
    # Create an external table from CSV
    #spark.catalog.createExternalTable(table_name, path=filepath, source='csv', header='true', inferschema='true')

    # The following is needed to rename columns or choose specific columns:
    if True:
        # Read CSV into Spark DataFrame
        dataframe = spark.read.format('com.databricks.spark.csv').options(header='true', inferschema='true').load(filepath)
        logging.info(str(dataframe.count()) + " rows loaded.")
        #dataframe.dropna(axis=1, thresh=dataframe.count()//10, inplace=True)  # drop columns having over 90% NaN values
        cols = [c for c in map(str.upper, map(underscore, dataframe.columns))]  # camelcase to underscore to uppercase
        dataframe = dataframe.toDF(*cols)
        #dataframe.cache()  # cache data in memory

        # Create temporary view from CSV
        logging.info("Registering the csv source type as a SQL temporary view...")
        dataframe.createOrReplaceTempView(table_name)
        #spark.registerDataFrameAsTable(dataframe, table_name)  # alternative syntax; throws exception if table already exists
        logging.info("Successfully created table: " + table_name)
    
    # Save as alternative data source types and create temporary views
    logging.info("Saving the DataFrame into alternative data source types:")
    data_source_types = ['parquet', 'json', 'orc']
    for source_type in data_source_types:
        table_path = table_name + "." + source_type
        logging.info("Saving " + table_path + "...")
        dataframe.write.save(table_path, format=source_type, mode='overwrite')
        logging.info("Registering the " + source_type + " source type as a SQL temporary view...")            
        temp_dataframe = getattr(spark.read, source_type)(table_path)
        temp_dataframe.createOrReplaceTempView('`' + table_path + '`')
        logging.info("Successfully created table: " + table_path)

2017-03-06 19:27:58,232 [INFO] Reading data file: /jupyter-vagrant/notebook/data/test_table_1.csv
2017-03-06 19:28:15,325 [INFO] 460949 rows loaded.
2017-03-06 19:28:15,638 [INFO] Registering the csv source type as a SQL temporary view...
2017-03-06 19:28:15,695 [INFO] Successfully created table: test_table_1
2017-03-06 19:28:15,697 [INFO] Saving the DataFrame into alternative data source types:
2017-03-06 19:28:15,698 [INFO] Saving test_table_1.parquet...
2017-03-06 19:28:38,148 [INFO] Registering the parquet source type as a SQL temporary view...
2017-03-06 19:28:38,505 [INFO] Successfully created table: test_table_1.parquet
2017-03-06 19:28:38,507 [INFO] Saving test_table_1.json...
2017-03-06 19:28:58,013 [INFO] Registering the json source type as a SQL temporary view...
2017-03-06 19:29:10,832 [INFO] Successfully created table: test_table_1.json
2017-03-06 19:29:10,852 [INFO] Saving test_table_1.orc...
2017-03-06 19:29:37,160 [INFO] Registering the orc source type as a SQL temporar

### Query for all table names

In [7]:
table_name_list = spark.catalog.listTables(database_name)  # return a list of names of tables in the database
tables_dataframe = spark.sql("SHOW TABLES").toPandas()  # alternatively return a DataFrame containing names of tables in the given database
cols = [c for c in map(str.lower, map(underscore, tables_dataframe.columns))]  # camelcase to underscore to lowercase
tables_dataframe.columns = cols
logging.info('Found the following table names:')
tables_dataframe.rename(columns={'database': 'database_name'}, inplace=True)
for table_name in tables_dataframe['table_name']:
    logging.info(table_name)

2017-03-06 19:30:01,495 [INFO] Found the following table names:
2017-03-06 19:30:01,497 [INFO] test_table_1
2017-03-06 19:30:01,499 [INFO] test_table_1.json
2017-03-06 19:30:01,501 [INFO] test_table_1.orc
2017-03-06 19:30:01,502 [INFO] test_table_1.parquet
2017-03-06 19:30:01,519 [INFO] test_table_2
2017-03-06 19:30:01,520 [INFO] test_table_2.json
2017-03-06 19:30:01,520 [INFO] test_table_2.orc
2017-03-06 19:30:01,521 [INFO] test_table_2.parquet


### Query for the number of records in each table and categorize

In [9]:
tables_dataframe['table_row_count'] = tables_dataframe['table_name'].apply(
    lambda table_name: spark.table("`" + table_name + "`").count())
bins = [0, 100000, 1000000, 10000000, 1000000000]
label_names = ['Small', 'Medium', 'Large', 'X-Large']
tables_dataframe['table_size_category'] = pandas.cut(tables_dataframe['table_row_count'], bins,
                                                     labels=label_names)
logging.info(tables_dataframe[['table_name', 'table_row_count', 'table_size_category']])

2017-03-06 19:30:24,254 [INFO]              table_name  table_row_count table_size_category
0          test_table_1           460949              Medium
1     test_table_1.json           460949              Medium
2      test_table_1.orc           460949              Medium
3  test_table_1.parquet           460949              Medium
4          test_table_2              120               Small
5     test_table_2.json              120               Small
6      test_table_2.orc              120               Small
7  test_table_2.parquet              120               Small


### Benchmarking Spark SQL

In [14]:
def benchmark():
    benchmark_dataframe = pandas.DataFrame()
    for table_index, table_row in tables_dataframe.iterrows():
        logging.info("Benchmarking table: " + table_row['table_name'])
        for query_index, query_row in queries_dataframe.iterrows():
            
            # Query builder
            datatypes_dataframe = pandas.DataFrame.from_dict(spark.table('`' + str(table_row['table_name']) + '`').schema.jsonValue()['fields'])
            numeric_columns = datatypes_dataframe[datatypes_dataframe['type'].str.lower().isin(['integer', 'long', 'double'])]['name']
            character_columns = datatypes_dataframe[datatypes_dataframe['type'].str.lower().isin(['string'])]['name']  # non-numeric columns
            if (numeric_columns.empty or character_columns.empty):
                raise AssertionError(table_row['table_name'] + " needs to have both numeric and character columns.")
            query_builder_dict = {}
            query_builder_dict['columns'] = ', '.join(map(str, list(character_columns.sample(n=randint(1, character_columns.size)))))
            query_builder_dict['table'] = '`' + table_row['table_name'] + '`'
            query_builder_dict['column_1'] = character_columns.sample().to_string(header=False, index=False)
            query_builder_dict['column_2'] = character_columns.sample().to_string(header=False, index=False)
            query_builder_dict['row'] = str(randint(1, rows))
            query_builder_dict['order_column'] = character_columns.sample().to_string(header=False, index=False)
            query_builder_dict['numeric_column'] = numeric_columns.sample().to_string(header=False, index=False)
            query_builder_dict['column'] = character_columns.sample().to_string(header=False, index=False)

            # Benchmark
            sql = query_row['query_template'].format(**query_builder_dict)
            with Timer() as t:
                logging.debug(sql)
                dataframe = spark.sql(sql)
                query_row['rows'] = dataframe.count()
            logging.info("Query " + str(query_row['query_id']) + str(':  {:f} sec'.format(t.interval)))
            query_row['time'] = float(t.interval)
            query_row['query_executed'] = sql
            query_row = pandas.concat([query_row, table_row])
            benchmark_dataframe = benchmark_dataframe.append(query_row, ignore_index=True)
    benchmark_dataframe.to_csv(csv_filepath, index=False, mode='a', header=not os.path.isfile(csv_filepath))
    del benchmark_dataframe

In [15]:
csv_filepath = 'tempresults.csv'
rows = 10  # maximum number of rows to return from each query execution.
database = 'Spark SQL'
queries_filepath = 'queries/queries.csv'
queries_dataframe = pandas.read_csv(queries_filepath)  # load queries from CSV file
queries_dataframe = queries_dataframe[
    queries_dataframe['database'] == database]  # filter queries on database name
if not queries_dataframe.empty:
    with Timer() as t:
        benchmark()
    logging.info(database + ' benchmark time: %.07f sec' % t.interval)
else:
    logging.warning("Missing " + database + " queries from " + queries_filepath)

2017-03-06 19:32:13,266 [INFO] Benchmarking table: test_table_1
2017-03-06 19:32:17,091 [INFO] Query 1:  3.784965 sec
2017-03-06 19:32:20,759 [INFO] Query 2:  3.597519 sec
2017-03-06 19:32:24,670 [INFO] Query 3:  3.845127 sec
2017-03-06 19:32:28,583 [INFO] Query 4:  3.866383 sec
2017-03-06 19:32:32,031 [INFO] Query 5:  3.382424 sec
2017-03-06 19:32:35,576 [INFO] Query 6:  3.495031 sec
2017-03-06 19:32:39,139 [INFO] Query 7:  3.506386 sec
2017-03-06 19:32:42,543 [INFO] Query 8:  3.279761 sec
2017-03-06 19:32:45,909 [INFO] Query 9:  3.321665 sec
2017-03-06 19:32:52,261 [INFO] Query 10:  6.301785 sec
2017-03-06 19:32:55,999 [INFO] Query 11:  3.632919 sec
2017-03-06 19:32:56,264 [INFO] Query 12:  0.215368 sec
2017-03-06 19:33:00,095 [INFO] Query 13:  3.757411 sec
2017-03-06 19:33:03,536 [INFO] Query 14:  3.395210 sec
2017-03-06 19:33:03,914 [INFO] Query 15:  0.323767 sec
2017-03-06 19:33:04,175 [INFO] Query 16:  0.210809 sec
2017-03-06 19:33:07,794 [INFO] Query 17:  3.553201 sec
2017-03-06