# Generate Data 

## Pandas dataframes 

In [1]:
import pandas as pd
import numpy as np
from datetime import datetime, timezone
from sklearn.datasets import make_hastie_10_2
import warnings
warnings.filterwarnings("ignore", category=DeprecationWarning)

def generate_entities(size):
    return np.random.choice(size, size=size, replace=False)

def generate_data(entities, year=2021, month=10, day=1) -> pd.DataFrame:
    n_samples=len(entities)
    X, y = make_hastie_10_2(n_samples=n_samples, random_state=0)
    df = pd.DataFrame(X, columns=["f0", "f1", "f2", "f3", "f4", "f5", "f6", "f7", "f8", "f9"])
    df["y"]=y
    df['entity_id'] = entities
    df['datetime'] = pd.to_datetime(
            np.random.randint(
                datetime(year, month, day, 0,tzinfo=timezone.utc).timestamp(),
                datetime(year, month, day, 22,tzinfo=timezone.utc).timestamp(),
                size=n_samples),
        unit="s", #utc=True
    )
    df['created'] = pd.to_datetime(
            datetime.now(), #utc=True
            )
    df['month_year'] = pd.to_datetime(datetime(year, month, day, 0, tzinfo=timezone.utc), utc=True)
    return df

entities=generate_entities(1000000)

entity_df = pd.DataFrame(data=entities, columns=['entity_id'])
entity_df["event_timestamp"]=datetime(2021, 1, 14, 23, 59, 42, tzinfo=timezone.utc)

## Create Delta Lake 

In [6]:
import time
for d in range(1,15):
    break # TMP :)
    print(f"DAY {d}")
    
    start_time = time.time()
    data=generate_data(entities,month=1, day=d)
    print(f"## GENERATED - {time.time() - start_time} s")
    
    start_time = time.time()
    spark.createDataFrame(data).write.format("delta").mode("append").partitionBy('month_year').save("./dataset/all")
    print(f"## DELTA CREATED - {time.time() - start_time} s")

DAY 1
## GENERATED - 1.9863653182983398 s
## DELTA CREATED - 118.46784734725952 s
DAY 2
## GENERATED - 2.2533488273620605 s
## DELTA CREATED - 113.56314516067505 s
DAY 3
## GENERATED - 2.090444326400757 s
## DELTA CREATED - 117.54949474334717 s
DAY 4
## GENERATED - 2.137775421142578 s
## DELTA CREATED - 113.69700503349304 s
DAY 5
## GENERATED - 2.0107674598693848 s
## DELTA CREATED - 112.49230170249939 s
DAY 6
## GENERATED - 2.04490327835083 s
## DELTA CREATED - 116.83132553100586 s
DAY 7
## GENERATED - 2.12314772605896 s
## DELTA CREATED - 114.3579614162445 s
DAY 8
## GENERATED - 2.1742141246795654 s
## DELTA CREATED - 115.68657755851746 s
DAY 9
## GENERATED - 2.001004695892334 s
## DELTA CREATED - 112.91505312919617 s
DAY 10
## GENERATED - 2.1537675857543945 s
## DELTA CREATED - 113.79394125938416 s
DAY 11
## GENERATED - 2.077458620071411 s
## DELTA CREATED - 116.54374861717224 s
DAY 12
## GENERATED - 2.2862818241119385 s
## DELTA CREATED - 119.25584959983826 s
DAY 13
## GENERATED - 

## Delta Lake history 

In [2]:
from delta.tables import *

deltaTable = DeltaTable.forPath(spark, "./dataset/all")

fullHistoryDF = deltaTable.history()
fullHistoryDF.show()

+-------+--------------------+------+--------+---------+--------------------+----+--------+---------+-----------+--------------+-------------+--------------------+------------+--------------------+
|version|           timestamp|userId|userName|operation| operationParameters| job|notebook|clusterId|readVersion|isolationLevel|isBlindAppend|    operationMetrics|userMetadata|          engineInfo|
+-------+--------------------+------+--------+---------+--------------------+----+--------+---------+-----------+--------------+-------------+--------------------+------------+--------------------+
|     13|2022-02-11 01:08:...|  null|    null|    WRITE|{mode -> Append, ...|null|    null|     null|         12|  Serializable|         true|{numFiles -> 12, ...|        null|Apache-Spark/3.2....|
|     12|2022-02-11 01:06:...|  null|    null|    WRITE|{mode -> Append, ...|null|    null|     null|         11|  Serializable|         true|{numFiles -> 12, ...|        null|Apache-Spark/3.2....|
|     11|2

# Feast Apply 

In [3]:
!rm -r .ipynb_checkpoints
from feast.repo_operations import apply_total
from feast.repo_config import load_repo_config
from pathlib import Path

repo = Path('/home/jovyan/feast-pyspark/feature_repo/')

repo_config = load_repo_config(repo)
apply_total(repo_config, repo, True)

Created entity [1m[32mentity_id[0m
Created feature view [1m[32mmy_statistics[0m

Created sqlite table [1m[32mrepo_my_statistics[0m





In [4]:
edf = entity_df[entity_df.entity_id<=500]
edf

Unnamed: 0,entity_id,event_timestamp
5173,386,2021-01-14 23:59:42+00:00
5598,388,2021-01-14 23:59:42+00:00
6891,25,2021-01-14 23:59:42+00:00
7249,196,2021-01-14 23:59:42+00:00
14082,97,2021-01-14 23:59:42+00:00
...,...,...
993487,256,2021-01-14 23:59:42+00:00
993530,462,2021-01-14 23:59:42+00:00
996428,391,2021-01-14 23:59:42+00:00
996937,200,2021-01-14 23:59:42+00:00


In [9]:
from feast import FeatureStore
import pandas as pd

import time
from feast_pyspark import SparkOfflineStore

SparkOfflineStore.spark = spark
store = FeatureStore(repo_path=".")

start_time = time.time()
training_df = store.get_historical_features(
    entity_df=edf, 
    features = [
        'my_statistics:f0',
        'my_statistics:f1',
        'my_statistics:f2',
        'my_statistics:f3',
        'my_statistics:f4',
        'my_statistics:f5',
        'my_statistics:f6',
        'my_statistics:f7',
        'my_statistics:f8',
        'my_statistics:f9',
        'my_statistics:y',
    ],
).to_df()


print("--- %s seconds ---" % (time.time() - start_time))

training_df

AnalysisException: `/home/jovyan/feast-spark/feature_repo/dataset/all` is not a Delta table.