## Overview

This notebook will show you how to create and query a table or DataFrame that you uploaded to DBFS. [DBFS](https://docs.databricks.com/user-guide/dbfs-databricks-file-system.html) is a Databricks File System that allows you to store data for querying inside of Databricks. This notebook assumes that you have a file already inside of DBFS that you would like to read from.

This notebook is written in **Python** so the default cell type is Python. However, you can use different languages by using the `%LANGUAGE` syntax. Python, Scala, SQL, and R are all supported.

In [0]:
# File location and type
file_location = "/FileStore/tables/retail.csv"
file_type = "csv"

# CSV options
infer_schema = "true"
first_row_is_header = "false"
delimiter = ","

# The applied options are for CSV files. For other file types, these will be ignored.
df = spark.read.format(file_type) \
  .option("inferSchema", infer_schema) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .load(file_location)

df = df.withColumnRenamed("_c0","invoice") \
    .withColumnRenamed("_c1","stockcode") \
    .withColumnRenamed("_c2","description") \
    .withColumnRenamed("_c3","quantity") \
    .withColumnRenamed("_c4","invoicedate") \
    .withColumnRenamed("_c5","price") \
    .withColumnRenamed("_c6","customerid") \
    .withColumnRenamed("_c7","country")
display(df)

invoice,stockcode,description,quantity,invoicedate,price,customerid,country
491100,22114,HOT WATER BOTTLE TEA AND SYMPATHY,4,2009-12-09T14:15:00.000+0000,3.95,17631.0,United Kingdom
491100,21486,PINK HEART DOTS HOT WATER BOTTLE,4,2009-12-09T14:15:00.000+0000,3.75,17631.0,United Kingdom
491100,85014D,PINK/BROWN DOTS RUFFLED UMBRELLA,3,2009-12-09T14:15:00.000+0000,5.95,17631.0,United Kingdom
491101,18097B,BLUE TALL PORCELAIN T-LIGHT HOLDER,2,2009-12-09T14:16:00.000+0000,2.55,17449.0,United Kingdom
491101,18097C,WHITE TALL PORCELAIN T-LIGHT HOLDER,1,2009-12-09T14:16:00.000+0000,2.55,17449.0,United Kingdom
491101,22178,VICTORIAN GLASS HANGING T-LIGHT,6,2009-12-09T14:16:00.000+0000,1.25,17449.0,United Kingdom
491101,22295,HEART FILIGREE DOVE LARGE,12,2009-12-09T14:16:00.000+0000,1.65,17449.0,United Kingdom
491101,21255,"FRENCH BOTTLE , LAVENDER",3,2009-12-09T14:16:00.000+0000,1.65,17449.0,United Kingdom
491101,21774,DECORATIVE CATS BATHROOM BOTTLE,1,2009-12-09T14:16:00.000+0000,1.25,17449.0,United Kingdom
491101,22044,CHRISTMAS CARD STACK OF PRESENTS,24,2009-12-09T14:16:00.000+0000,0.42,17449.0,United Kingdom


In [0]:
# With this registered as a temp view, it will only be available to this particular notebook. If you'd like other users to be able to query this table, you can also create a table from the DataFrame.
# Once saved, this table will persist across cluster restarts as well as allow various users across different notebooks to query this data.
# To do so, choose your table name and uncomment the bottom line.

retail_df = "retail_df"

df.write.format("parquet").saveAsTable(retail_df)

# Total Invoice Amount Distributinon

In [0]:
from pyspark.sql import functions as F
from pyspark.sql.functions import col
df = df.withColumn("amount",df.quantity * df.price)
invoice_amount_df = df.groupby('invoice').agg(F.sum('amount').alias('total_amount')).filter(col('total_amount') > 0)

In [0]:
percentile = invoice_amount_df.selectExpr('percentile(total_amount, 0.85)').show()

In [0]:
invoice_amount_df.filter(col('total_amount') < 724.3894999999991).describe().show()

# Monthly Placed and Cancelled Orders

In [0]:
from pyspark.sql.functions import date_format
df = df.withColumn("invoiceyearmonth",date_format('invoicedate','YYYYMM').cast('int'))
display(df)

invoice,stockcode,description,quantity,invoicedate,price,customerid,country,amount,invoiceyearmonth
491100,22114,HOT WATER BOTTLE TEA AND SYMPATHY,4,2009-12-09T14:15:00.000+0000,3.95,17631.0,United Kingdom,15.8,200912
491100,21486,PINK HEART DOTS HOT WATER BOTTLE,4,2009-12-09T14:15:00.000+0000,3.75,17631.0,United Kingdom,15.0,200912
491100,85014D,PINK/BROWN DOTS RUFFLED UMBRELLA,3,2009-12-09T14:15:00.000+0000,5.95,17631.0,United Kingdom,17.85,200912
491101,18097B,BLUE TALL PORCELAIN T-LIGHT HOLDER,2,2009-12-09T14:16:00.000+0000,2.55,17449.0,United Kingdom,5.1,200912
491101,18097C,WHITE TALL PORCELAIN T-LIGHT HOLDER,1,2009-12-09T14:16:00.000+0000,2.55,17449.0,United Kingdom,2.55,200912
491101,22178,VICTORIAN GLASS HANGING T-LIGHT,6,2009-12-09T14:16:00.000+0000,1.25,17449.0,United Kingdom,7.5,200912
491101,22295,HEART FILIGREE DOVE LARGE,12,2009-12-09T14:16:00.000+0000,1.65,17449.0,United Kingdom,19.8,200912
491101,21255,"FRENCH BOTTLE , LAVENDER",3,2009-12-09T14:16:00.000+0000,1.65,17449.0,United Kingdom,4.949999999999999,200912
491101,21774,DECORATIVE CATS BATHROOM BOTTLE,1,2009-12-09T14:16:00.000+0000,1.25,17449.0,United Kingdom,1.25,200912
491101,22044,CHRISTMAS CARD STACK OF PRESENTS,24,2009-12-09T14:16:00.000+0000,0.42,17449.0,United Kingdom,10.08,200912


In [0]:
placed_orders_df  = df.filter(F.substring('invoice',1,1) != "C")
canceled_orders_df = df.filter(F.substring('invoice',1,1) == "C")

In [0]:
total_monthly_placed_orders_df = placed_orders_df.groupby('invoiceyearmonth').agg(F.countDistinct('invoice').alias('total_monthly')).sort('invoiceyearmonth')
monthly_canceled_orders_df = canceled_orders_df.groupby('invoiceyearmonth').agg(F.countDistinct('invoice').alias('monthly_canceled')).sort('invoiceyearmonth')

monthly_placed_df = total_monthly_placed_orders_df.join(monthly_canceled_orders_df,'invoiceyearmonth').orderBy('invoiceyearmonth',ascending=True)
monthly_placed_df = monthly_placed_df.withColumn('monthly_placed',F.col('total_monthly')-F.col('monthly_canceled'))

In [0]:
display(monthly_placed_df)

invoiceyearmonth,total_monthly,monthly_canceled,monthly_placed
200912,1929,401,1528
201001,1333,300,1033
201002,1729,240,1489
201003,1960,407,1553
201004,1588,304,1284
201005,2011,407,1604
201006,1859,357,1502
201007,1673,344,1329
201008,1604,273,1331
201009,2004,371,1633


# Monthly Sales

In [0]:
monthly_sales_df = df.groupby('invoiceyearmonth').agg(F.sum('amount').alias('total_amount')).sort('invoiceyearmonth')
display(monthly_sales_df)

invoiceyearmonth,total_amount
200912,799847.1100000201
201001,624032.8919999955
201002,533091.4260000042
201003,765848.7609999765
201004,590580.3919999823
201005,615322.8300000005
201006,679786.6099999842
201007,575236.349999999
201008,656776.3300000043
201009,853650.4309999745


# Monthly Sales Growth

In [0]:
from pyspark.sql.window import Window

price_window = Window.partitionBy().orderBy("invoiceyearmonth")
monthly_sales_df = monthly_sales_df.withColumn("prev_value", F.lag(monthly_sales_df.total_amount).over(price_window))
monthly_sales_df = monthly_sales_df.withColumn("diff", F.when(F.isnull(monthly_sales_df.total_amount - monthly_sales_df.prev_value), 0).otherwise(monthly_sales_df.total_amount - monthly_sales_df.prev_value))
monthly_sales_df = monthly_sales_df.withColumn("pct_change",monthly_sales_df.diff / monthly_sales_df.prev_value)
display(monthly_sales_df.select('invoiceyearmonth','pct_change').sort('invoiceyearmonth'))

invoiceyearmonth,pct_change
200912,
201001,-0.2198097808967769
201002,-0.1457318470962776
201003,0.4366180426994347
201004,-0.2288550663333901
201005,0.0418951227219527
201006,0.1047641609526881
201007,-0.1537986457249982
201008,0.141750395294048
201009,0.2997582160123964


# Monthly Active Users

In [0]:
monthly_active_df = df.groupby('invoiceyearmonth').agg(F.countDistinct('customerid')).sort('invoiceyearmonth')
display(monthly_active_df)

invoiceyearmonth,count(DISTINCT customerid)
200912,1045
201001,786
201002,807
201003,1111
201004,998
201005,1062
201006,1095
201007,988
201008,964
201009,1202


# New and Existing Users

In [0]:
new_ex_df = df.sort('invoiceyearmonth').groupby('customerid').agg(F.first('invoiceyearmonth').alias('first_purchase_ym'))
first_purchase_month_df = df.join(new_ex_df,'customerid')
display(first_purchase_month_df)

customerid,invoice,stockcode,description,quantity,invoicedate,price,country,amount,invoiceyearmonth,first_purchase_ym
12799,513796,85017C,ENVELOPE 50 CURIOUS IMAGES,12,2010-06-28T15:57:00.000+0000,0.85,Japan,10.2,201006,201006
12799,513796,85032C,CURIOUS IMAGES GIFT WRAP SET,6,2010-06-28T15:57:00.000+0000,2.1,Japan,12.6,201006,201006
12799,513796,85049D,BRIGHT BLUES RIBBONS,12,2010-06-28T15:57:00.000+0000,1.25,Japan,15.0,201006,201006
12799,513796,85049G,CHOCOLATE BOX RIBBONS,12,2010-06-28T15:57:00.000+0000,1.25,Japan,15.0,201006,201006
12799,513796,22509,SEWING BOX RETROSPOT DESIGN,1,2010-06-28T15:57:00.000+0000,16.95,Japan,16.95,201006,201006
12799,513796,85176,SEWING SUSAN 21 NEEDLE SET,12,2010-06-28T15:57:00.000+0000,0.85,Japan,10.2,201006,201006
12799,513796,85178,VICTORIAN SEWING KIT,12,2010-06-28T15:57:00.000+0000,1.25,Japan,15.0,201006,201006
12799,513796,85032B,BLOSSOM IMAGES GIFT WRAP SET,6,2010-06-28T15:57:00.000+0000,2.1,Japan,12.6,201006,201006
12799,513796,85032A,ROMANTIC IMAGES GIFT WRAP SET,6,2010-06-28T15:57:00.000+0000,2.1,Japan,12.6,201006,201006
12799,513796,85017B,ENVELOPE 50 BLOSSOM IMAGES,12,2010-06-28T15:57:00.000+0000,0.85,Japan,10.2,201006,201006


In [0]:
new_user = first_purchase_month_df.filter(first_purchase_month_df.invoiceyearmonth == first_purchase_month_df.first_purchase_ym).groupby('invoiceyearmonth').agg(F.countDistinct('customerid').alias('new_users')).sort('invoiceyearmonth')
ex_user = first_purchase_month_df.filter(first_purchase_month_df.invoiceyearmonth != first_purchase_month_df.first_purchase_ym).groupby('invoiceyearmonth').agg(F.countDistinct('customerid').alias('ex_users')).sort('invoiceyearmonth')

In [0]:
display(new_user)

invoiceyearmonth,new_users
200912,1045
201001,394
201002,363
201003,436
201004,291
201005,254
201006,269
201007,183
201008,158
201009,242


In [0]:
display(ex_user)

invoiceyearmonth,ex_users
201001,392
201002,444
201003,675
201004,707
201005,808
201006,826
201007,805
201008,806
201009,960
201010,1198
