In [None]:
### GitHub Repository
ðŸ‘‰ https://github.com/rajsanodiya122/Fobd-Unit-3-


In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, year, month

spark = SparkSession.builder \
    .appName("RDD_vs_DataFrame_Benchmark") \
    .config("spark.executor.memory", "4g") \
    .config("spark.executor.cores", "2") \
    .getOrCreate()

# Load dataset
df = spark.read.csv(
    "/content/drive/MyDrive/online_retail_II.csv",
    header=True,
    inferSchema=True
)

df.printSchema()
df.show(5)


root
 |-- Invoice: string (nullable = true)
 |-- StockCode: string (nullable = true)
 |-- Description: string (nullable = true)
 |-- Quantity: integer (nullable = true)
 |-- InvoiceDate: timestamp (nullable = true)
 |-- Price: double (nullable = true)
 |-- Customer ID: double (nullable = true)
 |-- Country: string (nullable = true)

+-------+---------+--------------------+--------+-------------------+-----+-----------+--------------+
|Invoice|StockCode|         Description|Quantity|        InvoiceDate|Price|Customer ID|       Country|
+-------+---------+--------------------+--------+-------------------+-----+-----------+--------------+
| 489434|    85048|15CM CHRISTMAS GL...|      12|2009-12-01 07:45:00| 6.95|    13085.0|United Kingdom|
| 489434|   79323P|  PINK CHERRY LIGHTS|      12|2009-12-01 07:45:00| 6.75|    13085.0|United Kingdom|
| 489434|   79323W| WHITE CHERRY LIGHTS|      12|2009-12-01 07:45:00| 6.75|    13085.0|United Kingdom|
| 489434|    22041|"RECORD FRAME 7""...|      4

In [None]:
# Rename column (space causes issues)
df = df.withColumnRenamed("Customer ID", "CustomerID")

# Remove invalid rows
df_clean = df.dropna(subset=["Invoice", "StockCode", "Quantity", "Price", "CustomerID"])

# Keep only positive quantity and price
df_clean = df_clean.filter((col("Quantity") > 0) & (col("Price") > 0))

df_clean.cache()


DataFrame[Invoice: string, StockCode: string, Description: string, Quantity: int, InvoiceDate: timestamp, Price: double, CustomerID: double, Country: string]

In [None]:
from pyspark.sql.functions import sum

df_revenue_country = df_clean.withColumn(
    "Revenue", col("Quantity") * col("Price")
).groupBy("Country").agg(
    sum("Revenue").alias("TotalRevenue")
).orderBy(col("TotalRevenue").desc())

df_revenue_country.show(10)


+--------------+--------------------+
|       Country|        TotalRevenue|
+--------------+--------------------+
|United Kingdom|1.4723147517009035E7|
|          EIRE|    621631.110000006|
|   Netherlands|   554232.3400000011|
|       Germany|  431262.46099999954|
|        France|   355257.4699999994|
|     Australia|   169968.1099999996|
|         Spain|  109178.53000000006|
|   Switzerland|  100365.34000000011|
|        Sweden|   91549.71999999986|
|       Denmark|            69862.19|
+--------------+--------------------+
only showing top 10 rows


In [None]:
df_time = df_clean.withColumn("Year", year("InvoiceDate")) \
                  .withColumn("Month", month("InvoiceDate"))

df_monthly = df_time.withColumn(
    "Revenue", col("Quantity") * col("Price")
).groupBy("Year", "Month").sum("Revenue") \
 .orderBy("Year", "Month")

df_monthly.show()


+----+-----+------------------+
|Year|Month|      sum(Revenue)|
+----+-----+------------------+
|2009|   12| 686654.1599999949|
|2010|    1| 557319.0620000134|
|2010|    2|506371.06600001536|
|2010|    3| 699608.9909999889|
|2010|    4| 594609.1919999976|
|2010|    5| 599985.7900000075|
|2010|    6| 639066.5800000058|
|2010|    7| 591636.7400000024|
|2010|    8| 604242.6499999989|
|2010|    9| 831615.0009999905|
|2010|   10|1036679.9999999028|
|2010|   11|1172336.0419998672|
|2010|   12| 884591.8899999922|
|2011|    1| 569445.0400000077|
|2011|    2| 447137.3500000165|
|2011|    3|  595500.760000013|
|2011|    4| 469200.3610000132|
|2011|    5| 678594.5600000018|
|2011|    6| 661213.6900000115|
|2011|    7| 600091.0110000141|
+----+-----+------------------+
only showing top 20 rows


In [None]:
rdd = df_clean.rdd


In [None]:
rdd_country_revenue = (
    rdd.map(lambda row: (row.Country, row.Quantity * row.Price))
       .reduceByKey(lambda a, b: a + b)
       .sortBy(lambda x: x[1], ascending=False)
)

for row in rdd_country_revenue.take(10):
    print(row)


('United Kingdom', 14723147.517009035)
('EIRE', 621631.110000006)
('Netherlands', 554232.3400000011)
('Germany', 431262.46099999954)
('France', 355257.4699999994)
('Australia', 169968.1099999996)
('Spain', 109178.53000000006)
('Switzerland', 100365.34000000011)
('Sweden', 91549.71999999986)
('Denmark', 69862.19)


In [None]:
rdd_product_sales = (
    rdd.map(lambda r: (r.Description, r.Quantity))
       .reduceByKey(lambda a, b: a + b)
       .sortBy(lambda x: x[1], ascending=False)
)

rdd_product_sales.take(10)


[('WORLD WAR 2 GLIDERS ASSTD DESIGNS', 109169),
 ('WHITE HANGING HEART T-LIGHT HOLDER', 93640),
 ('PAPER CRAFT , LITTLE BIRDIE', 80995),
 ('ASSORTED COLOUR BIRD ORNAMENT', 79913),
 ('MEDIUM CERAMIC TOP STORAGE JAR', 77916),
 ('JUMBO BAG RED RETROSPOT', 75759),
 ('BROCADE RING PURSE ', 71129),
 ('PACK OF 60 PINK PAISLEY CAKE CASES', 55270),
 ('60 TEATIME FAIRY CAKE CASES', 53495),
 ('PACK OF 72 RETROSPOT CAKE CASES', 46107)]

In [None]:
print("Initial partitions:", df_clean.rdd.getNumPartitions())


Initial partitions: 2


In [None]:
df_repart = df_clean.repartition(8)
df_repart.cache()

df_repart.count()  # materialize cache
print("New partitions:", df_repart.rdd.getNumPartitions())


New partitions: 8
