<img src="Images/Splice_logo.jpeg" width="250" height="200" align="left" >

# Train machine learning models using the Feature Store

---

## How do you find features values at the correct point in time?
   - ### Features are updated at different times
   - ### How would you join across asynchronous timestamps?

<img src="Images/point_in_time_problem.png" width="900" align="left" >

#### This can be done without a Feature Store once or twice, but for 5 or 50 models?

---

# Easily build point in time consistent training sets with our Feature Store

<img src="Images/training_set.png" width="1000"  align="left" >

## Structure of Feature Set Tables
<img src="Images/FS_tables.png" width="800" height="400" align="left" >

---

# Feature Store for Model Training

In [None]:
#Begin spark session 
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()

#Create pysplice context. Allows you to create a Spark dataframe using our Native Spark DataSource 
from splicemachine.spark import PySpliceContext
splice = PySpliceContext(spark)

#Initialize our Feature Store API
from splicemachine.features import FeatureStore
from splicemachine.features.constants import FeatureType
fs = FeatureStore(splice)

#Initialize MLFlow
from splicemachine.mlflow_support import *
mlflow.register_feature_store(fs)
mlflow.register_splice_context(splice)

## Write any SQL to get your label. The label doesn't have to be apart of the Feature Store

In [None]:
%%sql
SELECT ltv.CUSTOMERID, 
       ((w.WEEK_END_DATE - ltv.CUSTOMER_START_DATE)/ 7) CUSTOMERWEEK,
       CAST(w.WEEK_END_DATE as TIMESTAMP) CUSTOMER_TS,  
       ltv.CUSTOMER_LIFETIME_VALUE as CUSTOMER_LTV
FROM retail_rfm.weeks w --splice-properties useSpark=True
INNER JOIN 
    twimlcon_fs.customer_lifetime ltv 
    ON w.WEEK_END_DATE >= ltv.CUSTOMER_START_DATE AND w.WEEK_END_DATE <= ltv.CUSTOMER_START_DATE + 28 --only first 4 weeks
ORDER BY 1,2

{limit 8}
;

## Create a Training View
#### By specifying the join key and time stamp, you can automatically get all of the relevant features you need

In [None]:
sql = """
SELECT ltv.CUSTOMERID, 
       ((w.WEEK_END_DATE - ltv.CUSTOMER_START_DATE)/ 7) CUSTOMERWEEK,
       CAST(w.WEEK_END_DATE as TIMESTAMP) CUSTOMER_TS,  
       ltv.CUSTOMER_LIFETIME_VALUE as CUSTOMER_LTV
FROM retail_rfm.weeks w --splice-properties useSpark=True
INNER JOIN 
    twimlcon_fs.customer_lifetime ltv 
    ON w.WEEK_END_DATE > ltv.CUSTOMER_START_DATE AND w.WEEK_END_DATE <= ltv.CUSTOMER_START_DATE + 28 --only first 4 weeks
"""

pks = ['CUSTOMERID','CUSTOMERWEEK'] # Each unique training row is identified by the customer and their week of spending activity
join_keys = ['CUSTOMERID'] # This is the primary key of the Feature Sets that we want to join to

fs.create_training_view(
    'twimlcon_customer_lifetime_value',
    sql=sql, 
    primary_keys=pks, 
    join_keys=join_keys,
    ts_col = 'CUSTOMER_TS', # How we join each unique row with our eventual Features
    label_col='CUSTOMER_LTV', # The thing we want to predict
    desc = 'The current (as of queried) lifetime value of each customer per week of being a customer'
)

## Easily extract all features
#### Every time this code is re-run you have access to the most up-to-date features

In [None]:
#Spark Dataframe
all_features = fs.get_training_set_from_view('twimlcon_customer_lifetime_value')
all_features.limit(8).toPandas()

In [None]:
#SQL used to generate the Dataframe
sql = fs.get_training_set_from_view('twimlcon_customer_lifetime_value',return_sql=True)
print(sql)

---

## Automatic Feature Selection
As simple as using the get_training_view function

In [None]:
import re

# get training set as a SQL statement
feats = fs.get_training_view_features('twimlcon_customer_lifetime_value')
# Grab only up to 4 weeks of RFM values
desired_features = ['CUSTOMER_LIFETIME_DAYS'] + [f.name for f in feats if re.search('_[0-4]W',f.name)]



all_features = fs.get_training_set_from_view('twimlcon_customer_lifetime_value', features = desired_features).dropna() 


top_features, feature_importances = fs.run_feature_elimination(
    all_features,
    features=desired_features,
    label = 'CUSTOMER_LTV',
    n = 10,
    verbose=2,
    step=30,
    model_type='regression',
    log_mlflow=True,
    mlflow_run_name='Feature_Elimination_LTV',
    return_importances=True
)

model_training_df = fs.get_training_set_from_view('twimlcon_customer_lifetime_value', features = top_features).dropna() 

---

## Train a Machine Learning Model
### Splice Machine's model training is built around an integrated and enhanced version of MLFlow

In [None]:
from splicemachine.notebook import get_mlflow_ui
get_mlflow_ui()

In [None]:
###############
# SparkML Model
###############
from pyspark.ml.regression import LinearRegression, RandomForestRegressor
from pyspark.ml.feature import VectorAssembler,StandardScaler
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import RegressionEvaluator


mlflow.set_experiment('Predict Lifetime Value from Initial Customer Activity')
run_tags={'project': 'TWIMLcon Demo',
          'team': 'INSERT YOUR NAME HERE'
         }

features_list = [f.name for f in top_features]
features_str  = ','.join(features_list)  

va = VectorAssembler(inputCols=features_list, outputCol='features_raw')
scaler = StandardScaler(inputCol="features_raw", outputCol="features")


with mlflow.start_run(run_name = f"Regression LTV", tags = run_tags):


    lr = LinearRegression(featuresCol = 'features', labelCol = 'CUSTOMER_LTV', maxIter=10, regParam=0.3, elasticNetParam=0.8)
    #lr = RandomForestRegressor(featuresCol = 'features', labelCol = 'CUSTOMER_LTV')
    
    pipeline = Pipeline( stages=[va, scaler, lr])

    # log everything
    mlflow.log_feature_transformations(pipeline)
    mlflow.log_pipeline_stages(pipeline)

    #train
    train,test = model_training_df.randomSplit([0.80,0.20])
    model = pipeline.fit(train)
    predictions = model.transform(test)

    lr_model = model.stages[-1]
    print("Coefficients: " + str(lr_model.coefficients))
    print("Intercept: " + str(lr_model.intercept))
    
    # log metric
    pred_evaluator = RegressionEvaluator(predictionCol="prediction", labelCol="CUSTOMER_LTV",metricName="r2")
    r2 = pred_evaluator.evaluate(predictions)
    print("R Squared (R2) on test data = %g" % r2)
    mlflow.log_metric('r2',r2)

    mlflow.log_model(model)
    run_id = mlflow.current_run_id()

In [None]:
from splicemachine.notebook import get_mlflow_ui
get_mlflow_ui()

---

## Store most important features for use in the next jupyter notebook

In [None]:
%store features_list
%store features_str

In [None]:
spark.stop()