In [1]:
## Helper to display filesize
def readable_size(num, suffix='B'):
    for unit in ['','K','M','G','T','P','E','Z']:
        if abs(num) < 1024.0:
            return "%3.1f%s%s" % (num, unit, suffix)
        num /= 1024.0
    return "%.1f%s%s" % (num, 'Yi', suffix)
  
# Helper to get total distributed filesize
def sizeof(path):
  return sum([f.size for f in dbutils.fs.ls(path)])

# Query
def nationkey_query(df):
  return df.agg({"nationkey": "sum"}).collect()

def measure_performance(df):
  import time
  start = time.perf_counter()
  nationkey_query(df)
  end = time.perf_counter()
  return (end-start)


In [2]:
# Step 1
# Import from tbl file
df = spark.read.csv("/FileStore/tables/customer.tbl", header=True, mode="PERMISSIVE ", sep="|", inferSchema = True)


In [3]:
# Export as csv
csv_name = "/FileStore/tables/customer.csv"
dbutils.fs.rm(csv_name, True)
df.write.csv(csv_name, header=True)


In [4]:
# Use ls to check filesize (24304318 bytes or aprox 23 MB)
csv_size = readable_size(sizeof(csv_name))
print("CSV Size: " + csv_size)


In [5]:
# Measure performance once
time = measure_performance(spark.read.csv(csv_name, inferSchema=True, header=True))
print("%.2gs" % time)


In [6]:
# Step 2: Store as parquet
pq_name = "/FileStore/tables/customer.pq"
dbutils.fs.rm(pq_name, True)
df.write.parquet(pq_name)

In [7]:
# Use ls to check filesize (24304318 bytes or aprox 12 MB)
pq_size = readable_size(sizeof(pq_name))
print("Parquet Size: " + pq_size)

In [8]:
# How is the file stored
display(dbutils.fs.ls(pq_name))

path,name,size
dbfs:/FileStore/tables/customer.pq/_SUCCESS,_SUCCESS,0
dbfs:/FileStore/tables/customer.pq/_committed_1911199852581842293,_committed_1911199852581842293,624
dbfs:/FileStore/tables/customer.pq/_started_1911199852581842293,_started_1911199852581842293,0
dbfs:/FileStore/tables/customer.pq/part-00000-tid-1911199852581842293-0f1cc739-b67a-4e55-9517-52b3bf77723b-567-1-c000.snappy.parquet,part-00000-tid-1911199852581842293-0f1cc739-b67a-4e55-9517-52b3bf77723b-567-1-c000.snappy.parquet,2165843
dbfs:/FileStore/tables/customer.pq/part-00001-tid-1911199852581842293-0f1cc739-b67a-4e55-9517-52b3bf77723b-568-1-c000.snappy.parquet,part-00001-tid-1911199852581842293-0f1cc739-b67a-4e55-9517-52b3bf77723b-568-1-c000.snappy.parquet,2157026
dbfs:/FileStore/tables/customer.pq/part-00002-tid-1911199852581842293-0f1cc739-b67a-4e55-9517-52b3bf77723b-569-1-c000.snappy.parquet,part-00002-tid-1911199852581842293-0f1cc739-b67a-4e55-9517-52b3bf77723b-569-1-c000.snappy.parquet,2156774
dbfs:/FileStore/tables/customer.pq/part-00003-tid-1911199852581842293-0f1cc739-b67a-4e55-9517-52b3bf77723b-570-1-c000.snappy.parquet,part-00003-tid-1911199852581842293-0f1cc739-b67a-4e55-9517-52b3bf77723b-570-1-c000.snappy.parquet,2158091
dbfs:/FileStore/tables/customer.pq/part-00004-tid-1911199852581842293-0f1cc739-b67a-4e55-9517-52b3bf77723b-571-1-c000.snappy.parquet,part-00004-tid-1911199852581842293-0f1cc739-b67a-4e55-9517-52b3bf77723b-571-1-c000.snappy.parquet,2149026
dbfs:/FileStore/tables/customer.pq/part-00005-tid-1911199852581842293-0f1cc739-b67a-4e55-9517-52b3bf77723b-572-1-c000.snappy.parquet,part-00005-tid-1911199852581842293-0f1cc739-b67a-4e55-9517-52b3bf77723b-572-1-c000.snappy.parquet,1728176


In [9]:
# Measure performance once
time = measure_performance(spark.read.parquet(pq_name))
print("%.2gs" % time)

In [10]:
# Step 3

df_csv = spark.read.csv(csv_name, inferSchema=True, header=True)
csv_timings = [measure_performance(df_csv) for i in range(10)]

df_pq = spark.read.parquet(pq_name)
pq_timings = [measure_performance(df_pq) for i in range(10)]

In [11]:
from pyspark.sql.types import *

schema = StructType([StructField("executionTime", DoubleType())])
df_csv_perf = spark.createDataFrame([(val,) for val in csv_timings],schema=schema) 
df_pq_perf = spark.createDataFrame([(val,) for val in pq_timings],schema=schema) 

In [12]:
df_csv_perf.describe("executionTime").show()

In [13]:
df_pq_perf.describe("executionTime").show()

In [14]:
# plot
import pandas as pd

d1 = {'csv': csv_timings, 'pq': pq_timings}
df1 = pd.DataFrame(data=d1)
display(df1)

csv,pq
0.9522031300002708,0.7203724800056079
0.8506693800009089,0.913615094992565
0.7230256689945236,0.7275070559990127
0.6919168140011607,0.7240587859996594
0.7049983789911494,0.6063944229972549
0.7547447359975195,0.6825149730138946
0.8725909189961385,0.7291782480024267
0.8624752629984869,0.6606658320088172
1.476315196006908,0.8727490029996261
0.9730383040005108,0.6202711439982522


In [15]:
import random
from numpy import ndarray
import numpy as np

# helpers
def range_query(df):
  return df.agg({"c1": "sum"}).collect()

def measure_range_performance(df):
  import time
  start = time.perf_counter()
  range_query(df)
  end = time.perf_counter()
  return (end-start)

# generate random dataframe
header= []
for i in range(10):
  header.append("c" + str(i+1))
  
df = pd.DataFrame(np.random.random(size=(1000000,10)), columns=header)
df = spark.createDataFrame(df)

In [16]:
# step1
# export as csv
csv_name = "/FileStore/tables/random.csv"
dbutils.fs.rm(csv_name, True)
df.write.csv(csv_name, header=True)

# get csv size
csv_size = readable_size(sizeof(csv_name))
print("CSV Size: " + csv_size)

# measure performance
time = measure_range_performance(spark.read.csv(csv_name, inferSchema=True, header=True))
print("%.2gs" % time)


In [17]:
# step2

# export as parquet
pq_name = "/FileStore/tables/random.pq"
dbutils.fs.rm(pq_name, True)
df.write.parquet(pq_name)

# get csv size
pq_size = readable_size(sizeof(pq_name))
print("Parquet Size: " + pq_size)

# measure performance
time = measure_range_performance(spark.read.parquet(pq_name))
print("%.2gs" % time)

In [18]:
# Business Data

# CSV Size: 23.2MB
# Parquet Size: 11.9MB
# CSV mean over execution time: 0.8861977789987577
# Parquet mean over execution time: 0.7257327040017116

# Scientific Data

# CSV Size: 183.8MB
# 4.4s
# Parquet Size: 76.3MB
# 0.99s

# Difference in Size for Business Data: Parquet 51.3% of csv size
# Difference in Querytime for Business Data: Parquet only needs 81.9% of csv querytime

# Difference in Size for Scientific Data: Parquet is 41.5% of csv size
# Difference in Querytime for Scientific Data: Parquet only needs 22.5% of csv querytime

# Querytime is significantly faster if file size increases by factor of 10
# Measurements agree with expected behaviour


In [19]:
import time

# Exercise 1
df = spark.read.csv("/FileStore/tables/customer.tbl", header=True, mode="PERMISSIVE ", sep="|", inferSchema = True)
name = "/FileStore/tables/customer"
dbutils.fs.rm(name, True)
df.write.format("delta").save(name)
spark.sql("CREATE TABLE IF NOT EXISTS customer USING DELTA LOCATION '/FileStore/tables/customer'")

start = time.perf_counter()
spark.sql("SELECT SUM(nationkey) FROM customer")
end = time.perf_counter()
duration = end-start
print("%.2gs" % duration)

# Exercise 2
header= []
for i in range(10):
  header.append("c" + str(i+1))
  
df = pd.DataFrame(np.random.random(size=(1000000,10)), columns=header)
df = spark.createDataFrame(df)

name = "/FileStore/tables/scientific"
dbutils.fs.rm(name, True)
df.write.format("delta").save(name)

spark.sql("CREATE TABLE IF NOT EXISTS scientific USING DELTA LOCATION '/FileStore/tables/scientific'")

start = time.perf_counter()
spark.sql("SELECT SUM(c1) FROM scientific")
end = time.perf_counter()
duration = end-start
print("%.2gs" % duration)

# -> delta tables are even more efficient