## Setup...

In [33]:
# utility functions
import shutil

from cassandra.cluster import Cluster
import pandas as pd

def rm_folder(path):
    shutil.rmtree(path, ignore_errors=True)

CASSANDRA_CLUSTER = Cluster(['127.0.0.1'], port=9042)
CASSANDRA_SESSION = CASSANDRA_CLUSTER.connect()

def cassandra_query(query):
    return pd.DataFrame(list(CASSANDRA_SESSION.execute(query)))

In [34]:
# testing online feature store on cassandra container (make cassandra-up)
cassandra_query("SELECT * FROM system_schema.tables WHERE keyspace_name = 'feature_store'")

Unnamed: 0,keyspace_name,table_name,bloom_filter_fp_chance,caching,cdc,comment,compaction,compression,crc_check_chance,dclocal_read_repair_chance,default_time_to_live,extensions,flags,gc_grace_seconds,id,max_index_interval,memtable_flush_period_in_ms,min_index_interval,read_repair_chance,speculative_retry
0,feature_store,user_chargebacks,0.01,"[keys, rows_per_partition]",,,"[class, max_threshold, min_threshold]","[chunk_length_in_kb, class]",1.0,0.1,0,[],(compound),864000,4f683170-7c92-11eb-8517-6d2c86545d91,2048,0,128,0.0,99PERCENTILE
1,feature_store,user_orders,0.01,"[keys, rows_per_partition]",,,"[class, max_threshold, min_threshold]","[chunk_length_in_kb, class]",1.0,0.1,0,[],(compound),864000,50b4d5b0-7c92-11eb-8517-6d2c86545d91,2048,0,128,0.0,99PERCENTILE


In [3]:
# setup spark
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession


conf = SparkConf().setAll(
    [
        ("spark.sql.session.timeZone", "UTC"),
        ("spark.sql.sources.partitionOverwriteMode", "dynamic"),
    ]
)
spark = (
    SparkSession.builder.config(conf=conf)
    .appName("legiti-challenge")
    .getOrCreate()
)

# UserOrdersPipeline Example

Showing the several interval executions for the pipeline that creates the feature set `user_orders` from `user` entity

In [4]:
# create pipeline from declaration
from legiti_challenge.feature_store_pipelines.user import UserOrdersPipeline
user_orders_pipeline = UserOrdersPipeline()

In [5]:
# clean local historical feature store table
user_orders_path = "data/feature_store/historical/user/user_orders"
rm_folder(user_orders_path)

In [6]:
# backfilling all historical data until 2020-05-10
user_orders_pipeline.run(end_date="2020-05-10")

In [7]:
# showing historical feature store results
spark.read.parquet(user_orders_path).orderBy("timestamp").toPandas()

Unnamed: 0,cpf,timestamp,cpf_orders__count_over_3_days_rolling_windows,cpf_orders__count_over_7_days_rolling_windows,cpf_orders__count_over_30_days_rolling_windows,year,month,day
0,x,2018-05-11,0,0,0,2018,5,11
1,z,2018-05-11,0,0,0,2018,5,11
2,y,2018-05-11,0,0,0,2018,5,11
3,x,2018-05-13,1,1,1,2018,5,13
4,x,2018-05-16,0,1,1,2018,5,16
5,x,2018-05-20,0,0,1,2018,5,20
6,x,2018-06-12,0,0,0,2018,6,12
7,y,2020-04-13,1,1,1,2020,4,13
8,y,2020-04-16,0,1,1,2020,4,16
9,y,2020-04-20,0,0,1,2020,4,20


In [8]:
# showing online feature store results
spark.table("online_feature_store__user_orders").orderBy("timestamp").toPandas()

Unnamed: 0,cpf,timestamp,cpf_orders__count_over_3_days_rolling_windows,cpf_orders__count_over_7_days_rolling_windows,cpf_orders__count_over_30_days_rolling_windows
0,z,2018-05-11,0,0,0
1,x,2018-06-12,0,0,0
2,y,2020-04-20,0,0,1


In [9]:
# daily run for the date 2020-05-11
user_orders_pipeline.run_for_date("2020-05-11")

In [10]:
# showing historical feature store results
spark.read.parquet(user_orders_path).orderBy("timestamp").toPandas()

Unnamed: 0,cpf,timestamp,cpf_orders__count_over_3_days_rolling_windows,cpf_orders__count_over_7_days_rolling_windows,cpf_orders__count_over_30_days_rolling_windows,year,month,day
0,x,2018-05-11,0,0,0,2018,5,11
1,z,2018-05-11,0,0,0,2018,5,11
2,y,2018-05-11,0,0,0,2018,5,11
3,x,2018-05-13,1,1,1,2018,5,13
4,x,2018-05-16,0,1,1,2018,5,16
5,x,2018-05-20,0,0,1,2018,5,20
6,x,2018-06-12,0,0,0,2018,6,12
7,y,2020-04-13,1,1,1,2020,4,13
8,y,2020-04-16,0,1,1,2020,4,16
9,y,2020-04-20,0,0,1,2020,4,20


3 new records were added to the table with feature states calculated just for the 2020-05-11 date. Records from the other table partitions were not touched.

In [11]:
# showing online feature store results
spark.table("online_feature_store__user_orders").orderBy("timestamp").toPandas()

Unnamed: 0,cpf,timestamp,cpf_orders__count_over_3_days_rolling_windows,cpf_orders__count_over_7_days_rolling_windows,cpf_orders__count_over_30_days_rolling_windows
0,x,2020-05-11,1,1,1
1,z,2020-05-11,0,0,0
2,y,2020-05-11,0,0,1


In [12]:
# daily run for the date 2020-05-12
user_orders_pipeline.run_for_date("2020-05-12")

In [13]:
# showing historical feature store results
spark.read.parquet(user_orders_path).orderBy("timestamp").toPandas()

Unnamed: 0,cpf,timestamp,cpf_orders__count_over_3_days_rolling_windows,cpf_orders__count_over_7_days_rolling_windows,cpf_orders__count_over_30_days_rolling_windows,year,month,day
0,x,2018-05-11,0,0,0,2018,5,11
1,z,2018-05-11,0,0,0,2018,5,11
2,y,2018-05-11,0,0,0,2018,5,11
3,x,2018-05-13,1,1,1,2018,5,13
4,x,2018-05-16,0,1,1,2018,5,16
5,x,2018-05-20,0,0,1,2018,5,20
6,x,2018-06-12,0,0,0,2018,6,12
7,y,2020-04-13,1,1,1,2020,4,13
8,y,2020-04-16,0,1,1,2020,4,16
9,y,2020-04-20,0,0,1,2020,4,20


In [14]:
# showing online feature store results
spark.table("online_feature_store__user_orders").orderBy("timestamp").toPandas()

Unnamed: 0,cpf,timestamp,cpf_orders__count_over_3_days_rolling_windows,cpf_orders__count_over_7_days_rolling_windows,cpf_orders__count_over_30_days_rolling_windows
0,x,2020-05-12,2,2,2
1,z,2020-05-12,1,1,1
2,y,2020-05-12,0,0,1


In [15]:
# daily run for the date 2020-05-13
user_orders_pipeline.run_for_date("2020-05-13")

In [16]:
# showing historical feature store results
spark.read.parquet(user_orders_path).orderBy("timestamp").toPandas()

Unnamed: 0,cpf,timestamp,cpf_orders__count_over_3_days_rolling_windows,cpf_orders__count_over_7_days_rolling_windows,cpf_orders__count_over_30_days_rolling_windows,year,month,day
0,x,2018-05-11,0,0,0,2018,5,11
1,z,2018-05-11,0,0,0,2018,5,11
2,y,2018-05-11,0,0,0,2018,5,11
3,x,2018-05-13,1,1,1,2018,5,13
4,x,2018-05-16,0,1,1,2018,5,16
5,x,2018-05-20,0,0,1,2018,5,20
6,x,2018-06-12,0,0,0,2018,6,12
7,y,2020-04-13,1,1,1,2020,4,13
8,y,2020-04-16,0,1,1,2020,4,16
9,y,2020-04-20,0,0,1,2020,4,20


In [17]:
# showing online feature store results
spark.table("online_feature_store__user_orders").orderBy("timestamp").toPandas()

Unnamed: 0,cpf,timestamp,cpf_orders__count_over_3_days_rolling_windows,cpf_orders__count_over_7_days_rolling_windows,cpf_orders__count_over_30_days_rolling_windows
0,x,2020-05-13,3,3,3
1,z,2020-05-13,1,1,1
2,y,2020-05-13,0,0,0


In [18]:
# daily run for the date 2020-05-14
user_orders_pipeline.run_for_date("2020-05-14")

In [19]:
# showing historical feature store results
spark.read.parquet(user_orders_path).orderBy("timestamp").toPandas()

Unnamed: 0,cpf,timestamp,cpf_orders__count_over_3_days_rolling_windows,cpf_orders__count_over_7_days_rolling_windows,cpf_orders__count_over_30_days_rolling_windows,year,month,day
0,x,2018-05-11,0,0,0,2018,5,11
1,z,2018-05-11,0,0,0,2018,5,11
2,y,2018-05-11,0,0,0,2018,5,11
3,x,2018-05-13,1,1,1,2018,5,13
4,x,2018-05-16,0,1,1,2018,5,16
5,x,2018-05-20,0,0,1,2018,5,20
6,x,2018-06-12,0,0,0,2018,6,12
7,y,2020-04-13,1,1,1,2020,4,13
8,y,2020-04-16,0,1,1,2020,4,16
9,y,2020-04-20,0,0,1,2020,4,20


In [20]:
# showing online feature store results
spark.table("online_feature_store__user_orders").orderBy("timestamp").toPandas()

Unnamed: 0,cpf,timestamp,cpf_orders__count_over_3_days_rolling_windows,cpf_orders__count_over_7_days_rolling_windows,cpf_orders__count_over_30_days_rolling_windows
0,x,2020-05-14,3,4,4
1,z,2020-05-14,1,1,1
2,y,2020-05-14,0,0,0


In [21]:
# backfilling from 2020-05-15 to 2020-07-17, this way completing all the data time line.
user_orders_pipeline.run(start_date="2020-05-15", end_date="2020-07-17")

In [22]:
# showing historical feature store results
spark.read.parquet(user_orders_path).orderBy("timestamp").toPandas()

Unnamed: 0,cpf,timestamp,cpf_orders__count_over_3_days_rolling_windows,cpf_orders__count_over_7_days_rolling_windows,cpf_orders__count_over_30_days_rolling_windows,year,month,day
0,x,2018-05-11,0,0,0,2018,5,11
1,z,2018-05-11,0,0,0,2018,5,11
2,y,2018-05-11,0,0,0,2018,5,11
3,x,2018-05-13,1,1,1,2018,5,13
4,x,2018-05-16,0,1,1,2018,5,16
5,x,2018-05-20,0,0,1,2018,5,20
6,x,2018-06-12,0,0,0,2018,6,12
7,y,2020-04-13,1,1,1,2020,4,13
8,y,2020-04-16,0,1,1,2020,4,16
9,y,2020-04-20,0,0,1,2020,4,20


In [23]:
# showing online feature store results
spark.table("online_feature_store__user_orders").orderBy("timestamp").toPandas()

Unnamed: 0,cpf,timestamp,cpf_orders__count_over_3_days_rolling_windows,cpf_orders__count_over_7_days_rolling_windows,cpf_orders__count_over_30_days_rolling_windows
0,y,2020-05-15,0,0,0
1,z,2020-06-11,0,0,0
2,x,2020-07-17,2,2,2


# UserChargebacksPipeline Example

Showing the pipeline run for all the datasets timeline for the feature set `user_chargeback` from `user` entity

In [24]:
# create pipeline from declaration
from legiti_challenge.feature_store_pipelines.user import UserChargebacksPipeline
user_chargebacks_pipeline = UserChargebacksPipeline()

In [25]:
# clean local historical feature store table
user_chargebacks_path = "data/feature_store/historical/user/user_chargebacks"
rm_folder(user_chargebacks_path)

In [26]:
# backfilling all historical data until 2020-07-17
user_chargebacks_pipeline.run(end_date="2020-07-17")

In [27]:
# showing historical feature store results
spark.read.parquet(user_chargebacks_path).orderBy("timestamp").toPandas()

Unnamed: 0,cpf,timestamp,cpf_chargebacks__count_over_3_days_rolling_windows,cpf_chargebacks__count_over_7_days_rolling_windows,cpf_chargebacks__count_over_30_days_rolling_windows,year,month,day
0,x,2020-05-12,0,0,0,2020,5,12
1,x,2020-05-13,1,1,1,2020,5,13
2,x,2020-05-16,0,1,1,2020,5,16
3,x,2020-05-20,0,0,1,2020,5,20
4,x,2020-06-12,0,0,0,2020,6,12
5,x,2020-07-02,1,1,1,2020,7,2
6,x,2020-07-05,0,1,1,2020,7,5
7,x,2020-07-09,0,0,1,2020,7,9
8,x,2020-07-15,1,1,2,2020,7,15


In [28]:
# showing online feature store results
spark.table("online_feature_store__user_chargebacks").orderBy("timestamp").toPandas()

Unnamed: 0,cpf,timestamp,cpf_chargebacks__count_over_3_days_rolling_windows,cpf_chargebacks__count_over_7_days_rolling_windows,cpf_chargebacks__count_over_30_days_rolling_windows
0,x,2020-07-15,1,1,2


# Creating the AwesomeDataset
Enriching order events with features from both feature sets

In [29]:
from legiti_challenge.dataset_pipelines import AwesomeDatasetPipeline
awesome_dataset_pipeline = AwesomeDatasetPipeline()

In [30]:
# creating dataset
awesome_dataset_pipeline.run()

In [31]:
# showing created CSV dataset
awesome_dataset_path = "data/datasets/awesome_dataset"
spark.read.option("header", True).csv(awesome_dataset_path).orderBy("timestamp").toPandas()

Unnamed: 0,order_id,timestamp,chargeback_timestamp,cpf,cpf_orders__count_over_3_days_rolling_windows,cpf_orders__count_over_7_days_rolling_windows,cpf_orders__count_over_30_days_rolling_windows,cpf_chargebacks__count_over_3_days_rolling_windows,cpf_chargebacks__count_over_7_days_rolling_windows,cpf_chargebacks__count_over_30_days_rolling_windows
0,a,2018-05-12T00:00:01.000Z,,x,0,0,0,0,0,0
1,b,2020-04-12T10:00:00.000Z,,y,0,0,0,0,0,0
2,c,2020-05-10T00:00:01.000Z,,x,0,0,0,0,0,0
3,d,2020-05-11T12:00:00.000Z,,z,0,0,0,0,0,0
4,e,2020-05-11T15:00:00.000Z,2020-05-12T20:00:00.000Z,x,1,1,1,0,0,0
5,f,2020-05-12T21:00:00.000Z,2020-07-01T11:00:00.000Z,x,2,2,2,0,0,0
6,g,2020-05-13T22:00:00.000Z,2020-07-14T12:00:00.000Z,x,3,3,3,1,1,1
7,h,2020-07-15T02:00:00.000Z,,x,0,0,0,0,0,0
8,i,2020-07-16T14:00:00.000Z,,x,1,1,1,1,1,2
