## Customer, Order, Communications History Data Sets - Data Engineering

We will be working with three tables that contains Customer data, Communications data and Order data for a Retail Store. These dataset can be used to understand the consumer behaviour to predict churn, in this example.

These datasets were generated for this demo using a Kaggle dataset below.

Reference: https://www.kaggle.com/uttamp/store-data

In [1]:
from snowflake.snowpark import Session, Window
from snowflake.snowpark import functions as F
from snowflake.snowpark.types import *
import preprocessing as pp
from snowflake.snowpark.functions import sproc, col, min, max, avg
import snowflake.snowpark
import json

### 1. Create a Snowpark Session

First, we will be setting our parameters for stage name and source file name. Then we will initialize a Snowpark session reading the configuration parameters from creds.json.

In [2]:
with open('creds.json') as f:
        data = json.load(f)
        connection_parameters = {
          'account': data['account'],
          'user': data['user'],
          'password': data['password'], #getpass.getpass(),
          'schema': data['schema'],
          'database': data['database'],
          'warehouse': data['warehouse']}
session = Session.builder.configs(connection_parameters).create()
print(session.sql('select current_warehouse(), current_database(), current_schema()').collect())

[Row(CURRENT_WAREHOUSE()='COMPUTE_WH', CURRENT_DATABASE()='RETAIL_CHURN', CURRENT_SCHEMA()='PUBLIC')]


# Let's look the 3 tables we will be working with:

### Customer Table

![Original Data Frame](images/customer.png)

### Communication History Table

![Original Data Frame](images/comm.png)

### Order Table

![Original Data Frame](images/order.png)

In [3]:
display(session.table("SRC_CUSTOMER").to_pandas().head())

Unnamed: 0,VALUE,CUSTOMER_ID,CREATED_DT,CITY,STATE,FAV_DELIVERY_DAY,REFILL,DOOR_DELIVERY,PAPERLESS,CUSTOMER_NAME,RETAINED
0,"{\n ""c3"": ""Dallas"",\n ""c4"": ""TX"",\n ""c5"": ""...",,,Dallas,TX,Wednesday,0,0,0,wU2AcrEU3F,
1,"{\n ""c3"": ""Houston"",\n ""c4"": ""TX"",\n ""c5"": ...",,,Houston,TX,Tuesday,1,0,1,1SqzfEhoeW,
2,"{\n ""c1"": ""6H6T6N"",\n ""c10"": ""0"",\n ""c2"": ""...",6H6T6N,2012-09-28,Dallas,TX,Monday,0,0,0,flNK83fi4f,0.0
3,"{\n ""c1"": ""APCENR"",\n ""c10"": ""1"",\n ""c2"": ""...",APCENR,2010-12-19,Dallas,TX,Friday,1,1,1,ljCODBugAV,1.0
4,"{\n ""c1"": ""7UP6MS"",\n ""c10"": ""0"",\n ""c2"": ""...",7UP6MS,2010-10-03,Dallas,TX,Wednesday,0,0,0,5B7qnNdi32,0.0


In [4]:
display(session.table("SRC_COMMUNICATION_HIST").to_pandas().head())

Unnamed: 0,VALUE,CUSTOMER_ID,ESENT,EOPENRATE,ECLICKRATE
0,"{\n ""c1"": ""6H6T6N"",\n ""c2"": ""29"",\n ""c3"": ""...",6H6T6N,29,100.0,3.448276
1,"{\n ""c1"": ""APCENR"",\n ""c2"": ""95"",\n ""c3"": ""...",APCENR,95,92.631579,10.526316
2,"{\n ""c1"": ""7UP6MS"",\n ""c2"": ""0"",\n ""c3"": ""0...",7UP6MS,0,0.0,0.0
3,"{\n ""c1"": ""7ZEW8G"",\n ""c2"": ""0"",\n ""c3"": ""0...",7ZEW8G,0,0.0,0.0
4,"{\n ""c1"": ""8V726M"",\n ""c2"": ""30"",\n ""c3"": ""...",8V726M,30,90.0,13.333333


In [5]:
display(session.table("SRC_ORDER").to_pandas().head())

Unnamed: 0,VALUE,CUSTOMER_ID,ORDER_DT,CITY,STATE,ORDER_AMOUNT,ORDER_ID
0,"{\n ""c3"": ""Dallas"",\n ""c4"": ""TX"",\n ""c6"": ""...",,,Dallas,TX,,2004860729612798827
1,"{\n ""c3"": ""Houston"",\n ""c4"": ""TX"",\n ""c6"": ...",,,Houston,TX,,3421577291682474489
2,"{\n ""c1"": ""7UP6MS"",\n ""c2"": ""12/1/2010"",\n ...",7UP6MS,12/1/2010,Dallas,TX,42.78,1878685737792973460
3,"{\n ""c1"": ""9SVNMF"",\n ""c2"": ""12/19/2010"",\n ...",9SVNMF,12/19/2010,Dallas,TX,89.08,2208082681814434881
4,"{\n ""c1"": ""JD96M8"",\n ""c2"": ""6/13/2011"",\n ...",JD96M8,6/13/2011,Dallas,TX,94.81,5863032514623560934


In [6]:
session.add_packages('snowflake-snowpark-python')
table_name = 'TRANSFORMED_CUSTOMER_CHURN'

### 2. Let's apply transformations like joins and aggregations

In [7]:
def createTransformed(dfCust, dfOrd, dfCom):
    
    #Calculate first_order_date, last_order_date and avg_order amount for each customer
    window = Window.partition_by("CUSTOMER_ID")
    df_lastorder = dfOrd.select(col("CUSTOMER_ID"),max("ORDER_DT").over(window).alias("LAST_ORDER_DT")).distinct()
    df_firstorder = dfOrd.select(col("CUSTOMER_ID"),min("ORDER_DT").over(window).alias("FIRST_ORDER_DT")).distinct()
    df_avgorder = dfOrd.select(col("CUSTOMER_ID"),avg("ORDER_AMOUNT").over(window).alias("AVG_ORDER")).distinct()
    
    
    df_1 = dfCust.join(dfCom, dfCust["CUSTOMER_ID"] == dfCom["CUSTOMER_ID"]) \
                   .join(df_lastorder, dfCust["CUSTOMER_ID"] == df_lastorder["CUSTOMER_ID"]) \
                   .join(df_firstorder, dfCust["CUSTOMER_ID"] == df_firstorder["CUSTOMER_ID"]) \
                   .join(df_avgorder, dfCust["CUSTOMER_ID"] == df_avgorder["CUSTOMER_ID"]) \
                   .drop(dfCom["CUSTOMER_ID"],df_lastorder["CUSTOMER_ID"], df_avgorder["CUSTOMER_ID"], df_firstorder["CUSTOMER_ID"]) \
                   .rename(dfCust["CUSTOMER_ID"], "CUSTOMER_ID")
    
    #calculate DIFF_BETWEEN_LAST_FIRST_DAYS and DIFF_BETWEEN_FIRST_CREATED_DAYS
    df_2 = df_1.with_columns(["DIFF_BETWEEN_LAST_FIRST_DAYS", "DIFF_BETWEEN_FIRST_CREATED_DAYS"], 
                   [F.datediff("DAY", df_1["FIRST_ORDER_DT"].try_cast(DateType()), df_1["LAST_ORDER_DT"].try_cast(DateType())),
                   F.datediff("DAY", df_1["CREATED_DT"].try_cast(DateType()), df_1["FIRST_ORDER_DT"].try_cast(DateType()))
                   ])
    
    return df_2.na.drop()

In [8]:
def transformData(session: Session) -> str:
    
    dfCust=session.table("SRC_CUSTOMER").select(col("CUSTOMER_ID"), 
                       col("CREATED_DT"), 
                       col("CITY"), 
                       col("FAV_DELIVERY_DAY"),
                       col("REFILL"),
                       col("DOOR_DELIVERY"),
                       col("PAPERLESS"),
                       col("RETAINED"))
    
    dfCom=session.table("SRC_COMMUNICATION_HIST").select(col("CUSTOMER_ID"), 
                       col("ESENT"), 
                       col("EOPENRATE"), 
                       col("ECLICKRATE"))
    
    dfOrd=session.table("SRC_ORDER").select(col("CUSTOMER_ID"), 
                       col("ORDER_ID"), 
                       col("ORDER_DT"), 
                       col("CITY"),
                       col("ORDER_AMOUNT"))
    
    df=createTransformed(dfCust, dfOrd, dfCom)
    df.write.mode("overwrite").save_as_table(table_name)
    return 'SUCCESS'

# Create an instance of StoredProcedure using the sproc() function
data_transform_sp = sproc(transformData, replace=True)

In [9]:
data_transform_sp()

'SUCCESS'

![Original Data Frame](images/image2.png)

In [10]:
display(session.table(table_name).to_pandas().head())

Unnamed: 0,CUSTOMER_ID,CREATED_DT,CITY,FAV_DELIVERY_DAY,REFILL,DOOR_DELIVERY,PAPERLESS,RETAINED,ESENT,EOPENRATE,ECLICKRATE,LAST_ORDER_DT,FIRST_ORDER_DT,AVG_ORDER,DIFF_BETWEEN_LAST_FIRST_DAYS,DIFF_BETWEEN_FIRST_CREATED_DAYS
0,6H6T6N,2012-09-28,Dallas,Monday,0,0,0,0,29,100.0,3.448276,8/11/2013,8/11/2013,5.32,0,317
1,APCENR,2010-12-19,Dallas,Friday,1,1,1,1,95,92.631579,10.526316,4/1/2011,1/19/2014,83.69,-1024,1127
2,7UP6MS,2010-10-03,Dallas,Wednesday,0,0,0,0,0,0.0,0.0,7/6/2011,12/1/2010,33.58,217,59
3,7ZEW8G,2010-10-22,Houston,Thursday,0,0,0,0,0,0.0,0.0,3/28/2011,3/28/2011,45.76,0,157
4,8V726M,2010-11-27,Houston,Monday,0,0,0,1,30,90.0,13.333333,11/29/2010,1/28/2013,111.91,-791,793
