# Churn Analysis with Pachyderm and Snowflake

<img src="images/kkbox_dag.png" alt="Drawing" style="width: 300px;"/>

[Predicting churn](https://www.investopedia.com/terms/c/churnrate.asp) is a task to determine whether a user will renew their subscription or not to a particular service. 

In this example, we use the [KKBox Churn Prediction challenge dataset](https://www.kaggle.com/competitions/kkbox-churn-prediction-challenge/data) from [Kaggle](https://www.kaggle.com/) to show a real world setup for predicting churn for a music service. To this end, we use [Snowflake](https://www.snowflake.com/) as our data warehouse where our source data resides and [Pachyderm](https://www.pachyderm.com/) as our data versioning and processing platform for non-SQL transformations. 

Specifically, this example shows how to integrate with and use data from a data warehouse to apply coding

In it you'll learn how to:
- Ingest data from Snowflake using the [Data Warehouse Integration](https://docs.pachyderm.com/latest/how-tos/basic-data-operations/sql-ingest/#data-warehouse-integration)
- Transform your data to prepare it for model training
- Train a churn model
- Predict on new data with our churn model
- Egress our predictions back to Snowflake using [Pachyderm's Egress Feature](https://docs.pachyderm.com/latest/how-tos/basic-data-operations/export-data-out-pachyderm/sql-egress/#egress-to-an-sql-database)

## 1. Install and Connect to Snowflake
First, we need to set everything up for the example. You'll need: 
1. Snowflake account access (SYSADMIN role)
2. [Pachyderm Cluster](https://docs.pachyderm.com/latest/deploy-manage/deploy/)
3. [kubectl](https://kubernetes.io/docs/tasks/tools/) installed with Kubernetes access to where your Pachyderm cluster is running 

In [None]:
!pip install snowflake-connector-python pyarrow"<6.1.0,>=6.0.0"

In [2]:
import snowflake.connector

For more information on setting these variables, [check here](https://docs.snowflake.com/en/user-guide/python-connector-install.html#step-2-verify-your-installation).

In [None]:
user_name = '<your_username>'
password = '<your_password>'
account_identifier = '<your_account_id>'

First, we'll just check our configuration to make sure we set our snowflake information correctly by returning the current Snowflake version.

In [10]:
ctx = snowflake.connector.connect(
    user=user_name,
    password=password,
    account=account_identifier
    )
cs = ctx.cursor()
try:
    cs.execute("SELECT current_version()")
    one_row = cs.fetchone()
    print(one_row[0])
finally:
    cs.close()
# ctx.close()

6.19.0


## 2: Setup Databases in Snowflake
In the setup step, we'll create all the necessary componets for this example. By default we'll use the `COMPUTE_WH` that is common to Snowflake. The `SYSADMIN` role is also required for everything in this tutorial.

In [17]:
import pandas as pd
from snowflake.connector.pandas_tools import write_pandas

In [18]:
warehouse_name = "COMPUTE_WH"
database_name = "KKBOX_CHURN_EXAMPLE"
schema_name = "PUBLIC"
role = "SYSADMIN"

In [19]:
# Create connector
ctx = snowflake.connector.connect(
    user=user_name,
    password=password,
    account=account_identifier,
    client_session_keep_alive=True
    )

# Create a cursor object
cur = ctx.cursor()

# Define Setup properties for Snowflake

# Starting with the Role.
sql = "USE ROLE {role}".format(role=role)
cur.execute(sql)

# Specify the warehouse to use
sql = "USE WAREHOUSE {wh_name}".format(wh_name = warehouse_name)
cur.execute(sql)

# See if the desired database exists.
sql = "CREATE DATABASE IF NOT EXISTS {db_name}".format(db_name = database_name)
cur.execute(sql)

# And then use it.
sql = "USE DATABASE {db_name}".format(db_name = database_name)
cur.execute(sql)

# Do the same with the Schema.
sql = "CREATE SCHEMA IF NOT EXISTS {sc_name}".format(sc_name = schema_name)
cur.execute(sql)

# And then use it.
sql = "USE SCHEMA {sc_name}".format(sc_name = schema_name)
cur.execute(sql)

<snowflake.connector.cursor.SnowflakeCursor at 0x7fee68c917c0>

### 2.1 Create Kubernetes Secret to access Snowflake in Pachyderm
To securely access our Snowflake data in Pachyderm, we set up a Kubernetes secret to store our passcode. Here' we're directly using [Kubernetes secrets](https://kubernetes.io/docs/concepts/configuration/secret/), but you could also use a [Pachyderm Secret](https://docs.pachyderm.com/latest/reference/pachctl/pachctl_create_secret/). 

Note: That you may have to modify the command if you're running Pachyderm in a specific Kubernetes namespace. 

In [None]:
# Must have kubectl and connect to your cluster
!kubectl create secret generic snowflakesecret --type=string --from-literal=PACHYDERM_SQL_PASSWORD='<your_snowflake_password>'

### 2.2 Create Snowflake Tables from CSV files
This will simulate a production data warehouse in Snowflake. We will be able to run queries and add data to Snowflake just like a production application. But when it comes to performing ML or anything that required reproducibility and automation, Pachyderm will handle the data and processing. 

**Download:** [kkbox-churn-prediction-challenge.zip](https://www.kaggle.com/competitions/kkbox-churn-prediction-challenge/data) from Kaggle. 

Note: You need to have a Kaggle account to download the dataset.  

In [None]:
# Unzip the data
!unzip kkbox-churn-prediction-challenge.zip

In [20]:
# Helper method to convert pandas types to Snowflake data types
def get_table_metadata(df):
    def map_dtypes(x):
        if (x == 'object') or (x=='category'):
            return 'VARCHAR'
        elif 'date' in x:
            return 'DATETIME'
        elif 'int' in x:
            return 'NUMERIC'
        elif 'float' in x:
            return 'REAL'
        elif 'bytes' in x:
            return 'BINARY'
        elif 'float' in x:
            return 'REAL'
        elif 'bytearray' in x:
            return 'BINARY'
        elif 'bool' in x:
            return 'BOOLEAN'
        else:
            print("cannot parse pandas dtype")
    sf_dtypes = [map_dtypes(str(s)) for s in df.dtypes]
    table_metadata = ", ". join([" ".join([y.upper(), x]) for x, y in zip(sf_dtypes, list(df.columns))])
    return table_metadata

# Create/replace table with dataframe
def df_to_snowflake_table(table_name, operation, df, conn=ctx): 
    if operation=='create_replace':
        df.columns = [c.upper() for c in df.columns]
        table_metadata = get_table_metadata(df)
        conn.cursor().execute(f"CREATE OR REPLACE TABLE {table_name} ({table_metadata})")
        write_pandas(conn, df, table_name.upper())
    elif operation=='insert':
        table_rows = str(list(df.itertuples(index=False, name=None))).replace('[','').replace(']','')
        conn.cursor().execute(f"INSERT INTO {table_name} VALUES {table_rows}")  

### 2.3 Table to Snowflake
Now, we'll uncompress the data (using p7zip) and push it to Snowflake using pandas and the snowflake-connector-python. 

In [None]:
!sudo apt-get install p7zip

In [None]:
!7zr -y x *.7z

#### 2.3.1 Members table

In [16]:
members = pd.read_csv('members_v3.csv')

print(members.dtypes)

print('Pushing data (this may take a while)...')
df_to_snowflake_table('MEMBERSHIPS', 'create_replace', members) 
print('Done.')

In [None]:
# Free memory
del members

#### 2.3.2 Train table

In [21]:
train = pd.read_csv('train.csv')

print(train.dtypes)

print('Pushing data (this may take a while)...')
df_to_snowflake_table('TRAIN', 'create_replace', train) 
print('Done.')

msno        object
is_churn     int64
dtype: object
Pushing data (this may take a while)...


In [28]:
# Free memory
del train

#### 2.3.3 Transactions table

In [None]:
transactions = pd.read_csv('transactions.csv')

print(transactions.dtypes)

print('Pushing data (this may take a while)...')
df_to_snowflake_table('TRANSACTIONS', 'create_replace', transactions) 
print('Done.')

In [30]:
# Free memory
del transactions

#### 2.3.4 User Logs table

In [None]:
user_logs = pd.read_csv('user_logs.csv')

print(user_logs.dtypes)

print('Pushing data (this may take a while)...')
df_to_snowflake_table('USER_LOGS', 'create_replace', user_logs) 
print('Done.')

In [None]:
# Free memory
del user_logs

## 3. Import Churn Data From Snowflake to Pachyderm
In this step we will use the Pachyderm Data Warehouse integration to ingress data. The pipelines will: 

1. Execute a SQL query
2. Create a CSV file for the result
3. Commit the CSV file to a Pachyderm versioned data repository

The Data Warehouse integration will also allow us to run the ingress operation on a schedule. For our purposes in this example, we'll run every 24 hours and manually kick off jobs, but in a real world scenario, we would schedule ingestions specifically for our usecase. For example, we may only want to update our dataset once per day or once per week. 

### 3.1 Create Connection URL
The data warehouse integration will parse a standardized URL to connect to our Snowflake instance. We'll automatically generate the URL using the details that were provided earlier in this demo. 

In [22]:
url = 'snowflake://{usr}@{account}/{database_name}/{schema_name}?warehouse={warehouse_name}'.format(usr = user_name,
                                                                                              account = account_identifier,
                                                                                              database_name = database_name,
                                                                                              schema_name = schema_name,
                                                                                              warehouse_name = warehouse_name)
print(url)

snowflake://jimmy@of28881.us-central1.gcp/KKBOX_CHURN_EXAMPLE/PUBLIC?warehouse=COMPUTE_WH


### 3.2 Create SQL Query
The querys that we execute in this example join features from the different sources to create our dataset. These joins are performed on the user's ID, which is our `msno` column.

The structure of the queries are: 

**Training Data**:
```sql
select 
  distinct *
from 
  CHURN_EXAMPLE.PUBLIC.USER_LOGS A 
  inner join CHURN_EXAMPLE.PUBLIC.TRAIN B sample (0.1) on A.msno = B.msno 
  and B.msno = A.msno 
  inner join CHURN_EXAMPLE.PUBLIC.TRANSACTIONS C on A.msno = C.msno 
  and C.msno = A.msno 
  inner join CHURN_EXAMPLE.PUBLIC.MEMBERSHIPS D on A.msno = D.msno 
  and D.msno = A.msno;
```

**Prediction Data**: 

```sql
select 
  distinct * 
from 
  CHURN_EXAMPLE.PUBLIC.USER_LOGS A 
  inner join CHURN_EXAMPLE.PUBLIC.TRANSACTIONS C sample (1) on A.msno = C.msno 
  and C.msno = A.msno 
  inner join CHURN_EXAMPLE.PUBLIC.MEMBERSHIPS D on A.msno = D.msno 
  and D.msno = A.msno;
```

Both queries are essentially the same with the only difference being the presence of training labels in the training data. Which is a boolean value of whether or not that user churned. 

Note: In both queries, we explicitly define the column names we want to return due to the behavior of inner joins. We also add a `sample()` command to reduce the amount of data we query to make the example faster. 

### 3.3 Deploy the Ingestion Pipelines
We can now use our configuration to query Snowflake and store our result in a Pachyderm repository. 

The integration uses [Jsonnet Pipeline Specifications](https://docs.pachyderm.com/latest/how-tos/pipeline-operations/jsonnet-pipeline-specs/#jsonnet-pipeline-specifications) to create 2 pipelines. The first one (`TRAIN_DATA_queries`) functions as a cron pipeline. It will run automatically according to the `cronSpec` we specify. When it runs, it writes our SQL query out to a file which triggers the second pipeline (`TRAIN_DATA`) which will run the query and store the result as a csv file.  


<img src="images/data_warehouse_ingest.png" alt="Drawing" style="width: 600px;"/>

Note: These queries will take a little while to run since the dataset is quite large. 

In [None]:
!pachctl update pipeline --jsonnet ./pachyderm/snowflake_import.jsonnet  \
    --arg name=TRAIN_DATA \
    --arg url="snowflake://jimmy@of28881.us-central1.gcp/KKBOX_CHURN_EXAMPLE/PUBLIC?warehouse=COMPUTE_WH" \
    --arg query="select distinct A.MSNO, A.DATE, A.NUM_25, A.NUM_50, A.NUM_75, A.NUM_985, A.NUM_100, A.NUM_UNQ, A.TOTAL_SECS, B.IS_CHURN, C.PAYMENT_METHOD_ID, C.PAYMENT_PLAN_DAYS, C.PLAN_LIST_PRICE, C.ACTUAL_AMOUNT_PAID, C.IS_AUTO_RENEW, C.TRANSACTION_DATE, C.MEMBERSHIP_EXPIRE_DATE, C.IS_CANCEL, D.CITY, D.BD, D.GENDER, D.REGISTERED_VIA, D.REGISTRATION_INIT_TIME from CHURN_EXAMPLE.PUBLIC.USER_LOGS A inner join CHURN_EXAMPLE.PUBLIC.TRAIN B sample (0.1) on A.msno = B.msno and B.msno = A.msno inner join CHURN_EXAMPLE.PUBLIC.TRANSACTIONS C on A.msno = C.msno and C.msno = A.msno inner join CHURN_EXAMPLE.PUBLIC.MEMBERSHIPS D on A.msno = D.msno and D.msno = A.msno;" \
    --arg cronSpec="@every 24h" \
    --arg secretName="snowflakesecret" \
    --arg format=csv

In [None]:
# Force a cron tick to run the pipline initially (we don't want to wait 24 hours)
!pachctl run cron TRAIN_DATA_queries

In [None]:
# Prediction data table
!pachctl update pipeline --jsonnet ./pachyderm/snowflake_import.jsonnet  \
    --arg name=PRED_DATA \
    --arg url="snowflake://jimmy@of28881.us-central1.gcp/KKBOX_CHURN_EXAMPLE/PUBLIC?warehouse=COMPUTE_WH" \
    --arg query="select distinct A.MSNO, A.DATE, A.NUM_25, A.NUM_50, A.NUM_75, A.NUM_985, A.NUM_100, A.NUM_UNQ, A.TOTAL_SECS, C.PAYMENT_METHOD_ID, C.PAYMENT_PLAN_DAYS, C.PLAN_LIST_PRICE, C.ACTUAL_AMOUNT_PAID, C.IS_AUTO_RENEW, C.TRANSACTION_DATE, C.MEMBERSHIP_EXPIRE_DATE, C.IS_CANCEL, D.CITY, D.BD, D.GENDER, D.REGISTERED_VIA, D.REGISTRATION_INIT_TIME from CHURN_EXAMPLE.PUBLIC.USER_LOGS A inner join CHURN_EXAMPLE.PUBLIC.TRANSACTIONS C sample (1) on A.msno = C.msno and C.msno = A.msno inner join CHURN_EXAMPLE.PUBLIC.MEMBERSHIPS D on A.msno = D.msno and D.msno = A.msno;" \
    --arg cronSpec="@every 24h" \
    --arg secretName="snowflakesecret" \
    --arg format=csv

In [None]:
# Force a cron tick to run the pipline initially (we don't want to wait 24 hours)
!pachctl run cron PRED_DATA_queries

## 4. Process Data with Python and R
Now that our data is in Pachyderm, we can run arbitrary processing on the data. This means that we can apply our python code to do data cleaning, feature engineering, model training, and anything else we desire through [Pachyderm pipelines](https://docs.pachyderm.com/latest/concepts/pipeline-concepts/pipeline/). 

If we look at one of these files, we can see that we're calling python to process our csv data. Pachyderm passes the versioned data stored in data repositories as inputs to the file system of the container when it runs. 

In [28]:
!cat pachyderm/data-cleaning.json

{
    "pipeline": {
      "name": "clean-data"
    },
    "description": "Clean and curate dataset.",
    "input": {
      "pfs": {
        "repo": "TRAIN_DATA",
        "glob": "/"
      }
    },
    "transform": {
        "cmd": [
          "python","/workdir/data-cleaning.py","--data","/pfs/TRAIN_DATA/0000","--output","/pfs/out/"
        ],
      "image": "jimmywhitaker/py_wsdm:dev0.18"
    }
  }

### 4.1 Create Pachyderm Pipelines
Once created, our pipelines will automatically execute whenever our data changes, such as when we query new data from Snowflake. 

In [38]:
# Clean our dataset
!pachctl create pipeline -f pachyderm/clean-data.json

# Create features from our cleaned data
!pachctl create pipeline -f pachyderm/feature-engineering.json

# Train a churn classification model
!pachctl create pipeline -f pachyderm/model.json

# Generate some data visualization for our tables
# !pachctl create pipeline -f pachyderm/visualizations.json

## 5. Predict Churn for Users
Now that we have a trained model, we can apply it to our new data to figure out who is likely to churn according to the data we have on their usage of the KKBox service. 



### 5.1 Create a Snowflake Table to put our results
We may want to do queries or other functions with our predicted churn data once we've computed it, so we'll egress this data from Pachyderm back to Snowflake. Before we do that, we'll create a table to put our results. 

In [23]:
# Do the same with the Schema.
tbl_name = 'PREDICTIONS'
sql = "CREATE TABLE IF NOT EXISTS {tbl_name} (msno varchar (100), churn_prediction integer);".format(tbl_name = tbl_name)
cur.execute(sql)

<snowflake.connector.cursor.SnowflakeCursor at 0x7fee68c917c0>

### 5.2 Create Pachyderm Pipelines
Before we predict on our data, we'll clean and extract features to ensure our data is relevant to the model we created. 

In [None]:
# Clean the data we're going to predict on
!pachctl create pipeline -f pachyderm/clean-data-pred.json

# Extract features from our prediction data
!pachctl create pipeline -f pachyderm/feature-engineering-pred.json

# Extract features from our prediction data
!pachctl create pipeline -f pachyderm/predict.json

The final pipeline `predict` uses the Pachyderm [egress feature](https://docs.pachyderm.com/latest/how-tos/basic-data-operations/export-data-out-pachyderm/sql-egress/). This allows us to write data to Snowflake by configuring our pipeline specification with the appropriate information and Pachyderm handles everything for us. 

In [33]:
!cat pachyderm/predict.json

{
    "pipeline": {
      "name": "predict"
    },
    "description": "Predict churn probability for KKBox customers.",
    "input": {
      "cross": [
        {
          "pfs": {
            "repo": "feature-engineering-pred",
            "glob": "/"
          }
        },
        {
          "pfs": {
            "repo": "model",
            "glob": "/"
          }
        }
      ]
    },
    "transform": {
      "cmd": [
        "python", "predict.py", "--model", "/pfs/model/logistic_regression.sav", "--features", "/pfs/feature-engineering-pred/inference_features.csv", "--output", "/pfs/out/predictions/"
      ],
      "image": "jimmywhitaker/py_wsdm:dev0.19"
    },
    "egress": {
        "sql_database": {
            "url": "snowflake://jimmy@of28881.us-central1.gcp/KK_BOX_CHURN_EXAMPLE/PUBLIC?warehouse=COMPUTE_WH",
            "file_format": {
                "type": "CSV",
                "columns": ["msno", "churn_prediction"]
            },
            "secret": {
           

## 6. View Results!

We can now go to snowflake and view all the users that are at risk of churning on our service!

<img src="images/churn_prediction.png" alt="Drawing" style="width: 700px;"/>

## Clean Up

In [33]:
!pachctl delete pipeline --all