In [None]:
# Step 1: Data Ingestion - Load Large Datasets
from pyspark.sql import SparkSession

# Initialize SparkSession
spark = SparkSession.builder.appName("SparkPro").getOrCreate()

# Load large transactions data
transactions_df = spark.read.csv("large_transactions.csv")

# Load large inventory data
inventory_df = spark.read.json("large_inventory.json")

# Load large customer feedback data
feedback_df = spark.read.csv("large_customer_feedback.csv")

# Display a sample of each dataset
transactions_df.show(5)
inventory_df.show(5)
feedback_df.show(5)

In [None]:
# Step 2: Data Cleaning and Transformation with RDDs
import hashlib

# Convert transactions DataFrame to RDD
transactions_rdd = transactions_df.rdd

# Filter out corrupted records (e.g., missing transaction_id or amount)
cleaned_rdd = transactions_rdd.filter(lambda x: all(field is not None for field in x))


# Write Function to Anonymize user IDs using Hashing
def anonymize(record):
    # Assuming user_id is the second field (index 1 in zero-indexed array)
    hashed_id = hashlib.sha256(record[1].encode('utf-8')).hexdigest()
    # Recreate the record with anonymized user_id, leaving the rest as is
    return (record[0], hashed_id) + tuple(record[2:])

anonymized_rdd = cleaned_rdd.map(anonymize)

# Convert back to DataFrame
original_columns = transactions_df.columns
cleaned_transactions_df = anonymized_rdd.toDF(original_columns)

# Display cleaned and anonymized data
cleaned_transactions_df.show(5)

In [None]:
# Step 3: DataFrame Operations for Cleaning and Transformation
from pyspark.sql.functions import col, lower, trim

# Clean inventory data by handling missing values and normalizing text
cleaned_inventory_df = inventory_df.dropna(subset=["stock_level"]) \
                                  .withColumn("product_name", lower(trim(col("product_name"))))

# Display cleaned inventory data
cleaned_inventory_df.show(5)

# Perform a join operation to combine data
joined_df = cleaned_transactions_df.join(
    cleaned_inventory_df, cleaned_inventory_df.product_id == cleaned_transactions_df.product_id, "inner")

# Display joined DataFrame
joined_df.show(5)

In [None]:
# Step 4: Spark SQL Queries
from audioop import avg
from pyspark.sql.functions import date_format
# Create temporary views for SQL queries
cleaned_transactions_df.createOrReplaceTempView("transactions")
cleaned_inventory_df.createOrReplaceTempView("inventory")
joined_df.createOrReplaceTempView("joined_data")

# Query: Top 10 most purchased products in the last month
top_products_df = spark.sql("""
    WITH recent_month AS (
        SELECT MAX(date_format(transaction_date, 'yyyy-MM')) AS max_month
        FROM joined_data
    )
    SELECT product_name, SUM(quantity) AS total_qty
    FROM joined_data
    JOIN recent_month
    ON date_format(joined_data.transaction_date, 'yyyy-MM') = recent_month.max_month
    GROUP BY product_name
    ORDER BY total_qty DESC
    LIMIT 10
""")

top_products_df.show()

# Query: Monthly revenue trends
monthly_revenue_df = spark.sql("""
    SELECT 
        DATE_FORMAT(transaction_date, 'yyyy-MM') AS month,
        SUM(amount) AS total_revenue
    FROM 
        joined_data
    GROUP BY 
        DATE_FORMAT(transaction_date, 'yyyy-MM')
    ORDER BY 
        month
""")

monthly_revenue_df.show()

# Query: Inventory turnover rates
turnover_rate_df = spark.sql("""
    SELECT 
        i.product_name,
        SUM(j.quantity) / AVG(i.stock_level) AS turnover_rate
    FROM 
        joined_data j
    JOIN 
        inventory i
    ON 
        j.product_name = i.product_name
    GROUP BY 
        i.product_name
    ORDER BY 
        turnover_rate DESC
""")
turnover_rate_df.show()

In [None]:
# Step 5: Real-Time Processing (Optional)
from pyspark.sql.functions import window, countDistinct

# For demonstration, create a streaming DataFrame from a sample batch dataset
streaming_transactions_df = spark.readStream.schema(transactions_df.schema) \
                                           .csv("streaming_transactions_folder")  # Point to a folder with incoming data

# Compute real-time metrics (e.g., active users per minute)
active_users = 

# Display active users in real-time (Note: This will print continuously if run with actual streaming data)
query = 

query.awaitTermination()  # Keep the stream running (can be stopped manually)


In [None]:
# Step 6: Performance Optimization Techniques
# Caching DataFrames to optimize performance for multiple transformations
cleaned_transactions_df.cache()
cleaned_inventory_df.cache()
joined_df.cache()

# Repartition DataFrames for optimal join performance
transactions_df_repartitioned = cleaned_transactions_df.repartition("product_id")
inventory_df_repartitioned = cleaned_inventory_df.repartition("product_id")

# Use Broadcast Join for small DataFrames (if applicable)
joined_df_optimized = transactions_df_repartitioned.join(
    cleaned_inventory_df, transactions_df_repartitioned.product_id == cleaned_inventory_df.product_id)

# Display the optimized joined DataFrame
joined_df_optimized.show(5)


In [None]:
#Dashboards for each of them
import matplotlib.pyplot as plt # type: ignore
import seaborn as sns # type: ignore

# top_products_df
top_products_pd = top_products_df.toPandas()

# Bar plot for top products
plt.figure(figsize=(10,6))
sns.barplot(x='product_name', y='total_qty', data=top_products_pd)
plt.title('Top 10 Most Purchased Products')
plt.xticks(rotation=45)
plt.show()


#monthly_revenue_df
monthly_revenue_pd = monthly_revenue_df.toPandas()

# Line plot for monthly revenue
plt.figure(figsize=(10,6))
sns.lineplot(x='month', y='total_revenue', data=monthly_revenue_pd, marker="o")
plt.title('Monthly Revenue Trend')
plt.xticks(rotation=45)
plt.show()

#turnover_rate_df
turnover_rate_pd = turnover_rate_df.toPandas()

# Heatmap for turnover rates (ensure the data is reshaped correctly)
plt.figure(figsize=(10,8))
heatmap_data = turnover_rate_pd.pivot("product_name", "turnover_rate")
sns.heatmap(heatmap_data, annot=True, cmap="YlGnBu")
plt.title('Inventory Turnover Heatmap')
plt.show()


In [None]:
# Step 7: Store the Transformed Data
# Store the cleaned and transformed data in Parquet format
cleaned_transactions_df.write.parquet("cleaned_transactions.parquet")
cleaned_inventory_df.write.parquet("cleaned_inventory.parquet")
joined_df.write.parquet("joined_df.parquet")