In [None]:
%cd /home/jovyan/feast-dask
from IPython.core.display import HTML
HTML("""
<link rel="stylesheet" type="text/css" href="//fonts.googleapis.com/css?family=Bangers" />
<style>
h1 {font-family: Bangers !important;}
h2 {font-family: Bangers !important;}
h3 {font-family: Bangers !important;}
h4 {font-family: Bangers !important;}
h5 {font-family: Bangers !important;}
.controls
{
    display: none !important;
}

.slide-number
{
    display: none !important;
}
</style>
""")

## DOCKER

In [None]:
!docker run --name feast -d --rm -p 8888:8888 -p 8787:8787 qooba/feast:dask

# Generate data

In [None]:
import pandas as pd
import dask.dataframe as dd
import numpy as np
from datetime import datetime, timezone
from sklearn.datasets import make_hastie_10_2
import warnings
warnings.filterwarnings("ignore", category=DeprecationWarning)

In [None]:
def generate_entities(size):
    return np.random.choice(size, size=size, replace=False)

In [None]:
def generate_data(entities, year=2021, month=10, day=1) -> pd.DataFrame:
    n_samples=len(entities)
    X, y = make_hastie_10_2(n_samples=n_samples, random_state=0)
    df = pd.DataFrame(X, columns=["f0", "f1", "f2", "f3", "f4", "f5", "f6", "f7", "f8", "f9"])
    df["y"]=y
    df['entity_id'] = entities
    df['datetime'] = pd.to_datetime(
            np.random.randint(
                datetime(year, month, day, 0,tzinfo=timezone.utc).timestamp(),
                datetime(year, month, day, 22,tzinfo=timezone.utc).timestamp(),
                size=n_samples),
        unit="s", utc=True
    )
    df['created'] = pd.to_datetime(datetime.now())
    return df

In [None]:
entities=generate_entities(1000000)

In [None]:
entity_df = pd.DataFrame(data=entities, columns=['entity_id'])
entity_df["event_timestamp"]=datetime(2021, 1, 14, 23, 59, 42, tzinfo=timezone.utc)
#entity_df=entity_df[entity_df.entity_id < 500]
entity_df

In [None]:
all_data=[]

for d in range(1,15):
    data=generate_data(entities,month=1, day=d)
    all_data.append(data)
        
all_dd=pd.concat(all_data)
all_dd.set_index('datetime')
all_dd.to_parquet("./all_data.parquet")
all_dd

# Feast

In [None]:
%%writefile feature_store.yaml
project: default
registry: data/registry.db
provider: local
online_store:
    path: data/online_store.db

In [None]:
%%writefile features.py
from google.protobuf.duration_pb2 import Duration
from feast import Entity, Feature, FeatureView, ValueType
from feast import FileSource
from feast.data_format import ParquetFormat

my_stats = FileSource(
    path="./all_data.parquet",
    event_timestamp_column="datetime",
)
my_entity = Entity(name="entity_id", value_type=ValueType.INT64, description="entity id",)
mystats_view = FeatureView(
    name="my_statistics",
    entities=["entity_id"],
    ttl=Duration(seconds=3600*24*20),
    features=[
        Feature(name="f0", dtype=ValueType.FLOAT),
        Feature(name="f1", dtype=ValueType.FLOAT),
        Feature(name="f2", dtype=ValueType.FLOAT),
        Feature(name="f3", dtype=ValueType.FLOAT),
        Feature(name="f4", dtype=ValueType.FLOAT),
        Feature(name="f5", dtype=ValueType.FLOAT),
        Feature(name="f6", dtype=ValueType.FLOAT),
        Feature(name="f7", dtype=ValueType.FLOAT),
        Feature(name="f8", dtype=ValueType.FLOAT),
        Feature(name="f9", dtype=ValueType.FLOAT),
        Feature(name="y", dtype=ValueType.FLOAT),
    ], online=True, input=my_stats, tags={},)

In [None]:
!rm -r .ipynb_checkpoints
!feast apply

In [None]:
!pip install feast-schema

In [None]:
from feast_schema import FeastSchema
FeastSchema(".").show_schema()

# dask cluster

In [None]:
%%writefile run_dask.sh

dask-scheduler --host 0.0.0.0 --port 8786 --bokeh-port 8787 &


dask-worker --host 0.0.0.0 0.0.0.0:8786 --worker-port 8701 &
dask-worker --host 0.0.0.0 0.0.0.0:8786 --worker-port 8702 &
dask-worker --host 0.0.0.0 0.0.0.0:8786 --worker-port 8703 &
dask-worker --host 0.0.0.0 0.0.0.0:8786 --worker-port 8704 &


In [None]:
!bash ./run_dask.sh

In [None]:
from dask.distributed import Client

client = Client("tcp://0.0.0.0:8786")
client

# Feast historical features

In [None]:
%%time
from feast import FeatureStore

store = FeatureStore(repo_path=".")

training_df = store.get_historical_features(
    entity_df=entity_df, 
    feature_refs = [
        'my_statistics:f0',
        'my_statistics:f1',
        'my_statistics:f2',
        'my_statistics:f3',
        'my_statistics:f4',
        'my_statistics:f5',
        'my_statistics:f6',
        'my_statistics:f7',
        'my_statistics:f8',
        'my_statistics:f9',
        'my_statistics:y',
    ],
).to_df()
training_df

# FEAST BENCHMARK (pandas vs dask)

### pandas
```
CPU times: user 2min 51s, sys: 6.64 s, total: 2min 57s
Wall time: 2min 52s
```

### dask
```
CPU times: user 206 ms, sys: 25.9 ms, total: 232 ms
Wall time: 10.4 s
```

# Train model 

In [None]:
type(training_df)

In [None]:
from feast import FeatureStore

store = FeatureStore(repo_path=".")

training_dd = store.get_historical_features(
    entity_df=entity_df, 
    feature_refs = [
        'my_statistics:f0',
        'my_statistics:f1',
        'my_statistics:f2',
        'my_statistics:f3',
        'my_statistics:f4',
        'my_statistics:f5',
        'my_statistics:f6',
        'my_statistics:f7',
        'my_statistics:f8',
        'my_statistics:f9',
        'my_statistics:y',
    ],
).evaluation_function()
type(training_dd)

In [None]:
import joblib
from sklearn.ensemble import GradientBoostingClassifier
from dask_ml.model_selection import train_test_split

predictors = training_dd[["f0","f1","f2","f3","f4","f5","f6","f7","f8","f9"]]
targets = training_dd[["y"]]

X_train, X_test, y_train, y_test = train_test_split(predictors, targets, test_size=.3)

with joblib.parallel_backend('dask'):
    clf = GradientBoostingClassifier(n_estimators=100, learning_rate=1.0,
        max_depth=1, random_state=0, verbose=1).fit(X_train, y_train)

    score=clf.score(X_test, y_test)
    
score

# Feast Materialization 

In [None]:
%%time
from feast import FeatureStore
from datetime import datetime, timedelta
from feast import flags, flags_helper, utils

start_date=utils.make_tzaware(datetime.fromisoformat('2020-01-03T14:30:00'))
end_date=utils.make_tzaware(datetime.fromisoformat('2022-01-03T14:30:00'))

store = FeatureStore(repo_path=".")

store.materialize(start_date=start_date,end_date=end_date)

## CODE
https://github.com/qooba/feast/tree/feature/dask 

## FEAST PR

https://github.com/feast-dev/feast/pull/1954
