In [0]:
dataframe = spark.read.table("retail_csv")

dataframe.show()

#Total Invoice Amount

In [0]:
from pyspark.sql.functions import *

total_invoice = dataframe.withColumn('Amount', (col('quantity') * col('unit_price'))).select('invoice_no', 'Amount').groupBy('invoice_no').agg(sum('Amount')).orderBy('invoice_no').withColumnRenamed('sum(Amount)', 'Amount').filter(col('Amount') > 0)
total_invoice = total_invoice.withColumn('Amount', round(total_invoice['Amount'], 2))
total_invoice.show()

#Monthly Placed and Canceled Orders

In [0]:
monthly_placed_orders = dataframe.withColumn('YYYYMM', concat(col('invoice_date').substr(0,4), col('invoice_date').substr(6,2)))
# Get Placed Orders
monthly_placed_orders = dataframe.withColumn('YYYYMM', concat(col('invoice_date').substr(0,4), col('invoice_date').substr(6,2))).groupBy('YYYYMM', 'invoice_no').count().groupBy('YYYYMM').count().withColumnRenamed('Count', 'PlacedBefore')
monthly_placed_orders.show()

In [0]:
# Get cancelled orders
monthly_cancelled_orders = dataframe.withColumn('YYYYMM', concat(col('invoice_date').substr(0,4), col('invoice_date').substr(6,2))).filter(col('invoice_no').startswith("C")).groupBy(col('invoice_no'), col('YYYYMM')).count().groupBy('YYYYMM').count().orderBy(col('YYYYMM')).withColumnRenamed('Count', 'Cancelled')
monthly_cancelled_orders.show()

In [0]:
# Placed orders - 2 * Cancelled Orders
monthly_orders = monthly_placed_orders.join(monthly_cancelled_orders, monthly_placed_orders['YYYYMM'] == monthly_cancelled_orders['YYYYMM'], "inner").withColumn('Placed', col('PlacedBefore') - (2 * col('Cancelled'))).drop(monthly_placed_orders['YYYYMM'])
monthly_orders = monthly_orders.select('YYYYMM', 'Placed', 'Cancelled')
monthly_orders.show()

#Monthly Sales

In [0]:
monthly_sales = dataframe.withColumn('YYYYMM', concat(col('invoice_date').substr(0,4), col('invoice_date').substr(6,2))).withColumn('Amount', col('quantity') * col('unit_price')).groupBy('YYYYMM').sum('Amount').orderBy('YYYYMM').withColumnRenamed('sum(Amount)', 'Amount')
monthly_sales = monthly_sales.withColumn('Amount', round(monthly_sales['Amount'], 2))
monthly_sales.show()

#Monthly Active Users

In [0]:
monthly_active_users = dataframe.withColumn('YYYYMM', concat(col('invoice_date').substr(0,4), col('invoice_date').substr(6,2))).groupBy('YYYYMM', 'customer_id').count().groupBy('YYYYMM').count().withColumnRenamed('count', 'active_users')

monthly_active_users.show()

#New and Existing Users

In [0]:
with_yyyymm = dataframe.withColumn('YYYYMM', concat(col('invoice_date').substr(0,4), col('invoice_date').substr(6,2)).cast("Integer"))
monthly_new_users = with_yyyymm.groupBy('customer_id').min('YYYYMM').groupBy('min(YYYYMM)').count().orderBy('min(YYYYMM)').withColumnRenamed('count', 'new_users')
monthly_users = monthly_active_users.join(monthly_new_users, col('YYYYMM') == col('min(YYYYMM)'), "inner").withColumn('existing_users', col('active_users') - col('new_users'))
monthly_users = monthly_users.select('YYYYMM', 'new_users', 'existing_users').orderBy('YYYYMM')

monthly_users.show()

#RFM Segmentation

In [0]:
rfm = dataframe
# Get frequency, monetary and recency
frequency = rfm.groupBy('customer_id', 'invoice_no').count().groupBy('customer_id').count().orderBy('customer_id').where(col('customer_id').isNotNull()).withColumnRenamed('count', 'Frequency')
monetary = rfm.withColumn('Amount', col('unit_price') * col('quantity')).groupBy('customer_id').sum('Amount').withColumnRenamed('sum(Amount)', 'monetary')
monetary = monetary.withColumn('monetary', round(monetary['monetary'], 2)).where(col('customer_id').isNotNull()).orderBy('customer_id')
# Weird way of finding recency cause it's difficult to convert the date to a timestamp and compare it with the current date...
recency = dataframe.withColumn('YYYYMM', concat(col('invoice_date').substr(0,4), col('invoice_date').substr(6,2)).cast("Integer")).groupBy('customer_id').max('YYYYMM')
recency = recency.withColumn('recency', 202110 - col('max(YYYYMM)') )

# Merge frequency, monetary and and recency
rfm = recency.join(frequency, frequency['customer_id'] == recency['customer_id'], "inner").drop(recency['customer_id']).drop('max(YYYYMM)')
rfm = rfm.join(monetary, monetary['customer_id'] == rfm['customer_id'], "inner").drop(rfm['customer_id'])
rfm.withColumnRenamed('Frequency', 'frequency').select('customer_id', 'recency','frequency','monetary')

rfm.show()