In [0]:
#Read data from PSQL via a JDBC connection
my_df = spark.read \
    .format("jdbc") \
    .option("url", "jdbc:postgresql://104.195.233.166:5432/postgres") \
    .option("dbtable", "public.retail") \
    .option("user", "postgres") \
    .option("password", "password") \
    .load()

#Persist table to Hive Metastore
my_df.write.saveAsTable("retail")

In [0]:
retail_df = spark.sql("SELECT * FROM retail")
display(retail_df)

invoice_no,stock_code,description,quantity,invoice_date,unit_price,customer_id,country
545300,21155,RED RETROSPOT PEG BAG,6,2011-03-01T12:24:00Z,2.55,15092.0,United Kingdom
545300,47566,PARTY BUNTING,4,2011-03-01T12:24:00Z,4.95,15092.0,United Kingdom
545300,47566B,TEA TIME PARTY BUNTING,4,2011-03-01T12:24:00Z,4.95,15092.0,United Kingdom
545300,22896,PEG BAG APPLES DESIGN,6,2011-03-01T12:24:00Z,2.55,15092.0,United Kingdom
545300,22895,SET OF 2 TEA TOWELS APPLE AND PEARS,6,2011-03-01T12:24:00Z,2.95,15092.0,United Kingdom
545300,21340,CLASSIC METAL BIRDCAGE PLANT HOLDER,1,2011-03-01T12:24:00Z,12.75,15092.0,United Kingdom
545301,POST,POSTAGE,4,2011-03-01T12:26:00Z,18.0,12679.0,France
545301,22089,PAPER BUNTING VINTAGE PAISLEY,6,2011-03-01T12:26:00Z,2.95,12679.0,France
545301,22090,PAPER BUNTING RETROSPOT,6,2011-03-01T12:26:00Z,2.95,12679.0,France
545301,22432,WATERING CAN PINK BUNNY,8,2011-03-01T12:26:00Z,1.95,12679.0,France


In [0]:
from pyspark.sql.functions import col
from pyspark.sql.functions import sum
invoice_total_df = retail_df\
    .withColumn('total_amount', col('quantity') * col('unit_price'))\
    .filter(col('total_amount') > 0)\
    .groupby(col('invoice_no'))\
    .agg(sum('total_amount'))

display(invoice_total_df)

invoice_no,sum(total_amount)
545583,317.24999713897705
547122,298.1100025177002
548542,198.20000076293945
550469,178.4099978208542
550617,375.9999942779541
550831,171.19999504089355
551990,68.0
552172,640.0
552191,58.89999723434448
552238,185.3099946975708


In [0]:
from pyspark.sql.functions import month, year

cancels_per_month_df = retail_df\
    .dropDuplicates(['invoice_no'])\
    .filter(col('invoice_no').contains('C'))\
    .groupby(year(col('invoice_date')).alias('year'), month(col('invoice_date')).alias('month'))\
    .count()\
    .withColumnRenamed("count", "cancellations")

orders_per_month_df = retail_df\
    .dropDuplicates(['invoice_no'])\
    .groupby(year(col('invoice_date')).alias('year'), month(col('invoice_date')).alias('month'))\
    .count()\
    .withColumnRenamed("count", "orders_placed")\
    .join(cancels_per_month_df, ['year', 'month'])\
    .withColumn('orders_placed', col('orders_placed') - 2 * col('cancellations'))\
    .orderBy('year', 'month')\

display(orders_per_month_df)

year,month,orders_placed,cancellations
2009,12,1528,401
2010,1,1033,300
2010,2,1489,240
2010,3,1553,407
2010,4,1284,304
2010,5,1604,407
2010,6,1502,357
2010,7,1329,344
2010,8,1331,273
2010,9,1633,371


In [0]:
from pyspark.sql.functions import month, year, sum

monthly_sales_df = retail_df\
    .withColumn('total_amount', col('quantity') * col('unit_price'))\
    .filter(col('total_amount') > 0)\
    .groupby(year(col('invoice_date')).alias('year'), month(col('invoice_date')).alias('month'))\
    .agg(sum('total_amount').alias('monthly_sales'))\
    .orderBy('year', 'month')

display(monthly_sales_df)

year,month,monthly_sales
2009,12,825685.7573565356
2010,1,652708.4992080962
2010,2,553713.3044831112
2010,3,833570.1277497764
2010,4,681528.9893587043
2010,5,659858.8580619022
2010,6,752270.1375174187
2010,7,650712.9387189075
2010,8,697274.908798039
2010,9,924333.0085303576


In [0]:
from pyspark.sql.functions import lag, col
from pyspark.sql.window import Window

# Calculate monthly sales growth by percent
window_spec = Window.orderBy('year', 'month')
monthly_sales_growth_df = monthly_sales_df\
    .withColumn('previous_month_sales', lag('monthly_sales').over(window_spec))\
    .withColumn('sales_growth_percent', (col('monthly_sales') / col('previous_month_sales')))\
    .filter(col('previous_month_sales').isNotNull())\
    .orderBy('year', 'month')

display(monthly_sales_growth_df)

year,month,monthly_sales,previous_month_sales,sales_growth_percent
2010,1,652708.4992080962,825685.7573565356,0.7905047330570013
2010,2,553713.3044831112,652708.4992080962,0.8483316904175575
2010,3,833570.1277497764,553713.3044831112,1.505418275126891
2010,4,681528.9893587043,833570.1277497764,0.8176024627927739
2010,5,659858.8580619022,681528.9893587043,0.968203654378381
2010,6,752270.1375174187,659858.8580619022,1.1400470393425366
2010,7,650712.9387189075,752270.1375174187,0.8649990292933041
2010,8,697274.908798039,650712.9387189075,1.0715553161902702
2010,9,924333.0085303576,697274.908798039,1.3256364123638424
2010,10,1165483.9084469676,924333.0085303576,1.2608917973188336


In [0]:
from pyspark.sql.functions import countDistinct

monthly_active_users_df = retail_df\
    .groupby(year(col('invoice_date')).alias('year'), month(col('invoice_date')).alias('month'))\
    .agg(countDistinct('customer_id').alias('monthly_active_users'))\
    .orderBy('year', 'month')

display(monthly_active_users_df)

year,month,monthly_active_users
2009,12,1045
2010,1,786
2010,2,807
2010,3,1111
2010,4,998
2010,5,1062
2010,6,1095
2010,7,988
2010,8,964
2010,9,1202


In [0]:
from pyspark.sql.functions import min, when

# Get the first purchase date for each customer
first_purchase_df = retail_df\
    .groupby('customer_id')\
    .agg(min('invoice_date').alias('first_purchase_date'))

# Join with the original dataframe to get the first purchase year and month
retail_with_first_purchase_df = retail_df\
    .join(first_purchase_df, on='customer_id')\
    .withColumn('first_purchase_year', year(col('first_purchase_date')))\
    .withColumn('first_purchase_month', month(col('first_purchase_date')))\
    .withColumn('purchase_year', year(col('invoice_date')))\
    .withColumn('purchase_month', month(col('invoice_date')))

# Calculate new and existing users
new_existing_users_df = retail_with_first_purchase_df\
    .groupby('purchase_year', 'purchase_month')\
    .agg(
        countDistinct(
            when(
                (col('first_purchase_year') == col('purchase_year')) & 
                (col('first_purchase_month') == col('purchase_month')), 
                col('customer_id')
            )
        ).alias('new_users'),
        countDistinct(
            when(
                (col('first_purchase_year') < col('purchase_year')) | 
                ((col('first_purchase_year') == col('purchase_year')) & 
                 (col('first_purchase_month') < col('purchase_month'))), 
                col('customer_id')
            )
        ).alias('existing_users')
    )\
    .orderBy('purchase_year', 'purchase_month')

display(new_existing_users_df)

purchase_year,purchase_month,new_users,existing_users
2009,12,1045,0
2010,1,394,392
2010,2,363,444
2010,3,436,675
2010,4,291,707
2010,5,254,808
2010,6,269,826
2010,7,183,805
2010,8,158,806
2010,9,242,960


In [0]:
from pyspark.sql.functions import col, max, datediff, lit, sum as spark_sum

# Calculate Recency, Frequency, and Monetary value for each customer
rfm_df = retail_df\
    .groupby('customer_id')\
    .agg(
        max('invoice_date').alias('last_purchase_date'),
        countDistinct('invoice_no').alias('frequency'),
        spark_sum(col('quantity') * col('unit_price')).alias('monetary')
    )

# Calculate Recency
max_date = retail_df.agg(max('invoice_date')).collect()[0][0]
rfm_df = rfm_df.withColumn('recency', datediff(lit(max_date), col('last_purchase_date')))

# Define RFM score thresholds
quantiles = rfm_df.approxQuantile(['recency', 'frequency', 'monetary'], [0.25, 0.5, 0.75], 0.01)
recency_thresholds = quantiles[0]
frequency_thresholds = quantiles[1]
monetary_thresholds = quantiles[2]

# Assign RFM scores
rfm_df = rfm_df\
    .withColumn('recency_score', 
                when(col('recency') <= recency_thresholds[0], 4)
                .when(col('recency') <= recency_thresholds[1], 3)
                .when(col('recency') <= recency_thresholds[2], 2)
                .otherwise(1))\
    .withColumn('frequency_score', 
                when(col('frequency') <= frequency_thresholds[0], 1)
                .when(col('frequency') <= frequency_thresholds[1], 2)
                .when(col('frequency') <= frequency_thresholds[2], 3)
                .otherwise(4))\
    .withColumn('monetary_score', 
                when(col('monetary') <= monetary_thresholds[0], 1)
                .when(col('monetary') <= monetary_thresholds[1], 2)
                .when(col('monetary') <= monetary_thresholds[2], 3)
                .otherwise(4))

# Combine RFM scores into a single score
rfm_df = rfm_df.withColumn('RFM_score', 
                           col('recency_score') + col('frequency_score') + col('monetary_score'))

display(rfm_df)

customer_id,last_purchase_date,frequency,monetary,recency,recency_score,frequency_score,monetary_score,RFM_score
17323.0,2011-11-24T17:29:00Z,14,1553.510007083416,15,4,4,3,11
12714.0,2011-11-30T15:43:00Z,12,14093.709978342056,9,4,4,4,12
12636.0,2009-12-01T09:55:00Z,1,141.0,738,1,1,1,3
17854.0,2011-06-07T16:21:00Z,6,1071.449997395277,185,2,3,3,8
13999.0,2011-11-29T13:40:00Z,36,9438.009954929352,10,4,4,4,12
13268.0,2011-11-22T11:51:00Z,15,3167.639995932579,17,4,4,4,12
13159.0,2011-12-01T11:20:00Z,5,1379.7500064373016,8,4,3,3,10
13883.0,2011-12-08T19:20:00Z,26,6342.740051031113,1,4,4,4,12
16156.0,2011-12-01T12:42:00Z,26,5933.590007066727,8,4,4,4,12
12467.0,2010-11-18T15:53:00Z,2,0.0,386,1,2,1,4
