**First we choose the dataset : Simulate Retail Customer Data**


We confirm the Schema of the dataset

In [0]:
%sql
SHOW SCHEMAS IN databricks_simulated_retail_customer_data;


databaseName
information_schema
v01
v02


We also show the tables in this dataset

In [0]:
%sql
SHOW TABLES IN databricks_simulated_retail_customer_data.v01;


database,tableName,isTemporary
v01,customers,False
v01,sales,False
v01,sales_orders,False


**Load the Dataset**

In [0]:
df = spark.table(
    "databricks_simulated_retail_customer_data.v01.sales_orders"
)

df.printSchema()


root
 |-- clicked_items: string (nullable = true)
 |-- customer_id: long (nullable = true)
 |-- customer_name: string (nullable = true)
 |-- number_of_line_items: long (nullable = true)
 |-- order_datetime: long (nullable = true)
 |-- order_number: long (nullable = true)
 |-- ordered_products: string (nullable = true)
 |-- promo_info: string (nullable = true)



**Lazy Evaluation (No Job Yet)**

In [0]:
df.select("customer_id", "order_number")


DataFrame[customer_id: bigint, order_number: bigint]

**Trigger Spark Job**

In [0]:
df.count()


4074

**Introduce a Shuffle**

In [0]:
customer_counts = (
    df.groupBy("customer_id")
      .count()
      .orderBy("count", ascending=False)
)

customer_counts.show()


+-----------+-----+
|customer_id|count|
+-----------+-----+
|    3037810|   69|
|   10393805|   65|
|   21532231|   65|
|   19580290|   64|
|   14939501|   63|
|   11935034|   62|
|   18558459|   60|
|    2176964|   60|
|   28447330|   59|
|   19452611|   58|
|   17382206|   58|
|    7543046|   57|
|   31375543|   57|
|   64153907|   55|
|   24774688|   54|
|   13861161|   54|
|   17728997|   54|
|   16995374|   54|
|   46949990|   53|
|    4401099|   53|
+-----------+-----+
only showing top 20 rows


**Amplify Shuffle Cost**

In [0]:
spark.conf.set("spark.sql.shuffle.partitions", 200)

heavy_shuffle = (
    df.select("customer_id", "order_number")
      .groupBy("customer_id")
      .count()
      .orderBy("count", ascending=False)
)

heavy_shuffle.collect()

[Row(customer_id=3037810, count=69),
 Row(customer_id=10393805, count=65),
 Row(customer_id=21532231, count=65),
 Row(customer_id=19580290, count=64),
 Row(customer_id=14939501, count=63),
 Row(customer_id=11935034, count=62),
 Row(customer_id=2176964, count=60),
 Row(customer_id=18558459, count=60),
 Row(customer_id=28447330, count=59),
 Row(customer_id=17382206, count=58),
 Row(customer_id=19452611, count=58),
 Row(customer_id=7543046, count=57),
 Row(customer_id=31375543, count=57),
 Row(customer_id=64153907, count=55),
 Row(customer_id=24774688, count=54),
 Row(customer_id=17728997, count=54),
 Row(customer_id=16995374, count=54),
 Row(customer_id=13861161, count=54),
 Row(customer_id=19476252, count=53),
 Row(customer_id=4401099, count=53),
 Row(customer_id=46949990, count=53),
 Row(customer_id=11754254, count=52),
 Row(customer_id=782583, count=52),
 Row(customer_id=57719783, count=51),
 Row(customer_id=11137864, count=50),
 Row(customer_id=16819948, count=50),
 Row(customer_id=4

**Show Task Imbalance (Skew Simulation)**

In [0]:
skew_demo = (
    df.groupBy("number_of_line_items")
      .count()
      .orderBy("count", ascending=False)
)

skew_demo.show()


+--------------------+-----+
|number_of_line_items|count|
+--------------------+-----+
|                   3| 1393|
|                   2| 1357|
|                   1| 1300|
|                   4|   24|
+--------------------+-----+



**Executor Behaviour (Resource View)**

In [0]:
df.groupBy("customer_id").count().collect()

[Row(customer_id=19476252, count=53),
 Row(customer_id=4401099, count=53),
 Row(customer_id=14939501, count=63),
 Row(customer_id=3072597, count=1),
 Row(customer_id=2732808, count=1),
 Row(customer_id=3838589, count=2),
 Row(customer_id=7159905, count=1),
 Row(customer_id=13728809, count=1),
 Row(customer_id=8513182, count=1),
 Row(customer_id=1542007, count=47),
 Row(customer_id=15817754, count=1),
 Row(customer_id=12537090, count=1),
 Row(customer_id=19140462, count=1),
 Row(customer_id=11935034, count=62),
 Row(customer_id=24774688, count=54),
 Row(customer_id=13721768, count=1),
 Row(customer_id=12744134, count=1),
 Row(customer_id=22100013, count=46),
 Row(customer_id=3037810, count=69),
 Row(customer_id=13419197, count=1),
 Row(customer_id=12999178, count=49),
 Row(customer_id=4952777, count=47),
 Row(customer_id=31499930, count=1),
 Row(customer_id=17728997, count=54),
 Row(customer_id=25263091, count=1),
 Row(customer_id=16995374, count=54),
 Row(customer_id=46949990, count=53

**Optimization Preview**

In [0]:
optimized = (
    df.repartition(50, "customer_id")
      .groupBy("customer_id")
      .count()
      .orderBy("count", ascending=False)
)

optimized.collect()

[Row(customer_id=3037810, count=69),
 Row(customer_id=10393805, count=65),
 Row(customer_id=21532231, count=65),
 Row(customer_id=19580290, count=64),
 Row(customer_id=14939501, count=63),
 Row(customer_id=11935034, count=62),
 Row(customer_id=18558459, count=60),
 Row(customer_id=2176964, count=60),
 Row(customer_id=28447330, count=59),
 Row(customer_id=19452611, count=58),
 Row(customer_id=17382206, count=58),
 Row(customer_id=7543046, count=57),
 Row(customer_id=31375543, count=57),
 Row(customer_id=64153907, count=55),
 Row(customer_id=13861161, count=54),
 Row(customer_id=17728997, count=54),
 Row(customer_id=16995374, count=54),
 Row(customer_id=24774688, count=54),
 Row(customer_id=4401099, count=53),
 Row(customer_id=19476252, count=53),
 Row(customer_id=46949990, count=53),
 Row(customer_id=782583, count=52),
 Row(customer_id=11754254, count=52),
 Row(customer_id=57719783, count=51),
 Row(customer_id=4497446, count=50),
 Row(customer_id=16819948, count=50),
 Row(customer_id=17