<a href="https://colab.research.google.com/github/nandini-gangrade/Data-Engineering-Hexaware/blob/Project/Week3/PySpark.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

#  Week 3: Real-Time Data Processing with Apache Spark and PySpark

In [None]:
# Step 1
# Set Up Spark
!pip install pyspark

In [22]:
# Import Necessary Libraries

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType, FloatType, TimestampType
from pyspark.sql.functions import col, sum, count
import pandas as pd
import random
import os
import time
from datetime import datetime

In [16]:
# Step 2
# Create Spark session
spark = SparkSession.builder \
    .appName("EcommerceRealTime") \
    .getOrCreate()

In [48]:
# Step 3: Create a directory for streaming data
streaming_data_path = os.makedirs('/content/streaming_data', exist_ok=True)

# Step 4: Define schema for order data
schema = StructType([
    StructField("order_id", IntegerType(), True),
    StructField("product_id", IntegerType(), True),
    StructField("customer_id", IntegerType(), True),
    StructField("quantity", IntegerType(), True),
    StructField("order_amount", FloatType(), True),
    StructField("order_date", TimestampType(), True)
])

# Step 5: Simulate initial streaming data with new values
initial_order_data = [
    (2001, 1, 3001, 2, 149.99, datetime.now()),  # Order 1
    (2002, 2, 3002, 3, 199.99, datetime.now()),  # Order 2
    (2003, 3, 3003, 1, 299.99, datetime.now()),  # Order 3
    (2004, 4, 3001, 4, 399.99, datetime.now()),  # Order 4
    (2005, 5, 3002, 2, 599.99, datetime.now()),  # Order 5
]

# Create a DataFrame from the list of initial orders
df_orders = spark.createDataFrame(initial_order_data, schema)

# Show the initial data
print("Initial Orders:")
df_orders.show()

# Step 6: Process Data Using PySpark
# Group the data by product_id and calculate the total sales (sum of order_amount) per product
product_sales = df_orders.groupBy("product_id").agg(
    sum("order_amount").alias("total_sales"),
    count("order_id").alias("order_count")
)

# Show the result
print("Initial Product Sales:")
product_sales.show()

# Step 7: Real-Time Streaming Simulation
# Simulating appending new data in real-time with updated values
new_order_data = [
    (1008, 2, 3004, 2, 249.99, datetime.now()),  # New Order 1
    (1009, 4, 3001, 3, 359.95, datetime.now()),  # New Order 2
    (1010, 5, 3002, 1, 599.99, datetime.now()),  # New Order 3
    (1011, 1, 3003, 4, 149.99, datetime.now()),  # New Order 4
    (1012, 3, 3004, 2, 389.50, datetime.now()),  # New Order 5
]

# Create a new DataFrame for the new batch of orders with the correct timestamp format
new_df_orders = spark.createDataFrame(new_order_data, schema)

# Append the new data to the original DataFrame
df_orders = df_orders.union(new_df_orders)

# Perform the same aggregation again with the updated data
updated_product_sales = df_orders.groupBy("product_id").agg(
    sum("order_amount").alias("total_sales"),
    count("order_id").alias("order_count")
)

# Show the updated result
print("Updated Product Sales After New Orders:")
updated_product_sales.show()

# Step 8: Write the results to a CSV file
updated_product_sales.write.csv("/content/product_sales.csv", header=True, mode='overwrite')

Initial Orders:
+--------+----------+-----------+--------+------------+--------------------+
|order_id|product_id|customer_id|quantity|order_amount|          order_date|
+--------+----------+-----------+--------+------------+--------------------+
|    2001|         1|       3001|       2|      149.99|2024-09-27 09:45:...|
|    2002|         2|       3002|       3|      199.99|2024-09-27 09:45:...|
|    2003|         3|       3003|       1|      299.99|2024-09-27 09:45:...|
|    2004|         4|       3001|       4|      399.99|2024-09-27 09:45:...|
|    2005|         5|       3002|       2|      599.99|2024-09-27 09:45:...|
+--------+----------+-----------+--------+------------+--------------------+

Initial Product Sales:
+----------+------------------+-----------+
|product_id|       total_sales|order_count|
+----------+------------------+-----------+
|         1|149.99000549316406|          1|
|         2|199.99000549316406|          1|
|         3|  299.989990234375|          1|
|  