## Feature Store

When the data grows fast, performing ETL pipelines for multiple machine learning projects becomes expensive since repetitive operations. A feature store is a solution to this problem. It's possible to reuse the features in different projects and don't need to repeat similar processes in other projects.

This tutorial will cover how to create a feature store for Starbucks transactions. We will build ETLs pipelines using Butterfree library to upload data to a Feature Store so that data can be provided for machine learning algorithms, even for training or for prediction.

## Example:
Simulating the following scenario:

- We have a streaming JSON data source with events of starbucks orders being captured in real time.
- We have a csv data set with more information about drinks.


Objective: 

We want to parse the JSON from the streaming source, performing aggregations operations, and store all rows in a cheap structure(like s3) and get more recent transactions on a low latency database like Cassandra.

We desire to have an output with the schema:

- **id_employer**: int
- **name_employer**: string
- **name_client**: string
- **payment**: string
- **timestamp**: timestamp
- **product_name**: timestamp
- **product_size**: string
- **product_price**: int
- **percent_carbo**: float
- **final_price**: float


The following code blocks will show how to generate this feature set using Butterfree library using the above architecture:

- Apache Kafka as data sources (Streaming input data);

- A hive metastore to store metadata (like their schema and location) in a relational database.(For this tutorial we will use Postgresql)
- Apache Cassandra to store more recent data.
- Amazon S3 to store historical features or table views for debug mode.

<img src="arc.png">



<b>Historical Feature Store:</b> all features calculated over time;

<b>Online Feature Store:</b> hot/latest(last record by key) data stored at a low latency data storage(Cassandra).


## Observations

<b>In this tutorial, the historical data will be stored locally. However, you can easily add an s3 bucket. 
    </b>
    
  <b> Check the documentation here https://butterfree.readthedocs.io/en/latest/configuration.html?highlight=s3#historical-feature-store-spark-metastore-and-s3  </b>

<b>We will do a batch process, but you can switch to online processing with minor modifications
    </b>
    
<b> Check the documentation here https://butterfree.readthedocs.io/en/latest/stream.html </b>

### Download packages

In [3]:
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--master local[6] --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.0,com.datastax.spark:spark-cassandra-connector_2.12:3.0.0-beta pyspark-shell'

### Spark Instance

Connecting to hive metastore

In [5]:
# setup spark
from pyspark import SparkContext, SparkConf
from pyspark.sql import session, SparkSession
from pyspark.sql import HiveContext
# butterfree spark client
from butterfree.clients import SparkClient


spark = (
    SparkSession
    .builder
    .appName("Feature Store")
    .config("hive.metastore.uris", "thrift://hive-metastore:9083")
    .enableHiveSupport()
    .getOrCreate())

sc=spark.sparkContext

spark_client = SparkClient()
hive_context = HiveContext(sc)



### Extract

First, we need to define our data schemas.

In [6]:
from pyspark.sql.types import StringType, IntegerType, StructType, StructField, DoubleType

schema_kafka = StructType([StructField('name_employer', StringType(), True),
                          StructField('id_employer', IntegerType(), True),
                          StructField('name_client', StringType(), True),
                          StructField('transaction_id', IntegerType(), True),
                          StructField('payment', StringType(), True),
                          StructField('timestamp', StringType(), True),
                          StructField('product_name', StringType(), True),
                          StructField('product_size', StringType(), True),
                          StructField('product_price', DoubleType(), True),
                          StructField('percent_discount', IntegerType(), True)])


schema_file = StructType([StructField('name', StringType(), True),
                         StructField('calories', IntegerType(), True),
                         StructField('fat(g)', IntegerType(), True),
                         StructField('carb(g)', IntegerType(), True),
                         StructField('fiber(g)', IntegerType(), True),
                         StructField('protein', IntegerType(), True),
                         StructField('sodium', IntegerType(), True)])



Connecting with cassandra database

In [7]:
from butterfree.extract import Source
from butterfree.extract.readers import FileReader
from butterfree.extract.readers import KafkaReader

kafka_reader = KafkaReader(
    id="events",
    topic="queueing.transactions",
    value_schema=schema_kafka,
    connection_string="broker:9092",
    stream=False
)

readers = [
    kafka_reader,
    FileReader(id="nutrients", path="starbucks-menu-nutrition-drinks.csv", format="csv", schema=schema_file)
]

query = """
select
    *
from
    events
    join nutrients
        on events.product_name = nutrients.name
"""

source = Source(readers=readers, query=query)

In [8]:
source_df = source.construct(spark_client)

In [9]:
# showing that it is a Spark's streaming df
source_df.isStreaming

False

In [10]:
# schema
source_df.printSchema()

root
 |-- name_employer: string (nullable = true)
 |-- id_employer: integer (nullable = true)
 |-- name_client: string (nullable = true)
 |-- transaction_id: integer (nullable = true)
 |-- payment: string (nullable = true)
 |-- timestamp: string (nullable = true)
 |-- product_name: string (nullable = true)
 |-- product_size: string (nullable = true)
 |-- product_price: double (nullable = true)
 |-- percent_discount: integer (nullable = true)
 |-- kafka_metadata: struct (nullable = false)
 |    |-- key: string (nullable = true)
 |    |-- topic: string (nullable = true)
 |    |-- value: string (nullable = true)
 |    |-- partition: integer (nullable = true)
 |    |-- offset: long (nullable = true)
 |    |-- timestamp: timestamp (nullable = true)
 |    |-- timestampType: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- calories: integer (nullable = true)
 |-- fat(g): integer (nullable = true)
 |-- carb(g): integer (nullable = true)
 |-- fiber(g): integer (nullable = true

### Transform
- At the transform part, a set of `Feature` objects is declared.
- An Instance of `FeatureSet` is used to hold the features.
- A `FeatureSet` can only be created when it is possible to define a unique tuple formed by key columns and a time reference. This is an **architectural requirement** for the data. So least one `KeyFeature` and one `TimestampFeature` is needed.
- Every `Feature` needs a unique name, a description, and a data-type definition.

### Transform

In [11]:
from pyspark.sql import functions as F

from butterfree.transform import FeatureSet
from butterfree.transform.features import Feature, KeyFeature, TimestampFeature
from butterfree.transform.transformations import SQLExpressionTransform, SparkFunctionTransform, CustomTransform
from butterfree.transform.transformations.h3_transform import H3HashTransform
from butterfree.constants import DataType
from butterfree.transform.utils import Function


def divide(df, parent_feature, column1, column2):
    name = parent_feature.get_output_columns()[0]
    df = df.withColumn(name, F.col(column1) / F.col(column2))
    return df


keys = [
    KeyFeature(
        name="id_employer",
        description="Unique identificator code for employer.",
        from_column="id_employer",
        dtype=DataType.INTEGER,
    )
]

# from_ms = True because the data originally is not in a Timestamp format.
ts_feature = TimestampFeature(from_column="timestamp")

features = [
    Feature(
        name="name_employer",
        description="name_employer",
        dtype=DataType.STRING,
    ),
    Feature(
        name="name_client",
        description="name_client",
        dtype=DataType.STRING,
    ),
    Feature(
        name="product_name",
        description="product_name.",
        dtype=DataType.STRING,
    ),
    Feature(
        name="product_price",
        description="product_price.",
        dtype=DataType.FLOAT,
    ),
    Feature(
        name="payment",
        description="payment.",
        dtype=DataType.STRING,
    ),
    Feature(
        name="calories",
        description="calories",
        dtype=DataType.INTEGER,
    ),
    # custom transformation
    Feature(
           name="percent_carbo",
           description="percent_carbo",
           transformation=CustomTransform(transformer=divide, column1="carb(g)", column2="calories"), 
           dtype=DataType.FLOAT,
    ),
    # SQL transformation
    Feature(
           name="final_price",
           description="percent_carbo",
           transformation=SQLExpressionTransform("product_price * ((100 - percent_discount)/100)"), 
           dtype=DataType.FLOAT,
    ),
]

# events will be sotred in our metasotore as a table. You can acess starbucks_order_events 
feature_set = FeatureSet(
    name="starbucks_order_events",
    entity="events",  # entity: to which "business context" this feature set belongs
    description="Features describring events about starbucks store.",
    keys=keys,
    timestamp=ts_feature,
    features=features,
)

In [12]:
feature_set_df = feature_set.construct(source_df, spark_client)



In [13]:
# schema
feature_set_df.printSchema()

root
 |-- id_employer: integer (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- name_employer: string (nullable = true)
 |-- name_client: string (nullable = true)
 |-- product_name: string (nullable = true)
 |-- product_price: float (nullable = true)
 |-- payment: string (nullable = true)
 |-- calories: integer (nullable = true)
 |-- percent_carbo: double (nullable = true)
 |-- final_price: double (nullable = true)



### Load

- Using debug mode to create a temporary view with the historical data

In [14]:
from butterfree.load.writers import (
    HistoricalFeatureStoreWriter,
    OnlineFeatureStoreWriter,
)
from butterfree.load import Sink

from butterfree.configs.db import CassandraConfig
from butterfree.load.writers import OnlineFeatureStoreWriter

db_config  = CassandraConfig(
    username="cassandra", 
    password="cassandra",
    host="feature_store_cassandra",
    keyspace="feature_store",
    stream_checkpoint_path="./"
)

writers = [HistoricalFeatureStoreWriter(debug_mode=True),OnlineFeatureStoreWriter(db_config=db_config)]
sink = Sink(writers=writers)

### Cassandra tables

- Lets create a keyspace and a table to store the online features

In [15]:
from cassandra.cluster import Cluster

keyspace = "feature_store"
table_name = "starbucks_order_events"

cassandra_mapping = {
        "TimestampType": "timestamp",
        "BinaryType": "boolean",
        "BooleanType": "boolean",
        "DateType": "timestamp",
        "DecimalType": "decimal",
        "DoubleType": "double",
        "FloatType": "float",
        "IntegerType": "int",
        "LongType": "bigint",
        "StringType": "text",
        "ArrayType(LongType,true)": "frozen<list<bigint>>",
        "ArrayType(StringType,true)": "frozen<list<text>>",
        "ArrayType(FloatType,true)": "frozen<list<float>>",
    }

cluster = Cluster(['feature_store_cassandra'])
session = cluster.connect()

session.execute("CREATE KEYSPACE IF NOT EXISTS "+ keyspace +" WITH REPLICATION = { 'class' : 'NetworkTopologyStrategy', 'datacenter1' : 1 };")

sql = ", ".join([feature.name +str(" ") + cassandra_mapping[str(feature.dataType)] for feature in feature_set_df.schema]).replace("id_employer int", "id_employer int PRIMARY KEY")
sql = "CREATE TABLE IF NOT EXISTS {}.{} (" + sql + ");"
sql = sql.format(keyspace, table_name)
session.execute(sql)
cluster.shutdown()

### Final Pipeline

In [16]:
from butterfree.pipelines import FeatureSetPipeline

pipeline = FeatureSetPipeline(source=source, feature_set=feature_set, sink=sink)

In [17]:
# asinc run when creating an in memory streaming view for sink 
pipeline.run()

## Showing the results

### Online features for cassandra

In [18]:
print(">>> Online Feature Store  table:")

from cassandra.cluster import Cluster
cluster = Cluster(['feature_store_cassandra'])
session = cluster.connect()
df = session.execute("SELECT * FROM feature_store.starbucks_order_events")
cluster.shutdown()
# Create data frame
df = spark.createDataFrame(df)
df.toPandas()

>>> Online Feature Store  table:


Unnamed: 0,id_employer,calories,final_price,name_client,name_employer,payment,percent_carbo,product_name,product_price,timestamp
0,5,260,2.555,Mike Soto,Denver,credit,0.153846,Cinnamon Dolce Latte,3.65,2024-01-04 14:04:05
1,1,250,3.325,Brandon Novak,Alex,debit,0.14,Caramel Macchiato,4.75,2024-01-04 03:04:05
2,0,250,3.56,Diane Erickson,Alicia,credit,0.14,Caramel Macchiato,4.45,2024-01-04 20:04:05
3,2,360,3.325,Darryl Shea,Julian,credit,0.147222,White Chocolate Mocha,4.75,2024-01-03 19:04:05
4,4,250,3.375,Brooke Fowler,Mark,cash,0.14,Caramel Macchiato,3.75,2024-01-04 18:04:05
5,6,260,2.975,Brittany Ford,Luiza,credit,0.153846,Cinnamon Dolce Latte,4.25,2024-01-04 16:04:05
6,3,260,3.72,Shannon Tanner,Cassandra,cash,0.153846,Cinnamon Dolce Latte,4.65,2024-01-04 19:04:05


## Acessging metastore

### Historical features

In [19]:
print(">>> Historical Feature Store:")
spark.table("historical_feature_store__starbucks_order_events").toPandas()

>>> Historical Feature Store:


Unnamed: 0,id_employer,timestamp,name_employer,name_client,product_name,product_price,payment,calories,percent_carbo,final_price,year,month,day
0,1,2023-05-18 01:04:05,Alex,Aaron Murphy,Cinnamon Dolce Latte,3.65,cash,260,0.153846,2.920,2023,5,18
1,1,2021-05-14 17:04:05,Alex,Adrian Hernandez,Cinnamon Dolce Latte,4.25,cash,260,0.153846,3.400,2021,5,14
2,1,2021-07-24 03:04:05,Alex,Adrian Miller,White Chocolate Mocha,4.45,credit,360,0.147222,3.115,2021,7,24
3,1,2021-12-03 06:04:05,Alex,Alexander Adams,White Chocolate Mocha,4.45,credit,360,0.147222,3.560,2021,12,3
4,1,2023-05-18 03:04:05,Alex,Alexander Wheeler,Caramel Macchiato,3.75,debit,250,0.140000,3.000,2023,5,18
...,...,...,...,...,...,...,...,...,...,...,...,...,...
41228,0,2023-10-06 20:04:05,Alicia,Stephanie Gordon,Caramel Macchiato,4.45,debit,250,0.140000,3.115,2023,10,6
41229,0,2022-03-13 15:04:05,Alicia,Stephanie Stewart,Cinnamon Dolce Latte,4.65,debit,260,0.153846,4.185,2022,3,13
41230,0,2023-08-16 23:04:05,Alicia,Tiffany Macdonald,Caramel Macchiato,4.75,cash,250,0.140000,3.325,2023,8,16
41231,0,2021-02-21 06:04:05,Alicia,Todd Stafford,Cinnamon Dolce Latte,3.65,cash,260,0.153846,2.555,2021,2,21


### MetaStore
If you not specify a s3 bucket, then historical data will be store into metastore

In [20]:
hive_context.sql("select * from historical_feature_store__starbucks_order_events;").show()


+-----------+-------------------+-------------+-----------------+--------------------+-------------+-------+--------+-------------------+------------------+----+-----+---+
|id_employer|          timestamp|name_employer|      name_client|        product_name|product_price|payment|calories|      percent_carbo|       final_price|year|month|day|
+-----------+-------------------+-------------+-----------------+--------------------+-------------+-------+--------+-------------------+------------------+----+-----+---+
|          1|2023-05-18 01:04:05|         Alex|     Aaron Murphy|Cinnamon Dolce Latte|         3.65|   cash|     260|0.15384615384615385|2.9200000762939453|2023|    5| 18|
|          1|2021-05-14 17:04:05|         Alex| Adrian Hernandez|Cinnamon Dolce Latte|         4.25|   cash|     260|0.15384615384615385|3.4000000000000004|2021|    5| 14|
|          1|2021-07-24 03:04:05|         Alex|    Adrian Miller|White Chocolate M...|         4.45| credit|     360|0.14722222222222223|3.1