# Retail Data Wrangling and Analytics

In [0]:
%scala
// Load the table into a Spark DataFrame
val df = spark.table("jarvis_workspace.default.online_retail_ii")
df.show(5)

In [0]:
%scala
//imports
import org.apache.spark.sql.functions._

In [0]:
%scala
df.printSchema()
df.describe().show()

# Total Invoice Amount Distribution

## Calculate Invoice Total

In [0]:
%scala

// Create column that displays total price of the items
val itemsTotal = df.withColumn("ItemsTotal",$"Quantity" * $"Price")

val invoiceTotal = itemsTotal
    .filter($"ItemsTotal" > 0)
    .groupBy("Invoice")
    .agg(sum("ItemsTotal").alias("ItemsTotal"))

invoiceTotal.show()

## Calculate Min, Max, Mean, Median, and Mode Values and Draw Distribution Amount

In [0]:
%scala
val minTotal = invoiceTotal.agg(min("ItemsTotal")).as[Double].first()
val maxTotal = invoiceTotal.agg(max("ItemsTotal")).as[Double].first()
val medianTotal = invoiceTotal.stat.approxQuantile("ItemsTotal", Array(0.5), 0.0)(0)
val modeTotalDF = invoiceTotal.groupBy("ItemsTotal").count().orderBy(desc("count"))
val modeTotal = modeTotalDF.first().getAs[Double]("ItemsTotal")
val meanTotal = invoiceTotal.agg(avg("ItemsTotal")).as[Double].first()

println(f"Minimum: $minTotal%.2f")
println(f"Mean: $meanTotal%.2f")
println(f"Median: $medianTotal%.2f")
println(f"Mode: $modeTotal%.2f")
println(f"Maximum: $maxTotal%.2f")

display(invoiceTotal)

Invoice,ItemsTotal
489677,192.0
491045,303.2
491658,155.05999999999997
493542,118.75
493977,275.95
494244,6711.0
494277,1335.92
495185,2507.06
495783,48.96
496171,199.3


Databricks visualization. Run in Databricks to view.

## Draw Distribution And Calculate Values for First 85 Quantiles

In [0]:
%scala
val quantileArray = (0 to 85).map(_ / 100.0).toArray
val quantiles = invoiceTotal.stat.approxQuantile("ItemsTotal", quantileArray, 0.01)

// Convert quantiles to a DataFrame for further processing
val quantileDF = spark.createDataFrame(quantiles.zipWithIndex.map { case (value, index) =>
  (index, value)
}).toDF("Quantile", "ItemsTotal")

// Calculate additional statistics
val minTotal = quantileDF.agg(min("ItemsTotal")).first().getDouble(0)
val maxTotal = quantileDF.agg(max("ItemsTotal")).first().getDouble(0)
val medianTotal = quantiles(quantiles.length / 2)
val meanTotal = invoiceTotal.agg(mean("ItemsTotal")).first().getDouble(0)
val modeTotal = invoiceTotal.groupBy("ItemsTotal")
  .count()
  .orderBy(desc("count"))
  .select("ItemsTotal")
  .first()
  .getDouble(0)

// Print the results
display(quantileDF)
println(f"Minimum: $minTotal%.2f\nMaximum: $maxTotal%.2f\nMedian: $medianTotal%.2f\nMode: $modeTotal%.2f\nMean: $meanTotal%.2f")


Quantile,ItemsTotal
0,0.19
1,0.19
2,6.45
3,14.85
4,14.85
5,22.5
6,22.5
7,36.48
8,36.48
9,52.35


Databricks visualization. Run in Databricks to view.

# Monthly Placed and Canceled Orders

## Calculate Monthly Placed and Cancelled Orders

In [0]:
%scala
// Create column to group by month
val dfWithMonth = itemsTotal.withColumn("YYYYMM", date_format(col("InvoiceDate"), "yyyyMM"))

// Calculate Total # of Orders, Cancelled, and Placed
val totalOrders = dfWithMonth.groupBy("YYYYMM").agg(countDistinct("Invoice").alias("TotalOrders"))

val cancelledOrders = dfWithMonth
  .filter(col("Invoice").startsWith("C"))
  .groupBy("YYYYMM")
  .agg(countDistinct("Invoice").alias("CancelledOrders"))

val placedOrders = totalOrders
  .join(cancelledOrders, Seq("YYYYMM"), "left_outer")
  .withColumn("PlacedOrders", col("TotalOrders") - col("CancelledOrders") * 2)
  .select("YYYYMM", "TotalOrders", "CancelledOrders", "PlacedOrders")



## Show Distribution of Placed and Cancelled Orders By Month

In [0]:
%scala

//Reshape dataframe to allow for double bar graph

val cancelledOrdersDF = placedOrders
  .select(
    col("YYYYMM"),
    col("CancelledOrders").alias("OrderCount"),
    lit("CancelledOrders").alias("OrderType")
  )

val placedOrdersDF = placedOrders
  .select(
    col("YYYYMM"),
    col("PlacedOrders").alias("OrderCount"),
    lit("PlacedOrders").alias("OrderType")
  )

// Combine both DataFrames
val combinedOrdersDF = cancelledOrdersDF.union(placedOrdersDF)

display(combinedOrdersDF)


YYYYMM,OrderCount,OrderType
201001,300,CancelledOrders
201002,240,CancelledOrders
201007,344,CancelledOrders
201004,304,CancelledOrders
200912,401,CancelledOrders
201006,357,CancelledOrders
201005,407,CancelledOrders
201003,407,CancelledOrders
201103,318,CancelledOrders
201104,240,CancelledOrders


Databricks visualization. Run in Databricks to view.

# Monthly Sales

## Calculate Monthly Sales Data

In [0]:
%scala
val monthlySales = dfWithMonth.groupBy("YYYYMM").agg(sum("ItemsTotal").alias("MonthlySalesTotal"))

## Distribution of Sales By Month

In [0]:
%scala
display(monthlySales)

YYYYMM,sum(ItemsTotal)
201001,624032.8919999956
201002,533091.4260000042
201007,575236.360000009
201004,590580.4319999823
200912,799847.1100000143
201006,679786.6099999842
201005,615322.8300000005
201003,765848.7609999765
201103,683267.0800000189
201104,493207.1210000249


Databricks visualization. Run in Databricks to view.

# Monthly Sales Growth


## Calculate Monthly Sales Percentage Growth Data

In [0]:
%scala
import org.apache.spark.sql.expressions.Window

// Define a window specification for calculating the lag
val windowSpec = Window.orderBy("YYYYMM")

// Add a column for the previous month's ItemsTotal
val monthlySalesWithLag = monthlySales
  .withColumn("PreviousItemsTotal", lag("MonthlySalesTotal", 1).over(windowSpec))

// Calculate the growth rate as a percentage change
val monthlySalesWithGrowthRate = monthlySalesWithLag
  .withColumn("GrowthRate", (col("MonthlySalesTotal") - col("PreviousItemsTotal")) / col("PreviousItemsTotal"))

YYYYMM,GrowthRate
200912,
201001,-0.2198097808967711
201002,-0.1457318470962778
201003,0.4366180426994347
201004,-0.2288550141037566
201005,0.0418950521544184
201006,0.1047641609526881
201007,-0.153798631014485
201008,0.1417503928297842
201009,0.2997581962224666


Databricks visualization. Run in Databricks to view.

## Plot Chart to Show Growth Percentage

In [0]:

%scala
// Display the result
display(monthlySalesWithGrowthRate.select("YYYYMM","GrowthRate"))

YYYYMM,GrowthRate
200912,
201001,-0.2198097808967711
201002,-0.1457318470962778
201003,0.4366180426994347
201004,-0.2288550141037566
201005,0.0418950521544184
201006,0.1047641609526881
201007,-0.153798631014485
201008,0.1417503928297842
201009,0.2997581962224666


Databricks visualization. Run in Databricks to view.

# Monthly Active Users

## Calculate Monthly Active Users

In [0]:
%scala
val monthlyUsers = dfWithMonth
  .groupBy("YYYYMM")
  .agg(countDistinct("Customer ID").alias("UniqueUsersPerMonth"))

## Plot Bar Chart For Monthly Active Users

In [0]:
%scala
display(monthlyUsers)

YYYYMM,UniqueUsersPerMonth
201103,1020
201001,786
201002,807
201010,1577
201009,1202
201104,899
201106,1051
201102,798
201110,1425
201008,964


Databricks visualization. Run in Databricks to view.

# New and Existing Users



## Merge Tables To Find New And Existing Users

In [0]:
%scala
// Find out purchase date of Each customer Id
val firstPurchase = dfWithMonth.groupBy("Customer ID").agg(min("YYYYMM").alias("FirstPurchaseDate"))

// Join the first purchase date back to the original DataFrame
val dfWithFirstPurchase = dfWithMonth
  .join(firstPurchase, Seq("Customer ID"), "left_outer")

## Find Values of New And Existing Users Per Month

In [0]:
%scala
//find new users
val newUsers = dfWithFirstPurchase
  .filter(col("YYYYMM") === col("FirstPurchaseDate"))

// Count unique new users per month
val newUsersPerMonth = newUsers
  .groupBy("YYYYMM")
  .agg(countDistinct("Customer ID").alias("NewUsers"))
  
// Step 3: Find existing users
val existingUsers = dfWithFirstPurchase
  .filter(col("YYYYMM") =!= col("FirstPurchaseDate"))

//   Count unique existing users per month
val existingUsersPerMonth = existingUsers
  .groupBy("YYYYMM")
  .agg(countDistinct("Customer ID").alias("ExistingUsers"))


## Plot Data Of New And Existing Users By Month

In [0]:
%scala
// Combine new and existing users DataFrames
val newUsersWithType = newUsersPerMonth
  .withColumnRenamed("NewUsers", "NewUserCount")

// Prepare existing users DataFrame
val existingUsersWithType = existingUsersPerMonth
  .withColumnRenamed("ExistingUsers", "ExUserCount")

// Merge new and existing users DataFrames on YYYYMM
val newEx = newUsersWithType
  .join(existingUsersWithType, Seq("YYYYMM"), "outer")

// Reshape DataFrame for side-by-side bar chart
val reshapedDF = newEx
  .select(col("YYYYMM"), 
          col("NewUserCount"), 
          col("ExUserCount"))
  .withColumn("UserType", lit("New Users"))
  .select(col("YYYYMM"), col("NewUserCount"), col("UserType"))
  .union(newEx.select(col("YYYYMM"), 
                      col("ExUserCount").alias("UserCount"), 
                      lit("Existing Users").alias("UserType")))
  .withColumnRenamed("NewUserCount", "UserCount")

display(reshapedDF)


YYYYMM,UserCount,UserType
200912,1045.0,New Users
201001,394.0,New Users
201002,363.0,New Users
201003,436.0,New Users
201004,291.0,New Users
201005,254.0,New Users
201006,269.0,New Users
201007,183.0,New Users
201008,158.0,New Users
201009,242.0,New Users


Databricks visualization. Run in Databricks to view.

## Calculating Recency, Monetary and Frequency Values

In [0]:
%scala
import java.time.LocalDate
import java.time.format.DateTimeFormatter

// Set today's date
val today = LocalDate.now()

// Calculating recency and monetary values
val dfX = dfWithMonth
  .groupBy("Customer ID")
  .agg(
    sum("ItemsTotal").alias("MonetaryValue"), // monetary value
    datediff(lit(today.toString), max("InvoiceDate")).alias("RecencyValue") // recency value
  )

// Calculate frequency values
// Calculate frequency values
val dfY = dfWithMonth
  .groupBy("Customer ID", "Invoice")
  .agg(sum("ItemsTotal").alias("TotalItems")) // Sum of ItemsTotal for each Invoice

val dfZ = dfY
  .groupBy("Customer ID")
  .agg(count("TotalItems").alias("FrequencyValue")) // Count of distinct invoices for each CustomerId


## Merge values into a single table

In [0]:
%scala

// Merge dfX and dfZ on 'CustomerId' to create the RFM table
val rfmTable = dfX.join(dfZ, Seq("Customer ID"), "inner")
  .select("Customer ID", "RecencyValue", "FrequencyValue", "MonetaryValue") // Selecting relevant columns


// determination of column names
val renamedRfmTable = rfmTable
  .withColumnRenamed("RecencyValue", "Recency")
  .withColumnRenamed("FrequencyValue", "Frequency")
  .withColumnRenamed("MonetaryValue", "Monetary")


## Calculate RFM Score Values

In [0]:
%scala
val recencyScore = renamedRfmTable
  .withColumn("RecencyScore", ntile(5).over(Window.orderBy("Recency")))

val frequencyScore = recencyScore
  .withColumn("FrequencyScore", ntile(5).over(Window.orderBy(col("Frequency").desc)))

val monetaryScore = frequencyScore
  .withColumn("MonetaryScore", ntile(5).over(Window.orderBy(col("Monetary").desc)))

val rfmWithScores = monetaryScore
  .withColumn("RFM_SCORE", concat_ws("", col("RecencyScore"), col("FrequencyScore"), col("MonetaryScore")))


# RFM Segmentation

In [0]:
%scala
// segmenting of customers according to RecencyScore and FrequencyScore values using map
val segMap = Map(
  "[1-2][1-2]" -> "Hibernating",
  "[1-2][3-4]" -> "At Risk",
  "[1-2]5" -> "Can't Lose",
  "3[1-2]" -> "About to Sleep",
  "33" -> "Need Attention",
  "[3-4][4-5]" -> "Loyal Customers",
  "41" -> "Promising",
  "51" -> "New Customers",
  "[4-5][2-3]" -> "Potential Loyalists",
  "5[4-5]" -> "Champions"
)

// creation of segment variable
val rfmWithSegments = rfmWithScores
  .withColumn("Segment", concat(col("RecencyScore").cast("string"), col("FrequencyScore").cast("string")))

// Replace segment values using the defined regex map
val segmentedRFMTable = segMap.foldLeft(rfmWithSegments) {
  case (df, (pattern, segmentName)) =>
    df.withColumn("Segment", when(col("Segment").rlike(pattern), segmentName).otherwise(col("Segment")))
}


Customer ID,Recency,Frequency,Monetary,RecencyScore,FrequencyScore,MonetaryScore,RFM_SCORE,Segment
18102.0,4713,153,598215.2200000002,1,1,1,111,Hibernating
14646.0,4714,164,523342.0699999989,1,1,1,111,Hibernating
14156.0,4722,202,296564.68999999965,1,1,1,111,Hibernating
14911.0,4714,510,270248.52999999974,1,1,1,111,Hibernating
17450.0,4721,61,233579.39,1,1,1,111,Hibernating
13694.0,4716,164,190825.5200000001,1,1,1,111,Hibernating
17511.0,4715,85,171885.97999999986,1,1,1,111,Hibernating
12415.0,4737,33,143269.28999999986,2,1,1,211,Hibernating
16684.0,4717,65,141502.24999999997,1,1,1,111,Hibernating
15061.0,4716,138,136391.47999999992,1,1,1,111,Hibernating
