In [2]:
%conda install openjdk -y

Collecting package metadata (current_repodata.json): done
Solving environment: done

# All requested packages already installed.

Retrieving notices: ...working... done

Note: you may need to restart the kernel to use updated packages.


In [3]:
%pip install pyspark==3.2.0

[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m23.0[0m[39;49m -> [0m[32;49m23.0.1[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpip install --upgrade pip[0m
Note: you may need to restart the kernel to use updated packages.


In [27]:
# Import pyspark and build Spark session
from pyspark.sql import SparkSession

spark = (
    SparkSession.builder.appName("PySparkApp")
    .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:3.2.2")
    .config(
        "fs.s3a.aws.credentials.provider",
        "com.amazonaws.auth.ContainerCredentialsProvider",
    )
    .getOrCreate()
)

print(spark.version)

3.2.0


In [11]:
import pyspark.sql.functions as F
import pandas as pd
import numpy as np
import time
from pyspark.sql.types import StructType,StructField, StringType, IntegerType
import datetime as DT
from datetime import date, timedelta as td

### 1. CP Data

In [51]:

df_cp = spark.read.parquet('s3a://prd-use1-datascientists-sc-fp-data/prd/aero/AERO_ORDER_PLACED_WITH_CP_DATA/')

df_cp = df_cp.withColumn("PRODUCT_PART_NUMBER_IN_CP", F.col("PRODUCT_PART_NUMBER")) \
            .withColumn("ORDER_PLACED_DTTM_CP", F.col("ORDER_PLACED_DTTM"))\
			.withColumn("ORDER_PLACED_YEAR_CP", F.year(F.col("ORDER_PLACED_DTTM"))) \
			.withColumn("ORDER_PLACED_MONTH_CP", F.month(F.col("ORDER_PLACED_DTTM")))

df_cp = df_cp.select("CUSTOMER_ID", "ORDER_ID", "ORDER_LINE_ID",
                               "PRODUCT_PART_NUMBER_IN_CP",
                               "ORDER_PLACED_DTTM_CP",
                               "ORDER_PLACED_YEAR_CP",
                               "ORDER_PLACED_MONTH_CP",
                               "LOCATION_CODE",
                               "ORDER_LINE_QUANTITY",
                               "ORDER_STATUS",
                               "IS_AUTO_SHIP",
                               "IB_COST","OB_COST", "COGS", "AVERAGE_PRICE")

df_cp.persist()

DataFrame[CUSTOMER_ID: decimal(38,0), ORDER_ID: decimal(38,0), ORDER_LINE_ID: decimal(38,0), PRODUCT_PART_NUMBER_IN_CP: string, ORDER_PLACED_DTTM_CP: timestamp, ORDER_PLACED_YEAR_CP: int, ORDER_PLACED_MONTH_CP: int, LOCATION_CODE: string, ORDER_LINE_QUANTITY: decimal(15,5), ORDER_STATUS: string, IS_AUTO_SHIP: string, IB_COST: double, OB_COST: double, COGS: string, AVERAGE_PRICE: double]

### 2. Clickstream Data

In [52]:
# 2.1. Input SKU, Year, Month

product_part_number_for_sample = 177863
year_value_for_sample = 2022
month_value_for_sample = 2

# 2.2. Complete clickstream data
df_clickstream_data = spark.read.parquet('s3a://prd-use1-datascientists-sc-fp-data/prd/aero/AERO_CLICK_STREAM_CUSTOMER/')

df_clickstream_data = df_clickstream_data.withColumn("SESSION_YEAR",F.year("SESSION_EST_TIMESTAMP"))\
                               .withColumn("SESSION_MONTH",F.month("SESSION_EST_TIMESTAMP"))

# 2.3. Get the slice of data that corresponds to the inputted monthly sample info
df_clickstream_data_slice = df_clickstream_data.where((F.col("PRODUCT_PART_NUMBER")== str(product_part_number_for_sample))\
                                            & (F.col("SESSION_YEAR") == year_value_for_sample) \
                                            & (F.col("SESSION_MONTH") == month_value_for_sample))

# 2.4. Convert the slice into pandas dataframe
print("==========Converting clickstream data into Pandas data frame==========")
start_time = time.time()
pd_clickstream_data_slice = df_clickstream_data_slice.toPandas()
print("The number of glance views for SKU " + str(product_part_number_for_sample)+ " during Year " + str(year_value_for_sample) + " Month " + str(month_value_for_sample) + " is " + str(len(pd_clickstream_data_slice)))
print("--- %s seconds ---" % (time.time() - start_time))

The number of glance views for SKU 177863 during Year 2022 Month 2 is 4935
--- 10.033287525177002 seconds ---


### 3. Calculate AS and non-AS shares and Cofficients (Clickstream)

In [53]:
# Instock gv counts

instock_gv_counts = sum(pd_clickstream_data_slice["STATUS"] == "IN STOCK")

print("The number of instock glance views for SKU " + str(product_part_number_for_sample)+ " during Year " + str(year_value_for_sample) + " Month " + str(month_value_for_sample) + " is " + str(instock_gv_counts))

# Units sold
# Data
df_cp_sku_month = df_cp.filter( (F.col("PRODUCT_PART_NUMBER_IN_CP") == product_part_number_for_sample) & \
						(F.col("ORDER_PLACED_YEAR_CP") == year_value_for_sample) & \
						(F.col("ORDER_PLACED_MONTH_CP") == month_value_for_sample))
df_cp_sku_month.persist()

# As & non-AS Units Sold
non_as_units_sold = df_cp_sku_month.filter(F.col("IS_AUTO_SHIP") == "NON AUTO SHIP").select(F.sum("ORDER_LINE_QUANTITY")).collect()[0][0]
as_units_sold = df_cp_sku_month.filter(F.col("IS_AUTO_SHIP") == "AUTO SHIP").select(F.sum("ORDER_LINE_QUANTITY")).collect()[0][0]

# Shares
non_as_share = non_AS_units_sold/(non_AS_units_sold+AS_units_sold)
as_share =  1 - as_share

print("AS demand share for SKU " + str(product_part_number_for_sample)+ " during Year " + str(year_value_for_sample) + " Month " + str(month_value_for_sample) + " is " + str(as_share))

# Coefficients: GV counts per unit of non-AS demand
gv_coef = instock_gv_counts / non_AS_units_sold

print("GV counts per unit of non-AS demand for SKU " + str(product_part_number_for_sample)+ " during Year " + str(year_value_for_sample) + " Month " + str(month_value_for_sample) + " is " + str(gv_coef))

The number of instock glance views for SKU 177863 during Year 2022 Month 2 is 799
AS demand share for SKU 177863 during Year 2022 Month 2 is 0.1310166705799483446818501996
GV counts per unit of non-AS demand for SKU 177863 during Year 2022 Month 2 is 1.431899641577060931899641577


### 4. Cutoff Point Data (Clickstream)

In [54]:
# 4.1. Dummy variables that indicate IS and OOS

pd_clickstream_data_slice["IS"] = np.where(pd_clickstream_data_slice["STATUS"] == "IN STOCK", 1, 0)
pd_clickstream_data_slice["OOS"] = np.where(pd_clickstream_data_slice["STATUS"] == "OUT OF STOCK", 1, 0)

pd_is_oos_counts_by_date = pd_clickstream_data_slice.groupby(["SESSTION_DATE"])[["IS", "OOS"]].sum().reset_index()

# 4.2. Generate status switching variable

N = len(pd_is_oos_counts_by_date)

for row in range(N):
    
    if row == 0:
        
        temp = pd_is_oos_counts_by_date.iloc[row:(row+1),].reset_index()
        is_counts = temp["IS"][0]
        oos_counts = temp["OOS"][0]
    
        # initialization for the first date in this monthly sample 
        if (is_counts == 0) & (oos_counts == 0):
            temp_status = "IS"
        elif (is_counts > 0) & (oos_counts == 0):
            temp_status = "IS"
        elif (is_counts == 0) & (oos_counts > 0):
            temp_status = "OOS"
        elif (is_counts > 0) & (oos_counts > 0):
            temp_status = "SWITCHING TO IS"

        temp["SWITCHING_STATUS"] = temp_status

        pd_cutoff_point_detection = temp
        
    else:
        
        temp_t_minus_1 = pd_cutoff_point_detection.iloc[(row-1):row,].reset_index()
        switching_status_t_minus_1 = temp_t_minus_1["SWITCHING_STATUS"][0]
        
        temp_t = pd_is_oos_counts_by_date.iloc[row:(row+1),].reset_index()
        is_gv_counts = temp_t["IS"][0]
        oos_gv_counts = temp_t["OOS"][0]
        
        if (is_gv_counts > 0) & (oos_gv_counts > 0) & (switching_status_t_minus_1 == "IS"):
            switching_status_t = "SWITCHING TO OOS"
        elif (is_gv_counts > 0) & (oos_gv_counts > 0) & (switching_status_t_minus_1 == "OOS"):
            switching_status_t = "SWITCHING TO IS"
        elif (is_gv_counts > 0) & (oos_gv_counts > 0) & (switching_status_t_minus_1 == "SWITCHING TO IS"):
            switching_status_t = "SWITCHING TO IS (NON-CUTOFF)"
        elif (is_gv_counts > 0) & (oos_gv_counts > 0) & (switching_status_t_minus_1 == "SWITCHING TO IS (NON-CUTOFF)"):
            switching_status_t = "SWITCHING TO IS (NON-CUTOFF)"
        elif (is_gv_counts > 0) & (oos_gv_counts > 0) & (switching_status_t_minus_1 == "SWITCHING TO OOS"):
            switching_status_t = "SWITCHING TO OOS (NON-CUTOFF)"
        elif (is_gv_counts > 0) & (oos_gv_counts > 0) & (switching_status_t_minus_1 == "SWITCHING TO OOS (NON-CUTOFF)"):
            switching_status_t = "SWITCHING TO OOS (NON-CUTOFF)"
         
        elif (is_gv_counts > 0) & (oos_gv_counts == 0) & (switching_status_t_minus_1 == "IS"):
            switching_status_t = "IS"
        elif (is_gv_counts > 0) & (oos_gv_counts == 0) & (switching_status_t_minus_1 == "OOS"):
            switching_status_t = "SWITCHING TO IS"    
        elif (is_gv_counts > 0) & (oos_gv_counts == 0) & (switching_status_t_minus_1 == "SWITCHING TO IS"):
            switching_status_t = "IS"
        elif (is_gv_counts > 0) & (oos_gv_counts == 0) & (switching_status_t_minus_1 == "SWITCHING TO IS (NON-CUTOFF)"):
            switching_status_t = "IS"
        elif (is_gv_counts > 0) & (oos_gv_counts == 0) & (switching_status_t_minus_1 == "SWITCHING TO OOS"):
            switching_status_t = "SWITCHING TO IS"
        elif (is_gv_counts > 0) & (oos_gv_counts == 0) & (switching_status_t_minus_1 == "SWITCHING TO OOS (NON-CUTOFF)"):
            switching_status_t = "SWITCHING TO IS"
            
        elif (is_gv_counts == 0) & (oos_gv_counts > 0) & (switching_status_t_minus_1 == "IS"):
            switching_status_t = "SWITCHING TO OOS"
        elif (is_gv_counts == 0) & (oos_gv_counts > 0) & (switching_status_t_minus_1 == "OOS"):
            switching_status_t = "OOS"    
        elif (is_gv_counts == 0) & (oos_gv_counts > 0) & (switching_status_t_minus_1 == "SWITCHING TO IS"):
            switching_status_t = "SWITCHING TO OOS"
        elif (is_gv_counts == 0) & (oos_gv_counts > 0) & (switching_status_t_minus_1 == "SWITCHING TO IS (NON-CUTOFF)"):
            switching_status_t = "SWITCHING TO OOS"
        elif (is_gv_counts == 0) & (oos_gv_counts > 0) & (switching_status_t_minus_1 == "SWITCHING TO OOS"):
            switching_status_t = "OOS"
        elif (is_gv_counts == 0) & (oos_gv_counts > 0) & (switching_status_t_minus_1 == "SWITCHING TO OOS (NON-CUTOFF)"):
            switching_status_t = "OOS"

        elif (is_gv_counts == 0) & (oos_gv_counts == 0) & (switching_status_t_minus_1 == "IS"):
            switching_status_t = "IS"
        elif (is_gv_counts == 0) & (oos_gv_counts == 0) & (switching_status_t_minus_1 == "OOS"):
            switching_status_t = "OOS"    
        elif (is_gv_counts == 0) & (oos_gv_counts == 0) & (switching_status_t_minus_1 == "SWITCHING TO IS"):
            switching_status_t = "SWITCHING TO IS (NON-CUTOFF)"
        elif (is_gv_counts == 0) & (oos_gv_counts == 0) & (switching_status_t_minus_1 == "SWITCHING TO IS (NON-CUTOFF)"):
            switching_status_t = "SWITCHING TO IS (NON-CUTOFF)"
        elif (is_gv_counts == 0) & (oos_gv_counts == 0) & (switching_status_t_minus_1 == "SWITCHING TO OOS"):
            switching_status_t = "SWITCHING TO OOS (NON-CUTOFF)"
        elif (is_gv_counts == 0) & (oos_gv_counts == 0) & (switching_status_t_minus_1 == "SWITCHING TO OOS (NON-CUTOFF)"):
            switching_status_t = "SWITCHING TO OOS (NON-CUTOFF)"
            
        else:
            switching_status_t = np.nan
            print("There is NULL value in the table.")
        
        temp_t["SWITCHING_STATUS"] = switching_status_t
        
        pd_cutoff_point_detection = pd_cutoff_point_detection.append(temp_t)

# 4.3. The data that stores all cutoff point dates (section 5 will find exact timestamp of cutoff points)
pd_cutoff_points = pd_cutoff_point_detection[pd_cutoff_point_detection["SWITCHING_STATUS"].isin(["SWITCHING TO IS", "SWITCHING TO OOS"])][["SESSTION_DATE","SWITCHING_STATUS"]]


### 5. RDD Sample (Clickstream)

In [56]:
# 5.1. parameters

sufficiency_ratio = 0.5

# 5.2. initialize the set of RDD samples (there may be multiple cutoff points within this month)
 
clickstream_rdd_sample_collection = {}

# 5.3. RDD sample generation
for k in range(len(pd_cutoff_points)):
    
    cutoff_point = pd_cutoff_points.iloc[k,]["SESSTION_DATE"]
    
    cutoff_point_name = pd_cutoff_points.iloc[k,]["SWITCHING_STATUS"]
    
    window_length = 7
        
    # 5.3.1. We look at T days prior to the cutoff date and T days after the cutoff date
    while window_length > 0:

        # Window start date
        window_left_end = max(cutoff_point - DT.timedelta(days=window_length), min(pd_cutoff_point_detection["SESSTION_DATE"]))
        # Window end date
        window_right_end = min(cutoff_point + DT.timedelta(days=(window_length+1)), max(pd_cutoff_point_detection["SESSTION_DATE"])+ DT.timedelta(days=1))

        pd_cutoff_point_detection_rdd_sample_temp = pd_cutoff_point_detection[(pd_cutoff_point_detection["SESSTION_DATE"] >= window_left_end) & (pd_cutoff_point_detection["SESSTION_DATE"] < window_right_end) ]
        
        # [1] We want to make sure this sample only has the cutoff point that is under inspection
        if np.sum(pd_cutoff_point_detection_rdd_sample_temp["SWITCHING_STATUS"].isin(["SWITCHING TO IS", "SWITCHING TO OOS"]))>1:
            print("==========REGARDING CUTOFF POINT: " + str(cutoff_point) + " ==========")
            print("==========WINDOW LENGTH: " + str(window_length) + " ==========")
            print("==========MULTIPLICITY OF CUTOFF POINTS CHECKING==========")
            print("MORE THAN ONE CUTOFF POINT EXISTS IN THE RDD SAMPLE!")
            window_length = window_length - 1
            if window_length == 0:
                print("UNFORTUNATELY, WE CANNOT FIND AN IDEAL RDD SAMPLE FOR CUTOFF POINT: " + str(cutoff_point))
            continue
        else:
            print("==========REGARDING CUTOFF POINT: " + str(cutoff_point) + " ==========")
            print("==========WINDOW LENGTH: " + str(window_length) + " ==========")
            print("==========MULTIPLICITY OF CUTOFF POINTS CHECKING==========")
            print("NO OTHER CUTOFF POINT IS DETECTED. CHECKING DATA SUFFICIENCY")

            
        # Get RDD sample (clickstream data)
        clickstream_rdd_sample_temp = pd_clickstream_data_slice[ (pd_clickstream_data_slice["SESSTION_DATE"]>=window_left_end) & (pd_clickstream_data_slice["SESSTION_DATE"]< window_right_end)]
        
        # Keep the latest clickstream data point coming from the same customer
        clickstream_rdd_sample_temp["rank"] = clickstream_rdd_sample_temp.groupby("CUSTOMER_ID")["SESSION_EST_TIMESTAMP"].rank(method="dense", ascending=False).astype(int)
        clickstream_rdd_sample_cleaned = clickstream_rdd_sample_temp[clickstream_rdd_sample_temp["rank"]==1]
        
        # [2] Ensure at least 50% of days on both sides of the cutoff point need to have data 
        # Summarize daily data points
        clickstream_counts_distribution_across_days = clickstream_rdd_sample_cleaned.groupby(["SESSTION_DATE"])["CUSTOMER_ID"].count().reset_index()
        
        if (clickstream_counts_distribution_across_days[clickstream_counts_distribution_across_days["SESSTION_DATE"] < cutoff_point]["SESSTION_DATE"].nunique() >= window_length*sufficiency_ratio) & \
         (clickstream_counts_distribution_across_days[clickstream_counts_distribution_across_days["SESSTION_DATE"] > cutoff_point]["SESSTION_DATE"].nunique() >= window_length*sufficiency_ratio):
            
            print("==========REGARDING CUTOFF POINT: " + str(cutoff_point) + " ==========")
            print("==========WINDOW LENGTH: " + str(window_length) + " ==========")
            print("==========DATA SUFFICIENCY CHECKING==========")
            print("LEFT SIDE HAS " + str(clickstream_counts_distribution_across_days[clickstream_counts_distribution_across_days["SESSTION_DATE"] < cutoff_point]["SESSTION_DATE"].nunique()) + " DAYS OF DATA")
            print("RIGHT SIDE HAS " + str(clickstream_counts_distribution_across_days[clickstream_counts_distribution_across_days["SESSTION_DATE"] > cutoff_point]["SESSTION_DATE"].nunique()) + " DAYS OF DATA")
            print("WINDOW LENGTH: " + str(window_length))
            print("PASSED!")
            
            
            print("FOR CUTOFF POINT: " + str(cutoff_point) + " WE HAVE FOUND THE RDD SAMPLE FOR REGRESSION")
            print("THE LEFT END DATE IS: " + str(window_left_end))
            print("THE RIGHT END DATE IS: " + str(window_right_end))
            break
            
        else:

            print("==========REGARDING CUTOFF POINT: " + str(cutoff_point) + " ==========")
            print("==========WINDOW LENGTH: " + str(window_length) + " ==========")
            print("==========DATA SUFFICIENCY CHECKING==========")
            print("LEFT SIDE HAS " + str(clickstream_counts_distribution_across_days[clickstream_counts_distribution_across_days["SESSTION_DATE"] < cutoff_point]["SESSTION_DATE"].nunique()) + " DAYS OF DATA")
            print("RIGHT SIDE HAS " + str(clickstream_counts_distribution_across_days[clickstream_counts_distribution_across_days["SESSTION_DATE"] > cutoff_point]["SESSTION_DATE"].nunique()) + " DAYS OF DATA")
            print("WINDOW LENGTH: " + str(window_length))
            window_length = window_length - 1
            if window_length == 0:
                print("UNFORTUNATELY, WE CANNOT FIND AN IDEAL RDD SAMPLE FOR CUTOFF POINT: " + str(cutoff_point))
            continue
        
    
    if window_length == 0:
        print("UNFORTUNATELY, WE CANNOT FIND AN IDEAL RDD SAMPLE FOR CUTOFF POINT: " + str(cutoff_point))
        continue
        
    else:
        print("FOR CUTOFF POINT: " + str(cutoff_point) + " WE HAVE FOUND THE RDD SAMPLE FOR REGRESSION")
        print("THE LEFT END DATE IS: " + str(window_left_end))
        print("THE RIGHT END DATE IS: " + str(window_right_end))
               
        # 5.3.2. Feature Engineering
        
        # [1] The precise timestamp of the cutoff point
        
        clickstream_rdd_sample_cleaned["STATUS_RANK"] = clickstream_rdd_sample_cleaned.groupby("STATUS")["SESSION_EST_TIMESTAMP"].rank(method="dense", ascending=False).astype(int)
        
        if cutoff_point_name == "SWITCHING TO IS":
        
            cutoff_point_timestamp = clickstream_rdd_sample_cleaned[clickstream_rdd_sample_cleaned["STATUS_RANK"]==1][clickstream_rdd_sample_cleaned["STATUS"] == "OUT OF STOCK"].reset_index().iloc[0]["SESSION_EST_TIMESTAMP"]
        
        else:
            
            cutoff_point_timestamp = clickstream_rdd_sample_cleaned[clickstream_rdd_sample_cleaned["STATUS_RANK"]==1][clickstream_rdd_sample_cleaned["STATUS"] == "IN STOCK"].reset_index().iloc[0]["SESSION_EST_TIMESTAMP"]
            
        # [2] generate the hour difference between the timestamp and the cutoff point
        
        clickstream_rdd_sample_cleaned["RUNNING_VARIABLE"] = (clickstream_rdd_sample_cleaned["SESSION_EST_TIMESTAMP"] - cutoff_point_timestamp).dt.total_seconds()/3600
                
        # [3] treatment variable generation
        
        if cutoff_point_name == "SWITCHING TO IS":
            
            clickstream_rdd_sample_cleaned["TREATMENT_VARIABLE"] = np.where(clickstream_rdd_sample_cleaned["STATUS"] == "IN STOCK", 1, 0)
        
        else:
            
            clickstream_rdd_sample_cleaned["TREATMENT_VARIABLE"] = np.where(clickstream_rdd_sample_cleaned["STATUS"] == "OUT OF STOCK", 1, 0)
        
        # [4] Outcome Variable Construction

        customer_id_list_in_clickstream_rdd_sample = clickstream_rdd_sample_cleaned[["CUSTOMER_ID"]].drop_duplicates()["CUSTOMER_ID"].to_list()

        # Add filtering variables
        clickstream_rdd_sample_cleaned["SESSION_EST_TIMESTAMP_UB"] = clickstream_rdd_sample_cleaned["SESSION_EST_TIMESTAMP"] + pd.Timedelta(hours=24)
        clickstream_rdd_sample_cleaned["SESSION_EST_TIMESTAMP_END"] = clickstream_rdd_sample_cleaned["SESSION_EST_TIMESTAMP"] + pd.Timedelta(hours=24*365)
        
        # Max timestamp possible for CP calculation
        max_timestamp = np.max(clickstream_rdd_sample_cleaned["SESSION_EST_TIMESTAMP_END"])
        # Min timestamp possible for CP calculation
        min_timestamp = np.min(clickstream_rdd_sample_cleaned["SESSION_EST_TIMESTAMP"])
        
        # Focus on customers in clickstream data & Focus on time period that is relevant
        df_cp_clickstream = df_cp.filter( (F.col("CUSTOMER_ID").isin(customer_id_list_in_clickstream_rdd_sample)) & \
                                        (F.col("ORDER_PLACED_DTTM") >= min_timestamp) & \
                                        (F.col("ORDER_PLACED_DTTM") <= max_timestamp) )
        df_cp_clickstream = df_cp_clickstream.na.fill(value=0,subset=["IB_COST", "OB_COST", "COGS", "AVERAGE_PRICE"])
        df_cp_clickstream = df_cp_clickstream.withColumn("NET_MARGIN", (F.col("AVERAGE_PRICE")-F.col("IB_COST")-F.col("OB_COST")-F.col("COGS"))*F.col("ORDER_LINE_QUANTITY") )

        df_cp_clickstream.persist()
        
        # Transform the small cp data into pandas dataframe
        
        print("==========Converting CP data into Pandas data frame==========")
        start_time = time.time()
        pd_cp_clickstream = df_cp_clickstream.toPandas()
        print("--- %s seconds ---" % (time.time() - start_time))
        
        # Columns we need from clickstream data
        clickstream_relevant_columns = ["CUSTOMER_ID", "SESSION_EST_TIMESTAMP", "SESSION_EST_TIMESTAMP_UB", "SESSION_EST_TIMESTAMP_END",
                        "PRODUCT_PART_NUMBER", 
                        "STATUS", 
                        "DEVICE_CATEGORY", 
                        "SESSION_YEAR", "SESSION_MONTH", 
                        "RUNNING_VARIABLE", "TREATMENT_VARIABLE"]
        clickstream_rdd_sample_cleaned_adj = clickstream_rdd_sample_cleaned[clickstream_relevant_columns]
	 
	
        start_time_0 = time.time()
        
        
        # alternative way
        for row in range(len(clickstream_rdd_sample_cleaned_adj)):
        

            
            print("==========NOW GENERATING CP FOR DATA POINT: " + str(row) + " ==========")
            
            start_time = time.time()

            # Loop through each clickstream data point

            clickstream_dp = clickstream_rdd_sample_cleaned_adj.iloc[row:(row+1),:].reset_index()

            # info needed to be added to CP data
            customer_id = clickstream_dp["CUSTOMER_ID"][0]
            product_part_number = clickstream_dp["PRODUCT_PART_NUMBER"][0]
            timestamp_lb = clickstream_dp["SESSION_EST_TIMESTAMP"][0]
            timestamp_ub = clickstream_dp["SESSION_EST_TIMESTAMP_UB"][0]
            timestamp_end = clickstream_dp["SESSION_EST_TIMESTAMP_END"][0]
            instock_status = clickstream_dp["STATUS"][0]
            running_variable = clickstream_dp["RUNNING_VARIABLE"][0]
            treatment_variable = clickstream_dp["TREATMENT_VARIABLE"][0]

            # get CP data that corresponds to the customer id of the clickstream dp
            pd_cp_clickstream_dp = pd_cp_clickstream[pd_cp_clickstream["CUSTOMER_ID"] == customer_id]

            # add the info from the clickstream dp into CP data
            pd_cp_clickstream_dp["PRODUCT_PART_NUMBER"] = product_part_number
            pd_cp_clickstream_dp["SESSION_EST_TIMESTAMP"] = timestamp_lb
            pd_cp_clickstream_dp["SESSION_EST_TIMESTAMP_UB"] = timestamp_ub
            pd_cp_clickstream_dp["SESSION_EST_TIMESTAMP_END"] = timestamp_end
            pd_cp_clickstream_dp["STATUS"] = instock_status
            pd_cp_clickstream_dp["RUNNING_VARIABLE"] = running_variable
            pd_cp_clickstream_dp["TREATMENT_VARIABLE"] = treatment_variable
            
            if len(pd_cp_clickstream_dp) == 0:

                print("THIS DATA POINT HAS NO CP RECORD")


            else:

                print("THIS DATA POINT HAS CP RECORD: " + str(len(pd_cp_clickstream_dp)))
            
            
            pd_cp_clickstream_dp["CP_INCLUDED"] = np.where(
             
                ((pd_cp_clickstream_dp['PRODUCT_PART_NUMBER_IN_CP'] != pd_cp_clickstream_dp['PRODUCT_PART_NUMBER']) & (pd_cp_clickstream_dp['ORDER_PLACED_DTTM_CP'] >= pd_cp_clickstream_dp['SESSION_EST_TIMESTAMP']) & (pd_cp_clickstream_dp['ORDER_PLACED_DTTM_CP'] < pd_cp_clickstream_dp['SESSION_EST_TIMESTAMP_UB']))\
                | \

                ((pd_cp_clickstream_dp['ORDER_PLACED_DTTM_CP'] >= pd_cp_clickstream_dp['SESSION_EST_TIMESTAMP_UB']) & (pd_cp_clickstream_dp['ORDER_PLACED_DTTM_CP'] < pd_cp_clickstream_dp['SESSION_EST_TIMESTAMP_END']))
                
                , 1, 0)
            
            pd_cp_clickstream_dp["CP"] = pd_cp_clickstream_dp["CP_INCLUDED"] * pd_cp_clickstream_dp["NET_MARGIN"]

            groupVars = ["PRODUCT_PART_NUMBER", "CUSTOMER_ID", "SESSION_EST_TIMESTAMP", "SESSION_EST_TIMESTAMP_UB", "SESSION_EST_TIMESTAMP_END", "STATUS", "RUNNING_VARIABLE", "TREATMENT_VARIABLE"]

            pd_agg_cp_clickstream_dp = pd_cp_clickstream_dp.groupby(groupVars)["CP"].sum().reset_index()

            if row == 0:

                pd_clickstream_cp_outcome =  pd_agg_cp_clickstream_dp

            else:
                
                pd_clickstream_cp_outcome = pd_clickstream_cp_outcome.append(pd_agg_cp_clickstream_dp)

            print("==========GENERATION IS COMPLETED FOR DATA POINT: " + str(row) + " ==========")
            print("--- %s seconds ---" % (time.time() - start_time))


        print("--- TOTAL --- %s seconds ---" % (time.time() - start_time_0))


    clickstream_rdd_sample_cleaned["CUSTOMER_ID"] = clickstream_rdd_sample_cleaned["CUSTOMER_ID"].astype(str)
    pd_clickstream_cp_outcome["CUSTOMER_ID"] = pd_clickstream_cp_outcome["CUSTOMER_ID"].astype(str)
    
    
    rdd_sample = pd.merge(clickstream_rdd_sample_cleaned, pd_clickstream_cp_outcome, on = groupVars, how = "left")
    
    rdd_sample = rdd_sample.fillna({"CP":0})
    
    clickstream_rdd_sample_collection[cutoff_point] = rdd_sample

MORE THAN ONE CUTOFF POINT EXISTS IN THE RDD SAMPLE!
MORE THAN ONE CUTOFF POINT EXISTS IN THE RDD SAMPLE!
MORE THAN ONE CUTOFF POINT EXISTS IN THE RDD SAMPLE!
MORE THAN ONE CUTOFF POINT EXISTS IN THE RDD SAMPLE!
MORE THAN ONE CUTOFF POINT EXISTS IN THE RDD SAMPLE!
MORE THAN ONE CUTOFF POINT EXISTS IN THE RDD SAMPLE!
MORE THAN ONE CUTOFF POINT EXISTS IN THE RDD SAMPLE!
UNFORTUNATELY, WE CANNOT FIND AN IDEAL RDD SAMPLE FOR CUTOFF POINT: 2022-02-01
UNFORTUNATELY, WE CANNOT FIND AN IDEAL RDD SAMPLE FOR CUTOFF POINT: 2022-02-01
MORE THAN ONE CUTOFF POINT EXISTS IN THE RDD SAMPLE!
MORE THAN ONE CUTOFF POINT EXISTS IN THE RDD SAMPLE!
MORE THAN ONE CUTOFF POINT EXISTS IN THE RDD SAMPLE!
MORE THAN ONE CUTOFF POINT EXISTS IN THE RDD SAMPLE!
MORE THAN ONE CUTOFF POINT EXISTS IN THE RDD SAMPLE!
MORE THAN ONE CUTOFF POINT EXISTS IN THE RDD SAMPLE!
MORE THAN ONE CUTOFF POINT EXISTS IN THE RDD SAMPLE!
UNFORTUNATELY, WE CANNOT FIND AN IDEAL RDD SAMPLE FOR CUTOFF POINT: 2022-02-02
UNFORTUNATELY, WE CAN

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user

--- 0.2119450569152832 seconds ---
THIS DATA POINT HAS NO CP RECORD
--- 0.01592254638671875 seconds ---
THIS DATA POINT HAS CP RECORD: 1
--- 0.017068862915039062 seconds ---
THIS DATA POINT HAS NO CP RECORD
--- 0.013969182968139648 seconds ---
THIS DATA POINT HAS NO CP RECORD
--- 0.01469111442565918 seconds ---
THIS DATA POINT HAS NO CP RECORD
--- 0.013646841049194336 seconds ---
THIS DATA POINT HAS NO CP RECORD
--- 0.014545917510986328 seconds ---
THIS DATA POINT HAS NO CP RECORD
--- 0.013826131820678711 seconds ---
THIS DATA POINT HAS CP RECORD: 1
--- 0.01471853256225586 seconds ---
THIS DATA POINT HAS CP RECORD: 8
--- 0.013779878616333008 seconds ---
THIS DATA POINT HAS NO CP RECORD
--- 0.014567852020263672 seconds ---
THIS DATA POINT HAS CP RECORD: 1
--- 0.014043092727661133 seconds ---
THIS DATA POINT HAS CP RECORD: 8
--- 0.013936042785644531 seconds ---
THIS DATA POINT HAS CP RECORD: 1
--- 0.01393580436706543 seconds ---
THIS DATA POINT HAS NO CP RECORD
--- 0.013915777206420898 s

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user

THIS DATA POINT HAS NO CP RECORD
--- 0.01793980598449707 seconds ---
THIS DATA POINT HAS CP RECORD: 2
--- 0.017240524291992188 seconds ---
THIS DATA POINT HAS CP RECORD: 14
--- 0.015657901763916016 seconds ---
THIS DATA POINT HAS NO CP RECORD
--- 0.015465497970581055 seconds ---
THIS DATA POINT HAS NO CP RECORD
--- 0.01406550407409668 seconds ---
THIS DATA POINT HAS CP RECORD: 7
--- 0.015329122543334961 seconds ---
THIS DATA POINT HAS CP RECORD: 1
--- 0.01375269889831543 seconds ---
THIS DATA POINT HAS CP RECORD: 13
--- 0.014444589614868164 seconds ---
THIS DATA POINT HAS CP RECORD: 11
--- 0.014473915100097656 seconds ---
THIS DATA POINT HAS CP RECORD: 1
--- 0.014845132827758789 seconds ---
THIS DATA POINT HAS NO CP RECORD
--- 0.015431642532348633 seconds ---
THIS DATA POINT HAS NO CP RECORD
--- 0.013702154159545898 seconds ---
THIS DATA POINT HAS NO CP RECORD
--- 0.015043020248413086 seconds ---
THIS DATA POINT HAS NO CP RECORD
--- 0.0198972225189209 seconds ---
THIS DATA POINT HAS CP

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy


MORE THAN ONE CUTOFF POINT EXISTS IN THE RDD SAMPLE!
MORE THAN ONE CUTOFF POINT EXISTS IN THE RDD SAMPLE!
MORE THAN ONE CUTOFF POINT EXISTS IN THE RDD SAMPLE!
MORE THAN ONE CUTOFF POINT EXISTS IN THE RDD SAMPLE!
NO OTHER CUTOFF POINT IS DETECTED. CHECKING DATA SUFFICIENCY
LEFT SIDE HAS 1 DAYS OF DATA
RIGHT SIDE HAS 1 DAYS OF DATA
WINDOW LENGTH: 1
PASSED!
FOR CUTOFF POINT: 2022-02-17 WE HAVE FOUND THE RDD SAMPLE FOR REGRESSION
THE LEFT END DATE IS: 2022-02-16
THE RIGHT END DATE IS: 2022-02-19
FOR CUTOFF POINT: 2022-02-17 WE HAVE FOUND THE RDD SAMPLE FOR REGRESSION
THE LEFT END DATE IS: 2022-02-16
THE RIGHT END DATE IS: 2022-02-19
--- 12.516529083251953 seconds ---
THIS DATA POINT HAS NO CP RECORD
--- 0.016763925552368164 seconds ---
THIS DATA POINT HAS CP RECORD: 1
--- 0.014443159103393555 seconds ---
THIS DATA POINT HAS NO CP RECORD
--- 0.015944957733154297 seconds ---
THIS DATA POINT HAS CP RECORD: 1
--- 0.015011787414550781 seconds ---
THIS DATA POINT HAS NO CP RECORD
--- 0.015340328

### 7. Scheduled AS orders data 

In [57]:
# 2.1. Input SKU, Year, Month

product_part_number_for_sample = 177863
year_value_for_sample = 2022
month_value_for_sample = 2

# 2.2. Complete clickstream data
df_as_orders_data = spark.read.parquet("s3a://prd-use1-datascientists-sc-fp-data/prd/aero/AERO_AS_SALES/")

df_as_orders_data = df_as_orders_data.withColumn("ORDER_PLACED_DATE",F.to_date("ORDER_PLACED_DTTM"))\
                                      .withColumn("ORDER_PLACED_YEAR",F.year("ORDER_PLACED_DTTM"))\
                                      .withColumn("ORDER_PLACED_MONTH",F.month("ORDER_PLACED_DTTM"))

# 2.3. Get the slice of data that corresponds to the inputted monthly sample info
df_as_orders_data_slice = df_as_orders_data.where((F.col("PRODUCT_PART_NUMBER")== str(product_part_number_for_sample))\
                                            & (F.col("ORDER_PLACED_YEAR") == year_value_for_sample) \
                                            & (F.col("ORDER_PLACED_MONTH") == month_value_for_sample))

# 2.4. Convert the slice into pandas dataframe
print("==========Converting as orders data into Pandas dataframe==========")
start_time = time.time()
pd_as_orders_data_slice = df_as_orders_data_slice.toPandas()
print("The number of glance views for SKU " + str(product_part_number_for_sample)+ " during Year " + str(year_value_for_sample) + " Month " + str(month_value_for_sample) + " is " + str(len(pd_clickstream_data_slice)))
print("--- %s seconds ---" % (time.time() - start_time))

The number of glance views for SKU 177863 during Year 2022 Month 2 is 4935
--- 17.047402143478394 seconds ---


### 8. Calculate AS and non-AS shares and Cofficients (AS Orders)

In [59]:
# Instock gv counts

instock_as_orders_counts = sum(pd_as_orders_data_slice["ORDER_ITEM_BACKORDER_FLAG"] == False)

print("The number of instock scheduled AS orders for SKU " + str(product_part_number_for_sample)+ " during Year " + str(year_value_for_sample) + " Month " + str(month_value_for_sample) + " is " + str(instock_as_orders_counts))

# Units sold
# Data
df_cp_sku_month = df_cp.filter( (F.col("PRODUCT_PART_NUMBER_IN_CP") == product_part_number_for_sample) & \
						(F.col("ORDER_PLACED_YEAR_CP") == year_value_for_sample) & \
						(F.col("ORDER_PLACED_MONTH_CP") == month_value_for_sample))
df_cp_sku_month.persist()

# As & non-AS Units Sold
non_as_units_sold = df_cp_sku_month.filter(F.col("IS_AUTO_SHIP") == "NON AUTO SHIP").select(F.sum("ORDER_LINE_QUANTITY")).collect()[0][0]
as_units_sold = df_cp_sku_month.filter(F.col("IS_AUTO_SHIP") == "AUTO SHIP").select(F.sum("ORDER_LINE_QUANTITY")).collect()[0][0]

# Shares
as_share = as_units_sold/(non_as_units_sold+as_units_sold)
non_as_share =  1 - as_share

print("AS demand share for SKU " + str(product_part_number_for_sample)+ " during Year " + str(year_value_for_sample) + " Month " + str(month_value_for_sample) + " is " + str(as_share))

# Coefficients: GV counts per unit of non-AS demand
as_orders_coef = instock_as_orders_counts / as_units_sold

print("Scheduled AS orders per unit of AS demand for SKU " + str(product_part_number_for_sample)+ " during Year " + str(year_value_for_sample) + " Month " + str(month_value_for_sample) + " is " + str(as_orders_coef))

The number of instock scheduled AS orders for SKU 177863 during Year 2022 Month 2 is 2809
AS demand share for SKU 177863 during Year 2022 Month 2 is 0.8689833294200516553181498004
Scheduled AS orders per unit of AS demand for SKU 177863 during Year 2022 Month 2 is 0.7589840583626047014320453931


### 9. Cutoff Point Data (AS Orders)

In [60]:
# 9.1. Dummy variables that indicate IS and OOS

pd_as_orders_data_slice["IS"] = np.where(pd_as_orders_data_slice["ORDER_ITEM_BACKORDER_FLAG"] == False, 1, 0)
pd_as_orders_data_slice["OOS"] = np.where(pd_as_orders_data_slice["ORDER_ITEM_BACKORDER_FLAG"] == True, 1, 0)

pd_as_orders_data_is_oos_counts_by_date = pd_as_orders_data_slice.groupby(["ORDER_PLACED_DATE"])[["IS", "OOS"]].sum().reset_index()

# 9.2. Generate status switching variable

N = len(pd_as_orders_data_is_oos_counts_by_date)

for row in range(N):
    
    if row == 0:
        
        temp = pd_as_orders_data_is_oos_counts_by_date.iloc[row:(row+1),].reset_index()
        is_counts = temp["IS"][0]
        oos_counts = temp["OOS"][0]
    
        # initialization for the first date in this monthly sample 
        if (is_counts == 0) & (oos_counts == 0):
            temp_status = "IS"
        elif (is_counts > 0) & (oos_counts == 0):
            temp_status = "IS"
        elif (is_counts == 0) & (oos_counts > 0):
            temp_status = "OOS"
        elif (is_counts > 0) & (oos_counts > 0):
            temp_status = "SWITCHING TO IS"

        temp["SWITCHING_STATUS"] = temp_status

        pd_as_orders_data_cutoff_point_detection = temp
        
    else:
        
        temp_t_minus_1 = pd_as_orders_data_cutoff_point_detection.iloc[(row-1):row,].reset_index()
        switching_status_t_minus_1 = temp_t_minus_1["SWITCHING_STATUS"][0]
        
        temp_t = pd_as_orders_data_is_oos_counts_by_date.iloc[row:(row+1),].reset_index()
        is_gv_counts = temp_t["IS"][0]
        oos_gv_counts = temp_t["OOS"][0]
        
        if (is_gv_counts > 0) & (oos_gv_counts > 0) & (switching_status_t_minus_1 == "IS"):
            switching_status_t = "SWITCHING TO OOS"
        elif (is_gv_counts > 0) & (oos_gv_counts > 0) & (switching_status_t_minus_1 == "OOS"):
            switching_status_t = "SWITCHING TO IS"
        elif (is_gv_counts > 0) & (oos_gv_counts > 0) & (switching_status_t_minus_1 == "SWITCHING TO IS"):
            switching_status_t = "SWITCHING TO IS (NON-CUTOFF)"
        elif (is_gv_counts > 0) & (oos_gv_counts > 0) & (switching_status_t_minus_1 == "SWITCHING TO IS (NON-CUTOFF)"):
            switching_status_t = "SWITCHING TO IS (NON-CUTOFF)"
        elif (is_gv_counts > 0) & (oos_gv_counts > 0) & (switching_status_t_minus_1 == "SWITCHING TO OOS"):
            switching_status_t = "SWITCHING TO OOS (NON-CUTOFF)"
        elif (is_gv_counts > 0) & (oos_gv_counts > 0) & (switching_status_t_minus_1 == "SWITCHING TO OOS (NON-CUTOFF)"):
            switching_status_t = "SWITCHING TO OOS (NON-CUTOFF)"
         
        elif (is_gv_counts > 0) & (oos_gv_counts == 0) & (switching_status_t_minus_1 == "IS"):
            switching_status_t = "IS"
        elif (is_gv_counts > 0) & (oos_gv_counts == 0) & (switching_status_t_minus_1 == "OOS"):
            switching_status_t = "SWITCHING TO IS"    
        elif (is_gv_counts > 0) & (oos_gv_counts == 0) & (switching_status_t_minus_1 == "SWITCHING TO IS"):
            switching_status_t = "IS"
        elif (is_gv_counts > 0) & (oos_gv_counts == 0) & (switching_status_t_minus_1 == "SWITCHING TO IS (NON-CUTOFF)"):
            switching_status_t = "IS"
        elif (is_gv_counts > 0) & (oos_gv_counts == 0) & (switching_status_t_minus_1 == "SWITCHING TO OOS"):
            switching_status_t = "SWITCHING TO IS"
        elif (is_gv_counts > 0) & (oos_gv_counts == 0) & (switching_status_t_minus_1 == "SWITCHING TO OOS (NON-CUTOFF)"):
            switching_status_t = "SWITCHING TO IS"
            
        elif (is_gv_counts == 0) & (oos_gv_counts > 0) & (switching_status_t_minus_1 == "IS"):
            switching_status_t = "SWITCHING TO OOS"
        elif (is_gv_counts == 0) & (oos_gv_counts > 0) & (switching_status_t_minus_1 == "OOS"):
            switching_status_t = "OOS"    
        elif (is_gv_counts == 0) & (oos_gv_counts > 0) & (switching_status_t_minus_1 == "SWITCHING TO IS"):
            switching_status_t = "SWITCHING TO OOS"
        elif (is_gv_counts == 0) & (oos_gv_counts > 0) & (switching_status_t_minus_1 == "SWITCHING TO IS (NON-CUTOFF)"):
            switching_status_t = "SWITCHING TO OOS"
        elif (is_gv_counts == 0) & (oos_gv_counts > 0) & (switching_status_t_minus_1 == "SWITCHING TO OOS"):
            switching_status_t = "OOS"
        elif (is_gv_counts == 0) & (oos_gv_counts > 0) & (switching_status_t_minus_1 == "SWITCHING TO OOS (NON-CUTOFF)"):
            switching_status_t = "OOS"

        elif (is_gv_counts == 0) & (oos_gv_counts == 0) & (switching_status_t_minus_1 == "IS"):
            switching_status_t = "IS"
        elif (is_gv_counts == 0) & (oos_gv_counts == 0) & (switching_status_t_minus_1 == "OOS"):
            switching_status_t = "OOS"    
        elif (is_gv_counts == 0) & (oos_gv_counts == 0) & (switching_status_t_minus_1 == "SWITCHING TO IS"):
            switching_status_t = "SWITCHING TO IS (NON-CUTOFF)"
        elif (is_gv_counts == 0) & (oos_gv_counts == 0) & (switching_status_t_minus_1 == "SWITCHING TO IS (NON-CUTOFF)"):
            switching_status_t = "SWITCHING TO IS (NON-CUTOFF)"
        elif (is_gv_counts == 0) & (oos_gv_counts == 0) & (switching_status_t_minus_1 == "SWITCHING TO OOS"):
            switching_status_t = "SWITCHING TO OOS (NON-CUTOFF)"
        elif (is_gv_counts == 0) & (oos_gv_counts == 0) & (switching_status_t_minus_1 == "SWITCHING TO OOS (NON-CUTOFF)"):
            switching_status_t = "SWITCHING TO OOS (NON-CUTOFF)"
            
        else:
            switching_status_t = np.nan
            print("There is NULL value in the table.")
        
        temp_t["SWITCHING_STATUS"] = switching_status_t
        
        pd_as_orders_data_cutoff_point_detection = pd_as_orders_data_cutoff_point_detection.append(temp_t)

# 9.3. The data that stores all cutoff point dates (section 5 will find exact timestamp of cutoff points)
pd_as_orders_data_cutoff_points = pd_as_orders_data_cutoff_point_detection[pd_as_orders_data_cutoff_point_detection["SWITCHING_STATUS"].isin(["SWITCHING TO IS", "SWITCHING TO OOS"])][["ORDER_PLACED_DATE","SWITCHING_STATUS"]]


### 10. RDD Sample (AS Orders)

In [63]:
# 10.1. parameters

sufficiency_ratio = 0.5

# 10.2. initialize the set of RDD samples (there may be multiple cutoff points within this month)
 
as_orders_rdd_sample_collection = {}

# 10.3. RDD sample generation
for k in range(len(pd_as_orders_data_cutoff_points)):
    
    cutoff_point = pd_as_orders_data_cutoff_points.iloc[k,]["ORDER_PLACED_DATE"]
    
    cutoff_point_name = pd_as_orders_data_cutoff_points.iloc[k,]["SWITCHING_STATUS"]
    
    window_length = 7
        
    # 10.3.1. We look at T days prior to the cutoff date and T days after the cutoff date
    while window_length > 0:

        # Window start date
        window_left_end = max(cutoff_point - DT.timedelta(days=window_length), min(pd_as_orders_data_cutoff_point_detection["ORDER_PLACED_DATE"]))
        
        # Window end date
        window_right_end = min(cutoff_point + DT.timedelta(days=(window_length+1)), max(pd_as_orders_data_cutoff_point_detection["ORDER_PLACED_DATE"])+ DT.timedelta(days=1))

        pd_as_orders_data_cutoff_point_detection_rdd_sample_temp = pd_as_orders_data_cutoff_point_detection[(pd_as_orders_data_cutoff_point_detection["ORDER_PLACED_DATE"] >= window_left_end) & (pd_as_orders_data_cutoff_point_detection["ORDER_PLACED_DATE"] < window_right_end) ]
        
        # [1] We want to make sure this sample only has the cutoff point that is under inspection
        if np.sum(pd_as_orders_data_cutoff_point_detection_rdd_sample_temp["SWITCHING_STATUS"].isin(["SWITCHING TO IS", "SWITCHING TO OOS"]))>1:
            print("==========REGARDING CUTOFF POINT: " + str(cutoff_point) + " ==========")
            print("==========WINDOW LENGTH: " + str(window_length) + " ==========")
            print("==========MULTIPLICITY OF CUTOFF POINTS CHECKING==========")
            print("MORE THAN ONE CUTOFF POINT EXISTS IN THE RDD SAMPLE!")
            window_length = window_length - 1
            if window_length == 0:
                print("UNFORTUNATELY, WE CANNOT FIND AN IDEAL RDD SAMPLE FOR CUTOFF POINT: " + str(cutoff_point))
            continue
        else:
            print("==========REGARDING CUTOFF POINT: " + str(cutoff_point) + " ==========")
            print("==========WINDOW LENGTH: " + str(window_length) + " ==========")
            print("==========MULTIPLICITY OF CUTOFF POINTS CHECKING==========")
            print("NO OTHER CUTOFF POINT IS DETECTED. CHECKING DATA SUFFICIENCY")
            
        # Get RDD sample (clickstream data)
        as_orders_rdd_sample_temp = pd_as_orders_data_slice[ (pd_as_orders_data_slice["ORDER_PLACED_DATE"]>=window_left_end) & (pd_as_orders_data_slice["ORDER_PLACED_DATE"]< window_right_end)]
        
        # Keep the latest clickstream data point coming from the same customer
        as_orders_rdd_sample_temp["rank"] = as_orders_rdd_sample_temp.groupby("CUSTOMER_ID")["ORDER_PLACED_DTTM"].rank(method="dense", ascending=False).astype(int)
        as_orders_rdd_sample_cleaned = as_orders_rdd_sample_temp[as_orders_rdd_sample_temp["rank"]==1]
        
        # [2] Ensure at least 50% of days on both sides of the cutoff point need to have data 
        # Summarize daily data points
        as_orders_counts_distribution_across_days = as_orders_rdd_sample_cleaned.groupby(["ORDER_PLACED_DATE"])["CUSTOMER_ID"].count().reset_index()
        
        if (as_orders_counts_distribution_across_days[as_orders_counts_distribution_across_days["ORDER_PLACED_DATE"] < cutoff_point]["ORDER_PLACED_DATE"].nunique() >= window_length*sufficiency_ratio) & \
         (as_orders_counts_distribution_across_days[as_orders_counts_distribution_across_days["ORDER_PLACED_DATE"] > cutoff_point]["ORDER_PLACED_DATE"].nunique() >= window_length*sufficiency_ratio):
            
            print("==========REGARDING CUTOFF POINT: " + str(cutoff_point) + " ==========")
            print("==========WINDOW LENGTH: " + str(window_length) + " ==========")
            print("==========DATA SUFFICIENCY CHECKING==========")
            print("LEFT SIDE HAS " + str(as_orders_counts_distribution_across_days[as_orders_counts_distribution_across_days["ORDER_PLACED_DATE"] < cutoff_point]["ORDER_PLACED_DATE"].nunique()) + " DAYS OF DATA")
            print("RIGHT SIDE HAS " + str(as_orders_counts_distribution_across_days[as_orders_counts_distribution_across_days["ORDER_PLACED_DATE"] > cutoff_point]["ORDER_PLACED_DATE"].nunique()) + " DAYS OF DATA")
            print("WINDOW LENGTH: " + str(window_length))
            print("PASSED!")
            
            
            print("FOR CUTOFF POINT: " + str(cutoff_point) + " WE HAVE FOUND THE RDD SAMPLE FOR REGRESSION")
            print("THE LEFT END DATE IS: " + str(window_left_end))
            print("THE RIGHT END DATE IS: " + str(window_right_end))
            break
            
        else:

            print("==========REGARDING CUTOFF POINT: " + str(cutoff_point) + " ==========")
            print("==========WINDOW LENGTH: " + str(window_length) + " ==========")
            print("==========DATA SUFFICIENCY CHECKING==========")
            print("LEFT SIDE HAS " + str(as_orders_counts_distribution_across_days[as_orders_counts_distribution_across_days["ORDER_PLACED_DATE"] < cutoff_point]["ORDER_PLACED_DATE"].nunique()) + " DAYS OF DATA")
            print("RIGHT SIDE HAS " + str(as_orders_counts_distribution_across_days[as_orders_counts_distribution_across_days["ORDER_PLACED_DATE"] > cutoff_point]["ORDER_PLACED_DATE"].nunique()) + " DAYS OF DATA")
            print("WINDOW LENGTH: " + str(window_length))
            window_length = window_length - 1
            if window_length == 0:
                print("UNFORTUNATELY, WE CANNOT FIND AN IDEAL RDD SAMPLE FOR CUTOFF POINT: " + str(cutoff_point))
            continue
        
    
    if window_length == 0:
        print("UNFORTUNATELY, WE CANNOT FIND AN IDEAL RDD SAMPLE FOR CUTOFF POINT: " + str(cutoff_point))
        continue
        
    else:
        print("FOR CUTOFF POINT: " + str(cutoff_point) + " WE HAVE FOUND THE RDD SAMPLE FOR REGRESSION")
        print("THE LEFT END DATE IS: " + str(window_left_end))
        print("THE RIGHT END DATE IS: " + str(window_right_end))
               
        # 10.3.2. Feature Engineering
        
        # [1] The precise timestamp of the cutoff point
        
        as_orders_rdd_sample_cleaned["STATUS_RANK"] = as_orders_rdd_sample_cleaned.groupby("ORDER_ITEM_BACKORDER_FLAG")["ORDER_PLACED_DTTM"].rank(method="dense", ascending=False).astype(int)
        
        if cutoff_point_name == "SWITCHING TO IS":
        
            cutoff_point_timestamp = as_orders_rdd_sample_cleaned[as_orders_rdd_sample_cleaned["STATUS_RANK"]==1][as_orders_rdd_sample_cleaned["ORDER_ITEM_BACKORDER_FLAG"] == True].reset_index().iloc[0]["ORDER_PLACED_DTTM"]
        
        else:
            
            cutoff_point_timestamp = as_orders_rdd_sample_cleaned[as_orders_rdd_sample_cleaned["STATUS_RANK"]==1][as_orders_rdd_sample_cleaned["ORDER_ITEM_BACKORDER_FLAG"] == False].reset_index().iloc[0]["ORDER_PLACED_DTTM"]
            
        # [2] generate the hour difference between the timestamp and the cutoff point
        
        as_orders_rdd_sample_cleaned["RUNNING_VARIABLE"] = (as_orders_rdd_sample_cleaned["ORDER_PLACED_DTTM"] - cutoff_point_timestamp).dt.total_seconds()/3600
                
        # [3] treatment variable generation
        
        # if cutoff_point_name == "SWITCHING TO IS":
            
        #    as_orders_rdd_sample_cleaned["TREATMENT_VARIABLE"] = np.where(as_orders_rdd_sample_cleaned["STATUS"] == "IN STOCK", 1, 0)
        
        # else:
            
        #    as_orders_rdd_sample_cleaned["TREATMENT_VARIABLE"] = np.where(as_orders_rdd_sample_cleaned["STATUS"] == "OUT OF STOCK", 1, 0)
        
        as_orders_rdd_sample_cleaned["TREATMENT_VARIABLE"] = np.where(as_orders_rdd_sample_cleaned["ORDER_ITEM_BACKORDER_FLAG"] == True, 1, 0)
        
        # [4] Outcome Variable Construction

        customer_id_list_in_as_orders_rdd_sample = as_orders_rdd_sample_cleaned[["CUSTOMER_ID"]].drop_duplicates()["CUSTOMER_ID"].to_list()

        # Add filtering variables
        as_orders_rdd_sample_cleaned["ORDER_PLACED_DTTM_UB"] = as_orders_rdd_sample_cleaned["ORDER_PLACED_DTTM"] + pd.Timedelta(hours=24)
        as_orders_rdd_sample_cleaned["ORDER_PLACED_DTTM_END"] = as_orders_rdd_sample_cleaned["ORDER_PLACED_DTTM"] + pd.Timedelta(hours=24*365)
        
        # Max timestamp possible for CP calculation
        max_timestamp = np.max(as_orders_rdd_sample_cleaned["ORDER_PLACED_DTTM_END"])
        # Min timestamp possible for CP calculation
        min_timestamp = np.min(as_orders_rdd_sample_cleaned["ORDER_PLACED_DTTM"])
        
        # Focus on customers in clickstream data & Focus on time period that is relevant
        df_cp_as_orders = df_cp.filter( (F.col("CUSTOMER_ID").isin(customer_id_list_in_as_orders_rdd_sample)) & \
                                        (F.col("ORDER_PLACED_DTTM") >= min_timestamp) & \
                                        (F.col("ORDER_PLACED_DTTM") <= max_timestamp) )
        df_cp_as_orders = df_cp_as_orders.na.fill(value=0,subset=["IB_COST", "OB_COST", "COGS", "AVERAGE_PRICE"])
        df_cp_as_orders = df_cp_as_orders.withColumn("NET_MARGIN", (F.col("AVERAGE_PRICE")-F.col("IB_COST")-F.col("OB_COST")-F.col("COGS"))*F.col("ORDER_LINE_QUANTITY") )

        df_cp_as_orders.persist()
        
        # Transform the small cp data into pandas dataframe
        
        print("==========Converting CP data into Pandas data frame==========")
        start_time = time.time()
        pd_cp_as_orders = df_cp_as_orders.toPandas()
        print("--- %s seconds ---" % (time.time() - start_time))
        
        # Columns we need from clickstream data
        as_orders_relevant_columns = ["CUSTOMER_ID", "ORDER_PLACED_DTTM", "ORDER_PLACED_DTTM_UB", "ORDER_PLACED_DTTM_END",
                        "PRODUCT_PART_NUMBER", 
                        "ORDER_ITEM_BACKORDER_FLAG", 
                        "ORDER_PLACED_YEAR", "ORDER_PLACED_MONTH", 
                        "RUNNING_VARIABLE", "TREATMENT_VARIABLE"]
        as_orders_rdd_sample_cleaned_adj = as_orders_rdd_sample_cleaned[as_orders_relevant_columns]
	
    
        start_time_0 = time.time()    
        
        # alternative way
        for row in range(len(as_orders_rdd_sample_cleaned_adj)):
        

            
            print("==========NOW GENERATING CP FOR DATA POINT: " + str(row) + " ==========")
            
            start_time = time.time()

            # Loop through each clickstream data point

            as_order_dp = as_orders_rdd_sample_cleaned_adj.iloc[row:(row+1),:].reset_index()

            # info needed to be added to CP data
            customer_id = as_order_dp["CUSTOMER_ID"][0]
            product_part_number = as_order_dp["PRODUCT_PART_NUMBER"][0]
            timestamp_lb = as_order_dp["ORDER_PLACED_DTTM"][0]
            timestamp_ub = as_order_dp["ORDER_PLACED_DTTM_UB"][0]
            timestamp_end = as_order_dp["ORDER_PLACED_DTTM_END"][0]
            instock_status = as_order_dp["ORDER_ITEM_BACKORDER_FLAG"][0]
            running_variable = as_order_dp["RUNNING_VARIABLE"][0]
            treatment_variable = as_order_dp["TREATMENT_VARIABLE"][0]

            # get CP data that corresponds to the customer id of the clickstream dp
            pd_cp_as_orders_dp = pd_cp_as_orders[pd_cp_as_orders["CUSTOMER_ID"] == customer_id]

            # add the info from the clickstream dp into CP data
            pd_cp_as_orders_dp["PRODUCT_PART_NUMBER"] = product_part_number
            pd_cp_as_orders_dp["ORDER_PLACED_DTTM"] = timestamp_lb
            pd_cp_as_orders_dp["ORDER_PLACED_DTTM_UB"] = timestamp_ub
            pd_cp_as_orders_dp["ORDER_PLACED_DTTM_END"] = timestamp_end
            pd_cp_as_orders_dp["ORDER_ITEM_BACKORDER_FLAG"] = instock_status
            pd_cp_as_orders_dp["RUNNING_VARIABLE"] = running_variable
            pd_cp_as_orders_dp["TREATMENT_VARIABLE"] = treatment_variable
            
            if len(pd_cp_as_orders_dp) == 0:

                print("THIS DATA POINT HAS NO CP RECORD")

            else:

                print("THIS DATA POINT HAS CP RECORD: " + str(len(pd_cp_as_orders_dp)))
            
            pd_cp_as_orders_dp["CP_INCLUDED"] = np.where(
             
                ((pd_cp_as_orders_dp['PRODUCT_PART_NUMBER_IN_CP'] != pd_cp_as_orders_dp['PRODUCT_PART_NUMBER']) & (pd_cp_as_orders_dp['ORDER_PLACED_DTTM_CP'] >= pd_cp_as_orders_dp['ORDER_PLACED_DTTM']) & (pd_cp_as_orders_dp['ORDER_PLACED_DTTM_CP'] < pd_cp_as_orders_dp['ORDER_PLACED_DTTM_UB']))\
                | \

                ((pd_cp_as_orders_dp['ORDER_PLACED_DTTM_CP'] >= pd_cp_as_orders_dp['ORDER_PLACED_DTTM_UB']) & (pd_cp_as_orders_dp['ORDER_PLACED_DTTM_CP'] < pd_cp_as_orders_dp['ORDER_PLACED_DTTM_END']))
                
                , 1, 0)
            
            pd_cp_as_orders_dp["CP"] = pd_cp_as_orders_dp["CP_INCLUDED"] * pd_cp_as_orders_dp["NET_MARGIN"]

            groupVars = ["PRODUCT_PART_NUMBER", "CUSTOMER_ID", "ORDER_PLACED_DTTM", "ORDER_PLACED_DTTM_UB", "ORDER_PLACED_DTTM_END", "ORDER_ITEM_BACKORDER_FLAG", "RUNNING_VARIABLE", "TREATMENT_VARIABLE"]

            pd_agg_cp_as_orders_dp = pd_cp_as_orders_dp.groupby(groupVars)["CP"].sum().reset_index()

            if row == 0:

                pd_as_orders_cp_outcome =  pd_agg_cp_as_orders_dp

            else:
                
                pd_as_orders_cp_outcome = pd_as_orders_cp_outcome.append(pd_agg_cp_as_orders_dp)

            print("==========GENERATION IS COMPLETED FOR DATA POINT: " + str(row) + " ==========")
            print("--- %s seconds ---" % (time.time() - start_time))

        print("--- TOTAL --- %s seconds ---" % (time.time() - start_time_0))

    as_orders_rdd_sample_cleaned["CUSTOMER_ID"] = as_orders_rdd_sample_cleaned["CUSTOMER_ID"].astype(str)
    pd_as_orders_cp_outcome["CUSTOMER_ID"] = pd_as_orders_cp_outcome["CUSTOMER_ID"].astype(str)
       
    rdd_sample = pd.merge(as_orders_rdd_sample_cleaned, pd_as_orders_cp_outcome, on = groupVars, how = "left")
    
    rdd_sample = rdd_sample.fillna({"CP":0})
    
    as_orders_rdd_sample_collection[cutoff_point] = rdd_sample

MORE THAN ONE CUTOFF POINT EXISTS IN THE RDD SAMPLE!
MORE THAN ONE CUTOFF POINT EXISTS IN THE RDD SAMPLE!
MORE THAN ONE CUTOFF POINT EXISTS IN THE RDD SAMPLE!
MORE THAN ONE CUTOFF POINT EXISTS IN THE RDD SAMPLE!
MORE THAN ONE CUTOFF POINT EXISTS IN THE RDD SAMPLE!
MORE THAN ONE CUTOFF POINT EXISTS IN THE RDD SAMPLE!
MORE THAN ONE CUTOFF POINT EXISTS IN THE RDD SAMPLE!
UNFORTUNATELY, WE CANNOT FIND AN IDEAL RDD SAMPLE FOR CUTOFF POINT: 2022-02-01
UNFORTUNATELY, WE CANNOT FIND AN IDEAL RDD SAMPLE FOR CUTOFF POINT: 2022-02-01
MORE THAN ONE CUTOFF POINT EXISTS IN THE RDD SAMPLE!
MORE THAN ONE CUTOFF POINT EXISTS IN THE RDD SAMPLE!
MORE THAN ONE CUTOFF POINT EXISTS IN THE RDD SAMPLE!
MORE THAN ONE CUTOFF POINT EXISTS IN THE RDD SAMPLE!
MORE THAN ONE CUTOFF POINT EXISTS IN THE RDD SAMPLE!
MORE THAN ONE CUTOFF POINT EXISTS IN THE RDD SAMPLE!
MORE THAN ONE CUTOFF POINT EXISTS IN THE RDD SAMPLE!
UNFORTUNATELY, WE CANNOT FIND AN IDEAL RDD SAMPLE FOR CUTOFF POINT: 2022-02-02
UNFORTUNATELY, WE CAN

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user

--- 0.6750693321228027 seconds ---
THIS DATA POINT HAS CP RECORD: 15
--- 0.025208711624145508 seconds ---
THIS DATA POINT HAS CP RECORD: 13
--- 0.018680572509765625 seconds ---
THIS DATA POINT HAS CP RECORD: 11
--- 0.01909494400024414 seconds ---
THIS DATA POINT HAS CP RECORD: 4
--- 0.019606351852416992 seconds ---
THIS DATA POINT HAS CP RECORD: 8
--- 0.019266843795776367 seconds ---
THIS DATA POINT HAS CP RECORD: 12
--- 0.01953864097595215 seconds ---
THIS DATA POINT HAS CP RECORD: 76
--- 0.017281770706176758 seconds ---
THIS DATA POINT HAS CP RECORD: 76
--- 0.016559839248657227 seconds ---
THIS DATA POINT HAS CP RECORD: 44
--- 0.015479326248168945 seconds ---
THIS DATA POINT HAS CP RECORD: 42
--- 0.016103267669677734 seconds ---
THIS DATA POINT HAS CP RECORD: 4


A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user

--- 0.01833319664001465 seconds ---
THIS DATA POINT HAS CP RECORD: 7
--- 0.017534732818603516 seconds ---
THIS DATA POINT HAS CP RECORD: 7
--- 0.017528772354125977 seconds ---
THIS DATA POINT HAS CP RECORD: 15
--- 0.015844345092773438 seconds ---
THIS DATA POINT HAS CP RECORD: 11
--- 0.015373468399047852 seconds ---
THIS DATA POINT HAS CP RECORD: 44
--- 0.015415430068969727 seconds ---
THIS DATA POINT HAS CP RECORD: 11
--- 0.01632523536682129 seconds ---
THIS DATA POINT HAS CP RECORD: 10
--- 0.016055822372436523 seconds ---
THIS DATA POINT HAS CP RECORD: 25
--- 0.016208648681640625 seconds ---
THIS DATA POINT HAS CP RECORD: 16
--- 0.016592025756835938 seconds ---
THIS DATA POINT HAS CP RECORD: 19
--- 0.016995668411254883 seconds ---
THIS DATA POINT HAS CP RECORD: 9
--- 0.015362739562988281 seconds ---
THIS DATA POINT HAS CP RECORD: 35
--- 0.01609206199645996 seconds ---
THIS DATA POINT HAS CP RECORD: 25
--- 0.01892375946044922 seconds ---
THIS DATA POINT HAS CP RECORD: 2
--- 0.01553750

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
