# Loading CSV to Dataframe

In [0]:
#Access DBFS to get path 

%fs ls

In [0]:
csvFile = "dbfs:/FileStore/tables/retail.csv"

In [0]:
#Define schema
from pyspark.sql.types import *

userSchema = StructType([
    StructField("invoice_no", StringType(), False),
    StructField("stock_code", StringType(), False),
    StructField("description", StringType(), False),
    StructField("quantity", IntegerType(), False),
    StructField("invoice_date", TimestampType(), False),
    StructField("unit_price", FloatType(), False),
    StructField("customer_id", IntegerType(), False),
    StructField("country", StringType(), False)
])


In [0]:
retail_df = (spark.read
       .option("sep", ",")
       .option("header", "true")
       .schema(userSchema)
       .csv(csvFile)
)

In [0]:
#Rename columns
retail_df = (retail_df
       .withColumnRenamed("invoice_no", "Invoice")
       .withColumnRenamed("stock_code", "StockCode")
       .withColumnRenamed("description", "Description")
       .withColumnRenamed("quantity", "Quantity")
       .withColumnRenamed("invoice_date", "InvoiceDate")
       .withColumnRenamed("unit_price", "Price")
       .withColumnRenamed("customer_id", "CustomerID")
       .withColumnRenamed("country", "Country"))

In [0]:
retail_df.printSchema()

In [0]:
display(retail_df)

Invoice,StockCode,Description,Quantity,InvoiceDate,Price,CustomerID,Country
489434,85048,15CM CHRISTMAS GLASS BALL 20 LIGHTS,12,2009-12-01T07:45:00.000+0000,6.95,13085.0,United Kingdom
489434,79323P,PINK CHERRY LIGHTS,12,2009-12-01T07:45:00.000+0000,6.75,13085.0,United Kingdom
489434,79323W,WHITE CHERRY LIGHTS,12,2009-12-01T07:45:00.000+0000,6.75,13085.0,United Kingdom
489434,22041,"""RECORD FRAME 7"""" SINGLE SIZE """,48,2009-12-01T07:45:00.000+0000,2.1,13085.0,United Kingdom
489434,21232,STRAWBERRY CERAMIC TRINKET BOX,24,2009-12-01T07:45:00.000+0000,1.25,13085.0,United Kingdom
489434,22064,PINK DOUGHNUT TRINKET POT,24,2009-12-01T07:45:00.000+0000,1.65,13085.0,United Kingdom
489434,21871,SAVE THE PLANET MUG,24,2009-12-01T07:45:00.000+0000,1.25,13085.0,United Kingdom
489434,21523,FANCY FONT HOME SWEET HOME DOORMAT,10,2009-12-01T07:45:00.000+0000,5.95,13085.0,United Kingdom
489435,22350,CAT BOWL,12,2009-12-01T07:46:00.000+0000,2.55,13085.0,United Kingdom
489435,22349,"DOG BOWL , CHASING BALL DESIGN",12,2009-12-01T07:46:00.000+0000,3.75,13085.0,United Kingdom


# Total Invoice Amount Distribution

In [0]:
#Create invoice dataframe

invoice_df = (retail_df
   .select("Invoice", "Quantity", "Price")
)

#Add a column called Amount, and group by

from pyspark.sql.functions import col

invoice_df = (invoice_df
      .withColumn("Amount", invoice_df.Quantity * invoice_df.Price)
      .where(col("Amount") > 0)
      .groupby("Invoice")
      .sum("Amount")
      .withColumnRenamed("sum(Amount)", "Amount")
      .orderBy('Invoice')
)

display(invoice_df)

Invoice,Amount
489434,505.2999877929688
489435,145.79999542236328
489436,630.3299980163574
489437,310.7500014305115
489438,2286.2400302886963
489439,426.2999949455261
489440,50.39999961853027
489441,344.3400049209595
489442,382.3699998855591
489443,285.05999660491943


In [0]:
#Get 85th percentile
from pyspark.sql.functions import *

percentile = invoice_df.selectExpr('percentile(Amount, 0.85)').show()

In [0]:
#Filter invoices less than the 85th percentile

percentile_df = (invoice_df
     .where(col("Amount") <= 724.3895041018716)
)

display(percentile_df)

Invoice,Amount
489434,505.2999877929688
489435,145.79999542236328
489436,630.3299980163574
489437,310.7500014305115
489439,426.2999949455261
489440,50.39999961853027
489441,344.3400049209595
489442,382.3699998855591
489443,285.05999660491943
489444,141.0


#Monthly Placed and Cancelled Orders

In [0]:
month_df = (retail_df
         .withColumnRenamed("InvoiceDate", "InvoiceMonth")
         .withColumn("InvoiceMonth", year(col("InvoiceMonth")) * 100 +month(col("InvoiceMonth")))
)

display(month_df)

Invoice,StockCode,Description,Quantity,InvoiceMonth,Price,CustomerID,Country
489434,85048,15CM CHRISTMAS GLASS BALL 20 LIGHTS,12,200912,6.95,13085.0,United Kingdom
489434,79323P,PINK CHERRY LIGHTS,12,200912,6.75,13085.0,United Kingdom
489434,79323W,WHITE CHERRY LIGHTS,12,200912,6.75,13085.0,United Kingdom
489434,22041,"""RECORD FRAME 7"""" SINGLE SIZE """,48,200912,2.1,13085.0,United Kingdom
489434,21232,STRAWBERRY CERAMIC TRINKET BOX,24,200912,1.25,13085.0,United Kingdom
489434,22064,PINK DOUGHNUT TRINKET POT,24,200912,1.65,13085.0,United Kingdom
489434,21871,SAVE THE PLANET MUG,24,200912,1.25,13085.0,United Kingdom
489434,21523,FANCY FONT HOME SWEET HOME DOORMAT,10,200912,5.95,13085.0,United Kingdom
489435,22350,CAT BOWL,12,200912,2.55,13085.0,United Kingdom
489435,22349,"DOG BOWL , CHASING BALL DESIGN",12,200912,3.75,13085.0,United Kingdom


In [0]:
#Divide total orders, and cancelled orders into separate tables

#Cancelled Orders

monthly_cancelled_orders_df = (month_df
                            .select("Invoice", "InvoiceMonth")
                            .where(col("Invoice").startswith("C"))
                            .groupby("InvoiceMonth")
                            .agg(countDistinct("Invoice"))
                            .withColumnRenamed("count(Invoice)", "Cancelled")
                            .orderBy(asc("InvoiceMonth"))
)

#Total Orders

monthly_total_orders_df = (month_df
                            .select("Invoice", "InvoiceMonth")
                            #.where(col("Invoice"))
                            .groupby("InvoiceMonth")
                            .agg(countDistinct("Invoice")) 
                            .withColumnRenamed("count(Invoice)", "Total")
                            #.cast("integer")
                            .orderBy(asc("InvoiceMonth"))
)


In [0]:
#Calculate placed orders

orders_df = (monthly_total_orders_df
            .join(monthly_cancelled_orders_df, "InvoiceMonth")
            .orderBy("InvoiceMonth")
)

orders_df = (orders_df 
      .withColumn("Placed", orders_df.Total - (2 * orders_df.Cancelled))
      .withColumn("InvoiceMonth", orders_df.InvoiceMonth.cast("string"))
)

display(orders_df)

InvoiceMonth,Total,Cancelled,Placed
200912,2330,401,1528
201001,1633,300,1033
201002,1969,240,1489
201003,2367,407,1553
201004,1892,304,1284
201005,2418,407,1604
201006,2216,357,1502
201007,2017,344,1329
201008,1877,273,1331
201009,2375,371,1633


Output can only be rendered in Databricks

# Monthly Sales

In [0]:
monthly_sales_df = (month_df
           .select("InvoiceMonth", "Quantity", "Price")
           #.withColumn("Amount", monthly_sales_df.Quantity * monthly_sales_df.Price)
           #.groupBy("InvoiceMonth")
           #.sum()
)

monthly_sales_df = (monthly_sales_df
           .withColumn("Amount", monthly_sales_df.Quantity * monthly_sales_df.Price)
           .groupBy("InvoiceMonth")
           .sum("Amount")
           .orderBy("InvoiceMonth")
           .withColumnRenamed("sum(Amount)", "Amount")
           .withColumn("InvoiceMonth", monthly_sales_df.InvoiceMonth.cast("string"))
)

display(monthly_sales_df)

InvoiceMonth,Amount
200912,799847.1075055711
201001,624032.8896515106
201002,533091.424455221
201003,765848.757825097
201004,590580.3910229017
201005,615322.8286303803
201006,679786.6077747606
201007,575236.3479464054
201008,656776.3295686841
201009,853650.4293782803


Output can only be rendered in Databricks

#Monthly Sales Growth

In [0]:
from pyspark.sql import functions as F
from pyspark.sql.window import Window
from pyspark.sql.functions import lag 

win = Window.partitionBy().orderBy("InvoiceMonth")

monthly_sales_df = (monthly_sales_df 
            .withColumn("PrevValue", F.lag(monthly_sales_df.Amount).over(win))
)

monthly_sales_df = (monthly_sales_df
           .withColumn("Diff", F.when(F.isnull(monthly_sales_df.Amount - monthly_sales_df.PrevValue), 0).otherwise(monthly_sales_df.Amount - monthly_sales_df.PrevValue)))

monthly_sales_df = (monthly_sales_df
           .withColumn("PctGrowth", monthly_sales_df.Diff / monthly_sales_df.PrevValue)
)

display(monthly_sales_df)

InvoiceMonth,Amount,PrevValue,Diff,PctGrowth
200912,799847.1075055711,,0.0,
201001,624032.8896515106,799847.1075055711,-175814.21785406047,-0.2198097813997982
201002,533091.424455221,624032.8896515106,-90941.4651962896,-0.1457318463568091
201003,765848.757825097,533091.424455221,232757.3333698759,0.4366180409068411
201004,590580.3910229017,765848.757825097,-175268.3668021952,-0.2288550644123688
201005,615322.8286303803,590580.3910229017,24742.437607478583,0.0418951221265981
201006,679786.6077747606,615322.8286303803,64463.77914438024,0.104764159795383
201007,575236.3479464054,679786.6077747606,-104550.25982835516,-0.1537986459759688
201008,656776.3295686841,575236.3479464054,81539.98162227869,0.1417503986202828
201009,853650.4293782803,656776.3295686841,196874.0998095962,0.2997582143968049


Output can only be rendered in Databricks

#Monthly Active Users

In [0]:
user_df = (month_df
    .select("InvoiceMonth", "CustomerID")
    .groupBy("InvoiceMonth")
    .agg(countDistinct("CustomerID"))
    .withColumnRenamed("count(CustomerID)", "ActiveUsers")
    .orderBy("InvoiceMonth")
    .withColumn("InvoiceMonth", month_df.InvoiceMonth.cast("string"))
)

display(user_df)

InvoiceMonth,ActiveUsers
200912,1045
201001,786
201002,807
201003,1111
201004,998
201005,1062
201006,1095
201007,988
201008,964
201009,1202


Output can only be rendered in Databricks

#New and Existing Users

In [0]:
#Create New User Dataframe

new_user_df = (month_df
        .select("InvoiceMonth", "CustomerID")
)



user_w = Window.partitionBy("CustomerID").orderBy("InvoiceMonth")

new_user_df = (new_user_df 
        .withColumn("FirstPurchase", F.first("InvoiceMonth").over(user_w))

)


new_user_df = (new_user_df
        .groupBy("FirstPurchase")
        .agg(countDistinct("CustomerID"))
        .orderBy("FirstPurchase")
        .withColumnRenamed("FirstPurchase", "InvoiceMonth")
        .withColumnRenamed("count(CustomerID)", "NewUsers")
)



display(new_user_df)

InvoiceMonth,NewUsers
200912,1045
201001,394
201002,363
201003,436
201004,291
201005,254
201006,269
201007,183
201008,158
201009,242


In [0]:
#Create new and existing users dataframe 

new_ex_df = (user_df
      .join(new_user_df, "InvoiceMonth")
      .orderBy("InvoiceMonth")
)

new_ex_df = (new_ex_df
      .withColumn("ExistingUsers", new_ex_df.ActiveUsers - new_ex_df.NewUsers)
)

display(new_ex_df)

InvoiceMonth,ActiveUsers,NewUsers,ExistingUsers
200912,1045,1045,0
201001,786,394,392
201002,807,363,444
201003,1111,436,675
201004,998,291,707
201005,1062,254,808
201006,1095,269,826
201007,988,183,805
201008,964,158,806
201009,1202,242,960


Output can only be rendered in Databricks