# PySpark Advanced Examples
## Master-Worker Cluster Setup

This notebook demonstrates:
1. **Joining & Grouping** - 3 elaborative examples
2. **Multidimensional DataFrames** - 3 elaborative examples
3. **Nesting Columns** - 3 elaborative examples

Running on a Spark cluster with 1 master and 2 worker nodes.

In [1]:
# Initialize SparkSession connected to the cluster
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql.window import Window

spark = SparkSession.builder \
    .appName("PySpark Advanced Examples") \
    .master("spark://spark-master:7077") \
    .config("spark.executor.memory", "2g") \
    .config("spark.executor.cores", "2") \
    .getOrCreate()

print(f"Spark version: {spark.version}")
print(f"Master URL: {spark.sparkContext.master}")
print(f"Application ID: {spark.sparkContext.applicationId}")
print(f"Default parallelism: {spark.sparkContext.defaultParallelism}")

Spark version: 3.5.0
Master URL: spark://spark-master:7077
Application ID: app-20251015150014-0005
Default parallelism: 2


---
# Section 1: Joining & Grouping Examples

This section demonstrates three comprehensive examples of joining and grouping operations in PySpark.

## Example 1.1: E-Commerce Order Analysis with Multiple Joins and Aggregations

Scenario: Analyze customer orders, products, and categories with complex joins and grouping.

In [2]:
# Create sample data for e-commerce analysis
customers_data = [
    (1, "Alice Johnson", "alice@email.com", "Premium"),
    (2, "Bob Smith", "bob@email.com", "Standard"),
    (3, "Carol White", "carol@email.com", "Premium"),
    (4, "David Brown", "david@email.com", "Standard"),
    (5, "Eve Davis", "eve@email.com", "Premium")
]

orders_data = [
    (101, 1, "2024-01-15", 250.00),
    (102, 1, "2024-01-20", 180.00),
    (103, 2, "2024-01-18", 420.00),
    (104, 3, "2024-01-22", 310.00),
    (105, 2, "2024-01-25", 150.00),
    (106, 4, "2024-01-28", 275.00),
    (107, 1, "2024-02-01", 190.00),
    (108, 5, "2024-02-03", 500.00)
]

order_items_data = [
    (1, 101, 1001, 2, 50.00),
    (2, 101, 1002, 1, 150.00),
    (3, 102, 1003, 3, 60.00),
    (4, 103, 1001, 4, 50.00),
    (5, 103, 1004, 2, 110.00),
    (6, 104, 1002, 2, 155.00),
    (7, 105, 1005, 1, 150.00),
    (8, 106, 1003, 5, 55.00),
    (9, 107, 1004, 1, 190.00),
    (10, 108, 1001, 10, 50.00)
]

products_data = [
    (1001, "Wireless Mouse", 101),
    (1002, "Mechanical Keyboard", 101),
    (1003, "USB-C Cable", 102),
    (1004, "Monitor Stand", 103),
    (1005, "Laptop Bag", 104)
]

categories_data = [
    (101, "Computer Accessories"),
    (102, "Cables & Adapters"),
    (103, "Furniture"),
    (104, "Bags & Cases")
]

# Create DataFrames
customers_df = spark.createDataFrame(customers_data, ["customer_id", "name", "email", "membership_tier"])
orders_df = spark.createDataFrame(orders_data, ["order_id", "customer_id", "order_date", "total_amount"])
order_items_df = spark.createDataFrame(order_items_data, ["item_id", "order_id", "product_id", "quantity", "unit_price"])
products_df = spark.createDataFrame(products_data, ["product_id", "product_name", "category_id"])
categories_df = spark.createDataFrame(categories_data, ["category_id", "category_name"])

print("\n=== Customers ===")
customers_df.show()

print("\n=== Orders ===")
orders_df.show()

print("\n=== Order Items ===")
order_items_df.show()


=== Customers ===
+-----------+-------------+---------------+---------------+
|customer_id|         name|          email|membership_tier|
+-----------+-------------+---------------+---------------+
|          1|Alice Johnson|alice@email.com|        Premium|
|          2|    Bob Smith|  bob@email.com|       Standard|
|          3|  Carol White|carol@email.com|        Premium|
|          4|  David Brown|david@email.com|       Standard|
|          5|    Eve Davis|  eve@email.com|        Premium|
+-----------+-------------+---------------+---------------+


=== Orders ===
+--------+-----------+----------+------------+
|order_id|customer_id|order_date|total_amount|
+--------+-----------+----------+------------+
|     101|          1|2024-01-15|       250.0|
|     102|          1|2024-01-20|       180.0|
|     103|          2|2024-01-18|       420.0|
|     104|          3|2024-01-22|       310.0|
|     105|          2|2024-01-25|       150.0|
|     106|          4|2024-01-28|       275.0|
|

In [3]:
# Perform complex joins and aggregations
# Join all tables to get complete order information
complete_orders = order_items_df \
    .join(orders_df, "order_id") \
    .join(customers_df, "customer_id") \
    .join(products_df, "product_id") \
    .join(categories_df, "category_id")

print("\n=== Complete Order Details ===")
complete_orders.select(
    "order_id", "name", "membership_tier", 
    "product_name", "category_name", "quantity", "unit_price"
).show(truncate=False)

# Group by customer and membership tier with aggregations
customer_analytics = complete_orders.groupBy("customer_id", "name", "membership_tier") \
    .agg(
        count("order_id").alias("total_orders"),
        sum(col("quantity") * col("unit_price")).alias("total_spent"),
        avg(col("quantity") * col("unit_price")).alias("avg_order_value"),
        countDistinct("category_name").alias("categories_purchased")
    ) \
    .orderBy(desc("total_spent"))

print("\n=== Customer Analytics ===")
customer_analytics.show()

# Group by category with membership tier analysis
category_analytics = complete_orders.groupBy("category_name", "membership_tier") \
    .agg(
        sum("quantity").alias("total_quantity_sold"),
        sum(col("quantity") * col("unit_price")).alias("total_revenue"),
        countDistinct("customer_id").alias("unique_customers")
    ) \
    .orderBy("category_name", "membership_tier")

print("\n=== Category Analytics by Membership Tier ===")
category_analytics.show(truncate=False)


=== Complete Order Details ===
+--------+-------------+---------------+-------------------+--------------------+--------+----------+
|order_id|name         |membership_tier|product_name       |category_name       |quantity|unit_price|
+--------+-------------+---------------+-------------------+--------------------+--------+----------+
|103     |Bob Smith    |Standard       |Monitor Stand      |Furniture           |2       |110.0     |
|107     |Alice Johnson|Premium        |Monitor Stand      |Furniture           |1       |190.0     |
|105     |Bob Smith    |Standard       |Laptop Bag         |Bags & Cases        |1       |150.0     |
|104     |Carol White  |Premium        |Mechanical Keyboard|Computer Accessories|2       |155.0     |
|101     |Alice Johnson|Premium        |Mechanical Keyboard|Computer Accessories|1       |150.0     |
|108     |Eve Davis    |Premium        |Wireless Mouse     |Computer Accessories|10      |50.0      |
|103     |Bob Smith    |Standard       |Wireless M

## Example 1.2: Employee Department Analysis with Self-Joins and Window Functions

Scenario: Analyze employee hierarchy, salaries, and department performance using self-joins.

In [4]:
# Create employee and department data
employees_data = [
    (1, "John CEO", None, 1, 250000, "2020-01-01"),
    (2, "Sarah VP-Eng", 1, 1, 180000, "2020-02-01"),
    (3, "Mike VP-Sales", 1, 2, 175000, "2020-02-15"),
    (4, "Emma Manager", 2, 1, 120000, "2020-03-01"),
    (5, "David Engineer", 4, 1, 95000, "2021-01-15"),
    (6, "Lisa Engineer", 4, 1, 92000, "2021-02-01"),
    (7, "Tom Sales Rep", 3, 2, 75000, "2021-03-01"),
    (8, "Anna Sales Rep", 3, 2, 78000, "2021-04-01"),
    (9, "Chris Engineer", 4, 1, 98000, "2021-05-01"),
    (10, "Nina HR Manager", 1, 3, 110000, "2020-06-01")
]

departments_data = [
    (1, "Engineering", "Building great products"),
    (2, "Sales", "Revenue generation"),
    (3, "Human Resources", "People management")
]

employees_df = spark.createDataFrame(
    employees_data, 
    ["emp_id", "emp_name", "manager_id", "dept_id", "salary", "hire_date"]
)

departments_df = spark.createDataFrame(
    departments_data, 
    ["dept_id", "dept_name", "dept_description"]
)

print("\n=== Employees ===")
employees_df.show()

print("\n=== Departments ===")
departments_df.show(truncate=False)


=== Employees ===
+------+---------------+----------+-------+------+----------+
|emp_id|       emp_name|manager_id|dept_id|salary| hire_date|
+------+---------------+----------+-------+------+----------+
|     1|       John CEO|      NULL|      1|250000|2020-01-01|
|     2|   Sarah VP-Eng|         1|      1|180000|2020-02-01|
|     3|  Mike VP-Sales|         1|      2|175000|2020-02-15|
|     4|   Emma Manager|         2|      1|120000|2020-03-01|
|     5| David Engineer|         4|      1| 95000|2021-01-15|
|     6|  Lisa Engineer|         4|      1| 92000|2021-02-01|
|     7|  Tom Sales Rep|         3|      2| 75000|2021-03-01|
|     8| Anna Sales Rep|         3|      2| 78000|2021-04-01|
|     9| Chris Engineer|         4|      1| 98000|2021-05-01|
|    10|Nina HR Manager|         1|      3|110000|2020-06-01|
+------+---------------+----------+-------+------+----------+


=== Departments ===
+-------+---------------+-----------------------+
|dept_id|dept_name      |dept_description

In [5]:
# Self-join to get employee-manager relationship
emp_with_manager = employees_df.alias("emp") \
    .join(
        employees_df.alias("mgr"),
        col("emp.manager_id") == col("mgr.emp_id"),
        "left"
    ) \
    .select(
        col("emp.emp_id"),
        col("emp.emp_name"),
        col("mgr.emp_name").alias("manager_name"),
        col("emp.dept_id"),
        col("emp.salary")
    )

print("\n=== Employee-Manager Hierarchy ===")
emp_with_manager.show(truncate=False)

# Join with departments and calculate department statistics
dept_analytics = employees_df.join(departments_df, "dept_id") \
    .groupBy("dept_id", "dept_name") \
    .agg(
        count("emp_id").alias("employee_count"),
        sum("salary").alias("total_salary_cost"),
        avg("salary").alias("avg_salary"),
        min("salary").alias("min_salary"),
        max("salary").alias("max_salary")
    ) \
    .orderBy(desc("total_salary_cost"))

print("\n=== Department Analytics ===")
dept_analytics.show(truncate=False)

# Use window functions with grouping for salary ranking within departments
window_spec = Window.partitionBy("dept_id").orderBy(desc("salary"))

salary_rankings = employees_df.join(departments_df, "dept_id") \
    .withColumn("rank_in_dept", rank().over(window_spec)) \
    .withColumn("dense_rank_in_dept", dense_rank().over(window_spec)) \
    .withColumn("salary_percentile", percent_rank().over(window_spec)) \
    .select(
        "dept_name", "emp_name", "salary", 
        "rank_in_dept", "dense_rank_in_dept", 
        round("salary_percentile", 2).alias("salary_percentile")
    ) \
    .orderBy("dept_name", "rank_in_dept")

print("\n=== Salary Rankings Within Departments ===")
salary_rankings.show(truncate=False)


=== Employee-Manager Hierarchy ===
+------+---------------+-------------+-------+------+
|emp_id|emp_name       |manager_name |dept_id|salary|
+------+---------------+-------------+-------+------+
|3     |Mike VP-Sales  |John CEO     |2      |175000|
|4     |Emma Manager   |Sarah VP-Eng |1      |120000|
|10    |Nina HR Manager|John CEO     |3      |110000|
|7     |Tom Sales Rep  |Mike VP-Sales|2      |75000 |
|8     |Anna Sales Rep |Mike VP-Sales|2      |78000 |
|9     |Chris Engineer |Emma Manager |1      |98000 |
|1     |John CEO       |NULL         |1      |250000|
|2     |Sarah VP-Eng   |John CEO     |1      |180000|
|5     |David Engineer |Emma Manager |1      |95000 |
|6     |Lisa Engineer  |Emma Manager |1      |92000 |
+------+---------------+-------------+-------+------+


=== Department Analytics ===
+-------+---------------+--------------+-----------------+------------------+----------+----------+
|dept_id|dept_name      |employee_count|total_salary_cost|avg_salary        |

## Example 1.3: Multi-Table Join with Complex Grouping - Student Course Enrollment

Scenario: Analyze student enrollments, course performance, and instructor effectiveness.

In [6]:
# Create student, course, and enrollment data
students_data = [
    (1, "Alice", "Computer Science", "Junior", 3.8),
    (2, "Bob", "Mathematics", "Senior", 3.5),
    (3, "Charlie", "Computer Science", "Sophomore", 3.2),
    (4, "Diana", "Physics", "Senior", 3.9),
    (5, "Eve", "Mathematics", "Junior", 3.6)
]

courses_data = [
    (101, "Data Structures", "CS", 4, 201),
    (102, "Algorithms", "CS", 4, 201),
    (103, "Calculus II", "MATH", 3, 202),
    (104, "Linear Algebra", "MATH", 3, 202),
    (105, "Quantum Mechanics", "PHYS", 4, 203),
    (106, "Database Systems", "CS", 3, 204)
]

instructors_data = [
    (201, "Dr. Smith", "Computer Science", 15),
    (202, "Dr. Johnson", "Mathematics", 20),
    (203, "Dr. Williams", "Physics", 18),
    (204, "Dr. Brown", "Computer Science", 10)
]

enrollments_data = [
    (1, 101, "Fall 2024", 95, "A"),
    (1, 102, "Fall 2024", 88, "B+"),
    (1, 106, "Fall 2024", 92, "A-"),
    (2, 103, "Fall 2024", 85, "B"),
    (2, 104, "Fall 2024", 90, "A-"),
    (3, 101, "Fall 2024", 78, "C+"),
    (3, 102, "Fall 2024", 82, "B-"),
    (4, 105, "Fall 2024", 96, "A"),
    (4, 103, "Fall 2024", 93, "A"),
    (5, 104, "Fall 2024", 87, "B+"),
    (5, 103, "Fall 2024", 91, "A-")
]

students_df = spark.createDataFrame(
    students_data, 
    ["student_id", "student_name", "major", "year", "gpa"]
)

courses_df = spark.createDataFrame(
    courses_data, 
    ["course_id", "course_name", "department", "credits", "instructor_id"]
)

instructors_df = spark.createDataFrame(
    instructors_data, 
    ["instructor_id", "instructor_name", "department", "years_experience"]
)

enrollments_df = spark.createDataFrame(
    enrollments_data, 
    ["student_id", "course_id", "semester", "score", "grade"]
)

print("\n=== Students ===")
students_df.show()

print("\n=== Courses ===")
courses_df.show(truncate=False)

print("\n=== Enrollments ===")
enrollments_df.show()


=== Students ===
+----------+------------+----------------+---------+---+
|student_id|student_name|           major|     year|gpa|
+----------+------------+----------------+---------+---+
|         1|       Alice|Computer Science|   Junior|3.8|
|         2|         Bob|     Mathematics|   Senior|3.5|
|         3|     Charlie|Computer Science|Sophomore|3.2|
|         4|       Diana|         Physics|   Senior|3.9|
|         5|         Eve|     Mathematics|   Junior|3.6|
+----------+------------+----------------+---------+---+


=== Courses ===
+---------+-----------------+----------+-------+-------------+
|course_id|course_name      |department|credits|instructor_id|
+---------+-----------------+----------+-------+-------------+
|101      |Data Structures  |CS        |4      |201          |
|102      |Algorithms       |CS        |4      |201          |
|103      |Calculus II      |MATH      |3      |202          |
|104      |Linear Algebra   |MATH      |3      |202          |
|105      

In [7]:
# Complex multi-table join with aliasing to avoid ambiguous columns
complete_enrollments = enrollments_df \
    .join(students_df, "student_id") \
    .join(courses_df.alias("c"), "course_id") \
    .join(instructors_df.alias("i"), col("c.instructor_id") == col("i.instructor_id"))

print("\n=== Complete Enrollment Information ===")
complete_enrollments.select(
    "student_name", "major", "course_name", 
    "instructor_name", "score", "grade"
).show(truncate=False)

# Group by student with performance metrics - using course department
student_performance = complete_enrollments.groupBy(
    "student_id", "student_name", "major", "year", "gpa"
) \
    .agg(
        count("course_id").alias("courses_taken"),
        sum("credits").alias("total_credits"),
        avg("score").alias("avg_score"),
        countDistinct(col("c.department")).alias("departments_explored")
    ) \
    .orderBy(desc("avg_score"))

print("\n=== Student Performance Summary ===")
student_performance.show(truncate=False)

# Group by instructor with teaching analytics - using instructor department
instructor_analytics = complete_enrollments.groupBy(
    col("i.instructor_id"), "instructor_name", col("i.department"), "years_experience"
) \
    .agg(
        countDistinct("course_id").alias("courses_taught"),
        count("student_id").alias("total_students"),
        avg("score").alias("avg_student_score"),
        sum(when(col("grade").isin(["A", "A-", "A+"]), 1).otherwise(0)).alias("a_grades_given")
    ) \
    .withColumn("a_grade_percentage", round(col("a_grades_given") / col("total_students") * 100, 2)) \
    .orderBy(desc("avg_student_score"))

print("\n=== Instructor Teaching Analytics ===")
instructor_analytics.show(truncate=False)

# Group by department and major combination - using course department
cross_dept_analysis = complete_enrollments.groupBy("major", col("c.department")) \
    .agg(
        count("student_id").alias("enrollment_count"),
        avg("score").alias("avg_score"),
        sum("credits").alias("total_credits")
    ) \
    .orderBy("major", col("c.department"))

print("\n=== Cross-Department Analysis by Major ===")
cross_dept_analysis.show(truncate=False)


=== Complete Enrollment Information ===
+------------+----------------+-----------------+---------------+-----+-----+
|student_name|major           |course_name      |instructor_name|score|grade|
+------------+----------------+-----------------+---------------+-----+-----+
|Charlie     |Computer Science|Data Structures  |Dr. Smith      |78   |C+   |
|Alice       |Computer Science|Data Structures  |Dr. Smith      |95   |A    |
|Charlie     |Computer Science|Algorithms       |Dr. Smith      |82   |B-   |
|Alice       |Computer Science|Algorithms       |Dr. Smith      |88   |B+   |
|Eve         |Mathematics     |Calculus II      |Dr. Johnson    |91   |A-   |
|Diana       |Physics         |Calculus II      |Dr. Johnson    |93   |A    |
|Bob         |Mathematics     |Calculus II      |Dr. Johnson    |85   |B    |
|Eve         |Mathematics     |Linear Algebra   |Dr. Johnson    |87   |B+   |
|Bob         |Mathematics     |Linear Algebra   |Dr. Johnson    |90   |A-   |
|Diana       |Physics  

---
# Section 2: Multidimensional DataFrames Examples

This section demonstrates three comprehensive examples of working with multidimensional data structures.

## Example 2.1: Sales Data Cube - Multi-Dimensional Analysis

Scenario: Create a sales data cube with dimensions: product, region, time, and channel.

In [8]:
# Create multidimensional sales data
sales_data = [
    ("2024-01", "Laptop", "Electronics", "North", "Online", 150000, 50),
    ("2024-01", "Laptop", "Electronics", "South", "Store", 120000, 40),
    ("2024-01", "Phone", "Electronics", "North", "Online", 200000, 100),
    ("2024-01", "Shirt", "Clothing", "East", "Store", 30000, 150),
    ("2024-02", "Laptop", "Electronics", "North", "Online", 180000, 60),
    ("2024-02", "Phone", "Electronics", "South", "Online", 220000, 110),
    ("2024-02", "Shirt", "Clothing", "East", "Store", 35000, 175),
    ("2024-02", "Pants", "Clothing", "West", "Store", 45000, 90),
    ("2024-03", "Laptop", "Electronics", "North", "Store", 160000, 55),
    ("2024-03", "Phone", "Electronics", "East", "Online", 240000, 120),
    ("2024-03", "Shirt", "Clothing", "South", "Online", 32000, 160),
    ("2024-03", "Pants", "Clothing", "West", "Store", 48000, 96)
]

sales_df = spark.createDataFrame(
    sales_data,
    ["month", "product", "category", "region", "channel", "revenue", "units_sold"]
)

print("\n=== Sales Data ===")
sales_df.show(truncate=False)


=== Sales Data ===
+-------+-------+-----------+------+-------+-------+----------+
|month  |product|category   |region|channel|revenue|units_sold|
+-------+-------+-----------+------+-------+-------+----------+
|2024-01|Laptop |Electronics|North |Online |150000 |50        |
|2024-01|Laptop |Electronics|South |Store  |120000 |40        |
|2024-01|Phone  |Electronics|North |Online |200000 |100       |
|2024-01|Shirt  |Clothing   |East  |Store  |30000  |150       |
|2024-02|Laptop |Electronics|North |Online |180000 |60        |
|2024-02|Phone  |Electronics|South |Online |220000 |110       |
|2024-02|Shirt  |Clothing   |East  |Store  |35000  |175       |
|2024-02|Pants  |Clothing   |West  |Store  |45000  |90        |
|2024-03|Laptop |Electronics|North |Store  |160000 |55        |
|2024-03|Phone  |Electronics|East  |Online |240000 |120       |
|2024-03|Shirt  |Clothing   |South |Online |32000  |160       |
|2024-03|Pants  |Clothing   |West  |Store  |48000  |96        |
+-------+-------+---

In [9]:
# Dimension 1: Time-based analysis with rollup
time_rollup = sales_df.rollup("month", "category") \
    .agg(
        sum("revenue").alias("total_revenue"),
        sum("units_sold").alias("total_units"),
        count("*").alias("transaction_count")
    ) \
    .orderBy("month", "category")

print("\n=== Time-based Rollup Analysis ===")
print("Shows aggregates at: (month, category), (month), and (grand total) levels")
time_rollup.show(truncate=False)

# Dimension 2: Geographic and Channel cube analysis
geo_channel_cube = sales_df.cube("region", "channel", "category") \
    .agg(
        sum("revenue").alias("total_revenue"),
        avg("revenue").alias("avg_revenue"),
        sum("units_sold").alias("total_units")
    ) \
    .orderBy("region", "channel", "category")

print("\n=== Geographic-Channel Cube Analysis ===")
print("Shows all possible combinations of region, channel, and category aggregations")
geo_channel_cube.show(50, truncate=False)

# Dimension 3: Pivot table - Products across regions
product_region_pivot = sales_df.groupBy("product", "category") \
    .pivot("region", ["North", "South", "East", "West"]) \
    .agg(sum("revenue").alias("revenue"))

print("\n=== Product-Region Pivot Table ===")
product_region_pivot.show(truncate=False)

# Dimension 4: Multi-level pivot with time and channel
time_channel_pivot = sales_df.groupBy("month") \
    .pivot("channel", ["Online", "Store"]) \
    .agg(
        sum("revenue").alias("revenue"),
        sum("units_sold").alias("units")
    )

print("\n=== Time-Channel Multi-level Pivot ===")
time_channel_pivot.show(truncate=False)


=== Time-based Rollup Analysis ===
Shows aggregates at: (month, category), (month), and (grand total) levels
+-------+-----------+-------------+-----------+-----------------+
|month  |category   |total_revenue|total_units|transaction_count|
+-------+-----------+-------------+-----------+-----------------+
|NULL   |NULL       |1460000      |1206       |12               |
|2024-01|NULL       |500000       |340        |4                |
|2024-01|Clothing   |30000        |150        |1                |
|2024-01|Electronics|470000       |190        |3                |
|2024-02|NULL       |480000       |435        |4                |
|2024-02|Clothing   |80000        |265        |2                |
|2024-02|Electronics|400000       |170        |2                |
|2024-03|NULL       |480000       |431        |4                |
|2024-03|Clothing   |80000        |256        |2                |
|2024-03|Electronics|400000       |175        |2                |
+-------+-----------+-----------

## Example 2.2: Multi-Dimensional Weather Data Analysis

Scenario: Analyze weather patterns across multiple dimensions: location, time, and weather metrics.

In [10]:
# Create multi-dimensional weather data
weather_data = [
    ("2024-01-01", "New York", "Northeast", 32, 65, 0.1, "Cloudy", "Winter"),
    ("2024-01-01", "Miami", "Southeast", 75, 80, 0.0, "Sunny", "Winter"),
    ("2024-01-01", "Seattle", "Northwest", 45, 85, 0.5, "Rainy", "Winter"),
    ("2024-01-01", "Phoenix", "Southwest", 68, 30, 0.0, "Sunny", "Winter"),
    ("2024-04-01", "New York", "Northeast", 58, 55, 0.2, "Rainy", "Spring"),
    ("2024-04-01", "Miami", "Southeast", 82, 75, 0.0, "Sunny", "Spring"),
    ("2024-04-01", "Seattle", "Northwest", 52, 80, 0.4, "Cloudy", "Spring"),
    ("2024-04-01", "Phoenix", "Southwest", 85, 25, 0.0, "Sunny", "Spring"),
    ("2024-07-01", "New York", "Northeast", 85, 70, 0.0, "Sunny", "Summer"),
    ("2024-07-01", "Miami", "Southeast", 92, 85, 0.1, "Humid", "Summer"),
    ("2024-07-01", "Seattle", "Northwest", 72, 60, 0.1, "Cloudy", "Summer"),
    ("2024-07-01", "Phoenix", "Southwest", 108, 15, 0.0, "Sunny", "Summer"),
    ("2024-10-01", "New York", "Northeast", 62, 60, 0.0, "Sunny", "Fall"),
    ("2024-10-01", "Miami", "Southeast", 80, 78, 0.0, "Sunny", "Fall"),
    ("2024-10-01", "Seattle", "Northwest", 55, 75, 0.3, "Rainy", "Fall"),
    ("2024-10-01", "Phoenix", "Southwest", 88, 28, 0.0, "Sunny", "Fall")
]

weather_df = spark.createDataFrame(
    weather_data,
    ["date", "city", "region", "temperature", "humidity", "precipitation", "condition", "season"]
)

print("\n=== Weather Data ===")
weather_df.show(truncate=False)


=== Weather Data ===
+----------+--------+---------+-----------+--------+-------------+---------+------+
|date      |city    |region   |temperature|humidity|precipitation|condition|season|
+----------+--------+---------+-----------+--------+-------------+---------+------+
|2024-01-01|New York|Northeast|32         |65      |0.1          |Cloudy   |Winter|
|2024-01-01|Miami   |Southeast|75         |80      |0.0          |Sunny    |Winter|
|2024-01-01|Seattle |Northwest|45         |85      |0.5          |Rainy    |Winter|
|2024-01-01|Phoenix |Southwest|68         |30      |0.0          |Sunny    |Winter|
|2024-04-01|New York|Northeast|58         |55      |0.2          |Rainy    |Spring|
|2024-04-01|Miami   |Southeast|82         |75      |0.0          |Sunny    |Spring|
|2024-04-01|Seattle |Northwest|52         |80      |0.4          |Cloudy   |Spring|
|2024-04-01|Phoenix |Southwest|85         |25      |0.0          |Sunny    |Spring|
|2024-07-01|New York|Northeast|85         |70      |0.

In [11]:
# Multi-dimensional analysis 1: Season and Region Cube
season_region_cube = weather_df.cube("season", "region", "condition") \
    .agg(
        avg("temperature").alias("avg_temp"),
        avg("humidity").alias("avg_humidity"),
        sum("precipitation").alias("total_precipitation"),
        count("*").alias("observation_count")
    ) \
    .orderBy("season", "region", "condition")

print("\n=== Season-Region-Condition Cube Analysis ===")
season_region_cube.show(50, truncate=False)

# Multi-dimensional analysis 2: Temperature ranges across dimensions
weather_with_ranges = weather_df.withColumn(
    "temp_range",
    when(col("temperature") < 40, "Cold")
    .when(col("temperature") < 70, "Moderate")
    .when(col("temperature") < 90, "Warm")
    .otherwise("Hot")
)

temp_range_analysis = weather_with_ranges.rollup("season", "region", "temp_range") \
    .agg(
        count("*").alias("days_count"),
        avg("humidity").alias("avg_humidity"),
        sum("precipitation").alias("total_precip")
    ) \
    .orderBy("season", "region", "temp_range")

print("\n=== Temperature Range Multi-dimensional Analysis ===")
temp_range_analysis.show(50, truncate=False)

# Multi-dimensional analysis 3: Pivot tables for different views
city_season_pivot = weather_df.groupBy("city") \
    .pivot("season", ["Winter", "Spring", "Summer", "Fall"]) \
    .agg(
        round(avg("temperature"), 1).alias("avg_temp"),
        round(avg("humidity"), 1).alias("avg_humidity")
    )

print("\n=== City-Season Pivot: Temperature and Humidity ===")
city_season_pivot.show(truncate=False)

# Create a comprehensive pivot with regions and conditions
region_condition_pivot = weather_df.groupBy("region") \
    .pivot("condition") \
    .agg(
        count("*").alias("count"),
        round(avg("temperature"), 1).alias("avg_temp")
    )

print("\n=== Region-Condition Pivot Analysis ===")
region_condition_pivot.show(truncate=False)


=== Season-Region-Condition Cube Analysis ===
+------+---------+---------+------------------+------------------+-------------------+-----------------+
|season|region   |condition|avg_temp          |avg_humidity      |total_precipitation|observation_count|
+------+---------+---------+------------------+------------------+-------------------+-----------------+
|NULL  |NULL     |NULL     |71.1875           |60.375            |1.7000000000000002 |16               |
|NULL  |NULL     |Cloudy   |52.0              |68.33333333333333 |0.6000000000000001 |3                |
|NULL  |NULL     |Humid    |92.0              |85.0              |0.1                |1                |
|NULL  |NULL     |Rainy    |52.666666666666664|71.66666666666667 |1.0                |3                |
|NULL  |NULL     |Sunny    |81.44444444444444 |51.22222222222222 |0.0                |9                |
|NULL  |Northeast|NULL     |59.25             |62.5              |0.30000000000000004|4                |
|NULL  |

## Example 2.3: Financial Portfolio Multi-Dimensional Analysis

Scenario: Analyze investment portfolio across asset types, sectors, time periods, and risk levels.

In [12]:
# Create multi-dimensional financial portfolio data
portfolio_data = [
    ("2024-Q1", "AAPL", "Stock", "Technology", 150000, 12.5, "Medium", "Large Cap"),
    ("2024-Q1", "GOOGL", "Stock", "Technology", 120000, 10.2, "Medium", "Large Cap"),
    ("2024-Q1", "JNJ", "Stock", "Healthcare", 80000, 5.8, "Low", "Large Cap"),
    ("2024-Q1", "BOND-1", "Bond", "Government", 50000, 3.2, "Low", "AAA"),
    ("2024-Q1", "REIT-1", "REIT", "Real Estate", 60000, 8.5, "Medium", "Diversified"),
    ("2024-Q2", "AAPL", "Stock", "Technology", 165000, 15.3, "Medium", "Large Cap"),
    ("2024-Q2", "GOOGL", "Stock", "Technology", 135000, 13.7, "Medium", "Large Cap"),
    ("2024-Q2", "JNJ", "Stock", "Healthcare", 85000, 7.2, "Low", "Large Cap"),
    ("2024-Q2", "BOND-1", "Bond", "Government", 51500, 3.5, "Low", "AAA"),
    ("2024-Q2", "REIT-1", "REIT", "Real Estate", 63000, 9.1, "Medium", "Diversified"),
    ("2024-Q3", "AAPL", "Stock", "Technology", 158000, -2.8, "Medium", "Large Cap"),
    ("2024-Q3", "GOOGL", "Stock", "Technology", 142000, 8.5, "Medium", "Large Cap"),
    ("2024-Q3", "JNJ", "Stock", "Healthcare", 87000, 6.5, "Low", "Large Cap"),
    ("2024-Q3", "BOND-1", "Bond", "Government", 52000, 2.8, "Low", "AAA"),
    ("2024-Q3", "REIT-1", "REIT", "Real Estate", 65000, 7.8, "Medium", "Diversified"),
    ("2024-Q3", "CRYPTO-1", "Crypto", "Digital Assets", 30000, 45.2, "High", "Volatile")
]

portfolio_df = spark.createDataFrame(
    portfolio_data,
    ["quarter", "asset", "asset_type", "sector", "value", "return_pct", "risk_level", "category"]
)

print("\n=== Portfolio Data ===")
portfolio_df.show(truncate=False)


=== Portfolio Data ===
+-------+--------+----------+--------------+------+----------+----------+-----------+
|quarter|asset   |asset_type|sector        |value |return_pct|risk_level|category   |
+-------+--------+----------+--------------+------+----------+----------+-----------+
|2024-Q1|AAPL    |Stock     |Technology    |150000|12.5      |Medium    |Large Cap  |
|2024-Q1|GOOGL   |Stock     |Technology    |120000|10.2      |Medium    |Large Cap  |
|2024-Q1|JNJ     |Stock     |Healthcare    |80000 |5.8       |Low       |Large Cap  |
|2024-Q1|BOND-1  |Bond      |Government    |50000 |3.2       |Low       |AAA        |
|2024-Q1|REIT-1  |REIT      |Real Estate   |60000 |8.5       |Medium    |Diversified|
|2024-Q2|AAPL    |Stock     |Technology    |165000|15.3      |Medium    |Large Cap  |
|2024-Q2|GOOGL   |Stock     |Technology    |135000|13.7      |Medium    |Large Cap  |
|2024-Q2|JNJ     |Stock     |Healthcare    |85000 |7.2       |Low       |Large Cap  |
|2024-Q2|BOND-1  |Bond      |G

In [13]:
# Multi-dimensional analysis 1: Complete Cube across all dimensions
complete_cube = portfolio_df.cube("quarter", "asset_type", "sector", "risk_level") \
    .agg(
        sum("value").alias("total_value"),
        avg("return_pct").alias("avg_return"),
        count("asset").alias("asset_count"),
        stddev("return_pct").alias("return_volatility")
    ) \
    .orderBy("quarter", "asset_type", "sector", "risk_level")

print("\n=== Complete Multi-Dimensional Cube ===")
print("Shows aggregations for all combinations of quarter, asset_type, sector, and risk_level")
complete_cube.show(100, truncate=False)

# Multi-dimensional analysis 2: Hierarchical Rollup
hierarchical_rollup = portfolio_df.rollup("quarter", "asset_type", "sector") \
    .agg(
        sum("value").alias("total_value"),
        round(avg("return_pct"), 2).alias("avg_return"),
        round(stddev("return_pct"), 2).alias("std_dev"),
        count("*").alias("holdings_count")
    ) \
    .orderBy("quarter", "asset_type", "sector")

print("\n=== Hierarchical Rollup: Quarter -> Asset Type -> Sector ===")
hierarchical_rollup.show(50, truncate=False)

# Multi-dimensional analysis 3: Risk-Return Matrix Pivot
risk_return_pivot = portfolio_df.groupBy("risk_level", "asset_type") \
    .pivot("quarter") \
    .agg(
        round(avg("return_pct"), 2).alias("avg_return"),
        sum("value").alias("total_value")
    ) \
    .orderBy("risk_level", "asset_type")

print("\n=== Risk-Return Matrix Across Quarters ===")
risk_return_pivot.show(truncate=False)

# Multi-dimensional analysis 4: Performance classification pivot
performance_classification = portfolio_df.withColumn(
    "performance",
    when(col("return_pct") < 0, "Loss")
    .when(col("return_pct") < 5, "Low Growth")
    .when(col("return_pct") < 15, "Medium Growth")
    .otherwise("High Growth")
)

performance_pivot = performance_classification.groupBy("sector") \
    .pivot("performance", ["Loss", "Low Growth", "Medium Growth", "High Growth"]) \
    .agg(
        count("asset").alias("count"),
        sum("value").alias("total_value")
    )

print("\n=== Performance Classification by Sector ===")
performance_pivot.show(truncate=False)

# Calculate portfolio metrics across multiple dimensions
portfolio_metrics = portfolio_df.groupBy("quarter") \
    .agg(
        sum("value").alias("total_portfolio_value"),
        round(avg("return_pct"), 2).alias("weighted_avg_return"),
        countDistinct("asset_type").alias("asset_type_diversity"),
        countDistinct("sector").alias("sector_diversity"),
        round(stddev("return_pct"), 2).alias("portfolio_volatility")
    ) \
    .orderBy("quarter")

print("\n=== Overall Portfolio Metrics by Quarter ===")
portfolio_metrics.show(truncate=False)


=== Complete Multi-Dimensional Cube ===
Shows aggregations for all combinations of quarter, asset_type, sector, and risk_level
+-------+----------+--------------+----------+-----------+------------------+-----------+------------------+
|quarter|asset_type|sector        |risk_level|total_value|avg_return        |asset_count|return_volatility |
+-------+----------+--------------+----------+-----------+------------------+-----------+------------------+
|NULL   |NULL      |NULL          |NULL      |1493500    |9.8125            |16         |10.451786131247296|
|NULL   |NULL      |NULL          |High      |30000      |45.2              |1          |NULL              |
|NULL   |NULL      |NULL          |Low       |405500     |4.833333333333333 |6          |1.891736415747888 |
|NULL   |NULL      |NULL          |Medium    |1058000    |9.2               |9          |5.199759609828131 |
|NULL   |NULL      |Digital Assets|NULL      |30000      |45.2              |1          |NULL              |


---
# Section 3: Nested Columns Examples

This section demonstrates three comprehensive examples of working with nested and complex data structures.

## Example 3.1: E-Commerce Orders with Nested Structures

Scenario: Work with deeply nested order data including customer info, shipping address, and order items.

In [14]:
# Define explicit schema for nested e-commerce data
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType, ArrayType

# Define the schema explicitly
order_schema = StructType([
    StructField("order_id", IntegerType(), False),
    StructField("order_date", StringType(), False),
    StructField("customer", StructType([
        StructField("customer_id", IntegerType(), False),
        StructField("name", StringType(), False),
        StructField("email", StringType(), False),
        StructField("membership", StructType([
            StructField("tier", StringType(), False),
            StructField("since", StringType(), False),
            StructField("points", IntegerType(), False)
        ]), False)
    ]), False),
    StructField("shipping_address", StructType([
        StructField("street", StringType(), False),
        StructField("city", StringType(), False),
        StructField("state", StringType(), False),
        StructField("zip", StringType(), False),
        StructField("coordinates", StructType([
            StructField("lat", DoubleType(), False),
            StructField("lon", DoubleType(), False)
        ]), False)
    ]), False),
    StructField("items", ArrayType(StructType([
        StructField("product_id", IntegerType(), False),
        StructField("name", StringType(), False),
        StructField("quantity", IntegerType(), False),
        StructField("price", DoubleType(), False),
        StructField("category", StringType(), False)
    ])), False),
    StructField("payment", StructType([
        StructField("method", StringType(), False),
        StructField("last_four", StringType(), True),
        StructField("amount", DoubleType(), False)
    ]), False)
])

# Create nested order data
orders_nested_data = [
    {
        "order_id": 1001,
        "order_date": "2024-01-15",
        "customer": {
            "customer_id": 1,
            "name": "John Doe",
            "email": "john@email.com",
            "membership": {
                "tier": "Gold",
                "since": "2020-01-01",
                "points": 1500
            }
        },
        "shipping_address": {
            "street": "123 Main St",
            "city": "New York",
            "state": "NY",
            "zip": "10001",
            "coordinates": {"lat": 40.7128, "lon": -74.0060}
        },
        "items": [
            {"product_id": 101, "name": "Laptop", "quantity": 1, "price": 1200.00, "category": "Electronics"},
            {"product_id": 102, "name": "Mouse", "quantity": 2, "price": 25.00, "category": "Accessories"}
        ],
        "payment": {
            "method": "Credit Card",
            "last_four": "4242",
            "amount": 1250.00
        }
    },
    {
        "order_id": 1002,
        "order_date": "2024-01-16",
        "customer": {
            "customer_id": 2,
            "name": "Jane Smith",
            "email": "jane@email.com",
            "membership": {
                "tier": "Silver",
                "since": "2021-06-15",
                "points": 750
            }
        },
        "shipping_address": {
            "street": "456 Oak Ave",
            "city": "Los Angeles",
            "state": "CA",
            "zip": "90001",
            "coordinates": {"lat": 34.0522, "lon": -118.2437}
        },
        "items": [
            {"product_id": 201, "name": "Desk Chair", "quantity": 1, "price": 350.00, "category": "Furniture"},
            {"product_id": 202, "name": "Desk Lamp", "quantity": 1, "price": 45.00, "category": "Lighting"},
            {"product_id": 203, "name": "Notebook", "quantity": 5, "price": 3.00, "category": "Stationery"}
        ],
        "payment": {
            "method": "PayPal",
            "last_four": None,
            "amount": 410.00
        }
    },
    {
        "order_id": 1003,
        "order_date": "2024-01-17",
        "customer": {
            "customer_id": 3,
            "name": "Bob Johnson",
            "email": "bob@email.com",
            "membership": {
                "tier": "Platinum",
                "since": "2018-03-20",
                "points": 3200
            }
        },
        "shipping_address": {
            "street": "789 Pine Rd",
            "city": "Chicago",
            "state": "IL",
            "zip": "60601",
            "coordinates": {"lat": 41.8781, "lon": -87.6298}
        },
        "items": [
            {"product_id": 301, "name": "Monitor", "quantity": 2, "price": 400.00, "category": "Electronics"},
            {"product_id": 302, "name": "Keyboard", "quantity": 2, "price": 120.00, "category": "Electronics"},
            {"product_id": 303, "name": "Webcam", "quantity": 1, "price": 85.00, "category": "Electronics"}
        ],
        "payment": {
            "method": "Credit Card",
            "last_four": "8888",
            "amount": 1125.00
        }
    }
]

# Create DataFrame with explicit schema
orders_nested_df = spark.createDataFrame(orders_nested_data, schema=order_schema)

print("\n=== Nested Orders Schema ===")
orders_nested_df.printSchema()

print("\n=== Nested Orders Data ===")
orders_nested_df.show(truncate=False)


=== Nested Orders Schema ===
root
 |-- order_id: integer (nullable = false)
 |-- order_date: string (nullable = false)
 |-- customer: struct (nullable = false)
 |    |-- customer_id: integer (nullable = false)
 |    |-- name: string (nullable = false)
 |    |-- email: string (nullable = false)
 |    |-- membership: struct (nullable = false)
 |    |    |-- tier: string (nullable = false)
 |    |    |-- since: string (nullable = false)
 |    |    |-- points: integer (nullable = false)
 |-- shipping_address: struct (nullable = false)
 |    |-- street: string (nullable = false)
 |    |-- city: string (nullable = false)
 |    |-- state: string (nullable = false)
 |    |-- zip: string (nullable = false)
 |    |-- coordinates: struct (nullable = false)
 |    |    |-- lat: double (nullable = false)
 |    |    |-- lon: double (nullable = false)
 |-- items: array (nullable = false)
 |    |-- element: struct (containsNull = true)
 |    |    |-- product_id: integer (nullable = false)
 |    |    |

In [15]:
# Working with nested columns - Example 1: Extract nested fields
orders_flattened = orders_nested_df.select(
    "order_id",
    "order_date",
    col("customer.customer_id").alias("customer_id"),
    col("customer.name").alias("customer_name"),
    col("customer.membership.tier").alias("membership_tier"),
    col("customer.membership.points").alias("loyalty_points"),
    col("shipping_address.city").alias("city"),
    col("shipping_address.state").alias("state"),
    col("payment.method").alias("payment_method"),
    col("payment.amount").alias("total_amount")
)

print("\n=== Flattened Order Data ===")
orders_flattened.show(truncate=False)

# Working with nested arrays - Example 2: Explode items array
orders_with_items = orders_nested_df.select(
    "order_id",
    "order_date",
    col("customer.name").alias("customer_name"),
    explode("items").alias("item")
)

items_detailed = orders_with_items.select(
    "order_id",
    "customer_name",
    col("item.product_id").alias("product_id"),
    col("item.name").alias("product_name"),
    col("item.category").alias("category"),
    col("item.quantity").alias("quantity"),
    col("item.price").alias("unit_price"),
    (col("item.quantity") * col("item.price")).alias("line_total")
)

print("\n=== Exploded Item Details ===")
items_detailed.show(truncate=False)

# Aggregate operations on nested data - Example 3
customer_summary = items_detailed.groupBy("customer_name") \
    .agg(
        countDistinct("order_id").alias("total_orders"),
        sum("quantity").alias("total_items"),
        sum("line_total").alias("total_spent"),
        countDistinct("category").alias("categories_purchased"),
        collect_list("product_name").alias("all_products")
    ) \
    .orderBy(desc("total_spent"))

print("\n=== Customer Summary with Aggregated Nested Data ===")
customer_summary.show(truncate=False)

# Create new nested structures - Example 4
order_summary_nested = items_detailed.groupBy("order_id", "customer_name") \
    .agg(
        struct(
            sum("quantity").alias("total_items"),
            sum("line_total").alias("total_amount"),
            countDistinct("category").alias("category_count")
        ).alias("order_summary"),
        collect_list(
            struct(
                col("product_name"),
                col("category"),
                col("quantity"),
                col("line_total")
            )
        ).alias("items")
    )

print("\n=== Re-nested Order Summary ===")
order_summary_nested.printSchema()
order_summary_nested.show(truncate=False)


=== Flattened Order Data ===
+--------+----------+-----------+-------------+---------------+--------------+-----------+-----+--------------+------------+
|order_id|order_date|customer_id|customer_name|membership_tier|loyalty_points|city       |state|payment_method|total_amount|
+--------+----------+-----------+-------------+---------------+--------------+-----------+-----+--------------+------------+
|1001    |2024-01-15|1          |John Doe     |Gold           |1500          |New York   |NY   |Credit Card   |1250.0      |
|1002    |2024-01-16|2          |Jane Smith   |Silver         |750           |Los Angeles|CA   |PayPal        |410.0       |
|1003    |2024-01-17|3          |Bob Johnson  |Platinum       |3200          |Chicago    |IL   |Credit Card   |1125.0      |
+--------+----------+-----------+-------------+---------------+--------------+-----------+-----+--------------+------------+


=== Exploded Item Details ===
+--------+-------------+----------+------------+-----------+---

## Example 3.2: Social Media Posts with Complex Nested Structures

Scenario: Analyze social media posts with nested user info, comments, reactions, and hashtags.

In [16]:
# Create nested social media data with explicit schema
from pyspark.sql.types import BooleanType

# Define explicit schema for social media posts
social_media_schema = StructType([
    StructField("post_id", StringType(), False),
    StructField("timestamp", StringType(), False),
    StructField("author", StructType([
        StructField("user_id", StringType(), False),
        StructField("username", StringType(), False),
        StructField("display_name", StringType(), False),
        StructField("followers", IntegerType(), False),
        StructField("verified", BooleanType(), False),
        StructField("profile", StructType([
            StructField("bio", StringType(), False),
            StructField("location", StringType(), False),
            StructField("joined_date", StringType(), False)
        ]), False)
    ]), False),
    StructField("content", StructType([
        StructField("text", StringType(), False),
        StructField("media", ArrayType(StructType([
            StructField("type", StringType(), False),
            StructField("url", StringType(), False),
            StructField("width", IntegerType(), True),
            StructField("height", IntegerType(), True),
            StructField("duration", IntegerType(), True)
        ])), False),
        StructField("hashtags", ArrayType(StringType()), False)
    ]), False),
    StructField("engagement", StructType([
        StructField("likes", IntegerType(), False),
        StructField("shares", IntegerType(), False),
        StructField("views", IntegerType(), False),
        StructField("reactions", ArrayType(StructType([
            StructField("type", StringType(), False),
            StructField("count", IntegerType(), False)
        ])), False)
    ]), False),
    StructField("comments", ArrayType(StructType([
        StructField("comment_id", StringType(), False),
        StructField("user", StructType([
            StructField("user_id", StringType(), False),
            StructField("username", StringType(), False)
        ]), False),
        StructField("text", StringType(), False),
        StructField("likes", IntegerType(), False),
        StructField("replies", ArrayType(StructType([
            StructField("user_id", StringType(), False),
            StructField("text", StringType(), False)
        ])), False)
    ])), False)
])

social_posts_data = [
    {
        "post_id": "P001",
        "timestamp": "2024-01-15 10:30:00",
        "author": {
            "user_id": "U001",
            "username": "tech_guru",
            "display_name": "Tech Guru",
            "followers": 15000,
            "verified": True,
            "profile": {
                "bio": "Technology enthusiast",
                "location": "San Francisco",
                "joined_date": "2020-01-01"
            }
        },
        "content": {
            "text": "Just launched my new app!",
            "media": [
                {"type": "image", "url": "img1.jpg", "width": 1920, "height": 1080, "duration": None},
                {"type": "image", "url": "img2.jpg", "width": 1920, "height": 1080, "duration": None}
            ],
            "hashtags": ["#tech", "#app", "#launch"]
        },
        "engagement": {
            "likes": 250,
            "shares": 45,
            "views": 5000,
            "reactions": [
                {"type": "like", "count": 200},
                {"type": "love", "count": 30},
                {"type": "wow", "count": 20}
            ]
        },
        "comments": [
            {
                "comment_id": "C001",
                "user": {"user_id": "U002", "username": "fan_user"},
                "text": "Looks amazing!",
                "likes": 10,
                "replies": [
                    {"user_id": "U001", "text": "Thank you!"}
                ]
            },
            {
                "comment_id": "C002",
                "user": {"user_id": "U003", "username": "developer"},
                "text": "What tech stack did you use?",
                "likes": 5,
                "replies": []
            }
        ]
    },
    {
        "post_id": "P002",
        "timestamp": "2024-01-16 14:20:00",
        "author": {
            "user_id": "U004",
            "username": "food_blogger",
            "display_name": "Foodie Adventures",
            "followers": 28000,
            "verified": True,
            "profile": {
                "bio": "Exploring culinary delights",
                "location": "New York",
                "joined_date": "2019-05-15"
            }
        },
        "content": {
            "text": "Best pasta in the city!",
            "media": [
                {"type": "image", "url": "food1.jpg", "width": 1080, "height": 1080, "duration": None}
            ],
            "hashtags": ["#food", "#pasta", "#foodie", "#restaurant"]
        },
        "engagement": {
            "likes": 580,
            "shares": 92,
            "views": 12000,
            "reactions": [
                {"type": "like", "count": 400},
                {"type": "love", "count": 150},
                {"type": "yum", "count": 30}
            ]
        },
        "comments": [
            {
                "comment_id": "C003",
                "user": {"user_id": "U005", "username": "food_lover"},
                "text": "Where is this place?",
                "likes": 25,
                "replies": [
                    {"user_id": "U004", "text": "Little Italy, NYC!"}
                ]
            }
        ]
    },
    {
        "post_id": "P003",
        "timestamp": "2024-01-17 09:00:00",
        "author": {
            "user_id": "U006",
            "username": "travel_pro",
            "display_name": "Travel Professional",
            "followers": 42000,
            "verified": True,
            "profile": {
                "bio": "Exploring the world one country at a time",
                "location": "Global",
                "joined_date": "2018-03-20"
            }
        },
        "content": {
            "text": "Sunrise at Santorini is breathtaking!",
            "media": [
                {"type": "image", "url": "santorini1.jpg", "width": 2560, "height": 1440, "duration": None},
                {"type": "image", "url": "santorini2.jpg", "width": 2560, "height": 1440, "duration": None},
                {"type": "video", "url": "santorini.mp4", "width": None, "height": None, "duration": 45}
            ],
            "hashtags": ["#travel", "#santorini", "#greece", "#sunrise", "#wanderlust"]
        },
        "engagement": {
            "likes": 1250,
            "shares": 210,
            "views": 35000,
            "reactions": [
                {"type": "like", "count": 800},
                {"type": "love", "count": 350},
                {"type": "wow", "count": 100}
            ]
        },
        "comments": [
            {
                "comment_id": "C004",
                "user": {"user_id": "U007", "username": "traveler123"},
                "text": "Adding this to my bucket list!",
                "likes": 45,
                "replies": [
                    {"user_id": "U006", "text": "You won't regret it!"}
                ]
            },
            {
                "comment_id": "C005",
                "user": {"user_id": "U008", "username": "photographer"},
                "text": "Great shots! What camera?",
                "likes": 30,
                "replies": []
            },
            {
                "comment_id": "C006",
                "user": {"user_id": "U009", "username": "travel_agent"},
                "text": "I can help you plan trips here!",
                "likes": 8,
                "replies": []
            }
        ]
    }
]

# Create DataFrame with explicit schema
social_posts_df = spark.createDataFrame(social_posts_data, schema=social_media_schema)

print("\n=== Social Media Posts Schema ===")
social_posts_df.printSchema()

print("\n=== Social Media Posts ===")
social_posts_df.show(truncate=False)


=== Social Media Posts Schema ===
root
 |-- post_id: string (nullable = false)
 |-- timestamp: string (nullable = false)
 |-- author: struct (nullable = false)
 |    |-- user_id: string (nullable = false)
 |    |-- username: string (nullable = false)
 |    |-- display_name: string (nullable = false)
 |    |-- followers: integer (nullable = false)
 |    |-- verified: boolean (nullable = false)
 |    |-- profile: struct (nullable = false)
 |    |    |-- bio: string (nullable = false)
 |    |    |-- location: string (nullable = false)
 |    |    |-- joined_date: string (nullable = false)
 |-- content: struct (nullable = false)
 |    |-- text: string (nullable = false)
 |    |-- media: array (nullable = false)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- type: string (nullable = false)
 |    |    |    |-- url: string (nullable = false)
 |    |    |    |-- width: integer (nullable = true)
 |    |    |    |-- height: integer (nullable = true)
 |    |    |    |-- 

In [17]:
# Extract and analyze nested social media data - Part 1: User analytics
author_stats = social_posts_df.select(
    "post_id",
    col("author.username").alias("username"),
    col("author.display_name").alias("display_name"),
    col("author.followers").alias("followers"),
    col("author.verified").alias("verified"),
    col("author.profile.location").alias("location"),
    col("engagement.likes").alias("likes"),
    col("engagement.shares").alias("shares"),
    col("engagement.views").alias("views")
).withColumn(
    "engagement_rate",
    round((col("likes") + col("shares")) / col("views") * 100, 2)
)

print("\n=== Author Statistics ===")
author_stats.show(truncate=False)

# Explode and analyze hashtags - Part 2
hashtag_analysis = social_posts_df.select(
    "post_id",
    col("author.username").alias("username"),
    col("engagement.views").alias("views"),
    explode("content.hashtags").alias("hashtag")
)

hashtag_performance = hashtag_analysis.groupBy("hashtag") \
    .agg(
        count("post_id").alias("usage_count"),
        sum("views").alias("total_views"),
        avg("views").alias("avg_views_per_post"),
        collect_set("username").alias("users")
    ) \
    .orderBy(desc("total_views"))

print("\n=== Hashtag Performance Analysis ===")
hashtag_performance.show(truncate=False)

# Explode and analyze reactions - Part 3
reactions_exploded = social_posts_df.select(
    "post_id",
    col("author.username").alias("username"),
    explode("engagement.reactions").alias("reaction")
).select(
    "post_id",
    "username",
    col("reaction.type").alias("reaction_type"),
    col("reaction.count").alias("reaction_count")
)

print("\n=== Exploded Reactions ===")
reactions_exploded.show(truncate=False)

reaction_summary = reactions_exploded.groupBy("username") \
    .pivot("reaction_type") \
    .sum("reaction_count") \
    .na.fill(0)

print("\n=== Reaction Summary by User (Pivot) ===")
reaction_summary.show(truncate=False)


=== Author Statistics ===
+-------+------------+-------------------+---------+--------+-------------+-----+------+-----+---------------+
|post_id|username    |display_name       |followers|verified|location     |likes|shares|views|engagement_rate|
+-------+------------+-------------------+---------+--------+-------------+-----+------+-----+---------------+
|P001   |tech_guru   |Tech Guru          |15000    |true    |San Francisco|250  |45    |5000 |5.9            |
|P002   |food_blogger|Foodie Adventures  |28000    |true    |New York     |580  |92    |12000|5.6            |
|P003   |travel_pro  |Travel Professional|42000    |true    |Global       |1250 |210   |35000|4.17           |
+-------+------------+-------------------+---------+--------+-------------+-----+------+-----+---------------+


=== Hashtag Performance Analysis ===
+-----------+-----------+-----------+------------------+--------------+
|hashtag    |usage_count|total_views|avg_views_per_post|users         |
+-----------+

In [18]:
# Complex nested analysis - Part 4: Comments and engagement
comments_exploded = social_posts_df.select(
    "post_id",
    col("author.username").alias("post_author"),
    col("engagement.likes").alias("post_likes"),
    explode("comments").alias("comment")
).select(
    "post_id",
    "post_author",
    "post_likes",
    col("comment.comment_id").alias("comment_id"),
    col("comment.user.username").alias("commenter"),
    col("comment.text").alias("comment_text"),
    col("comment.likes").alias("comment_likes"),
    size(col("comment.replies")).alias("reply_count")
)

print("\n=== Exploded Comments with Details ===")
comments_exploded.show(truncate=False)

# Aggregate comment statistics per post
comment_stats = comments_exploded.groupBy("post_id", "post_author") \
    .agg(
        count("comment_id").alias("total_comments"),
        sum("comment_likes").alias("total_comment_likes"),
        sum("reply_count").alias("total_replies"),
        avg("comment_likes").alias("avg_likes_per_comment"),
        collect_list("commenter").alias("commenters")
    )

print("\n=== Comment Statistics per Post ===")
comment_stats.show(truncate=False)

# Media analysis - Part 5
media_exploded = social_posts_df.select(
    "post_id",
    col("author.username").alias("username"),
    col("engagement.views").alias("views"),
    explode("content.media").alias("media")
).select(
    "post_id",
    "username",
    "views",
    col("media.type").alias("media_type"),
    col("media.url").alias("url")
)

media_analysis = media_exploded.groupBy("username", "media_type") \
    .agg(
        count("*").alias("media_count"),
        sum("views").alias("total_views"),
        avg("views").alias("avg_views")
    ) \
    .orderBy("username", "media_type")

print("\n=== Media Type Analysis ===")
media_analysis.show(truncate=False)

# Create comprehensive nested summary
post_summary = social_posts_df.select(
    "post_id",
    col("author.username").alias("username")
).join(
    comment_stats,
    ["post_id"],
    "left"
).withColumn(
    "engagement_summary",
    struct(
        col("total_comments"),
        col("total_comment_likes"),
        col("total_replies"),
        col("commenters")
    )
).select("post_id", "username", "engagement_summary")

print("\n=== Post Summary with Nested Engagement ===")
post_summary.printSchema()
post_summary.show(truncate=False)


=== Exploded Comments with Details ===
+-------+------------+----------+----------+------------+-------------------------------+-------------+-----------+
|post_id|post_author |post_likes|comment_id|commenter   |comment_text                   |comment_likes|reply_count|
+-------+------------+----------+----------+------------+-------------------------------+-------------+-----------+
|P001   |tech_guru   |250       |C001      |fan_user    |Looks amazing!                 |10           |1          |
|P001   |tech_guru   |250       |C002      |developer   |What tech stack did you use?   |5            |0          |
|P002   |food_blogger|580       |C003      |food_lover  |Where is this place?           |25           |1          |
|P003   |travel_pro  |1250      |C004      |traveler123 |Adding this to my bucket list! |45           |1          |
|P003   |travel_pro  |1250      |C005      |photographer|Great shots! What camera?      |30           |0          |
|P003   |travel_pro  |1250      

## Example 3.3: IoT Sensor Data with Complex Nested Structures

Scenario: Process IoT sensor data with nested device info, multiple sensor readings, and event logs.

In [19]:
# Create nested IoT sensor data with explicit schema

# Define explicit schema for IoT data
iot_schema = StructType([
    StructField("device_id", StringType(), False),
    StructField("timestamp", StringType(), False),
    StructField("device_info", StructType([
        StructField("name", StringType(), False),
        StructField("type", StringType(), False),
        StructField("manufacturer", StringType(), False),
        StructField("model", StringType(), False),
        StructField("firmware_version", StringType(), False),
        StructField("location", StructType([
            StructField("building", StringType(), False),
            StructField("floor", IntegerType(), False),
            StructField("room", StringType(), False),
            StructField("coordinates", StructType([
                StructField("x", DoubleType(), False),
                StructField("y", DoubleType(), False)
            ]), False)
        ]), False)
    ]), False),
    StructField("sensors", ArrayType(StructType([
        StructField("sensor_id", StringType(), False),
        StructField("type", StringType(), False),
        StructField("value", DoubleType(), False),
        StructField("unit", StringType(), False),
        StructField("status", StringType(), False)
    ])), False),
    StructField("metrics", StructType([
        StructField("power_consumption", DoubleType(), False),
        StructField("uptime_hours", IntegerType(), False),
        StructField("signal_strength", IntegerType(), False),
        StructField("battery_level", IntegerType(), True)
    ]), False),
    StructField("events", ArrayType(StructType([
        StructField("time", StringType(), False),
        StructField("type", StringType(), False),
        StructField("description", StringType(), False)
    ])), False)
])

iot_data = [
    {
        "device_id": "D001",
        "timestamp": "2024-01-15 10:00:00",
        "device_info": {
            "name": "Smart Thermostat Living Room",
            "type": "thermostat",
            "manufacturer": "SmartHome Inc",
            "model": "TH-2000",
            "firmware_version": "2.1.5",
            "location": {
                "building": "Main Building",
                "floor": 1,
                "room": "Living Room",
                "coordinates": {"x": 10.5, "y": 20.3}
            }
        },
        "sensors": [
            {"sensor_id": "S001", "type": "temperature", "value": 72.5, "unit": "F", "status": "normal"},
            {"sensor_id": "S002", "type": "humidity", "value": 45.2, "unit": "%", "status": "normal"},
            {"sensor_id": "S003", "type": "air_quality", "value": 85.0, "unit": "AQI", "status": "good"}
        ],
        "metrics": {
            "power_consumption": 0.15,
            "uptime_hours": 720,
            "signal_strength": -45,
            "battery_level": None
        },
        "events": [
            {"time": "10:00:00", "type": "reading", "description": "Normal operation"},
            {"time": "10:15:00", "type": "adjustment", "description": "Temperature set to 73F"}
        ]
    },
    {
        "device_id": "D002",
        "timestamp": "2024-01-15 10:00:00",
        "device_info": {
            "name": "Motion Sensor Entry",
            "type": "motion_sensor",
            "manufacturer": "SecureHome",
            "model": "MS-100",
            "firmware_version": "1.8.2",
            "location": {
                "building": "Main Building",
                "floor": 1,
                "room": "Entry Hall",
                "coordinates": {"x": 5.2, "y": 15.8}
            }
        },
        "sensors": [
            {"sensor_id": "S004", "type": "motion", "value": 1.0, "unit": "binary", "status": "detected"},
            {"sensor_id": "S005", "type": "light", "value": 450.0, "unit": "lux", "status": "normal"}
        ],
        "metrics": {
            "power_consumption": 0.05,
            "uptime_hours": 1440,
            "signal_strength": -52,
            "battery_level": 85
        },
        "events": [
            {"time": "09:45:00", "type": "motion_detected", "description": "Movement detected"},
            {"time": "10:00:00", "type": "reading", "description": "Normal operation"},
            {"time": "10:05:00", "type": "motion_detected", "description": "Movement detected"}
        ]
    },
    {
        "device_id": "D003",
        "timestamp": "2024-01-15 10:00:00",
        "device_info": {
            "name": "Smart Meter Kitchen",
            "type": "energy_monitor",
            "manufacturer": "EnergyTech",
            "model": "EM-500",
            "firmware_version": "3.2.1",
            "location": {
                "building": "Main Building",
                "floor": 1,
                "room": "Kitchen",
                "coordinates": {"x": 15.7, "y": 22.1}
            }
        },
        "sensors": [
            {"sensor_id": "S006", "type": "power", "value": 2.45, "unit": "kW", "status": "normal"},
            {"sensor_id": "S007", "type": "voltage", "value": 120.2, "unit": "V", "status": "normal"},
            {"sensor_id": "S008", "type": "current", "value": 20.4, "unit": "A", "status": "normal"},
            {"sensor_id": "S009", "type": "frequency", "value": 60.0, "unit": "Hz", "status": "normal"}
        ],
        "metrics": {
            "power_consumption": 0.02,
            "uptime_hours": 2160,
            "signal_strength": -38,
            "battery_level": None
        },
        "events": [
            {"time": "08:00:00", "type": "high_usage", "description": "Power spike detected"},
            {"time": "10:00:00", "type": "reading", "description": "Normal operation"}
        ]
    },
    {
        "device_id": "D004",
        "timestamp": "2024-01-15 10:00:00",
        "device_info": {
            "name": "Air Quality Monitor Bedroom",
            "type": "air_quality",
            "manufacturer": "AirSense",
            "model": "AQ-300",
            "firmware_version": "2.0.8",
            "location": {
                "building": "Main Building",
                "floor": 2,
                "room": "Master Bedroom",
                "coordinates": {"x": 12.3, "y": 25.6}
            }
        },
        "sensors": [
            {"sensor_id": "S010", "type": "co2", "value": 450.0, "unit": "ppm", "status": "normal"},
            {"sensor_id": "S011", "type": "pm25", "value": 12.0, "unit": "μg/m³", "status": "good"},
            {"sensor_id": "S012", "type": "voc", "value": 0.3, "unit": "mg/m³", "status": "normal"},
            {"sensor_id": "S013", "type": "temperature", "value": 68.5, "unit": "F", "status": "normal"},
            {"sensor_id": "S014", "type": "humidity", "value": 50.1, "unit": "%", "status": "normal"}
        ],
        "metrics": {
            "power_consumption": 0.08,
            "uptime_hours": 1800,
            "signal_strength": -48,
            "battery_level": None
        },
        "events": [
            {"time": "10:00:00", "type": "reading", "description": "All sensors normal"}
        ]
    }
]

# Create DataFrame with explicit schema
iot_df = spark.createDataFrame(iot_data, schema=iot_schema)

print("\n=== IoT Sensor Data Schema ===")
iot_df.printSchema()

print("\n=== IoT Sensor Data ===")
iot_df.show(truncate=False)


=== IoT Sensor Data Schema ===
root
 |-- device_id: string (nullable = false)
 |-- timestamp: string (nullable = false)
 |-- device_info: struct (nullable = false)
 |    |-- name: string (nullable = false)
 |    |-- type: string (nullable = false)
 |    |-- manufacturer: string (nullable = false)
 |    |-- model: string (nullable = false)
 |    |-- firmware_version: string (nullable = false)
 |    |-- location: struct (nullable = false)
 |    |    |-- building: string (nullable = false)
 |    |    |-- floor: integer (nullable = false)
 |    |    |-- room: string (nullable = false)
 |    |    |-- coordinates: struct (nullable = false)
 |    |    |    |-- x: double (nullable = false)
 |    |    |    |-- y: double (nullable = false)
 |-- sensors: array (nullable = false)
 |    |-- element: struct (containsNull = true)
 |    |    |-- sensor_id: string (nullable = false)
 |    |    |-- type: string (nullable = false)
 |    |    |-- value: double (nullable = false)
 |    |    |-- unit: stri

In [20]:
# Extract and analyze nested IoT data - Part 1: Device overview
device_overview = iot_df.select(
    "device_id",
    "timestamp",
    col("device_info.name").alias("device_name"),
    col("device_info.type").alias("device_type"),
    col("device_info.manufacturer").alias("manufacturer"),
    col("device_info.location.floor").alias("floor"),
    col("device_info.location.room").alias("room"),
    col("metrics.power_consumption").alias("power_kw"),
    col("metrics.uptime_hours").alias("uptime_hours"),
    col("metrics.signal_strength").alias("signal_dbm"),
    size("sensors").alias("sensor_count"),
    size("events").alias("event_count")
)

print("\n=== Device Overview ===")
device_overview.show(truncate=False)

# Explode sensors and analyze readings - Part 2
sensor_readings = iot_df.select(
    "device_id",
    col("device_info.type").alias("device_type"),
    col("device_info.location.room").alias("room"),
    explode("sensors").alias("sensor")
).select(
    "device_id",
    "device_type",
    "room",
    col("sensor.sensor_id").alias("sensor_id"),
    col("sensor.type").alias("sensor_type"),
    col("sensor.value").alias("value"),
    col("sensor.unit").alias("unit"),
    col("sensor.status").alias("status")
)

print("\n=== Exploded Sensor Readings ===")
sensor_readings.show(truncate=False)

# Aggregate sensor statistics - Part 3
sensor_stats = sensor_readings.groupBy("sensor_type", "unit") \
    .agg(
        count("*").alias("reading_count"),
        round(avg("value"), 2).alias("avg_value"),
        round(min("value"), 2).alias("min_value"),
        round(max("value"), 2).alias("max_value"),
        collect_set("device_id").alias("devices"),
        collect_set("status").alias("statuses")
    ) \
    .orderBy("sensor_type")

print("\n=== Sensor Statistics by Type ===")
sensor_stats.show(truncate=False)

# Room-level aggregation with nested sensors - Part 4
room_analysis = sensor_readings.groupBy("room") \
    .agg(
        countDistinct("device_id").alias("device_count"),
        countDistinct("sensor_type").alias("sensor_type_count"),
        count("*").alias("total_readings"),
        collect_list(
            struct(
                col("sensor_type"),
                col("value"),
                col("unit")
            )
        ).alias("all_readings")
    ) \
    .orderBy("room")

print("\n=== Room-Level Analysis with Nested Readings ===")
room_analysis.printSchema()
room_analysis.show(truncate=False)


=== Device Overview ===
+---------+-------------------+----------------------------+--------------+-------------+-----+--------------+--------+------------+----------+------------+-----------+
|device_id|timestamp          |device_name                 |device_type   |manufacturer |floor|room          |power_kw|uptime_hours|signal_dbm|sensor_count|event_count|
+---------+-------------------+----------------------------+--------------+-------------+-----+--------------+--------+------------+----------+------------+-----------+
|D001     |2024-01-15 10:00:00|Smart Thermostat Living Room|thermostat    |SmartHome Inc|1    |Living Room   |0.15    |720         |-45       |3           |2          |
|D002     |2024-01-15 10:00:00|Motion Sensor Entry         |motion_sensor |SecureHome   |1    |Entry Hall    |0.05    |1440        |-52       |2           |3          |
|D003     |2024-01-15 10:00:00|Smart Meter Kitchen         |energy_monitor|EnergyTech   |1    |Kitchen       |0.02    |2160       

In [21]:
# Event analysis - Part 5
events_exploded = iot_df.select(
    "device_id",
    col("device_info.name").alias("device_name"),
    col("device_info.type").alias("device_type"),
    "timestamp",
    explode("events").alias("event")
).select(
    "device_id",
    "device_name",
    "device_type",
    col("event.time").alias("event_time"),
    col("event.type").alias("event_type"),
    col("event.description").alias("description")
)

print("\n=== Exploded Events ===")
events_exploded.show(truncate=False)

# Event type analysis
event_summary = events_exploded.groupBy("device_type", "event_type") \
    .agg(
        count("*").alias("event_count"),
        countDistinct("device_id").alias("affected_devices"),
        collect_list("description").alias("descriptions")
    ) \
    .orderBy("device_type", desc("event_count"))

print("\n=== Event Summary by Device and Event Type ===")
event_summary.show(truncate=False)

# Create comprehensive device health report with nested structures - Part 6
device_health = iot_df.select(
    "device_id",
    col("device_info.name").alias("name"),
    col("device_info.type").alias("type")
)

# Join with sensor stats per device
device_sensor_stats = sensor_readings.groupBy("device_id") \
    .agg(
        count("*").alias("sensor_count"),
        collect_list(
            struct(
                col("sensor_type"),
                col("value"),
                col("status")
            )
        ).alias("sensor_data")
    )

# Join with event stats per device
device_event_stats = events_exploded.groupBy("device_id") \
    .agg(
        count("*").alias("event_count"),
        collect_list(
            struct(
                col("event_type"),
                col("description")
            )
        ).alias("recent_events")
    )

# Create final nested health report
health_report = device_health \
    .join(device_sensor_stats, "device_id", "left") \
    .join(device_event_stats, "device_id", "left") \
    .withColumn(
        "health_report",
        struct(
            col("sensor_count"),
            col("sensor_data"),
            col("event_count"),
            col("recent_events")
        )
    ) \
    .select("device_id", "name", "type", "health_report")

print("\n=== Comprehensive Device Health Report ===")
health_report.printSchema()
health_report.show(truncate=False)

# Temperature analysis with pivot
temp_sensors = sensor_readings.filter(col("sensor_type") == "temperature")
temp_pivot = temp_sensors.groupBy("device_type") \
    .pivot("room") \
    .agg(round(avg("value"), 1))

print("\n=== Temperature Analysis by Device Type and Room ===")
temp_pivot.show(truncate=False)


=== Exploded Events ===
+---------+----------------------------+--------------+----------+---------------+----------------------+
|device_id|device_name                 |device_type   |event_time|event_type     |description           |
+---------+----------------------------+--------------+----------+---------------+----------------------+
|D001     |Smart Thermostat Living Room|thermostat    |10:00:00  |reading        |Normal operation      |
|D001     |Smart Thermostat Living Room|thermostat    |10:15:00  |adjustment     |Temperature set to 73F|
|D002     |Motion Sensor Entry         |motion_sensor |09:45:00  |motion_detected|Movement detected     |
|D002     |Motion Sensor Entry         |motion_sensor |10:00:00  |reading        |Normal operation      |
|D002     |Motion Sensor Entry         |motion_sensor |10:05:00  |motion_detected|Movement detected     |
|D003     |Smart Meter Kitchen         |energy_monitor|08:00:00  |high_usage     |Power spike detected  |
|D003     |Smart Mete

---
# Cleanup and Summary

Stop the Spark session when done.

In [22]:
# Display final summary
print("\n" + "="*80)
print("SUMMARY OF EXAMPLES")
print("="*80)
print("\nSection 1: Joining & Grouping")
print("  - Example 1.1: E-Commerce multi-table joins with aggregations")
print("  - Example 1.2: Employee hierarchy with self-joins and window functions")
print("  - Example 1.3: Student enrollment with complex grouping")
print("\nSection 2: Multidimensional DataFrames")
print("  - Example 2.1: Sales data cube with rollup and pivot")
print("  - Example 2.2: Weather data multi-dimensional analysis")
print("  - Example 2.3: Financial portfolio with complete cube operations")
print("\nSection 3: Nested Columns")
print("  - Example 3.1: E-Commerce orders with deep nesting and array operations")
print("  - Example 3.2: Social media posts with complex nested structures")
print("  - Example 3.3: IoT sensor data with nested device info and events")
print("\n" + "="*80)
print(f"\nSpark Application UI: http://localhost:4040")
print(f"Spark Master UI: http://localhost:8080")
print("="*80)

# Uncomment to stop the Spark session
# spark.stop()
# print("\nSpark session stopped.")


SUMMARY OF EXAMPLES

Section 1: Joining & Grouping
  - Example 1.1: E-Commerce multi-table joins with aggregations
  - Example 1.2: Employee hierarchy with self-joins and window functions
  - Example 1.3: Student enrollment with complex grouping

Section 2: Multidimensional DataFrames
  - Example 2.1: Sales data cube with rollup and pivot
  - Example 2.2: Weather data multi-dimensional analysis
  - Example 2.3: Financial portfolio with complete cube operations

Section 3: Nested Columns
  - Example 3.1: E-Commerce orders with deep nesting and array operations
  - Example 3.2: Social media posts with complex nested structures
  - Example 3.3: IoT sensor data with nested device info and events


Spark Application UI: http://localhost:4040
Spark Master UI: http://localhost:8080
