<a href="https://colab.research.google.com/github/nickname8888/pyspark-prac/blob/main/windows_functions_pyspark.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Retail_EDA").getOrCreate()

In [3]:
# dataset link: https://www.kaggle.com/datasets/mashlyn/online-retail-ii-uci

df = spark.read.csv("online_retail_II.csv", header=True, inferSchema=True)
df.show(5)
df.printSchema()
print(f"Total records: {df.count()}")

+-------+---------+--------------------+--------+-------------------+-----+-----------+--------------+
|Invoice|StockCode|         Description|Quantity|        InvoiceDate|Price|Customer ID|       Country|
+-------+---------+--------------------+--------+-------------------+-----+-----------+--------------+
| 489434|    85048|15CM CHRISTMAS GL...|      12|2009-12-01 07:45:00| 6.95|    13085.0|United Kingdom|
| 489434|   79323P|  PINK CHERRY LIGHTS|      12|2009-12-01 07:45:00| 6.75|    13085.0|United Kingdom|
| 489434|   79323W| WHITE CHERRY LIGHTS|      12|2009-12-01 07:45:00| 6.75|    13085.0|United Kingdom|
| 489434|    22041|"RECORD FRAME 7""...|      48|2009-12-01 07:45:00|  2.1|    13085.0|United Kingdom|
| 489434|    21232|STRAWBERRY CERAMI...|      24|2009-12-01 07:45:00| 1.25|    13085.0|United Kingdom|
+-------+---------+--------------------+--------+-------------------+-----+-----------+--------------+
only showing top 5 rows

root
 |-- Invoice: string (nullable = true)
 |--

In [8]:
# getting all the null counts

from pyspark.sql.functions import col, sum

null_counts = []
for column in df.columns:
  null_count = sum(col(column).isNull().cast("int")).alias(column)
  null_counts.append(null_count)

df.select(null_counts).show()

+-------+---------+-----------+--------+-----------+-----+-----------+-------+
|Invoice|StockCode|Description|Quantity|InvoiceDate|Price|Customer ID|Country|
+-------+---------+-----------+--------+-----------+-----+-----------+-------+
|      0|        0|       4382|       0|          0|    0|     243007|      0|
+-------+---------+-----------+--------+-----------+-----+-----------+-------+



In [9]:
# one liner for the same

df.select([sum(col(c).isNull().cast("int")).alias(c) for c in df.columns]).show()

+-------+---------+-----------+--------+-----------+-----+-----------+-------+
|Invoice|StockCode|Description|Quantity|InvoiceDate|Price|Customer ID|Country|
+-------+---------+-----------+--------+-----------+-----+-----------+-------+
|      0|        0|       4382|       0|          0|    0|     243007|      0|
+-------+---------+-----------+--------+-----------+-----+-----------+-------+



In [10]:
# handling null values

# fill missing values in 'Description' with 'Unknown'
df = df.fillna({"Description": "Unknown"})

# drop rows where 'Customer ID' is NULL
df = df.dropna(subset=["Customer ID"])

In [11]:
# verify if null values have been handled

df.select([sum(col(c).isNull().cast("int")).alias(c) for c in df.columns]).show()

+-------+---------+-----------+--------+-----------+-----+-----------+-------+
|Invoice|StockCode|Description|Quantity|InvoiceDate|Price|Customer ID|Country|
+-------+---------+-----------+--------+-----------+-----+-----------+-------+
|      0|        0|          0|       0|          0|    0|          0|      0|
+-------+---------+-----------+--------+-----------+-----+-----------+-------+



In [14]:
# check unique products and customers

print(f"Unique Products: {df.select('StockCode').distinct().count()}")
print(f"Unique Customers: {df.select('Customer ID').distinct().count()}")

Unique Products: 4646
Unique Customers: 5942


In [15]:
df.describe().show()

+-------+------------------+------------------+--------------------+------------------+------------------+------------------+-----------+
|summary|           Invoice|         StockCode|         Description|          Quantity|             Price|       Customer ID|    Country|
+-------+------------------+------------------+--------------------+------------------+------------------+------------------+-----------+
|  count|            824364|            824364|              824364|            824364|            824364|            824364|     824364|
|   mean| 537410.8855639135|28826.655026367764|                NULL|12.414574144431343|3.6767995788202636| 15324.63850435002|       NULL|
| stddev|26666.396588956923|18528.792981682243|                NULL|188.97609900976042| 70.24138768949386|1697.4644503793263|       NULL|
|    min|            489434|             10002|  DOORMAT UNION J...|            -80995|               0.0|           12346.0|  Australia|
|    max|           C581569|      

In [18]:
# some aggregation function like total revenue per country

df.groupBy("Country").agg(sum("Price").alias("Revenue Per Country")).show()

+-----------+-------------------+
|    Country|Revenue Per Country|
+-----------+-------------------+
|     Sweden|  8596.469999999992|
|  Singapore| 25481.400000000005|
|    Germany|  67564.45100000099|
|        RSA|             397.56|
|     France|  66943.34000000023|
|     Greece|  2600.730000000004|
|    Belgium| 14766.879999999928|
|    Finland|  5234.609999999995|
|      Malta|  6575.589999999998|
|Unspecified| 2910.9500000000007|
|    Nigeria|             102.48|
|      Italy|  8479.459999999961|
|       EIRE| 109679.75999999698|
|  Lithuania|  494.4099999999992|
|     Norway| 41129.909999999756|
|      Spain| 20539.470000000132|
|    Denmark| 2397.8900000000017|
|West Indies| 122.76999999999995|
|   Thailand| 227.96999999999994|
|     Israel| 1156.9300000000012|
+-----------+-------------------+
only showing top 20 rows



In [20]:
df.groupBy("Country") \
  .agg(sum("Price").alias("Revenue Per Country Sorted")) \
  .orderBy("Revenue Per Country Sorted", ascending=False) \
  .show()

+---------------+--------------------------+
|        Country|Revenue Per Country Sorted|
+---------------+--------------------------+
| United Kingdom|        2564403.8769961074|
|           EIRE|        109679.75999999698|
|        Germany|         67564.45100000099|
|         France|         66943.34000000023|
|         Norway|        41129.909999999756|
|      Singapore|        25481.400000000005|
|          Spain|        20539.470000000132|
|       Portugal|        16778.780000000064|
|    Netherlands|         15663.22999999998|
|        Belgium|        14766.879999999928|
|    Switzerland|        11804.789999999946|
|Channel Islands|         9056.799999999956|
|         Sweden|         8596.469999999992|
|          Italy|         8479.459999999961|
|      Australia|         8111.069999999958|
|          Malta|         6575.589999999998|
|         Cyprus|         6333.949999999975|
|        Finland|         5234.609999999995|
|        Austria|         4184.320000000003|
|    Unspe

In [22]:
# ranking products by sales using window functions

from pyspark.sql.window import Window
from pyspark.sql.functions import rank

new_window = Window.partitionBy("Country").orderBy(col("Quantity").desc())

df = df.withColumn("Rank", rank().over(new_window))
df.select("Description", "Quantity", "Rank").show()

+--------------------+--------+----+
|         Description|Quantity|Rank|
+--------------------+--------+----+
|MINI PAINT SET VI...|    1152|   1|
|  RABBIT NIGHT LIGHT|     960|   2|
|RED  HARMONICA IN...|     720|   3|
|ASSORTED COLOURS ...|     600|   4|
|MINI PAINT SET VI...|     576|   5|
|KEY FOB , FRONT  ...|     576|   5|
|      KEY FOB , SHED|     576|   5|
|KEY FOB , GARAGE ...|     576|   5|
|MINI PAINT SET VI...|     576|   5|
|MINI PAINT SET VI...|     576|   5|
|60 CAKE CASES DOL...|     480|  11|
|HOMEMADE JAM SCEN...|     480|  11|
|PACK OF 12 CIRCUS...|     432|  13|
|PACK OF 12 PINK P...|     432|  13|
|PACK OF 12 WOODLA...|     432|  13|
|RED TOADSTOOL LED...|     432|  13|
|  WRAP CIRCUS PARADE|     400|  17|
|  WRAP ENGLISH ROSE |     400|  17|
|    WRAP RED APPLES |     400|  17|
|JAM JAR WITH PINK...|     384|  20|
+--------------------+--------+----+
only showing top 20 rows

