In [0]:
from pyspark.sql import functions as F

#### import the Online_Retail dataset

In [0]:
df = spark.read.csv('dbfs:/FileStore/Online_Retail.csv',header= True)
display(df.limit(10))
df.printSchema()

InvoiceNo,StockCode,Description,Quantity,InvoiceDate,UnitPrice,CustomerID,Country
536365,85123A,WHITE HANGING HEART T-LIGHT HOLDER,6,12/1/2010 8:26,2.55,17850,United Kingdom
536365,71053,WHITE METAL LANTERN,6,12/1/2010 8:26,3.39,17850,United Kingdom
536365,84406B,CREAM CUPID HEARTS COAT HANGER,8,12/1/2010 8:26,2.75,17850,United Kingdom
536365,84029G,KNITTED UNION FLAG HOT WATER BOTTLE,6,12/1/2010 8:26,3.39,17850,United Kingdom
536365,84029E,RED WOOLLY HOTTIE WHITE HEART.,6,12/1/2010 8:26,3.39,17850,United Kingdom
536365,22752,SET 7 BABUSHKA NESTING BOXES,2,12/1/2010 8:26,7.65,17850,United Kingdom
536365,21730,GLASS STAR FROSTED T-LIGHT HOLDER,6,12/1/2010 8:26,4.25,17850,United Kingdom
536366,22633,HAND WARMER UNION JACK,6,12/1/2010 8:28,1.85,17850,United Kingdom
536366,22632,HAND WARMER RED POLKA DOT,6,12/1/2010 8:28,1.85,17850,United Kingdom
536367,84879,ASSORTED COLOUR BIRD ORNAMENT,32,12/1/2010 8:34,1.69,13047,United Kingdom


root
 |-- InvoiceNo: string (nullable = true)
 |-- StockCode: string (nullable = true)
 |-- Description: string (nullable = true)
 |-- Quantity: string (nullable = true)
 |-- InvoiceDate: string (nullable = true)
 |-- UnitPrice: string (nullable = true)
 |-- CustomerID: string (nullable = true)
 |-- Country: string (nullable = true)



#### 1. Define Schema Manually. Don't use inferschema.

In [0]:
from pyspark.sql.types import StructType, StringType, IntegerType, StructField, DateType,FloatType,DoubleType

In [0]:
new_schema = StructType(fields=[
        StructField("InvoiceNo", StringType()),
        StructField("StockCode", StringType()),
        StructField("Description", StringType()),
        StructField("Quantity", IntegerType()),
        StructField("InvoiceDate", StringType()),
        StructField("UnitPrice", FloatType()),
        StructField("CustomerID", IntegerType()),
        StructField("Country", StringType())
])

In [0]:
df_retail = spark.read.option("header",True).schema(new_schema).csv("dbfs:/FileStore/Online_Retail.csv")
df_retail.limit(10).display()
df_retail.printSchema()

InvoiceNo,StockCode,Description,Quantity,InvoiceDate,UnitPrice,CustomerID,Country
536365,85123A,WHITE HANGING HEART T-LIGHT HOLDER,6,12/1/2010 8:26,2.55,17850,United Kingdom
536365,71053,WHITE METAL LANTERN,6,12/1/2010 8:26,3.39,17850,United Kingdom
536365,84406B,CREAM CUPID HEARTS COAT HANGER,8,12/1/2010 8:26,2.75,17850,United Kingdom
536365,84029G,KNITTED UNION FLAG HOT WATER BOTTLE,6,12/1/2010 8:26,3.39,17850,United Kingdom
536365,84029E,RED WOOLLY HOTTIE WHITE HEART.,6,12/1/2010 8:26,3.39,17850,United Kingdom
536365,22752,SET 7 BABUSHKA NESTING BOXES,2,12/1/2010 8:26,7.65,17850,United Kingdom
536365,21730,GLASS STAR FROSTED T-LIGHT HOLDER,6,12/1/2010 8:26,4.25,17850,United Kingdom
536366,22633,HAND WARMER UNION JACK,6,12/1/2010 8:28,1.85,17850,United Kingdom
536366,22632,HAND WARMER RED POLKA DOT,6,12/1/2010 8:28,1.85,17850,United Kingdom
536367,84879,ASSORTED COLOUR BIRD ORNAMENT,32,12/1/2010 8:34,1.69,13047,United Kingdom


root
 |-- InvoiceNo: string (nullable = true)
 |-- StockCode: string (nullable = true)
 |-- Description: string (nullable = true)
 |-- Quantity: integer (nullable = true)
 |-- InvoiceDate: string (nullable = true)
 |-- UnitPrice: float (nullable = true)
 |-- CustomerID: integer (nullable = true)
 |-- Country: string (nullable = true)



#### 2. Create a new column that represents the total price for each item

In [0]:
df_retail = df_retail.withColumn('Total_Price', F.col('Quantity') * F.col('UnitPrice'))\
                     .withColumn('Total_Price', F.round('Total_Price',2))
df_retail.limit(10).display()

InvoiceNo,StockCode,Description,Quantity,InvoiceDate,UnitPrice,CustomerID,Country,Total_Price
536365,85123A,WHITE HANGING HEART T-LIGHT HOLDER,6,12/1/2010 8:26,2.55,17850,United Kingdom,15.3
536365,71053,WHITE METAL LANTERN,6,12/1/2010 8:26,3.39,17850,United Kingdom,20.34
536365,84406B,CREAM CUPID HEARTS COAT HANGER,8,12/1/2010 8:26,2.75,17850,United Kingdom,22.0
536365,84029G,KNITTED UNION FLAG HOT WATER BOTTLE,6,12/1/2010 8:26,3.39,17850,United Kingdom,20.34
536365,84029E,RED WOOLLY HOTTIE WHITE HEART.,6,12/1/2010 8:26,3.39,17850,United Kingdom,20.34
536365,22752,SET 7 BABUSHKA NESTING BOXES,2,12/1/2010 8:26,7.65,17850,United Kingdom,15.3
536365,21730,GLASS STAR FROSTED T-LIGHT HOLDER,6,12/1/2010 8:26,4.25,17850,United Kingdom,25.5
536366,22633,HAND WARMER UNION JACK,6,12/1/2010 8:28,1.85,17850,United Kingdom,11.1
536366,22632,HAND WARMER RED POLKA DOT,6,12/1/2010 8:28,1.85,17850,United Kingdom,11.1
536367,84879,ASSORTED COLOUR BIRD ORNAMENT,32,12/1/2010 8:34,1.69,13047,United Kingdom,54.08


#### 3. Convert the InvoiceDate from string to a proper datetime format and extract a separate Date and Time columns

In [0]:
spark.conf.set("spark.sql.legacy.timeParserPolicy" ,"LEGACY")

In [0]:
df_retail = df_retail.withColumn("InvoiceDate", F.to_timestamp(F.col("InvoiceDate"), "MM/dd/yyyy HH:mm"))\
                    .withColumn("Date",F.date_format(F.col("InvoiceDate"),"dd-MM-yyyy"))\
                    .withColumn("Time",F.date_format(F.col("InvoiceDate"),"HH:mm"))\
                    .withColumn("Year",F.year(F.col("InvoiceDate")))\
                    .withColumn("Month",F.month(F.col("InvoiceDate")))\
                    .withColumn("Day",F.dayofmonth(F.col("InvoiceDate")))
                    
df_retail.select('Invoicedate','Date','Time','Day','Month','year').show(10)

+-------------------+----------+-----+---+-----+----+
|        Invoicedate|      Date| Time|Day|Month|year|
+-------------------+----------+-----+---+-----+----+
|2010-12-01 08:26:00|01-12-2010|08:26|  1|   12|2010|
|2010-12-01 08:26:00|01-12-2010|08:26|  1|   12|2010|
|2010-12-01 08:26:00|01-12-2010|08:26|  1|   12|2010|
|2010-12-01 08:26:00|01-12-2010|08:26|  1|   12|2010|
|2010-12-01 08:26:00|01-12-2010|08:26|  1|   12|2010|
|2010-12-01 08:26:00|01-12-2010|08:26|  1|   12|2010|
|2010-12-01 08:26:00|01-12-2010|08:26|  1|   12|2010|
|2010-12-01 08:28:00|01-12-2010|08:28|  1|   12|2010|
|2010-12-01 08:28:00|01-12-2010|08:28|  1|   12|2010|
|2010-12-01 08:34:00|01-12-2010|08:34|  1|   12|2010|
+-------------------+----------+-----+---+-----+----+
only showing top 10 rows



#### 4. Calculate the total revenue per day and per country

In [0]:

df_revenue = df_retail.groupBy('Country','Date').agg(F.sum("Total_Price").alias("TotalRevenue")).sort('country')

df_revenue = df_revenue.withColumn('TotalRevenue',F.round('TotalRevenue',2))\
                       .withColumn('Date_format',F.to_timestamp(F.col('Date'),"dd-MM-yyyy"))\
                       .withColumn('Year',F.year(F.col('Date_format')))\
                       .withColumn("Month",F.month(F.col("Date_format")))\
                       .withColumn("Day",F.dayofmonth(F.col("Date_format"))).sort('country','year','month','Day', ascending=[True,True,True,True]).drop('Date_format','Year','Month','Day')
df_revenue.limit(10).display()

Country,Date,TotalRevenue
Australia,01-12-2010,358.25
Australia,08-12-2010,258.9
Australia,17-12-2010,415.7
Australia,06-01-2011,7154.38
Australia,10-01-2011,81.6
Australia,11-01-2011,462.5
Australia,14-01-2011,177.0
Australia,17-01-2011,431.3
Australia,19-01-2011,238.5
Australia,20-01-2011,87.75


#### 5. What country do most purchases come from ?

In [0]:
most_purchase = df_revenue.groupBy('country').agg(F.sum('TotalRevenue').alias('Total_Revenue')).sort('Total_Revenue', ascending = False).limit(1)
most_purchase.display()

country,Total_Revenue
United Kingdom,9025222.06


#### 6. Top Selling Products per country

In [0]:
from pyspark.sql.window import Window

df_product = df_retail.groupBy('description','country').agg(F.count('Description').alias('Num_of_Products'))

window_data = Window.partitionBy('country').orderBy(F.col('Num_of_Products').desc())
Top_product = df_product.withColumn('Top_rank',F.dense_rank().over(window_data))

Top_product.limit(10).display()

Top_product.select('country','description').where(Top_product.Top_rank == 1).limit(10).display()

description,country,Num_of_Products,Top_rank
SET OF 3 CAKE TINS PANTRY DESIGN,Australia,9,1
RED TOADSTOOL LED NIGHT LIGHT,Australia,9,1
LUNCH BAG SPACEBOY DESIGN,Australia,8,2
PARTY BUNTING,Australia,8,2
LUNCH BAG RED RETROSPOT,Australia,8,2
BAKING SET SPACEBOY DESIGN,Australia,8,2
ROSES REGENCY TEACUP AND SAUCER,Australia,8,2
PAPER BUNTING RETROSPOT,Australia,7,3
SET OF 6 SOLDIER SKITTLES,Australia,7,3
RED HARMONICA IN BOX,Australia,7,3


country,description
Australia,SET OF 3 CAKE TINS PANTRY DESIGN
Australia,RED TOADSTOOL LED NIGHT LIGHT
Austria,POSTAGE
Bahrain,OCEAN SCENT CANDLE IN JEWELLED BOX
Bahrain,NOVELTY BISCUITS CAKE STAND 3 TIER
Belgium,POSTAGE
Brazil,ROSES REGENCY TEACUP AND SAUCER
Brazil,EDWARDIAN PARASOL NATURAL
Brazil,EDWARDIAN PARASOL BLACK
Brazil,PHARMACIE FIRST AID TIN


In [0]:
df_retail.createOrReplaceTempView('Retail')

In [0]:
%sql
with cte1 as (
    select Description, Country, count(Description) as Num_of_Products
    from Retail
    group by Description, country),
cte2 as (
  Select *,dense_rank() over (Partition by country order by Num_of_Products desc) as rank
  from cte1)

select Country, Description as Product from cte2
where rank = 1
limit 10;

Country,Product
Australia,SET OF 3 CAKE TINS PANTRY DESIGN
Australia,RED TOADSTOOL LED NIGHT LIGHT
Austria,POSTAGE
Bahrain,OCEAN SCENT CANDLE IN JEWELLED BOX
Bahrain,NOVELTY BISCUITS CAKE STAND 3 TIER
Belgium,POSTAGE
Brazil,ROSES REGENCY TEACUP AND SAUCER
Brazil,EDWARDIAN PARASOL NATURAL
Brazil,EDWARDIAN PARASOL BLACK
Brazil,PHARMACIE FIRST AID TIN


#### 7. Analyze the purchase frequency by hour of day and identify peak shopping hours

In [0]:
df_hour = df_retail.withColumn('Hour',F.date_format(F.col('InvoiceDate'),"HH"))
shopping_hour = df_hour.groupBy('Hour').agg(F.count('Hour').alias('Hour_Frequency')).sort('Hour_Frequency',ascending=False)
shopping_hour.display()

Hour,Hour_Frequency
12,77230
15,76432
13,71075
14,66266
11,56312
16,53451
10,47895
9,33755
17,27635
8,8805


Databricks visualization. Run in Databricks to view.

#### 8. When was the earliest purchase made by a customer on the e-commerce platform

In [0]:
%sql

select distinct customerid,invoiceDate from retail
where InvoiceDate in (
  select min(invoicedate) from retail
)

customerid,invoiceDate
17850,2010-12-01T08:26:00.000+0000


In [0]:
early_customer = df_retail.select('customerid',df_retail['invoicedate'].alias('early_date')).sort('invoicedate').limit(1)
early_customer.display()

customerid,early_date
17850,2010-12-01T08:26:00.000+0000


#### 9. Find the total amount spent by each customer

In [0]:
cstmr_expense = df_retail.groupBy('customerid').agg(F.sum('Total_Price').alias("Total_amount"))
cstmr_expense = cstmr_expense.withColumn('Total_amount',F.round(F.col('Total_amount'))).sort('Total_amount',ascending = False)

cstmr_expense.limit(10).display()

customerid,Total_amount
,1755277.0
14646.0,280206.0
18102.0,259657.0
17450.0,194551.0
16446.0,168472.0
14911.0,143825.0
12415.0,124915.0
14156.0,117380.0
17511.0,91062.0
16029.0,81025.0
