In [0]:
import pyspark.sql.functions as f
from pyspark.sql.types import *
# from graphframes import GraphFrame

## Read data first 

In [0]:
customerDF = spark.read.table("samples.tpch.customer")
lineitemDF = spark.read.table("samples.tpch.lineitem")
nationDF = spark.read.table("samples.tpch.nation")
orderDF = spark.read.table("samples.tpch.orders")
partDF = spark.read.table("samples.tpch.part")
partsuppDF = spark.read.table("samples.tpch.partsupp")
regionDF = spark.read.table("samples.tpch.region")
suppDF = spark.read.table("samples.tpch.supplier")

## Data Basic Infromation 

In [0]:
dataframes: dict = {
    "customerDF": customerDF,
    "lineitemDF": lineitemDF,
    "nationDF": nationDF,
    "orderDF": orderDF,
    "partDF": partDF,
    "partsuppDF": partsuppDF,
    "regionDF": regionDF,
    "suppDF": suppDF
}

In [0]:
for name, df in dataframes.items():
  print(f"Total number of default partitions for {name} are", df.rdd.getNumPartitions())

In [0]:
for name, df in dataframes.items():
  print(f"Total number of rows for {name} are", df.count())

In [0]:
customerDF.printSchema()
print("#"*50)
print(f"Total Number of rows are: {customerDF.count()}")
print("#"*50)
val = {}
for column in customerDF.columns:
    val[column] = [
      {
      f"distinct_count": customerDF.select(f.col(column)).distinct().count(),
      f"null_count": customerDF.filter(f.col(column).isNull()).count()
      }
    ]
display(val)


In [0]:
print("With default number of partitions")
lineitemDF.printSchema()
print("#"*50)
print(f"Total Number of rows are: {lineitemDF.count()}")
print("#"*50)
val = {}
for column in lineitemDF.columns:
    val[column] = [
      {
      f"distinct_count": lineitemDF.select(f.col(column)).distinct().count(),
      f"null_count": lineitemDF.filter(f.col(column).isNull()).count()
      }
    ]
display(val)

In [0]:
lineitemDF_5_partition = lineitemDF.coalesce(5)

In [0]:
lineitemDF_5_partition.rdd.getNumPartitions()

In [0]:
print("With 5 number of partitions")
lineitemDF_5_partition.printSchema()
print("#"*50)
print(f"Total Number of rows are: {lineitemDF_5_partition.count()}")
print("#"*50)
val = {}
for column in lineitemDF_5_partition.columns:
    val[column] = [
      {
      f"distinct_count": lineitemDF_5_partition.select(f.col(column)).distinct().count(),
      f"null_count": lineitemDF_5_partition.filter(f.col(column).isNull()).count()
      }
    ]
display(val)

In [0]:
# for df in [customerDF, lineitemDF, nationDF, orderDF, partDF, partsuppDF, regionDF, suppDF]:
#   # print(f"Basic Infromation for {df.name} table is: ")
#   print(f"Total Number of rows are: {df.count()}")
#   display(df.describe())

In [0]:
# Top 10 customers by account balance 
display(
  customerDF.select(['c_name', 'c_mktsegment', 'c_address', 'c_phone', 'c_acctbal']).orderBy('c_acctbal', ascending=False).limit(10)
)

In [0]:
from pyspark.sql.functions import sum as _sum

a = customerDF.groupBy('c_mktsegment').agg(
    _sum('c_acctbal').alias('total_account_balance')
)
a.orderBy('total_account_balance', ascending=False).show()

In [0]:
customerDF.createOrReplaceTempView("customer_table")

In [0]:
spark.sql("select c_mktsegment as mktsegment, sum(c_acctbal) as total_account_balance from customer_table group by c_mktsegment  order by total_account_balance DEsc").show()

In [0]:
customerDF = customerDF.withColumn('rounded_acctbal', f.round('c_acctbal', 0))
display(customerDF)


In [0]:
from pyspark.sql.window import Window

In [0]:
custom_window = Window.partitionBy('c_mktsegment').orderBy(f.desc('rounded_acctbal'))

# Apply the window functions
result_df = customerDF.select(
    'c_name', 
    'c_mktsegment', 
    'c_address', 
    'c_phone', 
    'c_acctbal',
    'rounded_acctbal',
    f.dense_rank().over(custom_window).alias('dense_rank'), 
    f.rank().over(custom_window).alias('rank')
)

# Display the first 10 results
display(result_df.select("*").groupBy('c_mktsegment').min('dense_rank').limit(10))

In [0]:
# Display the first 10 results
display(result_df.groupBy('c_mktsegment').agg(
  f.first('c_name').alias("customer_name"),
  f.first('c_address').alias("address"),
  f.first('c_phone').alias("phone"),
  f.first('c_acctbal').alias("acctbal"),
  f.first('rounded_acctbal').alias("rounded_acctbal"),
  f.min('dense_rank').alias("rank"),
  f.first('rank').alias("dense_rank")
  ).orderBy('c_mktsegment')
)

In [0]:
display(lineitemDF_5_partition.head(5))

In [0]:
customerDF.groupby("c_mktsegment").agg(
  f.first('c_name').alias("customer_name"),
  f.first('c_address').alias("address"),
  f.first('c_phone').alias("phone"),
  f.max('c_acctbal')
  ).show()

In [0]:
display(customerDF.orderBy("c_acctbal", ascending=False).limit(1))

In [0]:
customerDF.select(f.max("c_acctbal")).show()

In [0]:
lineitemDF_5_partition = lineitemDF_5_partition.withColumn("l_quantity_new", f.col("l_quantity").cast("int"))
lineitemDF_5_partition = lineitemDF_5_partition.withColumn("l_extendedprice_new", f.round(f.col("l_extendedprice"), 2))
lineitemDF_5_partition = lineitemDF_5_partition.withColumn("l_discount_new", (f.round(f.col("l_discount"), 2) * 100).cast("int"))
lineitemDF_5_partition = lineitemDF_5_partition.withColumn("l_tax_new", f.round(f.col("l_tax"), 2))

In [0]:
lineitemDF_5_partition.printSchema()

In [0]:
display(lineitemDF_5_partition.head(5))

In [0]:
display(
  lineitemDF_5_partition.filter(f.col("l_discount_new") >= 10)
)

### Joining the Schema

In [0]:
customer_to_nation_df_withoutbroadcast = customerDF.join(
  nationDF, 
  on=[customerDF.c_nationkey == nationDF.n_nationkey],
  how='inner'
).join(
  regionDF,
  on=[nationDF.n_regionkey == regionDF.r_regionkey],
  how='inner'
)

In [0]:
display(customer_to_nation_df_withoutbroadcast.head(10))

In [0]:
customer_to_nation_df = customerDF.join(
  f.broadcast(nationDF), 
  on=[customerDF.c_nationkey == nationDF.n_nationkey],
  how='inner'
).join(
  f.broadcast(regionDF),
  on=[nationDF.n_regionkey == regionDF.r_regionkey],
  how='inner'
).select(
  customerDF.c_custkey.alias("customer_key"),
  nationDF.n_nationkey.alias("nation_key"),
  regionDF.r_regionkey.alias("region_key"),
  customerDF.c_name.alias("customer_name"),
  customerDF.c_address.alias("customer_address"),
  customerDF.c_phone.alias("customer_phone"),
  f.round(customerDF.c_acctbal, 2).alias("customer_acctbal"),
  customerDF.c_mktsegment.alias("mktsegment"),
  regionDF.r_name.alias("region_name"),
  nationDF.n_name.alias("nation_name"),
  regionDF.r_comment.alias("region_comment"),
  nationDF.n_comment.alias("nation_comment"),
  customerDF.c_comment.alias("customer_comment"),
)

In [0]:
customer_to_nation_df.cache()

In [0]:
customer_to_nation_df.is_cached

In [0]:
display(customer_to_nation_df.head(10))

In [0]:
display(customer_to_nation_df.groupBy("region_name", "nation_name", "mktsegment").agg(
    f.sum(
        f.round(
            f.col('customer_acctbal'), 2
        )
    ).alias("Total_account_balance"),
))

Databricks visualization. Run in Databricks to view.

In [0]:
display(
  customer_to_nation_df.groupBy("region_name", "nation_name", "mktsegment").agg(
    f.sum(
        f.round(
            f.col('customer_acctbal'), 2
        )
    ).alias("Total_account_balance"),
  ).orderBy("Total_account_balance", ascending=False)
)

In [0]:
display(
  customer_to_nation_df.groupBy("region_name").agg(
    f.count(f.col('customer_key')).alias("Total_customers")
  ).orderBy("Total_customers", ascending=False)
)

Databricks visualization. Run in Databricks to view.

In [0]:
customer_to_nation_df.count()

In [0]:
display(orderDF.head(5))

- Total number of rows for customerDF are 750000
- Total number of rows for lineitemDF are 29999795
- Total number of rows for nationDF are 25
- Total number of rows for orderDF are 7500000
- Total number of rows for partDF are 1000000
- Total number of rows for partsuppDF are 4000000
- Total number of rows for regionDF are 5
- Total number of rows for suppDF are 50000

In [0]:
# max_view_order_table = orderDF.join(
#   f.broadcast(customer_to_nation_df),
#   on=[customer_to_nation_df.customer_key == orderDF.o_custkey],
#   how='inner'
# ).join(
#   lineitemDF,
#   on=[orderDF.o_orderkey == lineitemDF.l_orderkey],
#   how='inner'
# ).join(
#   partDF,
#   on=[lineitemDF.l_partkey == partDF.p_partkey],
#   how='inner'
# ).join(
#   f.broadcast(suppDF),
#   on=[lineitemDF.l_suppkey == suppDF.s_suppkey],
#   how='inner'
# )

In [0]:
# display(max_view_order_table.head(2))

In [0]:
# max_view_order_table1 = orderDF.join(
#   customer_to_nation_df,
#   on=[customer_to_nation_df.customer_key == orderDF.o_custkey],
#   how='inner'
# ).join(
#   lineitemDF,
#   on=[orderDF.o_orderkey == lineitemDF.l_orderkey],
#   how='inner'
# ).join(
#   partDF,
#   on=[lineitemDF.l_partkey == partDF.p_partkey],
#   how='inner'
# ).join(
#   f.broadcast(suppDF),
#   on=[lineitemDF.l_suppkey == suppDF.s_suppkey],
#   how='inner'
# )

In [0]:
# display(max_view_order_table1.head(2))

In [0]:
# max_view_order_table2 = orderDF.join(
#   customer_to_nation_df,
#   on=[customer_to_nation_df.customer_key == orderDF.o_custkey],
#   how='inner'
# ).join(
#   lineitemDF,
#   on=[orderDF.o_orderkey == lineitemDF.l_orderkey],
#   how='inner'
# ).join(
#   partDF,
#   on=[lineitemDF.l_partkey == partDF.p_partkey],
#   how='inner'
# ).join(
#   f.broadcast(suppDF),
#   on=[lineitemDF.l_suppkey == suppDF.s_suppkey],
#   how='inner'
# )

In [0]:
# display(max_view_order_table2.head(2))

In [0]:
# max_view_order_table2.explain(extended=True)

In [0]:
max_view_order_table = orderDF.join(
  customer_to_nation_df,
  on=[customer_to_nation_df.customer_key == orderDF.o_custkey],
  how='inner'
).join(
  lineitemDF,
  on=[orderDF.o_orderkey == lineitemDF.l_orderkey],
  how='inner'
).join(
  partDF,
  on=[lineitemDF.l_partkey == partDF.p_partkey],
  how='inner'
).join(
  f.broadcast(suppDF),
  on=[lineitemDF.l_suppkey == suppDF.s_suppkey],
  how='inner'
).select(
  # All Keys 
  customer_to_nation_df.customer_key,
  customer_to_nation_df.nation_key,
  customer_to_nation_df.region_key,
  orderDF.o_orderkey.alias("order_key"),
  partDF.p_partkey.alias("part_key"),
  suppDF.s_suppkey.alias("supplier_key"),
  customer_to_nation_df.customer_name,
  customer_to_nation_df.customer_address,
  customer_to_nation_df.customer_phone,
  customer_to_nation_df.customer_acctbal,
  customer_to_nation_df.mktsegment,
  customer_to_nation_df.region_name,
  customer_to_nation_df.nation_name,
  orderDF.o_orderstatus.alias("order_status"),
  f.round(orderDF.o_totalprice, 2).alias("order_billing_amount"),
  orderDF.o_orderdate.alias("order_date"),
  orderDF.o_orderpriority.alias("order_priority"),
  orderDF.o_clerk.alias("order_clerk"),
  orderDF.o_shippriority.alias("order_ship_priority"),
  orderDF.o_comment.alias("order_comment"),
  lineitemDF.l_linenumber.alias("order_line_item_number"),
  lineitemDF.l_quantity.alias("order_line_item_quantity"),
  f.round(lineitemDF.l_extendedprice, 2).alias("order_line_item_extended_price"),
  f.cast("int", f.round((lineitemDF.l_discount)*100, 2)).alias("order_line_item_discount"),
  f.cast("int", f.round((lineitemDF.l_tax) * 100, 2)).alias("order_line_item_tax"),
  lineitemDF.l_returnflag.alias("order_line_item_return_flag"),
  lineitemDF.l_shipdate.alias("order_line_item_ship_date"),
  lineitemDF.l_commitdate.alias("order_line_item_commit_date"),
  lineitemDF.l_shipmode.alias("order_line_item_ship_mode"),
  partDF.p_name.alias("part_name"),
  partDF.p_mfgr.alias("part_manufacturer"),
  partDF.p_brand.alias("part_brand"),
  partDF.p_type.alias("part_type"),
  partDF.p_size.alias("part_size"),
  partDF.p_container.alias("part_container"),
  partDF.p_retailprice.alias("part_retail_price"),
  suppDF.s_name.alias("supplier_name"),
  suppDF.s_address.alias("supplier_address"),
  suppDF.s_phone.alias("supplier_phone"),
  suppDF.s_acctbal.alias("supplier_acctbal"),
  # All type of comments 
  customer_to_nation_df.region_comment,
  customer_to_nation_df.nation_comment,
  customer_to_nation_df.customer_comment,
  lineitemDF.l_comment.alias("order_line_item_comment"),
  suppDF.s_comment.alias("supplier_comment"),
  partDF.p_comment.alias("part_comment"),
)

In [0]:
display(max_view_order_table.head(2))

In [0]:
# max_view_order_table = None
# max_view_order_table1 = None
# max_view_order_table2 = None

In [0]:
customer_to_nation_df.unpersist()

In [0]:
max_view_order_table.count()

In [0]:
max_view_order_table.columns

In [0]:
#  rename the dataframe
# max_view_order_table = max_view_order_table3

In [0]:
max_view_order_table.createOrReplaceTempView("max_view_order_table")

In [0]:
display(max_view_order_table.agg(f.sum("order_billing_amount").alias("sum_order_billing_amount")))

# display(spark.sql(
#   "select sum(order_billing_amount) from max_view_order_table"
# ))

In [0]:
display(
  max_view_order_table.agg(
    f.avg("order_billing_amount").alias("avg_order_billing_amount")
  )
)

In [0]:
display(max_view_order_table.groupBy("order_key").agg(f.sum("order_billing_amount")))

In [0]:
# order_line_item_ship_date <= order_line_item_commitdate.
display(
  max_view_order_table.filter(
    f.col('order_line_item_ship_date') <= f.col('order_line_item_commit_date')
    )
)

In [0]:
top_selling_products = max_view_order_table.groupBy('part_name').agg(
  f.first("region_name").alias("region_name"),
  f.sum("order_line_item_quantity").alias("total_quantity"),
  f.sum('order_line_item_extended_price').alias("total_price")
)

In [0]:
top_selling_products.display()

In [0]:
f.per