# Snowpark Python - TPC DS  - Customer Lifetime Value

This demo utilizes the [TPC DS sample](https://docs.snowflake.com/en/user-guide/sample-data-tpcds.html) dataset that is made available via  Snowflake share. It can be configured to run on either the 10 TB or the 100 TB version of the dataset. 

This illustrates how to utilize Snowpark for feature engineering, training, inference, streams, and tasks to answer a common question for retailers: What is the value of a customer across all sales channels? 

We will use sales data, customer information, and demographic data to see if we can accurately predict the lifetime value of a customer.

In the repo you can also find a streamlit app showing how to connect to snowflake as a data source.
Streamlit will soon be available in Snowsight as well.

### Setup:
The TPC DS data is available already to you in your Snowflake account as shared database utlizing Snowflake's data sharing. This means you as the user will never incur the costs of storing this large dataset. 

 1. Create a conda environment using the provided *environment.yml* file. 
    1. `conda env create -f environment.yml `
    2. Activate that created conda environment by `conda activate snowpark_ml_test`
 2. Edit the *creds.json* file to with your account information to connect to your account. 
 3. Load Jupyter or equivalent notebook to begin executing the notebook. 
 4. Refer to creds(example).json on how to configure login credentials

### Table of Contents:
1. Connecting to Snowflake
2. Environment Setup
3. Feature Engineering
4. Model Training
5. Infer Predictions
6. Streams and Tasks

### Cost Performance

Below is a table of some observed performance stats I have observed in AWS US East Ohio. All times reported in seconds and assuming enterprise edition list pricing. 

| Dataset       	| Data prep/Feature Eng Warehouse 	| Snowpark Optimized Warehouse 	| Time for feature eng/prep 	| Cost for feature eng/prep 	| Time for training 	| Cost for training 	| Time for inference 	| Cost for inference 	|
|---------------	|---------------------------------	|------------------------------	|---------------------------	|---------------------------	|-------------------	|-------------------	|--------------------	|--------------------	|
| TPC-DS 10 TB  	| 3XL                             	| Medium                       	| 60                        	| $3.20                     	| 1400.4            	| $7.07             	| 9.8                	| $0.52              	|
| TPC-DS 100 TB 	| 3XL                             	| Medium                       	| 311.6                     	| $16.51                    	| 2210              	| $11.05            	| 24.6               	| $1.30              	|

# Connecting to Snowflake

In [2]:
import snowflake.snowpark
from snowflake.snowpark import functions as F
from snowflake.snowpark.session import Session
from snowflake.snowpark import version as v
from snowflake.snowpark.types import (
    Variant,
    IntegerType,
    BooleanType,
    FloatType,
    StringType,
    DoubleType,
    BooleanType,
    DateType,
    StructType,
    StructField
)
from snowflake.snowpark.functions import (
    udtf,
    udf,
    col,
    lit,
    row_number,
    table_function
)
import json 

#See creds.json for an example on how to build your connection string.
with open('creds.json') as f:
    data = json.load(f)
    USERNAME = data['user']
    PASSWORD = data['password']
    SF_ACCOUNT = data['account']
    SF_WH = data['warehouse']
    SF_ROLE = data['role']

CONNECTION_PARAMETERS = {
   "account": SF_ACCOUNT,
   "user": USERNAME,
   "password": PASSWORD,
   "role": SF_ROLE
}

session = Session.builder.configs(CONNECTION_PARAMETERS).create()

## Environment Setup

In [2]:
# using data marketplace, copy down sample_data into snowflake. This can be done with any data on the marketplace made available to your account
session.use_role('ACCOUNTADMIN')
session.sql('''create database if not exists snowflake_sample_data from share sfc_samples.sample_data''').collect()

[Row(status='SNOWFLAKE_SAMPLE_DATA already exists, statement succeeded.')]

In [3]:
# Let's create the context for our demo. 
# Note that I am going to use a 3X-Large warehouse for the demo for the sake of time.
# refer to the readme for cost comparison of warehousing sizing for the demo.

session.sql('CREATE DATABASE IF NOT EXISTS tpcds_xgboost').collect()
session.sql('CREATE SCHEMA IF NOT EXISTS tpcds_xgboost.demo').collect()
# session.sql("create or replace warehouse FE_AND_INFERENCE_WH with warehouse_size='3X-LARGE'").collect()
session.sql("create or replace warehouse FE_AND_INFERENCE_WH with warehouse_size='X-LARGE'").collect()
session.sql("create or replace warehouse snowpark_opt_wh with warehouse_size = 'MEDIUM' warehouse_type = 'SNOWPARK-OPTIMIZED'").collect()
session.sql("alter warehouse snowpark_opt_wh set max_concurrency_level = 1").collect()
session.use_warehouse('FE_AND_INFERENCE_WH')

Select either 100 or 10 for the TPC-DS Dataset size to use below. See (https://docs.snowflake.com/en/user-guide/sample-data-tpcds.html)[here] for more information If you choose 100, I recommend >= 3XL warehouse. 

In [98]:
# Selecting either the 10TB or 100TB set of sample data
TPCDS_SIZE_PARAM = 10
SNOWFLAKE_SAMPLE_DB = 'SNOWFLAKE_SAMPLE_DATA' # Name of Snowflake Sample Database might be different...

if TPCDS_SIZE_PARAM == 100: 
    TPCDS_SCHEMA = 'TPCDS_SF100TCL'
elif TPCDS_SIZE_PARAM == 10:
    TPCDS_SCHEMA = 'TPCDS_SF10TCL'
else:
    raise ValueError("Invalid TPCDS_SIZE_PARAM selection")

# read the Snowflake marketplace tables into Snowpark dataframes
store_sales = session.table(f'{SNOWFLAKE_SAMPLE_DB}.{TPCDS_SCHEMA}.store_sales')
catalog_sales = session.table(f'{SNOWFLAKE_SAMPLE_DB}.{TPCDS_SCHEMA}.catalog_sales') 
web_sales = session.table(f'{SNOWFLAKE_SAMPLE_DB}.{TPCDS_SCHEMA}.web_sales') 
date = session.table(f'{SNOWFLAKE_SAMPLE_DB}.{TPCDS_SCHEMA}.date_dim')
dim_stores = session.table(f'{SNOWFLAKE_SAMPLE_DB}.{TPCDS_SCHEMA}.store')
customer = session.table(f'{SNOWFLAKE_SAMPLE_DB}.{TPCDS_SCHEMA}.customer')
address = session.table(f'{SNOWFLAKE_SAMPLE_DB}.{TPCDS_SCHEMA}.customer_address')
demo = session.table(f'{SNOWFLAKE_SAMPLE_DB}.{TPCDS_SCHEMA}.customer_demographics')

## Feature Engineering / Building Feature Table
We will aggregate sales by customer across all channels(web, store, catalogue) and join that to customer demographic data. 

In [99]:
# Grouping all sales by customer and summing them. 
# note that F.sum() refers to the package snowflake.snowpark.functions
# please see documentation here for functions in this library:
# https://docs.snowflake.com/ko/developer-guide/snowpark/reference/scala/com/snowflake/snowpark/functions$.html

store_sales_agged = store_sales.group_by('ss_customer_sk').agg(F.sum('ss_sales_price').as_('total_sales'))
web_sales_agged = web_sales.group_by('ws_bill_customer_sk').agg(F.sum('ws_sales_price').as_('total_sales'))
catalog_sales_agged = catalog_sales.group_by('cs_bill_customer_sk').agg(F.sum('cs_sales_price').as_('total_sales'))
store_sales_agged = store_sales_agged.rename('ss_customer_sk', 'customer_sk')
web_sales_agged = web_sales_agged.rename('ws_bill_customer_sk', 'customer_sk')
catalog_sales_agged = catalog_sales_agged.rename('cs_bill_customer_sk', 'customer_sk')

In [100]:
# Unioning the sales from the different sales channels into a single snowpark dataframe
total_sales = store_sales_agged.union_all(web_sales_agged)
total_sales = total_sales.union_all(catalog_sales_agged)

In [101]:
# vertically summing the sales from the unioned snowpark dataframe and grouping by customer
total_sales = total_sales.group_by('customer_sk').agg(F.sum('total_sales').as_('total_sales'))

In [102]:
# query a subset of the customer information into a snowpark dataframe
customer = customer.select('c_customer_sk','c_current_hdemo_sk', 'c_current_addr_sk', 'c_customer_id', 'c_birth_year')

In [103]:
# join the query results to a query subset from the address table
customer = customer.join(address.select('ca_address_sk', 'ca_zip'), customer['c_current_addr_sk'] == address['ca_address_sk'] )
# join the customer and address query results to demographic information from demo table
customer = customer.join(demo.select('cd_demo_sk', 'cd_gender', 'cd_marital_status', 'cd_credit_rating', 'cd_education_status', 'cd_dep_count'),
                                customer['c_current_hdemo_sk'] == demo['cd_demo_sk'] )
# renaming customer_sk column
customer = customer.rename('c_customer_sk', 'customer_sk')
customer.show()

------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
|"CUSTOMER_SK"  |"C_CURRENT_HDEMO_SK"  |"C_CURRENT_ADDR_SK"  |"C_CUSTOMER_ID"   |"C_BIRTH_YEAR"  |"CA_ADDRESS_SK"  |"CA_ZIP"  |"CD_DEMO_SK"  |"CD_GENDER"  |"CD_MARITAL_STATUS"  |"CD_CREDIT_RATING"  |"CD_EDUCATION_STATUS"  |"CD_DEP_COUNT"  |
------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
|28639804       |6777                  |17517826             |AAAAAAAAMDCAFLBA  |1984            |17517826         |76871     |6777          |M            |W                    |Good                |Advanced Degree        |1               |
|28639805       |3350               

In [104]:
# join our aggregated sales information to our customer information query results.
final_df = total_sales.join(customer, on='customer_sk')

In [None]:
# write this final snowpark dataframe back to snowflake
session.use_database('tpcds_xgboost')
session.use_schema('demo')
final_df.write.mode('overwrite').save_as_table('feature_store')

## Write the same joins in SQL
- Highlight the simplicity of snowpark vs sql
- Highlight the strength of the query optimizer in snowpark vs sql

# Write here how much storage we have down from 10TB onto our snowflake accout

## Model Training

In [5]:
# create the context for model training
session.use_database('tpcds_xgboost')
session.use_schema('demo')
session.use_warehouse('snowpark_opt_wh')
session.add_packages('snowflake-snowpark-python', 'scikit-learn', 'pandas', 'numpy', 'joblib', 'cachetools', 'xgboost', 'joblib')

The version of package scikit-learn in the local environment is 1.2.1, which does not fit the criteria for the requirement scikit-learn. Your UDF might not work when the package version is different between the server and your local environment
The version of package xgboost in the local environment is 1.7.3, which does not fit the criteria for the requirement xgboost. Your UDF might not work when the package version is different between the server and your local environment


In [5]:
# create the stage
session.sql('CREATE OR REPLACE STAGE ml_models ').collect()

[Row(status='Stage area ML_MODELS successfully created.')]

In [6]:
from sklearn.pipeline import Pipeline
from sklearn.impute import SimpleImputer
from sklearn.preprocessing import StandardScaler, OneHotEncoder, MinMaxScaler
from sklearn.metrics import mean_squared_error
from sklearn.compose import ColumnTransformer
from xgboost import XGBRegressor
import joblib
import os

def train_model(session: snowflake.snowpark.Session) -> float:
    # read the table we assembled in the prior section into a snowpark dataframe
    snowdf = session.table("feature_store")
    # drop the non-predictor columns
    snowdf = snowdf.drop(['CUSTOMER_SK', 'C_CURRENT_HDEMO_SK', 'C_CURRENT_ADDR_SK', 'C_CUSTOMER_ID', 'CA_ADDRESS_SK', 'CD_DEMO_SK'])
    # split the data into test/train
    snowdf_train, snowdf_test = snowdf.random_split([0.8, 0.2], seed=82) 

    # save the train and test sets as time stamped tables in Snowflake 
    snowdf_train.write.mode("overwrite").save_as_table("tpcds_xgboost.demo.tpc_TRAIN")
    snowdf_test.write.mode("overwrite").save_as_table("tpcds_xgboost.demo.tpc_TEST")
    # drop the outcome for training set
    train_x = snowdf_train.drop("TOTAL_SALES").to_pandas()
    # select the outcome for training set
    train_y = snowdf_train.select("TOTAL_SALES").to_pandas()
    # drop the outcome for test set
    test_x = snowdf_test.drop("TOTAL_SALES").to_pandas()
    # select the outcome for test set
    test_y = snowdf_test.select("TOTAL_SALES").to_pandas()
    # define the categorical columns for the pipeline feature engineering
    cat_cols = ['CA_ZIP', 'CD_GENDER', 'CD_MARITAL_STATUS', 'CD_CREDIT_RATING', 'CD_EDUCATION_STATUS']
    # define the numerical columns for the pipeline feature engineering
    num_cols = ['C_BIRTH_YEAR', 'CD_DEP_COUNT']

    # define the numerical portion of the pipeline
    num_pipeline = Pipeline([
            ('imputer', SimpleImputer(strategy="median")),
            ('std_scaler', StandardScaler()),
        ])

    # define the pre-processor
    preprocessor = ColumnTransformer(
    transformers=[('num', num_pipeline, num_cols),
                  ('encoder', OneHotEncoder(handle_unknown="ignore"), cat_cols) ])

    # assemble the pipe with pre-processor and model
    pipe = Pipeline([('preprocessor', preprocessor), 
                        ('xgboost', XGBRegressor())])
    # fit the data with pre-processing and model
    pipe.fit(train_x, train_y)

    # predict the outcomes for test data
    test_preds = pipe.predict(test_x)
    # calculate the error between predictions and actuals for test data
    rmse = mean_squared_error(test_y, test_preds)
    # define path for pipe and model file
    model_file = os.path.join('/tmp', 'model.joblib')
    # dump model pipe and model to joblib file
    joblib.dump(pipe, model_file)
    # put the joblib file in the ml_models stage
    session.file.put(model_file, "@ml_models",overwrite=True)
    return rmse

  from pandas import MultiIndex, Int64Index


In [7]:
# create the stored procedure
train_model_sp = F.sproc(train_model, session=session, replace=True)
# Switch to Snowpark Optimized Warehouse for training and to run the stored proc
session.use_warehouse('snowpark_opt_wh')
# run the stored procedure
train_model_sp(session=session)

34213493.08788544

## Infer Predictions Using Stored Model

In [6]:
# Switch back to feature engineering/inference warehouse
session.use_warehouse('FE_AND_INFERENCE_WH')
session.use_database('TPCDS_XGBOOST')
session.use_schema('DEMO')

In [7]:
import sys
import pandas as pd
import cachetools
import joblib
from snowflake.snowpark import types as T

# importing pipe file from stage into import directory
session.add_import("@ml_models/model.joblib.gz")

# defining featurelist
features = ['C_BIRTH_YEAR', 'CA_ZIP', 'CD_GENDER', 'CD_MARITAL_STATUS', 'CD_CREDIT_RATING', 'CD_EDUCATION_STATUS', 'CD_DEP_COUNT']

# clearing the cache
@cachetools.cached(cache={})

def read_file(filename):
       # snowflake import directory exists on snowflake
       import_dir = sys._xoptions.get("snowflake_import_directory")
       
       # reading the file pip into session
       if import_dir:
              with open(os.path.join(import_dir, filename), 'rb') as file:
                     m = joblib.load(file)
                     return m

# creating the udf on snowflake. The @ symbol will take whatever function follows and attribute it to the udf.
@F.pandas_udf(session=session, max_batch_size=10000, is_permanent=True, stage_location='@ml_models', name="clv_xgboost_udf", replace=True)


# naming and defining the UDF as predict
def predict(df:  T.PandasDataFrame[int, str, str, str, str, str, int]) -> T.PandasSeries[float]:
       m = read_file('model.joblib.gz')
       # m = read_file("@mlmodels/model.joblib.gz")

       df.columns = features
       
       return m.predict(df)

In [8]:
%%time
# read our prepared feature store table in our Snowflake account storage into Snowpark dataframe
inference_df = session.table('feature_store')
# drop the non-feature columns
inference_df = inference_df.drop(['CUSTOMER_SK', 'C_CURRENT_HDEMO_SK', 'C_CURRENT_ADDR_SK', 'C_CUSTOMER_ID', 'CA_ADDRESS_SK', 'CD_DEMO_SK'])
# drop the outcome
inputs = inference_df.drop("TOTAL_SALES")
# call the udf for inference
snowdf_results = inference_df.select(*inputs,
                    predict(*inputs).alias('PREDICTION'), 
                    (F.col('TOTAL_SALES')).alias('ACTUAL_SALES')
                    )
# write the results to a table in snowflake
snowdf_results.write.mode('overwrite').save_as_table('predictions')

CPU times: user 18.6 ms, sys: 3.98 ms, total: 22.5 ms
Wall time: 56 s


In [9]:
snowdf_results.count()

62726989

# UNDER CONSTRUCTION UNDER HERE

## Streams and Tasks (Change Data Capture and Scheduling)
- Still need to get metadeta to reset for update

In [3]:
# delete me, just for piecework to set context
session.use_database('TPCDS_XGBOOST')
session.use_schema('DEMO')
session.use_warehouse('compute_wh')
session.use_role('ACCOUNTADMIN')

create or replace TABLE ROI_PRED (
	SEARCH_ENGINE NUMBER(38,0),
	SOCIAL_MEDIA NUMBER(38,0),
	VIDEO NUMBER(38,0),
	EMAIL NUMBER(38,0),
	PREDICTED_ROI FLOAT
)

In [84]:
session.sql('drop table feature_stage').collect()
session.sql("""
	create or replace TABLE feature_stage (
	CUSTOMER_SK VARCHAR(50),
	TOTAL_SALES NUMBER(38,2),
	C_CURRENT_HDEMO_SK VARCHAR(50),
	C_CURRENT_ADDR_SK VARCHAR(50),
	C_CUSTOMER_ID VARCHAR(50),
	C_BIRTH_YEAR NUMBER(38,0),
	CA_ADDRESS_SK NUMBER(38,0),
	CA_ZIP NUMBER(38,0),
	CD_DEMO_SK NUMBER(38,0),
	CD_GENDER VARCHAR(50),
	CD_MARITAL_STATUS VARCHAR(50),
	CD_CREDIT_RATING VARCHAR(50),
    CD_EDUCATION_STATUS VARCHAR(50),
	CD_DEP_COUNT NUMBER(38,0),
	PREDICTION NUMBER(38,0)
)
""").collect()

[Row(status='Table FEATURE_STAGE successfully created.')]

In [85]:
session.sql("""
create or replace stream xg_boost_stream on table feature_stage;
""").collect()

[Row(status='Stream XG_BOOST_STREAM successfully created.')]

In [86]:
# features = ['C_BIRTH_YEAR', 'CA_ZIP', 'CD_GENDER', 'CD_MARITAL_STATUS', 'CD_CREDIT_RATING', 'CD_EDUCATION_STATUS', 'CD_DEP_COUNT']
session.sql("select * from xg_boost_stream").show()
feature_store = session.table('feature_store')
el_df = feature_store.select("*").distinct().limit(100)
el_df = el_df.withColumn('PREDICTION', F.lit(None).cast(DoubleType()))
el_df.write.mode('append').save_as_table('feature_stage')

--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
|"CUSTOMER_SK"  |"TOTAL_SALES"  |"C_CURRENT_HDEMO_SK"  |"C_CURRENT_ADDR_SK"  |"C_CUSTOMER_ID"  |"C_BIRTH_YEAR"  |"CA_ADDRESS_SK"  |"CA_ZIP"  |"CD_DEMO_SK"  |"CD_GENDER"  |"CD_MARITAL_STATUS"  |"CD_CREDIT_RATING"  |"CD_EDUCATION_STATUS"  |"CD_DEP_COUNT"  |"PREDICTION"  |"METADATA$ACTION"  |"METADATA$ISUPDATE"  |"METADATA$ROW_ID"  |
--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
|

In [87]:
session.sql("select * from xg_boost_stream").show()


--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
|"CUSTOMER_SK"  |"TOTAL_SALES"  |"C_CURRENT_HDEMO_SK"  |"C_CURRENT_ADDR_SK"  |"C_CUSTOMER_ID"   |"C_BIRTH_YEAR"  |"CA_ADDRESS_SK"  |"CA_ZIP"  |"CD_DEMO_SK"  |"CD_GENDER"  |"CD_MARITAL_STATUS"  |"CD_CREDIT_RATING"  |"CD_EDUCATION_STATUS"  |"CD_DEP_COUNT"  |"PREDICTION"  |"METADATA$ACTION"  |"METADATA$ISUPDATE"  |"METADATA$ROW_ID"                         |
----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [88]:
# read our prepared feature store table in our Snowflake account storage into Snowpark dataframe
feature_stage = session.table('feature_stage')
base_table = session.table('xg_boost_stream').filter((col('METADATA$ACTION') == 'INSERT') & col("PREDICTION").isNull())
base_table.show()

# drop the non-feature columns
inference_df = base_table.drop(['C_CURRENT_HDEMO_SK', 'C_CURRENT_ADDR_SK', 'C_CUSTOMER_ID', 'CA_ADDRESS_SK', 'CD_DEMO_SK', "METADATA$ACTION", "METADATA$ISUPDATE", "METADATA$ROW_ID"])
# drop the outcome
inputs = inference_df.drop(["TOTAL_SALES", "CUSTOMER_SK", "PREDICTION"])

# call the udf for inference
snowdf_results = feature_stage.select(*inputs,
                    predict(*inputs).alias('PREDICTION'), 
                    (F.col('TOTAL_SALES')).alias('ACTUAL_SALES'),
                    (F.col('CUSTOMER_SK')),
                    (F.col('C_CURRENT_ADDR_SK')),
                    F.col('C_CURRENT_HDEMO_SK'),
                    F.col('C_CUSTOMER_ID'),
                    F.col('CA_ADDRESS_SK'),
                    F.col('CD_DEMO_SK'),
                    )
snowdf_results.show()

--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
|"CUSTOMER_SK"  |"TOTAL_SALES"  |"C_CURRENT_HDEMO_SK"  |"C_CURRENT_ADDR_SK"  |"C_CUSTOMER_ID"   |"C_BIRTH_YEAR"  |"CA_ADDRESS_SK"  |"CA_ZIP"  |"CD_DEMO_SK"  |"CD_GENDER"  |"CD_MARITAL_STATUS"  |"CD_CREDIT_RATING"  |"CD_EDUCATION_STATUS"  |"CD_DEP_COUNT"  |"PREDICTION"  |"METADATA$ACTION"  |"METADATA$ISUPDATE"  |"METADATA$ROW_ID"                         |
----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [92]:
session.sql("ALTER SESSION SET ERROR_ON_NONDETERMINISTIC_MERGE = FALSE;").collect()
feature_stage.merge(snowdf_results, (feature_stage['CUSTOMER_SK'] == snowdf_results["CUSTOMER_SK"]) & \
                                    (feature_stage["C_BIRTH_YEAR"] == snowdf_results["C_BIRTH_YEAR"]) & \
                                    (feature_stage['CA_ZIP'] == snowdf_results["CA_ZIP"]) & \
                                    (feature_stage['CD_GENDER'] == snowdf_results["CD_GENDER"]) & \
                                    (feature_stage['CD_MARITAL_STATUS'] == snowdf_results["CD_MARITAL_STATUS"]) & \
                                    (feature_stage['CD_CREDIT_RATING'] == snowdf_results["CD_CREDIT_RATING"]) & \
                                    (feature_stage['CD_EDUCATION_STATUS'] == snowdf_results["CD_EDUCATION_STATUS"]) & \
                                    (feature_stage['CD_DEP_COUNT'] == snowdf_results["CD_DEP_COUNT"]), 
                    [F.when_matched().update({"PREDICTION" : snowdf_results['PREDICTION']})])

MergeResult(rows_inserted=0, rows_updated=100, rows_deleted=0)

In [94]:
feature_stage.show()
session.sql("select * from xg_boost_stream").show()

-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
|"CUSTOMER_SK"  |"TOTAL_SALES"  |"C_CURRENT_HDEMO_SK"  |"C_CURRENT_ADDR_SK"  |"C_CUSTOMER_ID"   |"C_BIRTH_YEAR"  |"CA_ADDRESS_SK"  |"CA_ZIP"  |"CD_DEMO_SK"  |"CD_GENDER"  |"CD_MARITAL_STATUS"  |"CD_CREDIT_RATING"  |"CD_EDUCATION_STATUS"  |"CD_DEP_COUNT"  |"PREDICTION"  |
-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
|60191755       |43455.91       |6119                  |23899421             |AAAAAAAALAEHGJDA  |1957            |23899421         |64593     |6119          |M            |U           