In [47]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
from pyspark.sql.window import Window
from pyspark.sql.functions import *
import opendatasets as od
import os

### Donwload and save dataset from Kaggle. 
#### Note: You need to have kaggle.json in the folder with this notebook or enter your credentials.

In [2]:
dataset_url = "https://www.kaggle.com/datasets/min02yam/ecommerse"
od.download(dataset_url)

Skipping, found downloaded files in "./ecommerse" (use force=True to force download)


### Getting dataset internal path and initializing PySpark

In [3]:
dataset_file_path = os.path.join("ecommerse", os.listdir("ecommerse")[0])

In [4]:
spark = SparkSession.builder.appName("Parsing ecommerse dataset").getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/02/20 23:08:34 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
24/02/20 23:08:45 WARN GarbageCollectionMetrics: To enable non-built-in garbage collector(s) List(G1 Concurrent GC), users should configure it(them) to spark.eventLog.gcMetrics.youngGenerationGarbageCollectors or spark.eventLog.gcMetrics.oldGenerationGarbageCollectors


### Preprocessing the data
- #### Adding a new column with Date (without time)
- #### Calculate the total spent per item per invoice (UntiPrice * Quantity)

In [41]:
df = spark.read.csv(dataset_file_path, header=True)
spark.conf.set('spark.sql.legacy.timeParserPolicy', 'LEGACY')
df = df.withColumn("DateOnly", to_date(col("Date"), "MM/dd/yyyy HH:mm"))
df = df.withColumn("TotalPerItem", round(col("UnitPrice")*col("Quantity")))

### The number of transactions (inque Invoices) per week

In [34]:
df.groupBy(year("DateOnly").alias("Year"), weekofyear("DateOnly").alias("Week")).agg(countDistinct("InvoiceNo").alias("Invoices")).orderBy("Year", "Week").show()

+----+----+--------+
|Year|Week|Invoices|
+----+----+--------+
|2010|  48|     348|
|2010|  49|     449|
|2010|  50|     381|
|2010|  51|     105|
|2011|   1|     221|
|2011|   2|     220|
|2011|   3|     199|
|2011|   4|     270|
|2011|   5|     257|
|2011|   6|     205|
|2011|   7|     256|
|2011|   8|     248|
|2011|   9|     256|
|2011|  10|     251|
|2011|  11|     279|
|2011|  12|     281|
|2011|  13|     299|
|2011|  14|     282|
|2011|  15|     324|
|2011|  16|     248|
+----+----+--------+
only showing top 20 rows



### The most active users by number of Invoices

In [44]:
df.groupBy(col("CustomerID").alias("User")).agg(countDistinct("InvoiceNo").alias("Invoices")).orderBy(desc("Invoices")).show()

+-----+--------+
| User|Invoices|
+-----+--------+
|14911|     206|
|12748|     173|
|17841|     136|
|15311|     107|
|14606|     105|
|13089|      90|
|14646|      64|
|14527|      61|
|14156|      55|
|12971|      52|
|13408|      49|
|13694|      46|
|15039|      45|
|13798|      44|
|16029|      43|
|18102|      41|
|16422|      40|
|17511|      39|
|17811|      39|
|12921|      37|
+-----+--------+
only showing top 20 rows



### The most valuable users by total spending

In [55]:
df.groupBy(col("CustomerID").alias("User")).agg(sum("TotalPerItem").alias("TotalySpent")).orderBy(desc("TotalySpent")).show()

+-----+-----------+
| User|TotalySpent|
+-----+-----------+
|14646|    71371.0|
|18102|    58466.0|
|17450|    42392.0|
|15098|    39917.0|
|12415|    33578.0|
|14911|    29448.0|
|14156|    26878.0|
|17949|    19870.0|
|17511|    19352.0|
|16684|    16785.0|
|14096|    15961.0|
|13694|    14454.0|
|13089|    14337.0|
|14298|    13222.0|
|15311|    13147.0|
|14088|    13022.0|
|15061|    12683.0|
|15769|    11589.0|
|17841|     9674.0|
|15838|     9627.0|
+-----+-----------+
only showing top 20 rows



### Top 10 the most popul (buyable) product per week

In [73]:
df2 = df.groupBy(year("DateOnly").alias("Year"), weekofyear("DateOnly").alias("Week"), col("ProductNo").alias("Product")).agg(sum("Quantity").alias("SoldNo"))
w = Window.partitionBy(df2["Year"], df2["Week"]).orderBy(df2["SoldNo"].desc())
df2.select("*", rank().over(w).alias("Rank")).filter(col("Rank") <= 10).show()

+----+----+-------+------+----+
|Year|Week|Product|SoldNo|Rank|
+----+----+-------+------+----+
|2010|  48|  84077|3216.0|   1|
|2010|  48| 17084R|1440.0|   2|
|2010|  48|  22616| 676.0|   3|
|2010|  48|  21137| 480.0|   4|
|2010|  48|  21731| 458.0|   5|
|2010|  48|  84879| 429.0|   6|
|2010|  48| 85099B| 336.0|   7|
|2010|  48|  84347| 311.0|   8|
|2010|  48|  22865| 285.0|   9|
|2010|  48|  21212| 246.0|  10|
|2010|  49|  17096|1728.0|   1|
|2010|  49|  22328|1494.0|   2|
|2010|  49| 85099B| 631.0|   3|
|2010|  49|  21623| 600.0|   4|
|2010|  49|  22469| 481.0|   5|
|2010|  49|  22867| 455.0|   6|
|2010|  49|  22470| 432.0|   7|
|2010|  49|  84945| 420.0|   8|
|2010|  49|  22659| 339.0|   9|
|2010|  49|  20668| 312.0|  10|
+----+----+-------+------+----+
only showing top 20 rows

