In [0]:
from pyspark.sql.types import *

orders_schema = StructType([
    StructField("order_id", LongType(), True),
    StructField("customer_id", LongType(), True),
    StructField("customer_fname", StringType(), True),
    StructField("customer_lname", StringType(), True),
    StructField("customer_city", StringType(), True),
    StructField("customer_state", StringType(), True),
    StructField("customer_pincode", LongType(), True),
    StructField("order_items", ArrayType(StructType([
        StructField("order_item_id", LongType(), True),
        StructField("order_item_product_id", LongType(), True),
        StructField("order_item_product_price", DecimalType(10, 2), True),
        StructField("order_item_quantity", LongType(), True),
        StructField("order_item_subtotal", DecimalType(10, 2), True)
    ])), True)
])


In [0]:
orders_df = spark.read \
    .format("json") \
        .schema(orders_schema) \
            .option("path","dbfs:/FileStore/merge.json") \
                .load()

In [0]:
orders_df.createOrReplaceTempView("orders")

In [0]:
spark.sql("select * from orders ").show()

+--------+-----------+--------------+--------------+--------------+--------------+----------------+--------------------+
|order_id|customer_id|customer_fname|customer_lname| customer_city|customer_state|customer_pincode|         order_items|
+--------+-----------+--------------+--------------+--------------+--------------+----------------+--------------------+
|       8|       2911|          Mary|         Smith|        Caguas|            PR|             725|[{8, 692, 257.23,...|
|      12|       1837|          Mary|          Vega|        Caguas|            PR|             725|[{12, 411, 19.86,...|
|      14|       9842|          Mary|         Smith|        Caguas|            PR|             725|[{14, 492, 182.13...|
|      16|       7276|        Pamela|         Smith|        Caguas|            PR|             725|[{16, 721, 271.54...|
|      23|       4367|      Danielle|         Moran|   Springfield|            MO|           65807|[{23, 577, 294.20...|
|      24|      11441|          

In [0]:
exploded_orders = spark.sql("""select order_id ,
          customer_id,
          customer_fname ,
          customer_lname,
          customer_city,
          customer_state,
          customer_pincode,explode(order_items) lines from orders""")

In [0]:
exploded_orders.show()

exploded_orders.createOrReplaceTempView("exploded_orders")

+--------+-----------+--------------+--------------+--------------+--------------+----------------+--------------------+
|order_id|customer_id|customer_fname|customer_lname| customer_city|customer_state|customer_pincode|               lines|
+--------+-----------+--------------+--------------+--------------+--------------+----------------+--------------------+
|       8|       2911|          Mary|         Smith|        Caguas|            PR|             725|{8, 692, 257.23, ...|
|      12|       1837|          Mary|          Vega|        Caguas|            PR|             725|{12, 411, 19.86, ...|
|      14|       9842|          Mary|         Smith|        Caguas|            PR|             725|{14, 492, 182.13,...|
|      16|       7276|        Pamela|         Smith|        Caguas|            PR|             725|{16, 721, 271.54,...|
|      23|       4367|      Danielle|         Moran|   Springfield|            MO|           65807|{23, 577, 294.20,...|
|      24|      11441|          

In [0]:
flattened_orders =spark.sql("""select order_id ,
          customer_id,
          customer_fname ,
          customer_lname,
          customer_city,
          customer_state,
          customer_pincode, 
          lines.order_item_id as item_id,
          lines.order_item_product_id as product_id,
          lines.order_item_product_price as price,
          lines.order_item_quantity as quantity,
          lines.order_item_subtotal as subtotal
          from exploded_orders """)

In [0]:
flattened_orders.show()

+--------+-----------+--------------+--------------+--------------+--------------+----------------+-------+----------+------+--------+--------+
|order_id|customer_id|customer_fname|customer_lname| customer_city|customer_state|customer_pincode|item_id|product_id| price|quantity|subtotal|
+--------+-----------+--------------+--------------+--------------+--------------+----------------+-------+----------+------+--------+--------+
|       8|       2911|          Mary|         Smith|        Caguas|            PR|             725|      8|       692|257.23|       4| 1028.92|
|      12|       1837|          Mary|          Vega|        Caguas|            PR|             725|     12|       411| 19.86|       4|   79.44|
|      14|       9842|          Mary|         Smith|        Caguas|            PR|             725|     14|       492|182.13|       2|  364.26|
|      16|       7276|        Pamela|         Smith|        Caguas|            PR|             725|     16|       721|271.54|       1|  

In [0]:
flattened_orders.createOrReplaceTempView("flattened_orders")

In [0]:
aggregated_orders = spark.sql("""select customer_id ,count(distinct(order_id)) as orders_placed ,count(item_id) as product_purchased,sum(subtotal) as amount_spent from flattened_orders group by customer_id """)


In [0]:
aggregated_orders.createOrReplaceTempView("orders_aggregated")

In [0]:
spark.sql("select * from orders_aggregated where customer_id = 25").show()

+-----------+-------------+-----------------+------------+
|customer_id|orders_placed|product_purchased|amount_spent|
+-----------+-------------+-----------------+------------+
|         25|            4|                4|     3358.24|
+-----------+-------------+-----------------+------------+



In [0]:
dbutils.fs.mkdirs("dbfs:/FileStore/processed/")

Out[41]: True

In [0]:
aggregated_orders.repartition(1) \
    .write \
        .format("csv") \
            .mode("overwrite") \
                .option("header","True") \
                    .option("path","dbfs:/FileStore/processed/aggregated_orders.csv") \
                        .save()

In [0]:
aggregated_orders.printSchema()

root
 |-- customer_id: long (nullable = true)
 |-- orders_placed: long (nullable = false)
 |-- product_purchased: long (nullable = false)
 |-- amount_spent: decimal(20,2) (nullable = true)



In [0]:
dbutils.fs.ls("dbfs:/FileStore/processed/aggregated_orders.csv")

Out[47]: [FileInfo(path='dbfs:/FileStore/processed/aggregated_orders.csv/_SUCCESS', name='_SUCCESS', size=0, modificationTime=1751731466000),
 FileInfo(path='dbfs:/FileStore/processed/aggregated_orders.csv/_committed_4176033198140089800', name='_committed_4176033198140089800', size=112, modificationTime=1751731466000),
 FileInfo(path='dbfs:/FileStore/processed/aggregated_orders.csv/_started_4176033198140089800', name='_started_4176033198140089800', size=0, modificationTime=1751731465000),
 FileInfo(path='dbfs:/FileStore/processed/aggregated_orders.csv/part-00000-tid-4176033198140089800-8cf78507-e777-485f-9241-229b665369f4-39-1-c000.csv', name='part-00000-tid-4176033198140089800-8cf78507-e777-485f-9241-229b665369f4-39-1-c000.csv', size=212834, modificationTime=1751731466000)]