## Prerequisites
1. Create a kafka service named my-kafka on default-tenant namespace
2. creating a kafka input, output,error topics and broker address equal to the keys below, each value store with his unique keys as a project params:
    * input_topic 
    * output_topic 
    * error_topic 
    * broker 
    * All the params assign to the project YAML
3. Create a Redis service  - Run those commands:
    * `helm repo add bitnami https://charts.bitnami.com/bitnami`
    * `helm repo update`
    * `helm install -n default-tenant  redis-test  --set auth.enabled=false bitnami/redis`
4. Creating a redis service , saved as a project params with redis_path key - 
    * redis_path - redis://redis-test-master.default-tenant.svc.cluster.local:6379
4. clone this repo to your jupyter service - make sure you are running this notebook from the repo directory


In [None]:
import mlrun
from mlrun import feature_store as fs

In [None]:
from mlrun.datastore.targets import RedisNoSqlTarget, ParquetTarget
from mlrun.feature_store.steps import OneHotEncoder, MapValues, DateExtractor

In [None]:
import pandas as pd
import kafka

In [None]:
import os
import requests

In [None]:
project_name = 'kafka-fs-test'

In [None]:
project = mlrun.get_or_create_project(project_name,'./kafka_redis_fs/',user_project=True)

#### Creates Kafka Topics and Consumers

In [None]:
topic_in = project.get_param('input_topic')
topic_out = project.get_param('output_topic')
topic_err = project.get_param('error_topic')
brokers = project.get_param('broker')

In [None]:
kafka_admin_client = kafka.KafkaAdminClient(bootstrap_servers=brokers)

In [None]:
kafka_consumer_in = kafka.KafkaConsumer(topic_in,bootstrap_servers=brokers,auto_offset_reset="earliest",max_poll_records=3)

In [None]:
kafka_consumer_out = kafka.KafkaConsumer(topic_out,bootstrap_servers=brokers,auto_offset_reset="earliest",)

In [None]:
kafka_consumer_err = kafka.KafkaConsumer(topic_err,bootstrap_servers=brokers,auto_offset_reset="earliest",)

#### Creating FS

In [None]:
redis_path = project.get_param('redis_path')

In [None]:
source_path = './data/data_ingest.csv'

In [None]:
transactions_data = pd.read_csv(source_path)

In [None]:
transactions_data = transactions_data.sample(100)

In [None]:
#Sort value by time that the last time record will be the last row in the table
transactions_data = transactions_data.sort_values(['timestamp'])

In [None]:
# mlrun: start-code

In [None]:

def len_device(event):
    event['len_device']=len(event['device'])
    return event

def check_len_device(event):
    if event['len_device'] > 5:
        event['check_len_device'] = 'Bigger Then 5'
        return event 
    else:
        event['check_len_device'] = 'Smaller Or Equal to 5'
        return event

In [None]:
# mlrun: end-code

In [None]:
# Define and add value mapping
transaction_set = fs.FeatureSet("transactions", 
                                 entities=[fs.Entity("source")], 
                                 timestamp_key='timestamp', 
                                 description="transactions feature set")
main_categories = ["es_transportation", "es_health", "es_otherservices",
       "es_food", "es_hotelservices", "es_barsandrestaurants",
       "es_tech", "es_sportsandtoys", "es_wellnessandbeauty",
       "es_hyper", "es_fashion", "es_home", "es_contents",
       "es_travel", "es_leisure"]

# One Hot Encode the newly defined mappings
one_hot_encoder_mapping = {'category': main_categories,
                           'gender': list(transactions_data.gender.unique())}

# Define the graph steps
transaction_set.graph\
    .to(DateExtractor(parts = ['hour', 'day_of_week'], timestamp_col = 'timestamp'))\
    .to(MapValues(mapping={'age': {'U': '0'}}, with_original_features=True))\
    .to(OneHotEncoder(mapping=one_hot_encoder_mapping)).respond()


# Add aggregations for 2, 12, and 24 hour time windows
transaction_set.add_aggregation(name='amount',
                                column='amount',
                                operations=['avg','sum', 'count','max'],
                                windows=['2h', '12h', '24h'],
                                period='1h')


# Add the category aggregations over a 14 day window
for category in main_categories:
    transaction_set.add_aggregation(name=category,column=f'category_{category}',
                                    operations=['count'], windows=['14d'], period='1d')

# Add default (offline-parquet & online-nosql) targets
targets = [RedisNoSqlTarget(path=redis_path),ParquetTarget()]
transaction_set.set_targets(
    targets=targets,
    with_defaults=False,
)
# Plot the pipeline so we can see the different steps
transaction_set.plot(rankdir="LR", with_targets=True)

In [None]:
ingest_df = transaction_set.ingest(transactions_data,overwrite=True,infer_options=fs.InferOptions.default())

In [None]:
ingest_df

In [None]:
#check that all rows from the dataframe ingested
ingest_df.shape[0]==transactions_data.shape[0]

#### Creating a deploy_ingestion_service function

In [None]:
steps = mlrun.code_to_function('steps',kind='serving',image='mlrun/mlrun')

In [None]:
from mlrun.datastore import KafkaSource
source = KafkaSource(brokers=brokers,topics=topic_in)

ingest_service = transaction_set.deploy_ingestion_service(source=source,run_config=fs.RunConfig(steps))

#### Test ingest Data with a simple requests

In [None]:
import datetime
import time
import random
df_json=pd.read_json('./json_files/json_49991.json',orient='index',typ='series')
ingest_dict = df_json.to_dict()
ingest_dict
res=steps.invoke('/',ingest_dict)
print(res)

#### Send requests to the Kafka trigger

In [None]:
producer  = kafka.KafkaProducer(bootstrap_servers=[brokers])

In [None]:
from os import listdir
from os.path import isfile, join
onlyfiles = [f for f in listdir('./json_files/') if isfile(join('./json_files/', f))]

In [None]:
onlyfiles.remove('json_49991.json')

In [None]:
import datetime
s = datetime.datetime.now()
import json
for file in onlyfiles:
    df_json=pd.read_json(f'./json_files/{file}',typ='series')
    ingest_dict = df_json.to_dict()
    ms=json.dumps(ingest_dict).encode('utf-8')
    producer.send(topic=topic_in,value=ms)

In [None]:
counter = 0
for i in onlyfiles:
    record = next(kafka_consumer_in)
    counter += 1
    print(record)

In [None]:
#Check if all the inputs got into kafka
counter == len(onlyfiles)

#### Creating feature vectore

In [None]:
features = [
    "transactions.*",
]

vector = fs.FeatureVector("transactions-vector",features=features,description="this is my vector")
resp = vector.get_offline_features(with_indexes=True)
# Preview the dataset
resp.to_dataframe().tail(5)

In [None]:
svc = vector.get_online_feature_service()
resp = svc.get([{"source": 'C1145304322'}])
resp

In [None]:
transaction_set.purge_targets()