In [19]:
import findspark
from pyspark.sql import SparkSession, Window
from pyspark.sql import functions as f
from functools import reduce

import pandas as pd
import numpy as np 
import matplotlib.pyplot as plt 
import seaborn as sns 
import warnings

warnings.filterwarnings("ignore")

In [11]:
findspark.init()

spark = SparkSession.builder \
    .appName("Analysis 2") \
    .config("spark.driver.memory", "2g") \
    .config("spark.executor.memory", "2g") \
    .config("spark.driver.extraClassPath", r"C:\Drivers\sqljdbc_12.10.0.0_enu\sqljdbc_12.10\enu\jars\mssql-jdbc-12.10.0.jre11.jar") \
    .getOrCreate()

# Read CSV
customers = spark.read.csv("../final data/customer_features.csv", header=True, inferSchema=True)
orders = spark.read.csv("../final data/orders_facts.csv", header=True, inferSchema =True)

In [12]:
orders = orders \
    .withColumn(
        "order_year", f.year(f.col("order_date"))
    ).withColumn(
        "order_month", f.month(f.col("order_date"))
    )

# Customer Lifetime Value

Customer lifetime value is a term that describes how much revenue or profit you can expect from customers over their lifetime doing business with you.

* Historical Lifetime value 
    - actual profits less costs so far adjusted by subtracting acquisition costs of those customers. (Only past purchases). If the customer's historical lifetime value is trending down, this is called value migration, and this can be an early warning signal of customers unsubscribing from your service or planning to stop buying. Beyond value migration, customers may be changing their spending habits in other important ways. A certain customer may have made only one big purchase last year, but this year they are making smaller purchases more often. While the customer lifetime value of this person has not changed, your marketing approach and goals for the person should change. (Predictive Marketing Easy Ways every marketer can use customer analytics and big data)

In [23]:
# Get unique years in sorted order
years = sorted(set(row['order_year'] for row in orders.select("order_year").collect()))

# Build yearly CLV DataFrames
yearly_clv_dfs = []

for year in years:
    yearly_df = orders.filter(
        f.col("order_year") == year
    ).groupBy(
        "customer_id"
    ).agg(
        f.round(f.sum("total_price")).alias(f"{year}_revenue"), 
        f.countDistinct("order_id").alias(f"no_of_orders_{year}")
    ).withColumn(
        f"order_value_{year}", 
        f.round(f.col(f"{year}_revenue") / f.col(f"no_of_orders_{year}"))
    )
    
    yearly_clv_dfs.append(yearly_df)

# Join all yearly DataFrames
customer_clv = reduce(lambda df1, df2: df1.join(df2, on="customer_id", how="outer"), yearly_clv_dfs)

# Build column order: group by metric
revenue_cols = [f"{year}_revenue" for year in years]
orders_cols = [f"no_of_orders_{year}" for year in years]
value_cols = [f"order_value_{year}" for year in years]

# Final column order
final_columns = ["customer_id"] + revenue_cols + orders_cols + value_cols

# Reorder columns
customer_clv = customer_clv.select(*final_columns)

customer_clv.show()


+-----------+------------+------------+------------+------------+-----------------+-----------------+-----------------+-----------------+----------------+----------------+----------------+----------------+
|customer_id|2010_revenue|2011_revenue|2012_revenue|2013_revenue|no_of_orders_2010|no_of_orders_2011|no_of_orders_2012|no_of_orders_2013|order_value_2010|order_value_2011|order_value_2012|order_value_2013|
+-----------+------------+------------+------------+------------+-----------------+-----------------+-----------------+-----------------+----------------+----------------+----------------+----------------+
|          2|        NULL|        NULL|         6.0|        NULL|             NULL|             NULL|                1|             NULL|            NULL|            NULL|             6.0|            NULL|
|          3|        11.0|      3601.0|        NULL|      1335.0|                1|                2|             NULL|                1|            11.0|          1801.0|     

In [29]:
customer_df = spark.read.csv("../final data/customer_dim.csv", header=True, inferSchema=True)

if len(years) >= 2:
    last_year = years[-1]
    prev_year = years[-2]

    customer_clv_filled = customer_clv.withColumn(
        "value_migration_flag",
        f.when(
            f.coalesce(f.col(f"{last_year}_revenue"), f.lit(0)) < f.coalesce(f.col(f"{prev_year}_revenue"), f.lit(0)),
            1
        ).otherwise(0)
    )

    customer_clv_filled.select(
        "customer_id",
        f.col(f"{prev_year}_revenue"),
        f.col(f"{last_year}_revenue"),
        "value_migration_flag"
    ).join(
        customer_df.select("customer_id", "customer_group").filter(f.col("is_current") == True), 
        "customer_id"
    ).show()


+-----------+------------+------------+--------------------+--------------+
|customer_id|2012_revenue|2013_revenue|value_migration_flag|customer_group|
+-----------+------------+------------+--------------------+--------------+
|          2|         6.0|        NULL|                   1|           B2B|
|          3|        NULL|      1335.0|                   0|           B2B|
|          5|       107.0|        NULL|                   1|           B2C|
|          6|      1963.0|        NULL|                   1|           B2C|
|          7|       222.0|        NULL|                   1|           B2C|
|          8|       649.0|        NULL|                   1|           B2C|
|          9|      2065.0|        NULL|                   1|           B2C|
|         10|        NULL|        NULL|                   0|           B2C|
|         11|        NULL|        NULL|                   0|           B2C|
|         12|        84.0|        NULL|                   1|           B2C|
|         14