<a href="https://colab.research.google.com/github/xaknight/data-science-engineering/blob/main/pyspark.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import os
import psutil

# Function to get CPU information
def get_cpu_info():
    cpu_count = os.cpu_count()
    print(f"Number of CPUs: {cpu_count}")

# Function to get RAM information
def get_ram_info():
    ram = psutil.virtual_memory()
    total_ram = ram.total / (1024 ** 3)  # Convert bytes to GB
    available_ram = ram.available / (1024 ** 3)  # Convert bytes to GB
    print(f"Total RAM: {total_ram:.2f} GB")
    print(f"Available RAM: {available_ram:.2f} GB")

# Function to get disk information
def get_disk_info():
    disk = psutil.disk_usage('/')
    total_disk = disk.total / (1024 ** 3)  # Convert bytes to GB
    free_disk = disk.free / (1024 ** 3)  # Convert bytes to GB
    print(f"Total Disk Space: {total_disk:.2f} GB")
    print(f"Free Disk Space: {free_disk:.2f} GB")

# Run the functions
print("System Information in Google Colab:")
print("----------------------------------")
get_cpu_info()
get_ram_info()
get_disk_info()

System Information in Google Colab:
----------------------------------
Number of CPUs: 2
Total RAM: 12.67 GB
Available RAM: 10.96 GB
Total Disk Space: 107.72 GB
Free Disk Space: 72.69 GB


In [None]:
!pip install pyspark



In [None]:
from pyspark.sql import SparkSession
spark = SparkSession.builder \
    .appName("EcommerceAnalysis") \
    .master("local[*]") \
    .getOrCreate()

In [None]:
from pyspark.sql.functions import rand, expr, col
import random

# Generate synthetic data
data = spark.range(100000).select(
    expr("id as transaction_id"),
    (rand() * 10000).cast("int").alias("customer_id"),
    expr("array('Electronics', 'Clothing', 'Books', 'Home')[int(rand() * 4)]").alias("category"),
    (rand() * 1000).cast("float").alias("amount"),
    expr("array('Mumbai', 'Delhi', 'Bangalore', 'Pune')[int(rand() * 4)]").alias("region"),
    (rand() * 0.2 - 0.1).alias("discount"),  # Some negative, some positive
    expr("if(rand() > 0.9, null, amount * (1 - discount))").alias("final_amount")  # 10% nulls
)

# Add duplicates and outliers for realism
data_with_noise = data.union(data.sample(0.05))  # 5% duplicates
data_with_noise.write.mode("overwrite").parquet("transactions.parquet")  # Save to disk
df = spark.read.parquet("transactions.parquet")
df.show(10)

+--------------+-----------+-----------+---------+---------+--------------------+------------------+
|transaction_id|customer_id|   category|   amount|   region|            discount|      final_amount|
+--------------+-----------+-----------+---------+---------+--------------------+------------------+
|         50000|       1644|Electronics| 736.2237|Bangalore|0.018013626419986495|  722.961635265142|
|         50001|       5195|      Books| 593.2392|    Delhi|-0.04274419532888...| 618.5967288811426|
|         50002|       4625|Electronics| 282.7866|     Pune| 0.01714273140612163| 277.9388560086717|
|         50003|       6943|       Home|216.02968|     Pune|-0.03806353523521...| 224.2525316182528|
|         50004|       2896|      Books|273.79163|    Delhi| 0.06022357157652536| 257.3029163925097|
|         50005|       6640|      Books|827.41724|   Mumbai|-0.05611531561395...| 873.8480156891038|
|         50006|       1132|Electronics|  592.954|Bangalore|-0.04745861913578...| 621.09475

In [None]:
from pyspark.sql.functions import when, mean, col

# Remove duplicates
df_cleaned = df.dropDuplicates(["transaction_id", "customer_id", "amount"])

# Fill null final_amount with mean per category
mean_final = df_cleaned.groupBy("category").agg(mean("final_amount").alias("mean_final"))
df_filled = df_cleaned.join(mean_final, "category", "left") \
    .withColumn("final_amount", when(col("final_amount").isNull(), col("mean_final")).otherwise(col("final_amount"))) \
    .drop("mean_final")

# Filter outliers (amount > 5000)
df_final = df_filled.filter(col("amount") <= 5000)
df_final.show(10)

+-----------+--------------+-----------+---------+---------+--------------------+------------------+
|   category|transaction_id|customer_id|   amount|   region|            discount|      final_amount|
+-----------+--------------+-----------+---------+---------+--------------------+------------------+
|       Home|             0|       7753|171.48724|     Pune|-0.08708886050721805|186.42187229355002|
|Electronics|             1|       9949|21.219477|Bangalore|0.043336644906418226| 20.29989577298859|
|      Books|             2|       5540| 577.2987|   Mumbai|-0.08881754852713303| 628.5729618943508|
|      Books|             4|        959|168.20488|Bangalore| 0.03364352494906048|162.54587469195792|
|      Books|             5|       5299|188.01472|     Pune| 0.07088463180508173|  174.687370194923|
|Electronics|             7|       9196|  976.126|Bangalore|-0.02119646003401...| 996.8163918128753|
|Electronics|            11|       6951| 700.7263|   Mumbai|0.012337023976774095| 692.08144

In [None]:
# Aggregate sales
sales_trends = df_final.groupBy("region", "category").agg(
    {"final_amount": "sum", "transaction_id": "count"}
).withColumnRenamed("sum(final_amount)", "total_sales") \
 .withColumnRenamed("count(transaction_id)", "transaction_count")

sales_trends.orderBy("region", "total_sales", ascending=False).show()

# Use Spark SQL for a different view
df_final.createOrReplaceTempView("transactions")
spark.sql("""
    SELECT region, category, ROUND(SUM(final_amount), 2) as total_sales
    FROM transactions
    GROUP BY region, category
    ORDER BY total_sales DESC
""").show()

+---------+-----------+-----------------+------------------+
|   region|   category|transaction_count|       total_sales|
+---------+-----------+-----------------+------------------+
|     Pune|   Clothing|             6320|  3173964.68105173|
|     Pune|Electronics|             6344| 3169371.548943664|
|     Pune|       Home|             6209|3112125.2852559783|
|     Pune|      Books|             6256|3102030.5211500167|
|   Mumbai|Electronics|             6321|3187738.4410494855|
|   Mumbai|   Clothing|             6303|3162128.4813304814|
|   Mumbai|       Home|             6150| 3095768.244115215|
|   Mumbai|      Books|             6214|  3090104.10231376|
|    Delhi|      Books|             6339|3178326.9381144457|
|    Delhi|Electronics|             6362|3156056.6066346867|
|    Delhi|       Home|             6243| 3108341.318252579|
|    Delhi|   Clothing|             6195| 3081282.144769721|
|Bangalore|      Books|             6292|3148400.6215088554|
|Bangalore|   Clothing| 

In [None]:
from pyspark.sql.window import Window
from pyspark.sql.functions import rank, sum as sum_

# Window spec for ranking
windowSpec = Window.partitionBy("region").orderBy(col("total_spent").desc())

# Calculate total spending per customer
customer_spending = df_final.groupBy("region", "customer_id").agg(sum_("final_amount").alias("total_spent"))

# Rank customers
top_customers = customer_spending.withColumn("rank", rank().over(windowSpec)) \
    .filter(col("rank") <= 5) \
    .orderBy("region", "rank")

top_customers.show()

+---------+-----------+------------------+----+
|   region|customer_id|       total_spent|rank|
+---------+-----------+------------------+----+
|Bangalore|       5839| 5685.310971342755|   1|
|Bangalore|       8733| 5322.331025792927|   2|
|Bangalore|        311| 5299.272872508585|   3|
|Bangalore|       5288| 5231.011926201888|   4|
|Bangalore|       8565|5138.0618893409755|   5|
|    Delhi|       3684| 5551.204706996249|   1|
|    Delhi|       2862|5479.2944667859865|   2|
|    Delhi|       2427| 5389.773814958524|   3|
|    Delhi|       1513| 5344.972533386377|   4|
|    Delhi|       7005| 5344.279685344902|   5|
|   Mumbai|       6174| 6719.172965303682|   1|
|   Mumbai|       1077| 5746.126345760734|   2|
|   Mumbai|       4216| 5366.283952614199|   3|
|   Mumbai|       6484| 5341.762108601681|   4|
|   Mumbai|       1433|5276.4591886204125|   5|
|     Pune|       2459| 5886.707844481927|   1|
|     Pune|       3990| 5819.454588338279|   2|
|     Pune|       8982| 5584.89031398295

In [None]:
# Add a fake timestamp column
from pyspark.sql.functions import floor, current_timestamp
df_with_time = df_final.withColumn("batch_id", floor(rand() * 5))  # 5 batches

# Process each batch
for batch in range(5):
    batch_data = df_with_time.filter(col("batch_id") == batch)
    print(f"Processing Batch {batch}:")
    batch_data.groupBy("region").agg({"final_amount": "sum"}).show()

Processing Batch 0:
+---------+------------------+
|   region| sum(final_amount)|
+---------+------------------+
|Bangalore| 2454550.812971227|
|   Mumbai|2498190.9703327343|
|     Pune| 2501915.098417806|
|    Delhi| 2533161.533821532|
+---------+------------------+

Processing Batch 1:
+---------+------------------+
|   region| sum(final_amount)|
+---------+------------------+
|Bangalore| 2424294.401773586|
|   Mumbai| 2529680.653171329|
|     Pune|2508099.3846215564|
|    Delhi| 2493725.149581561|
+---------+------------------+

Processing Batch 2:
+---------+------------------+
|   region| sum(final_amount)|
+---------+------------------+
|Bangalore|2486755.5404315954|
|   Mumbai| 2524343.406739462|
|     Pune|2455642.9802046944|
|    Delhi|2496585.9586052266|
+---------+------------------+

Processing Batch 3:
+---------+------------------+
|   region| sum(final_amount)|
+---------+------------------+
|Bangalore|2504070.8089441517|
|   Mumbai|2507069.3094978277|
|     Pune| 249782

In [None]:
# Save as CSV (Colab will store in /content/)
sales_trends.write.mode("overwrite").csv("sales_trends.csv")
top_customers.write.mode("overwrite").csv("top_customers.csv")

# Download from Colab
from google.colab import files
import glob

# Download sales_trends
for file_path in glob.glob("sales_trends.csv/part-00000-*.csv"):
    files.download(file_path)

# Download top_customers
for file_path in glob.glob("top_customers.csv/part-00000-*.csv"):
    files.download(file_path)

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>