# Customer propensity to purchase

The example based on:

https://www.kaggle.com/benpowis/customer-propensity-to-purchase

Before you start datasets:

https://www.kaggle.com/benpowis/customer-propensity-to-purchase?select=testing_sample.csv

https://www.kaggle.com/benpowis/customer-propensity-to-purchase?select=training_sample.csv

In [1]:
# default_exp train

# Prepare data 

## Prepare bucket

In [2]:
#hide
import os
import boto3

bucket_name="propensity"
feast_bucket_name="feast"
mlflow_bucket_name="mlflow"
s3_client = boto3.client('s3', endpoint_url=os.environ['MLFLOW_S3_ENDPOINT_URL'])
s3_client.create_bucket(Bucket=bucket_name)
s3_client.create_bucket(Bucket=feast_bucket_name)
s3_client.create_bucket(Bucket=mlflow_bucket_name)

{'ResponseMetadata': {'RequestId': '167D825524F0C640',
  'HostId': '',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'accept-ranges': 'bytes',
   'content-length': '0',
   'content-security-policy': 'block-all-mixed-content',
   'location': '/mlflow',
   'server': 'MinIO',
   'vary': 'Origin',
   'x-amz-request-id': '167D825524F0C640',
   'x-xss-protection': '1; mode=block',
   'date': 'Sun, 09 May 2021 21:12:23 GMT'},
  'RetryAttempts': 0},
 'Location': '/mlflow'}

## Convert data to parquets and upload to s3 

In [3]:
#hide
import pandas as pd
import pyarrow.parquet as pq
import pyarrow as pa
from pyarrow import fs
from datetime import datetime

s3 = fs.S3FileSystem(endpoint_override=os.environ.get("FEAST_S3_ENDPOINT_URL"))
def s3_upload(filename: str, entity_name: str):
    df = pd.read_csv(f"{filename}.csv")
    df['datetime'] = datetime(2021, 4, 12, 10, 59, 42)
    df['created'] = datetime(2021, 4, 12, 10, 59, 42)
    table = pa.Table.from_pandas(df)
    pq.write_table(table, f'{bucket_name}/{filename}.parquet', filesystem=s3)
    entities_table = pa.Table.from_pandas(df[entity_name].to_frame())
    pq.write_table(entities_table, f'{bucket_name}/{filename}_entities.parquet', filesystem=s3)
    
s3_upload('training_sample','UserID')
s3_upload('testing_sample','UserID')

# Feast feature store

In [4]:
#export
import feast
import warnings
warnings.filterwarnings("ignore", category=DeprecationWarning)

In [5]:
#hide
!feast apply

Registered entity [1m[32mUserID[0m
Registered feature view [1m[32mpropensity_data[0m
Deploying infrastructure for [1m[32mpropensity_data[0m


### Inspect Feast schema

In [6]:
#hide
from feast import FeatureStore
from IPython.core.display import display, HTML
import json
from json2html import *
import warnings
warnings.filterwarnings('ignore')

class FeastSchema:
    def __init__(self, repo_path: str):
        self.store = FeatureStore(repo_path=repo_path)
    
    def show_schema(self, skip_meta: bool= False):
        feast_schema=self.__project_show_schema(skip_meta)        
        display(HTML(json2html.convert(json = feast_schema)))

    def show_table_schema(self, table: str, skip_meta: bool= False):
        feasture_tables_dictionary=self.__project_show_schema(skip_meta)
        display(HTML(json2html.convert(json = {table:feasture_tables_dictionary[table]})))

    def __project_show_schema(self, skip_meta: bool= False):
        entities_dictionary={}
        feast_entities=self.store.list_entities()
        for entity in feast_entities:
            entity_dictionary=entity.to_dict()
            entity_spec=entity_dictionary['spec']
            entities_dictionary[entity_spec['name']]=entity_spec
        
        feasture_tables_dictionary={}
        feast_feature_tables=self.store.list_feature_views()
        for feature_table in feast_feature_tables:
            feature_table_dict=json.loads(str(feature_table))
            feature_table_spec=feature_table_dict['spec']
            feature_table_name=feature_table_spec['name']
            feature_table_spec.pop('name',None)
            if 'entities' in feature_table_spec:
                feature_table_entities=[]
                for entity in feature_table_spec['entities']:
                    feature_table_entities.append(entities_dictionary[entity])
                feature_table_spec['entities']=feature_table_entities
                
            if not skip_meta:
                feature_table_spec['meta']=feature_table_dict['meta']
            else:
                feature_table_spec.pop('input',None)
                feature_table_spec.pop('ttl',None)
                feature_table_spec.pop('online',None)
                
            feasture_tables_dictionary[feature_table_name]=feature_table_spec
        
        return feasture_tables_dictionary
    

        
    
FeastSchema(".").show_schema()
#FeastSchema(".").show_schema(skip_meta=True)
#FeastSchema(".").show_table_schema('driver_hourly_stats')
#FeastSchema().show_tables()

name,valueType,description,joinKey
name,valueType,Unnamed: 2_level_1,Unnamed: 3_level_1
UserID,STRING,user id,UserID
basket_icon_click,DOUBLE,,
basket_add_list,DOUBLE,,
basket_add_detail,DOUBLE,,
sort_by,DOUBLE,,
image_picker,DOUBLE,,
account_page_click,DOUBLE,,
promo_banner_click,DOUBLE,,
detail_wishlist_add,DOUBLE,,
list_size_dropdown,DOUBLE,,

name,valueType,description,joinKey
name,valueType,Unnamed: 2_level_1,Unnamed: 3_level_1
UserID,STRING,user id,UserID
basket_icon_click,DOUBLE,,
basket_add_list,DOUBLE,,
basket_add_detail,DOUBLE,,
sort_by,DOUBLE,,
image_picker,DOUBLE,,
account_page_click,DOUBLE,,
promo_banner_click,DOUBLE,,
detail_wishlist_add,DOUBLE,,
list_size_dropdown,DOUBLE,,

name,valueType,description,joinKey
UserID,STRING,user id,UserID

name,valueType
basket_icon_click,DOUBLE
basket_add_list,DOUBLE
basket_add_detail,DOUBLE
sort_by,DOUBLE
image_picker,DOUBLE
account_page_click,DOUBLE
promo_banner_click,DOUBLE
detail_wishlist_add,DOUBLE
list_size_dropdown,DOUBLE
closed_minibasket_click,DOUBLE

0,1
type,BATCH_FILE
eventTimestampColumn,datetime
createdTimestampColumn,created
fileOptions,fileUrls3://propensity/training_sample.parquet

0,1
fileUrl,s3://propensity/training_sample.parquet

0,1
createdTimestamp,1970-01-01T00:00:00Z


## Input parameters for mlflow project 

In [None]:
#export 
import argparse
parser= argparse.ArgumentParser()

parser.add_argument('--var_smoothing', type=float)

args = parser.parse_args()
input_params = args.__dict__

In [30]:
#hide
input_params = {'var_smoothing':1e-9}

In [28]:
#export
import os
import pandas as pd
import warnings
import sys
import numpy as np
import joblib
import pyarrow.parquet as pq
import pyarrow as pa
import sklearn
from pyarrow import fs
from datetime import datetime
from feast import FeatureStore
from sklearn.model_selection  import train_test_split
from sklearn.naive_bayes import GaussianNB
from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score

## Train and load to mlflow  

### Train, Save and Evaluate 

In [31]:
#export
bucket_name="propensity"
filename="training_sample"

store = FeatureStore(repo_path=".")

s3 = fs.S3FileSystem(endpoint_override=os.environ.get("FEAST_S3_ENDPOINT_URL"))
entity_df=pd.read_parquet(f'{bucket_name}/{filename}_entities.parquet', filesystem=s3)
entity_df["event_timestamp"]=datetime.now()


training_df = store.get_historical_features(
    entity_df=entity_df, 
    feature_refs = [
        'propensity_data:basket_icon_click',
        'propensity_data:basket_add_list',
        'propensity_data:basket_add_detail',
        'propensity_data:sort_by',
        'propensity_data:image_picker',
        'propensity_data:account_page_click',
        'propensity_data:promo_banner_click',
        'propensity_data:detail_wishlist_add',
        'propensity_data:list_size_dropdown',
        'propensity_data:closed_minibasket_click',
        'propensity_data:checked_delivery_detail',
        'propensity_data:checked_returns_detail',
        'propensity_data:sign_in',
        'propensity_data:saw_checkout',
        'propensity_data:saw_sizecharts',
        'propensity_data:saw_delivery',
        'propensity_data:saw_account_upgrade',
        'propensity_data:saw_homepage',
        'propensity_data:device_mobile',
        'propensity_data:device_computer',
        'propensity_data:device_tablet',
        'propensity_data:returning_user',
        'propensity_data:loc_uk',
        'propensity_data:ordered'
    ],
).to_df()

predictors = training_df.drop(['propensity_data__ordered','UserID','event_timestamp'], axis=1)
targets = training_df['propensity_data__ordered']

X_train, X_test, y_train, y_test = train_test_split(predictors, targets, test_size=.3)

classifier=GaussianNB(var_smoothing=input_params['var_smoothing'])
classifier=classifier.fit(X_train,y_train)

predictions=classifier.predict(X_test)

conf_matrix=sklearn.metrics.confusion_matrix(y_test,predictions)
ac_score=sklearn.metrics.accuracy_score(y_test, predictions)

propensity_model_path = 'propensity.joblib'
joblib.dump(classifier, propensity_model_path)

artifacts = {
    "propensity_model": propensity_model_path,
    "feature_store": "feature_store.yaml"
}

### Custom MLflow model wrapper

In [9]:
#export 
import mlflow.pyfunc
class PropensityWrapper(mlflow.pyfunc.PythonModel):
    
    def load_context(self, context):
        import joblib
        from feast import FeatureStore
        import pandas as pd 
        import os
        
        self.model = joblib.load(context.artifacts["propensity_model"])
        self.store = FeatureStore(repo_path=os.environ.get("FEAST_REPO_PATH"))
        
    def predict(self, context, model_input):
        users=list(model_input.to_dict()["UserID"].values())
        
        feature_vector = self.store.get_online_features(
            feature_refs=[
                'propensity_data:basket_icon_click',
                'propensity_data:basket_add_list',
                'propensity_data:basket_add_detail',
                'propensity_data:sort_by',
                'propensity_data:image_picker',
                'propensity_data:account_page_click',
                'propensity_data:promo_banner_click',
                'propensity_data:detail_wishlist_add',
                'propensity_data:list_size_dropdown',
                'propensity_data:closed_minibasket_click',
                'propensity_data:checked_delivery_detail',
                'propensity_data:checked_returns_detail',
                'propensity_data:sign_in',
                'propensity_data:saw_checkout',
                'propensity_data:saw_sizecharts',
                'propensity_data:saw_delivery',
                'propensity_data:saw_account_upgrade',
                'propensity_data:saw_homepage',
                'propensity_data:returning_user',
                'propensity_data:loc_uk'
            ],
            entity_rows=[{"UserID": uid} for uid in users]
        ).to_dict()
        
        data=pd.DataFrame.from_dict(feature_vector)
        merged_data = pd.merge(model_input,data, how="inner", on=["UserID"], suffixes=('_x', '')).drop(['UserID'], axis=1)
        return self.model.predict(merged_data)

### Conda environment 

In [8]:
#export 
from sys import version_info
import sklearn
import cloudpickle

PYTHON_VERSION = f"{version_info.major}.{version_info.minor}.{version_info.micro}"

conda_env = {
    'channels': ['defaults'],
    'dependencies': [
        f'python={PYTHON_VERSION}',
        'pip',
        {
            'pip':[
                'mlflow',
                f'scikit-learn=={sklearn.__version__}',
                f'cloudpickle=={cloudpickle.__version__}'
            ]
        }
    ],
    'name': 'serving_propensity_model'
}

In [None]:
#export

import warnings
import sys

import pandas as pd
import numpy as np
from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score
from sklearn.model_selection import train_test_split
from sklearn.linear_model import ElasticNet
from urllib.parse import urlparse
import mlflow
import mlflow.sklearn
import mlflow.pyfunc

#conda_env=mlflow.pyfunc.get_default_conda_env()

with mlflow.start_run():
    
    #mlflow.log_param("var_smoothing", input_params['var_smoothing'])
    mlflow.log_metric("accuracy_score", ac_score)
    
    tracking_url_type_store = urlparse(mlflow.get_tracking_uri()).scheme

    if tracking_url_type_store != "file":
        mlflow.pyfunc.log_model("model",
                                 registered_model_name="propensity_model",
                                 python_model=PropensityWrapper(),
                                 artifacts=artifacts,
                                 conda_env=conda_env)
    else:
        mlflow.pyfunc.log_model("model",
                                 path=my_model_path,
                                 python_model=PropensityWrapper(),
                                 artifacts=artifacts,
                                 conda_env=conda_env)

## Export train code 

The above code will be exported to the python file using nbdev library (export, hide, default_exp keyworkd are needed ) 

In [7]:
#hide
from nbdev.export import *
notebook2script()

Converted mlflow_feast.ipynb.


## Train from command using mlflow

In [8]:
!mlflow run . --no-conda --experiment-name="propensity" -P var_smoothing=1e-9

INFO: 'propensity' does not exist. Creating a new experiment
2021/05/09 21:15:22 INFO mlflow.projects.utils: === Created directory /tmp/tmp_l3rorpu for downloading remote URIs passed to arguments of type 'path' ===
2021/05/09 21:15:22 INFO mlflow.projects.backend.local: === Running command 'python3 ./mlflow_feast/train.py --var_smoothing 1e-9' in run with ID 'ffaaf42e82d24f21a532b521343c8492' === 
Successfully registered model 'propensity_model'.
2021/05/09 21:15:35 INFO mlflow.tracking._model_registry.client: Waiting up to 300 seconds for model version to finish creation.                     Model name: propensity_model, version 1
Created version '1' of model 'propensity_model'.
2021/05/09 21:15:35 INFO mlflow.projects: === Run (ID 'ffaaf42e82d24f21a532b521343c8492') succeeded ===


In [10]:
%env REDIS_CONNECTION_STRING=redis.qooba.svc.cluster.local:6379,db=0
!feast materialize 2021-03-22T23:42:00 2021-06-22T23:42:00

env: REDIS_CONNECTION_STRING=redis.qooba.svc.cluster.local:6379,db=0
Materializing feature view [1m[32mpropensity_data[0m from [1m[32m2021-03-22 23:42:00+00:00[0m to [1m[32m2021-06-22 23:42:00+00:00[0m done!


# Test locally 

## Load from mlflow repository and test 

In [11]:
%env FEAST_REPO_PATH=.
import mlflow.sklearn
#sk_model = mlflow.pyfunc.load_model("runs:/96771d893a5e46159d9f3b49bf9013e2/sk_models")
#sk_model = mlflow.pyfunc.load_model("s3://mlflow/mlruns/2/5610d55090ec4b499a9cd14fd409c05d/artifacts/model")
#sk_model = mlflow.pyfunc.load_model("models:/propensity_model/13")
sk_model = mlflow.pyfunc.load_model("models:/propensity_model/Production")

env: FEAST_REPO_PATH=.


In [12]:
import pandas as pd

data=pd.DataFrame.from_dict(data=[
    {"UserID": "a720-6b732349-a720-4862-bd21-644732",
     'propensity_data:device_mobile': 1.0,
     'propensity_data:device_computer': 0.0,
     'propensity_data:device_tablet': 0.0
    }
])

res=sk_model.predict(data)
res

array([0])

# Test microservice

In [17]:
import requests
import json

url="http://mlflow-serving.qooba.svc.cluster.local:5000/invocations"

headers={
    'Content-Type': 'application/json; format=pandas-records'
}

data=[
    {"UserID": "a720-6b732349-a720-4862-bd21-644732",
     'propensity_data:device_mobile': 1.0,
     'propensity_data:device_computer': 0.0,
     'propensity_data:device_tablet': 0.0
    }
]

response=requests.post(url, data=json.dumps(data), headers=headers)
response.text

'[0]'