In [2]:

from pyspark.sql import SparkSession
import os

# 1. Setup Environment
os.environ["HADOOP_USER_NAME"] = "root"

# 2. Initialize Spark with Docker Network Fixes
spark = SparkSession.builder \
    .appName("Student_Lab") \
    .master("spark://spark-master:7077") \
    .config("spark.driver.host", "bigdata-jupyter") \
    .config("spark.driver.bindAddress", "0.0.0.0") \
    .config("spark.driver.port", "20002") \
    .config("spark.blockManager.port", "20003") \
    .config("spark.executor.memory", "512m") \
    .config("spark.hadoop.dfs.client.use.datanode.hostname", "true") \
    .getOrCreate()

print("Spark Connected!")

Spark Connected!


Step 1: Ingest (Simulate Data Upload)
In a real scenario, you would upload a file via the Jupyter interface. For this lab, we will generate a realistic CSV file directly in the notebook to ensure consistent results.


In [4]:
import pandas as pd
import numpy as np
import os

# 1. Define the dataset
data = {
    'transaction_id': range(1, 1001),
    'store_location': np.random.choice(['New York', 'London', 'Tokyo', 'Paris'], 1000),
    'product_category': np.random.choice(['Electronics', 'Clothing', 'Home', 'Books'], 1000),
    'quantity': np.random.randint(1, 10, 1000),
    'unit_price': np.random.uniform(10.0, 500.0, 1000)
}

# 2. Create Pandas DataFrame
pdf = pd.DataFrame(data)

# 3. Save as CSV to the local container disk (simulating an uploaded file)
csv_path = "sales_data_raw.csv"
pdf.to_csv(csv_path, index=False)

print(f"✅ Data Ingested: Created {csv_path} with {len(pdf)} rows.")
print(pdf.head(3))

✅ Data Ingested: Created sales_data_raw.csv with 1000 rows.
   transaction_id store_location product_category  quantity  unit_price
0               1       New York             Home         5  201.140244
1               2          Paris             Home         3  358.504293
2               3         London         Clothing         9  420.131376


Step 2: Process (Pandas to Spark)
Now we act as a Data Engineer. We initialize the Spark engine and convert our raw "Small Data" (Pandas) into distributed "Big Data" (Spark DataFrame).

In [10]:
from pyspark.sql import SparkSession
import os

# 1. Setup Environment (Critical for Docker)
os.environ["HADOOP_USER_NAME"] = "root"

print("Initializing Spark Engine...")

# 2. Configure Spark Session
# THESE LINES MUST USE THE SAME NUMBER OF SPACES:
spark = SparkSession.builder \
    .appName("Lab1_Data_Processing") \
    .master("spark://spark-master:7077") \
    .config("spark.driver.host", "bigdata-jupyter") \
    .config("spark.driver.bindAddress", "0.0.0.0") \
    .config("spark.driver.port", "20002") \
    .config("spark.blockManager.port", "20003") \
    .config("spark.executor.memory", "512m") \
    .config("spark.hadoop.dfs.client.use.datanode.hostname", "true") \
    .getOrCreate()

print("✅ Spark Connected!")

# 3. Convert Pandas DF to Spark DF
# This distributes the data across the cluster nodes
df_spark = spark.createDataFrame(pdf)

print("Schema inferred by Spark:")
df_spark.printSchema()

Initializing Spark Engine...
✅ Spark Connected!


  for column, series in pdf.iteritems():
  for column, series in pdf.iteritems():


Schema inferred by Spark:
root
 |-- transaction_id: long (nullable = true)
 |-- store_location: string (nullable = true)
 |-- product_category: string (nullable = true)
 |-- quantity: long (nullable = true)
 |-- unit_price: double (nullable = true)



Step 3: Store (Write to Data Lake/HDFS)
We will now save this processed data into our Data Lake (HDFS). We use Parquet format because it is column-oriented, compressed, and much faster for analytics than CSV.

In [11]:
# 1. Define HDFS Path
# We write to the namenode container's storage
hdfs_path = "hdfs://namenode:9000/user/data/sales_parquet"

print(f"Writing data to Data Lake: {hdfs_path} ...")

# 2. Write to HDFS
# mode("overwrite") replaces the file if you run this cell again
df_spark.write.mode("overwrite").parquet(hdfs_path)

print("✅ Storage Complete! Data is now safe in HDFS.")

Writing data to Data Lake: hdfs://namenode:9000/user/data/sales_parquet ...
✅ Storage Complete! Data is now safe in HDFS.


Step 4: Analyze (Read & SQL)
Now we act as a Data Analyst. We will read the optimized Parquet file from the Data Lake and run SQL queries to derive insights.

In [12]:
from pyspark.sql.functions import col, sum, avg, round

print("Reading from Data Lake...")

# 1. Read Parquet from HDFS
df_analytics = spark.read.parquet(hdfs_path)

# 2. Register as a Temporary SQL View
df_analytics.createOrReplaceTempView("sales")

print("--- SQL ANALYSIS: Total Revenue by City ---")

# 3. Run SQL Query
# Calculate Revenue (Quantity * Price) and Group by City
result = spark.sql("""
    SELECT 
        store_location, 
        ROUND(SUM(quantity * unit_price), 2) as total_revenue,
        AVG(quantity) as avg_items_per_sale
    FROM sales
    GROUP BY store_location
    ORDER BY total_revenue DESC
""")

result.show()

# 4. Stop the Engine
spark.stop()

Reading from Data Lake...
--- SQL ANALYSIS: Total Revenue by City ---
+--------------+-------------+------------------+
|store_location|total_revenue|avg_items_per_sale|
+--------------+-------------+------------------+
|         Paris|     337168.4| 5.483606557377049|
|         Tokyo|    332754.79| 4.681818181818182|
|      New York|     313012.5|  5.08298755186722|
|        London|    306234.04| 4.948207171314741|
+--------------+-------------+------------------+

