# Ifood Feature Store

 
- Uma abordagem de feature store usando Pyspark, uma estrutura simples, flexível e confiável.
- Pelo tempo de desenvolvimento limitado, optei por não inventar a roda, e utilizar um framework que abstrai boa parte dos componentes do Pyspark, como integração com kafka, transformações de DataFrames, e armazenamento dos dados. O butterfree é um framework open source e brasileiro, desenvolvido pelo QuintoAndar, e atende bem quando o escopo é de tamanho pequeno ou médio.
- 

In [1]:
# setup spark
from pyspark import SparkContext, SparkConf
from pyspark.sql import session, SparkSession
from pyspark.sql import HiveContext
# butterfree spark client
from butterfree.clients import SparkClient
import os
from cassandra.cluster import Cluster

from butterfree.extract import Source
from butterfree.extract.readers import FileReader
from butterfree.extract.readers import KafkaReader

from pyspark.sql.types import StringType, IntegerType, StructType, StructField, DoubleType, BinaryType, TimestampType, DateType

from pyspark.sql import functions as F

from butterfree.transform import FeatureSet
from butterfree.transform.features import Feature, KeyFeature, TimestampFeature
from butterfree.transform.transformations import SQLExpressionTransform, SparkFunctionTransform, CustomTransform
from butterfree.transform.transformations.h3_transform import H3HashTransform
from butterfree.constants import DataType
from butterfree.transform.utils import Function

from butterfree.load.writers import (
    HistoricalFeatureStoreWriter,
    OnlineFeatureStoreWriter,
)
from butterfree.load import Sink

from butterfree.configs.db import CassandraConfig
from butterfree.load.writers import OnlineFeatureStoreWriter
from butterfree.pipelines import FeatureSetPipeline

import pdb

In [2]:
os.environ['PYSPARK_SUBMIT_ARGS'] = '--master local[6] --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.0,com.datastax.spark:spark-cassandra-connector_2.12:3.0.0-beta pyspark-shell'

# Setting Spark Session

In [3]:
def spark_session():
    
    hive_metastore = "thrift://hive-metastore:9083"

    spark = (
        SparkSession
        .builder
        .appName("Feature Store")
        .config("spark.sql.warehouse.dir", hive_metastore)
        .config("spark.hive.metastore.uris", hive_metastore)
        .config("spark.executor.memory", "8g")
        .config("spark.executor.cores", "2")
        .config("spark.sql.shuffle.partitions", 10)
        .enableHiveSupport()
        .getOrCreate()
    )

    sc=spark.sparkContext

    spark_client = SparkClient()
    hive_context = HiveContext(sc)

    return spark_client, spark

In [4]:
spark_client, spark = spark_session()

# Understanding the Kafka source and they topics

In [5]:
client = "a49784be7f36511e9a6b60a341003dc2-1378330561.us-east-1.elb.amazonaws.com:9092"
topic = "de-order-events"

In [6]:
from kafka import KafkaConsumer, TopicPartition

nbrrecords = int(50)
nbrrecordsinserted = int(0)
nbrrecordsretreived = int(0)


consumer = KafkaConsumer(bootstrap_servers=client, group_id=None)

{'de-order-events', 'testTopic', 'queueing.transactions', 'de-order-aaa', 'order', 'de-order-status-events', 'numtest', 'client', 'my-topic', 'de-restaurant-events', 'de-consumer-events'}


In [7]:
consumer.topics()

{'client',
 'de-consumer-events',
 'de-order-aaa',
 'de-order-events',
 'de-order-status-events',
 'de-restaurant-events',
 'my-topic',
 'numtest',
 'order',
 'queueing.transactions',
 'testTopic'}

In [8]:
topics = ['de-restaurant-events','de-order-events','de-consumer-events']

for topic in topics:
    print(topic)
    tp = TopicPartition(topic,0)
    consumer.assign([tp])
    consumer.seek_to_beginning(tp)
    # obtain the last offset value
    lastOffset = consumer.end_offsets([tp])[tp]
    print(topic, lastOffset)

de-restaurant-events
de-restaurant-events 7292
de-order-events
de-order-events 3683040
de-consumer-events
de-consumer-events 809323


# Extract

## Columns to be considered

### orders
selected columns:

cpf,
customer_id,
items,
order_total_amount,
order_created_at,
order_scheduled,
delivery_address_latitude,
delivery_address_longitude,

### restaurants
selected columns:

average_ticket,
takeout_time,
delivery_time,

## Setting kafka readers (using native spark stream kafka reader)

In [9]:
#TimestampType, DateType

orders_schema = StructType([
                        StructField('cpf', IntegerType(), True),
                        StructField('customer_id', StringType(), True),
                        StructField('merchant_id', StringType(), True),
                        StructField('items', StringType(), True),
                        StructField('order_total_amount', DoubleType(), True),
                        StructField('delivery_address_latitude', DoubleType(), True),
                        StructField('delivery_address_longitude', DoubleType(), True),
                        StructField('order_scheduled', IntegerType(), True),
                        StructField('order_created_at', TimestampType(), True)
                ])

restaurants_schema = StructType([
    
                        StructField('id', StringType(), True),
                        StructField('average_ticket', DoubleType(), True),
                        StructField('takeout_time', IntegerType(), True),
                        StructField('delivery_time', IntegerType(), True),
                ])

In [10]:
orders_topic = "de-order-events"
restaurants_topic = "de-restaurant-events"

In [11]:
# Here I've setted startingOffset to run entire pipeline more fastly
# https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html

kafka_orders_reader = KafkaReader(
    id="order_events",
    topic=orders_topic,
    value_schema=orders_schema,
    connection_string=client,
    topic_options={"startingOffsets": """ {"de-order-events": {"0":3682000}} """},
    stream=False
)

kafka_restaurants_reader = KafkaReader(
    id="restaurants_events",
    topic=restaurants_topic,
    value_schema=restaurants_schema,
    connection_string=client,
    topic_options={"startingOffsets": """ {"de-restaurant-events": {"0":7200}} """},
    stream=False
)

readers = [
    kafka_orders_reader,
    kafka_restaurants_reader
    
]

query = """
select
    *
from
    order_events
    join restaurants_events
        on order_events.merchant_id = restaurants_events.id    
"""


source = Source(readers=readers, query=query)

In [33]:
source

<butterfree.extract.source.Source at 0x7ff73ed0d040>

In [12]:
source_df = source.construct(spark_client)

source_df = source_df.limit(40)

source_df.show()

+----+--------------------+--------------------+--------------------+------------------+-------------------------+--------------------------+---------------+-------------------+--------------------+--------------------+--------------+------------+-------------+--------------------+
| cpf|         customer_id|         merchant_id|               items|order_total_amount|delivery_address_latitude|delivery_address_longitude|order_scheduled|   order_created_at|      kafka_metadata|                  id|average_ticket|takeout_time|delivery_time|      kafka_metadata|
+----+--------------------+--------------------+--------------------+------------------+-------------------------+--------------------------+---------------+-------------------+--------------------+--------------------+--------------+------------+-------------+--------------------+
|null|0ddebc9b-39c1-4ea...|10554b0b-8a67-454...|[{"name": "Guioza...|              51.8|                   -47.92|                    -15.84|          

# Transform

In [13]:
from pyspark.sql.functions import col, size, split
from pyspark.sql import functions as F
from pyspark.sql import Window as W

## Writing transformations

In [14]:
# feature, custom transformations

days = lambda i: i * 86400 # convert days to seconds

def count_items(df, parent_feature, column):
    
    name = parent_feature.get_output_columns()[0]
    
    #df = df.withColumn(name, F.lit(10))
    df = df.withColumn(name, F.size(F.split(F.col(column), r"name")) - 1)
    
    return df

def avg_last_1_month(df, parent_feature, column):
    
    name = parent_feature.get_output_columns()[0]
    
    windowSpec = W.partitionBy("customer_id").orderBy(F.col("order_created_at").cast('long')).rangeBetween(-days(30), 0)
    
    df = df.withColumn(name, F.avg(column).over(windowSpec))

    return df

def divide(df, fs, column1, column2):
    name = fs.get_output_columns()[0]
    df = df.withColumn(name, F.col(column1) / F.col(column2))
    return df

def count_items_in_order():
    return Feature(
        name="items_qtd",
        description="count number of items in order",
        dtype=DataType.INTEGER,
        transformation=CustomTransform(
           transformer=count_items, column="items"
        )
    )

def avg_order_total_amount_from_last_1_month():
    return Feature(
        name="avg_order_amount_from_last_1_month_val",
        description="average order amount from last 1 month",
        dtype=DataType.DOUBLE,
        transformation=CustomTransform(
           transformer=avg_last_1_month, column="order_total_amount"
        )
    )

def order_total_amount():
    return Feature(
        name="order_total_amount",
        description="name_employer",
        dtype=DataType.STRING,
    )

def ratio_order_amount_and_items():
    return Feature(
        name="ratio_order_amount_by_items_val",
        description="ratio order amount by items count",
        dtype=DataType.DOUBLE,
        transformation=CustomTransform(
           transformer=divide, column1="order_total_amount", column2="items_qtd"
        )
    )

def ratio_order_amount_and_average_ticket():
    return Feature(
        name="ratio_order_amount_by_average_ticket_val",
        description="ratio order amount by restaurant average ticket",
        dtype=DataType.DOUBLE,
        transformation=CustomTransform(
           transformer=divide, column1="order_total_amount", column2="average_ticket"
        )
    )

## Adding transformations into a features set

In [16]:
# primary key
keys = [
    KeyFeature(
        name="customer_id",
        description="Unique identificator code for customer.",
        from_column="customer_id",
        dtype=DataType.STRING,
    )
]

ts_feature = TimestampFeature(from_column="order_created_at")

# features transformations
features = [
    #order_total_amount(),
    count_items_in_order(),
    avg_order_total_amount_from_last_1_month(),
    ratio_order_amount_and_items(),
    ratio_order_amount_and_average_ticket()
]

# joining all together
feature_set = FeatureSet(
    name="orders_feature_master_table",
    entity="orders_feature_master_table",  # entity: to which "business context" this feature set belongs
    description="Features describring events about ifood store.",
    keys=keys,
    timestamp=ts_feature,
    features=features,
)

In [17]:


feature_set_df = feature_set.construct(source_df, spark_client)

print(feature_set_df.show())




+--------------------+-------------------+---------------+----------------------------+-------------------------------+----------------------------------------+
|         customer_id|          timestamp|items_count_val|avg_order_amount_1_month_val|ratio_order_amount_by_items_val|ratio_order_amount_by_average_ticket_val|
+--------------------+-------------------+---------------+----------------------------+-------------------------------+----------------------------------------+
|0ddebc9b-39c1-4ea...|2019-01-12 23:31:47|              2|                        51.8|                           25.9|                      1.7266666666666666|
|1da1119e-cf94-47b...|2019-01-08 21:04:42|              2|                       168.3|                          84.15|                                   2.805|
|3590abad-efb5-462...|2019-01-08 22:08:14|              1|                        14.9|                           14.9|                                  0.3725|
|373e5d95-a3bd-484...|2019-01-18 0

# Load

In [24]:
"""
cluster = Cluster(['feature_store_cassandra'])
session = cluster.connect()
df = session.execute("DROP TABLE feature_store.orders_feature_master_table")
cluster.shutdown()
"""

## Cassandra DB for online feature store

In [25]:

def _cassandra_config():
    return CassandraConfig(
        username="cassandra", 
        password="cassandra",
        host="feature_store_cassandra",
        keyspace="feature_store",
        stream_checkpoint_path="./"
    )

def _create_table(feature_set_df):

    keyspace = "feature_store"
    table_name = "orders_feature_master_table"

    cassandra_mapping = {
            "TimestampType": "timestamp",
            "BinaryType": "boolean",
            "BooleanType": "boolean",
            "DateType": "timestamp",
            "DecimalType": "decimal",
            "DoubleType": "double",
            "FloatType": "float",
            "IntegerType": "int",
            "LongType": "bigint",
            "StringType": "text",
            "ArrayType(LongType,true)": "frozen<list<bigint>>",
            "ArrayType(StringType,true)": "frozen<list<text>>",
            "ArrayType(FloatType,true)": "frozen<list<float>>",
        }

    cluster = Cluster(['feature_store_cassandra'])
    session = cluster.connect()

    session.execute("CREATE KEYSPACE IF NOT EXISTS "+ keyspace +" WITH REPLICATION = { 'class' : 'NetworkTopologyStrategy', 'datacenter1' : 1 };")

    sql = ", ".join([feature.name +str(" ") + cassandra_mapping[str(feature.dataType)] for feature in feature_set_df.schema]).replace("customer_id text", "customer_id text PRIMARY KEY")
    
    sql = "CREATE TABLE IF NOT EXISTS {}.{} (" + sql + ");"
    sql = sql.format(keyspace, table_name)
    session.execute(sql)
    cluster.shutdown()

db_config = _cassandra_config()

_create_table(feature_set_df)

## Setting writes (Metastore and Cassandra) and combining into pipeline

In [26]:
writers = [HistoricalFeatureStoreWriter(debug_mode=True), OnlineFeatureStoreWriter(db_config=db_config)]

#writers = [HistoricalFeatureStoreWriter(debug_mode=True)]

sink = Sink(writers=writers)

pipeline = FeatureSetPipeline(source=source, feature_set=feature_set, sink=sink)

## Running pipeline end2end
They will run until data from queue are finished.

All data will be stored into historical features, otherwise, for online features, aways will be updated with most recent data, considering consumer_id (primary key)

In [27]:
pipeline.run()

# Consuming data from feature store

## Online features, from Cassandra DB

In [28]:
from cassandra.cluster import Cluster

cluster = Cluster(['feature_store_cassandra'])
session = cluster.connect()
df = session.execute("SELECT * FROM feature_store.orders_feature_master_table")
cluster.shutdown()
# Create data frame
df = spark.createDataFrame(df)
df.toPandas()

Unnamed: 0,customer_id,avg_order_amount_1_month_val,items_count_val,ratio_order_amount_by_average_ticket_val,ratio_order_amount_by_items_val,timestamp
0,c0844066-28e1-4658-b375-91fa8173c7e2,21.0,5,0.7,4.2,2019-01-20 00:19:09
1,b38b218e-3e22-44f2-adea-fd2c9dcfb431,46.0,6,1.15,7.666667,2019-01-08 16:30:26
2,6065622a-98db-4a65-93a1-ba89e9f7ab7d,45.8,2,0.763333,22.9,2019-01-31 00:01:05
3,af30d521-5a68-4ca7-8d66-7bd8e03d7bda,54.4,6,0.906667,9.066667,2019-01-30 17:20:51
4,79677fb6-31c7-4ddc-b35c-2afe15d1f96b,65.3,3,0.81625,21.766667,2018-12-29 16:16:57
5,3590abad-efb5-4622-a98c-ed70856006a7,14.9,1,0.3725,14.9,2019-01-08 22:08:14
6,373e5d95-a3bd-484e-927e-ac4c3bdbe1c6,44.0,6,0.733333,7.333333,2019-01-18 00:17:13
7,1da1119e-cf94-47bf-ae73-3b4f9d7b7196,168.3,2,2.805,84.15,2019-01-08 21:04:42
8,fa5789d2-2ff9-4b62-8bd4-41daac2bec63,73.5,4,1.225,18.375,2019-01-29 20:55:22
9,0ddebc9b-39c1-4ea0-ad4f-6fe68e7b26ec,51.8,2,1.726667,25.9,2019-01-12 23:31:47


## Historical features, from Hive Metastore 
Here I'm using PostgreSQL as storage, but it can be switched to S2 easily

In [29]:
spark.sql("show tables").show(truncate=False)

+--------+-----------------------------------------------------+-----------+
|database|tableName                                            |isTemporary|
+--------+-----------------------------------------------------+-----------+
|        |historical_feature_store__orders_feature_master_table|true       |
|        |order_events                                         |true       |
|        |restaurants_events                                   |true       |
+--------+-----------------------------------------------------+-----------+



In [31]:
spark.table("historical_feature_store__orders_feature_master_table").toPandas()

Unnamed: 0,customer_id,timestamp,items_count_val,avg_order_amount_1_month_val,ratio_order_amount_by_items_val,ratio_order_amount_by_average_ticket_val,year,month,day
0,c0844066-28e1-4658-b375-91fa8173c7e2,2019-01-20 00:19:09,5,21.0,4.2,0.7,2019,1,20
1,6065622a-98db-4a65-93a1-ba89e9f7ab7d,2019-01-31 00:01:05,2,45.8,22.9,0.763333,2019,1,31
2,0ddebc9b-39c1-4ea0-ad4f-6fe68e7b26ec,2019-01-12 23:31:47,2,51.8,25.9,1.726667,2019,1,12
3,79677fb6-31c7-4ddc-b35c-2afe15d1f96b,2018-12-29 16:16:57,3,65.3,21.766667,0.81625,2018,12,29
4,af30d521-5a68-4ca7-8d66-7bd8e03d7bda,2019-01-30 17:20:51,6,54.4,9.066667,0.906667,2019,1,30
5,373e5d95-a3bd-484e-927e-ac4c3bdbe1c6,2019-01-18 00:17:13,6,44.0,7.333333,0.733333,2019,1,18
6,fa5789d2-2ff9-4b62-8bd4-41daac2bec63,2019-01-29 20:55:22,4,73.5,18.375,1.225,2019,1,29
7,1da1119e-cf94-47bf-ae73-3b4f9d7b7196,2019-01-08 21:04:42,2,168.3,84.15,2.805,2019,1,8
8,3590abad-efb5-4622-a98c-ed70856006a7,2019-01-08 22:08:14,1,14.9,14.9,0.3725,2019,1,8
9,b38b218e-3e22-44f2-adea-fd2c9dcfb431,2019-01-08 16:30:26,6,46.0,7.666667,1.15,2019,1,8
