In [None]:
import pyspark
from delta import *

builder = pyspark.sql.SparkSession.builder.appName("MyApp") \
            .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
            .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
 

spark = configure_spark_with_delta_pip(builder, extra_packages=my_packages).getOrCreate()

# https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.SparkContext.setLogLevel.html
#spark.sparkContext.setLogLevel("DEBUG") # or TRACE, useful if there are connection issues!

spark

In [None]:

# Define file paths
data_path = "path_to_tpcH_dataset"

# Deconstructing the tpc-h .tbl files
df_customers2 = spark.read.option("delimiter", "|").csv(data_path + "/customer.tbl").toDF(
    "c_custkey", "c_name", "c_address", "c_nationkey", "c_phone", "c_acctbal", "c_mktsegment", "c_comment", "c_dummy"
)
df_lineitem2 = spark.read.option("delimiter", "|").csv(data_path + "/lineitem.tbl").toDF(
    "l_orderkey", "l_partkey", "l_suppkey", "l_linenumber", "l_quantity", "l_extendedprice", 
    "l_discount", "l_tax", "l_returnflag", "l_linestatus", "l_shipdate", "l_commitdate", 
    "l_receiptdate", "l_shipinstruct", "l_shipmode", "l_comment", "l_dummy"
)
df_part2 = spark.read.option("delimiter", "|").csv(data_path + "/part.tbl").toDF(
    "p_partkey", "p_name", "p_mfgr", "p_brand", "p_type", "p_size", "p_container", "p_retailprice", "p_comment", "p_dummy"
)
df_region2 = spark.read.option("delimiter", "|").csv(data_path + "/region.tbl").toDF(
    "r_regionkey", "r_name", "r_comment", "r_dummy"
)
df_supplier2 = spark.read.option("delimiter", "|").csv(data_path + "/supplier.tbl").toDF(
    "s_suppkey", "s_name", "s_address", "s_nationkey", "s_phone", "s_acctbal", "s_comment", "s_dummy"
)
df_partsupp2 = spark.read.option("delimiter", "|").csv(data_path + "/partsupp.tbl").toDF(
    "ps_partkey", "ps_suppkey", "ps_availqty", "ps_supplycost", "ps_comment", "ps_dummy"
)
df_orders2 = spark.read.option("delimiter", "|").csv(data_path + "/orders.tbl").toDF(
    "o_orderkey", "o_custkey", "o_orderstatus", "o_totalprice", "o_orderdate", "o_orderpriority", "o_clerk", "o_shippriority", "o_comment", "o_dummy"
)
df_nation2 = spark.read.option("delimiter", "|").csv(data_path + "/nation.tbl").toDF(
    "n_nationkey", "n_name", "n_regionkey", "n_comment", "n_dummy"
)

In [None]:
# Create tables ON HDFS
df_customers2.write.format("csv").mode("overwrite").saveAsTable("customer")
df_lineitem2.write.format("csv").mode("overwrite").saveAsTable("lineitem")
df_part2.write.format("csv").mode("overwrite").saveAsTable("part")
df_region2.write.format("csv").mode("overwrite").saveAsTable("region")
df_supplier2.write.format("csv").mode("overwrite").saveAsTable("supplier")
df_partsupp2.write.format("csv").mode("overwrite").saveAsTable("partsupp")
df_orders2.write.format("csv").mode("overwrite").saveAsTable("orders")
df_nation2.write.format("csv").mode("overwrite").saveAsTable("nation")

In [None]:
# Create tables ON HDFS
df_customers2.write.format("delta").mode("overwrite").saveAsTable("customer")
df_lineitem2.write.format("delta").mode("overwrite").saveAsTable("lineitem")
df_part2.write.format("delta").mode("overwrite").saveAsTable("part")
df_region2.write.format("delta").mode("overwrite").saveAsTable("region")
df_supplier2.write.format("delta").mode("overwrite").saveAsTable("supplier")
df_partsupp2.write.format("delta").mode("overwrite").saveAsTable("partsupp")
df_orders2.write.format("delta").mode("overwrite").saveAsTable("orders")
df_nation2.write.format("delta").mode("overwrite").saveAsTable("nation")

In [None]:
# Create tables ON HDFS
df_customers2.write.format("orc").mode("overwrite").saveAsTable("customer")
df_lineitem2.write.format("orc").mode("overwrite").saveAsTable("lineitem")
df_part2.write.format("orc").mode("overwrite").saveAsTable("part")
df_region2.write.format("orc").mode("overwrite").saveAsTable("region")
df_supplier2.write.format("orc").mode("overwrite").saveAsTable("supplier")
df_partsupp2.write.format("orc").mode("overwrite").saveAsTable("partsupp")
df_orders2.write.format("orc").mode("overwrite").saveAsTable("orders")
df_nation2.write.format("orc").mode("overwrite").saveAsTable("nation")