## 1) Deps setup

In [None]:
%pip install "feast[spark,aws]" xgboost

In [None]:
import numpy as np
import pandas as pd
import xgboost as xgb
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import StructType, IntegerType, DoubleType, TimestampType

import pandas as pd
import os

## 2) EDA

In [None]:
books = spark.read.parquet("s3://tecton-demo-data/apply-book-recsys/books.parquet")
users = spark.read.parquet("s3://tecton-demo-data/apply-book-recsys/users.parquet")
ratings = spark.read.parquet("s3://tecton-demo-data/apply-book-recsys/ratings.parquet")

In [None]:
%sql
-- CREATE DATABASE book_recsys_apply

In [None]:
# books.write.mode("overwrite").saveAsTable("book_recsys_apply.books")
# users.write.mode("overwrite").saveAsTable("book_recsys_apply.users")
# ratings.write.mode("overwrite").saveAsTable("book_recsys_apply.ratings")

In [None]:
users.printSchema()

In [None]:
books.printSchema()

In [None]:
display(users.groupby("country").count())

country,count
"thailand""",1
guernsey,3
aruba,3
usa (currently living in england),1
"north sumatera, indonesia",1
finland,158
"china""",1
australia,2672
greece,41
"wales, united kingdom",2


In [None]:
display(books)

isbn,book_title,book_author,year_of_publication,publisher,Summary,Language,Category,created_at,__index_level_0__
0000913154,The Way Things Work: An Illustrated Encyclopedia of Technology,C. van Amerongen (translator),1967,Simon & Schuster,"Scientific principles, inventions, and chemical, mechanical, and industrial processes are explained for the general reader with the help of drawings and diagrams",en,Technology & Engineering,1967-01-01T12:00:00.000+0000,948518
0001055607,Cereus Blooms At Night,Shani Mootoo,1994,Fairmount Books Ltd Remainders,"When Mala, old and notoriously crazy, arrives at the Paradise Alms House, she is placed in the tender care of Tyler, a gay male nurse, and an extraordinary relationship begins to develop.",en,Adult child abuse victims,1994-01-01T12:00:00.000+0000,826650
0001061127,CHESS FOR YOUNG BEGINNERS,William T. McLeod,1975,HarperCollins Publishers,A step by step guide to playing chess,en,Chess,1975-01-01T12:00:00.000+0000,908526
0001374362,When It's Time for Bed (Collins Baby & Toddler S.),Nick Butterworth,1994,Collins,Shows baby and his animal friends preparing for bedtime. 1-2 yrs.,en,Animals,1994-01-01T12:00:00.000+0000,1025127
0001711253,The Big Honey Hunt,Stan Berenstein,1942,HarperCollins Publishers,"Father Bear takes Small Bear on a honey hunt. After many problems, they go to their local store.",en,Bears,1942-01-01T12:00:00.000+0000,935227
000171421X,It's Not Easy Being a Bunny (A Beginner Book),Marilyn Sadler,1984,HarperCollins Publishers,P.J. Funnybunny did not like being a bunny.,en,Animals,1984-01-01T12:00:00.000+0000,1028471
000184251X,February's Road,John Verney,1987,HarperCollins Publishers,"The new road is to go right through the Callendar family's garden and February Callendar, while trying to change the Ministry's plans, discovers some very fishy things going on.",en,"Children's stories, English",1987-01-01T12:00:00.000+0000,907750
0001850121,A place called Lantern Light,Ellen Miller,1975,Collins,They were nearing the end of their long journey.,en,Children's stories,1975-01-01T12:00:00.000+0000,917054
0001856367,The Collins Book of Ballet and Dance,Jean Ure,1996,HarperCollins Publishers,A collection of short stories about young people dedicated to the world of dance___,en,Ballet,1996-01-01T12:00:00.000+0000,907137
0001935968,The Brambly Hedge Treasury,Jill Barklem,1991,HarperCollins Publishers,"For this is the home of mice of Brambly Hills""--Back cover.",en,Country life,1991-01-01T12:00:00.000+0000,999997


In [None]:
display(books.select("Category").distinct())

Category
Broadcast journalism.
Entrepreneurship
Drug abuse
"Yugoslav War, 1991-1995."
Infants (Newborn)
Organic gardening
Authors and publishers
Fishes
Paris (France)
Information society


In [None]:
display(books.select("Language").distinct())

Language
en
vi
ro
pl
pt
tl
gl
ko
ms
de


## 3) Init Feast Object

In [None]:
from feast import FeatureStore, RepoConfig
from feast.repo_config import RegistryConfig
from feast.infra.online_stores.dynamodb import DynamoDBOnlineStoreConfig
from feast.infra.offline_stores.contrib.spark_offline_store.spark import SparkOfflineStoreConfig
import warnings
warnings.filterwarnings("ignore", category=DeprecationWarning) 
warnings.filterwarnings("ignore", category=RuntimeWarning) 

repo_config = RepoConfig(
    registry="s3://[INSERT YOUR BUCKET]/feast-registry.db",
    project="feast_repo",
    provider="aws",
    offline_store=SparkOfflineStoreConfig(
      spark_conf={
        "spark.ui.enabled": "false",
        "spark.eventLog.enabled": "false",
        "spark.sql.catalogImplementation": "hive",
        "spark.sql.parser.quotedRegexColumnNames": "true",
        "spark.sql.session.timeZone": "UTC"
      }
    ),
    batch_engine={
      "type": "spark.engine",
      "partitions": 10
    },
    online_store=DynamoDBOnlineStoreConfig(region="us-west-1"),
    entity_key_serialization_version=2
)
store = FeatureStore(config=repo_config)

## 4) Create upstream transformations (some of which go into dbt)

In [None]:
from pyspark.ml.feature import StringIndexer

# Feature #0: Last n in vanilla Spark doesn't work without custom aggregation state
# Feature #1: Encode user country (indexer is used after Feast)
stringIndexer = StringIndexer(
  inputCol="country", 
  outputCol="country_idx", 
  stringOrderType="frequencyDesc"
)
stringIndexer.setHandleInvalid("keep")
userCountryIndexerModel = stringIndexer.fit(users)
display(userCountryIndexerModel.transform(users))

user_id,location,age,city,state,country,signup_date,__index_level_0__,country_idx
2,"stockton, california, usa",18.0,stockton,california,usa,2021-09-12T06:14:27.197+0000,0,0.0
8,"timmins, ontario, canada",34.74389988072476,timmins,ontario,canada,2020-02-10T07:22:09.616+0000,3,1.0
9,"germantown, tennessee, usa",34.74389988072476,germantown,tennessee,usa,2020-04-01T12:05:52.654+0000,32775,0.0
10,"albacete, wisconsin, spain",26.0,albacete,wisconsin,spain,2020-07-04T18:18:19.970+0000,193428,5.0
12,"fort bragg, california, usa",34.74389988072476,fort bragg,california,usa,2020-07-23T04:19:17.666+0000,196326,0.0
14,"mediapolis, iowa, usa",34.74389988072476,mediapolis,iowa,usa,2020-03-11T14:45:24.762+0000,196414,0.0
16,"albuquerque, new mexico, usa",34.74389988072476,albuquerque,new mexico,usa,2020-02-14T18:14:31.964+0000,384041,0.0
17,"chesapeake, virginia, usa",34.74389988072476,chesapeake,virginia,usa,2020-03-29T06:12:18.883+0000,426729,0.0
19,"weston, ,",14.0,weston,",",,2021-05-23T16:16:05.804+0000,450331,414.0
20,"langhorne, pennsylvania, usa",19.0,langhorne,pennsylvania,usa,2020-03-28T01:53:40.955+0000,451548,0.0


In [None]:
%sql
-- Feature #2: Bucketize user age (precomputed prior to Feast)
SELECT
  user_id,
  signup_date,
  CASE 
    WHEN age < 0 THEN -1
    WHEN age >= 0 AND age <= 10 THEN 0
    WHEN age > 10 AND age <= 20 THEN 1
    WHEN age > 20 AND age <= 30 THEN 2
    WHEN age > 30 AND age <= 40 THEN 3
    WHEN age > 40 AND age <= 50 THEN 4
    WHEN age > 50 AND age <= 60 THEN 5
    WHEN age > 60 AND age <= 70 THEN 6
    WHEN age > 70 AND age <= 80 THEN 7
    WHEN age > 80 AND age <= 90 THEN 8
    WHEN age > 90 AND age <= 100 THEN 9
    ELSE 10 
  END AS bucketedAge
FROM
  book_recsys_apply.users

user_id,signup_date,bucketedAge
2,2021-09-12T06:14:27.197+0000,1
8,2020-02-10T07:22:09.616+0000,3
9,2020-04-01T12:05:52.654+0000,3
10,2020-07-04T18:18:19.970+0000,2
12,2020-07-23T04:19:17.666+0000,3
14,2020-03-11T14:45:24.762+0000,3
16,2020-02-14T18:14:31.964+0000,3
17,2020-03-29T06:12:18.883+0000,3
19,2021-05-23T16:16:05.804+0000,1
20,2020-03-28T01:53:40.955+0000,1


In [None]:
# Feature #3: Book language model (to be used after Feast to convert strings to numbers)
from pyspark.ml.feature import StringIndexer
bookLanguageIndexer = StringIndexer(
  inputCol="Language", 
  outputCol="Language_idx", 
  stringOrderType="frequencyDesc"
)
bookLanguageIndexer.setHandleInvalid("keep")
bookLanguageIndexerModel = bookLanguageIndexer.fit(books)

display(bookLanguageIndexerModel.transform(books).select(["isbn", "Language", "Language_idx"]))

isbn,Language,Language_idx
0000913154,en,0.0
0001055607,en,0.0
0001061127,en,0.0
0001374362,en,0.0
0001711253,en,0.0
000171421X,en,0.0
000184251X,en,0.0
0001850121,en,0.0
0001856367,en,0.0
0001935968,en,0.0


In [None]:
%sql
-- Feature #4: Average book rating in last year (precomputed)
SELECT
  isbn,
  rating_timestamp,
  AVG(rating) OVER (
    PARTITION BY isbn
    ORDER BY rating_timestamp
    RANGE BETWEEN INTERVAL 365 day PRECEDING AND CURRENT ROW
  ) AS avg_yr_book_rating,
  COUNT(rating) OVER (
    PARTITION BY isbn
    ORDER BY rating_timestamp
    RANGE BETWEEN INTERVAL 365 day PRECEDING AND CURRENT ROW
  ) AS count_yr_book_rating
FROM book_recsys_apply.ratings

isbn,rating_timestamp,avg_yr_book_rating,count_yr_book_rating
0000913154,2022-09-15T11:12:16.381+0000,8.0,1
0001046438,2021-11-25T10:29:14.737+0000,9.0,1
0001046934,2021-05-25T02:33:18.293+0000,0.0,1
0001047213,2022-11-26T22:59:22.500+0000,9.0,1
0001047647,2021-08-23T15:36:27.677+0000,0.0,1
0001047868,2021-07-21T09:00:24.500+0000,0.0,1
0001048473,2022-07-30T17:44:49.510+0000,0.0,1
0001053744,2021-05-19T12:13:23.050+0000,5.0,1
0001372564,2020-10-13T22:30:58.980+0000,0.0,1
0001382381,2021-11-24T13:31:14.968+0000,0.0,1


In [None]:
%sql
-- Feature #5: Average user rating in last year (precomputed)
SELECT
  user_id,
  rating_timestamp,
  AVG(rating) OVER (
    PARTITION BY user_id
    ORDER BY to_timestamp(rating_timestamp)
    RANGE BETWEEN INTERVAL 365 day PRECEDING AND CURRENT ROW
  ) AS avg_yr_user_rating,
  COUNT(rating) OVER (
    PARTITION BY user_id
    ORDER BY to_timestamp(rating_timestamp)
    RANGE BETWEEN INTERVAL 365 day PRECEDING AND CURRENT ROW
  ) AS count_yr_user_rating
FROM book_recsys_apply.ratings

user_id,rating_timestamp,avg_yr_user_rating,count_yr_user_rating
9,2020-04-01T12:05:52.654+0000,6.0,1
9,2021-09-25T13:10:05.814+0000,0.0,1
9,2022-09-02T01:44:06.535+0000,0.0,2
19,2021-05-23T16:16:05.804+0000,7.0,1
22,2021-01-05T13:13:26.804+0000,7.0,1
22,2021-04-25T13:43:22.954+0000,3.5,2
22,2021-07-10T10:31:00.945+0000,2.333333333333333,3
22,2022-08-10T11:05:21.767+0000,0.0,1
26,2020-03-03T17:30:29.355+0000,9.0,1
26,2021-09-16T02:41:36.437+0000,10.0,1


In [None]:
%sql
-- Feature #6: Average user rating per book category (precomputed)
SELECT
  book_recsys_apply.ratings.isbn,
  rating_timestamp,
  AVG(rating) OVER (
    PARTITION BY "Category"
    ORDER BY rating_timestamp
    RANGE BETWEEN INTERVAL 365 day PRECEDING AND CURRENT ROW
  ) AS avg_yr_category_rating,
  COUNT(rating) OVER (
    PARTITION BY "Category"
    ORDER BY rating_timestamp
    RANGE BETWEEN INTERVAL 365 day PRECEDING AND CURRENT ROW
  ) AS count_yr_category_rating
FROM book_recsys_apply.ratings LEFT JOIN book_recsys_apply.books
ON book_recsys_apply.ratings.isbn = book_recsys_apply.books.isbn

isbn,rating_timestamp,avg_yr_category_rating,count_yr_category_rating
0395754909,2020-01-01T00:01:29.484+0000,0.0,1
0451151259,2020-01-01T00:01:32.965+0000,3.5,2
044023722X,2020-01-01T00:02:05.674+0000,5.333333333333333,3
0941711196,2020-01-01T00:03:15.002+0000,6.25,4
0451408934,2020-01-01T00:04:25.161+0000,5.0,5
1903998166,2020-01-01T00:08:16.869+0000,5.666666666666667,6
0769604285,2020-01-01T00:11:00.756+0000,4.857142857142857,7
0029023807,2020-01-01T00:11:23.173+0000,4.25,8
0312144075,2020-01-01T00:12:04.140+0000,4.666666666666667,9
0671679449,2020-01-01T00:12:51.210+0000,5.2,10


In [None]:
from pyspark.ml import Pipeline

# Create PipelineModel
pipeline = Pipeline(stages=[userCountryIndexerModel, bookLanguageIndexerModel])

In [None]:
# Create final tables (optional, done in dbt)
# avgBookRatings.write.mode("overwrite").saveAsTable("book_recsys_apply.agg_book_ratings")
# avgUserRatings.write.mode("overwrite").saveAsTable("book_recsys_apply.agg_user_ratings")
# transformedUsers.write.mode("overwrite").saveAsTable("book_recsys_apply.transformed_users")
# avgUserRatingsPerCategory.write.mode("overwrite").saveAsTable("book_recsys_apply.avg_book_category_ratings")

## 5) Retrieve features (train models)

In [None]:
display(ratings.head(10))

user_id,isbn,rating,rating_timestamp
2,195153448,0,2022-08-04T16:03:16.862+0000
8,2005018,5,2020-08-15T16:00:36.000+0000
11400,2005018,0,2022-07-19T08:58:57.021+0000
11676,2005018,8,2020-02-18T18:43:18.503+0000
41385,2005018,0,2020-11-22T14:33:19.457+0000
67544,2005018,8,2022-08-19T22:17:44.554+0000
85526,2005018,0,2021-03-31T07:40:07.129+0000
96054,2005018,0,2022-12-15T01:23:46.267+0000
116866,2005018,9,2022-10-03T14:47:45.018+0000
123629,2005018,9,2021-10-07T01:11:34.382+0000


In [None]:
ratings_df = ratings.toPandas()

### 5a) Model v1: book language, country, bucketed user age

In [None]:
features_v1 = store.get_historical_features(
  entity_df=ratings_df, 
  features=store.get_feature_service("model_v1")
).to_spark_df()

In [None]:
transformationModel = pipeline.fit(features_v1)

In [None]:
features_v1.printSchema()

In [None]:
# Run prior transformations
features_v1_transformed = transformationModel.transform(features_v1)
features_v1_transformed.printSchema()

In [None]:
# Prep training data for v1
features_v1_all = features_v1_transformed.toPandas()
train_v1_all = features_v1_all[features_v1_all["rating_timestamp"] <= "2022-01-01"]
train_Y = train_v1_all["rating"]
train_X = train_v1_all.drop(columns=["rating", "isbn", "rating_timestamp", "country", "Language", "Category"])
test_v1_all = features_v1_all[features_v1_all["rating_timestamp"] > "2022-01-01"]
test_Y = test_v1_all["rating"]
test_X = test_v1_all.drop(columns=["rating", "isbn", "rating_timestamp", "country", "Language", "Category"])

In [None]:
xgb_v1 = xgb.XGBRegressor(objective='reg:squarederror', random_state=233)
 
# Fitting the model
xgb_v1.fit(train_X, train_Y)

In [None]:
from sklearn.metrics import mean_squared_error as MSE

pred = xgb_v1.predict(test_X)
# RMSE Computation
rmse = np.sqrt(MSE(test_Y, pred))
print("RMSE : % f" %(rmse))

### 5b) Model v2: Add in avg book rating

In [None]:
features_v2 = store.get_historical_features(
  entity_df=ratings_df, 
  features=store.get_feature_service("model_v2")
).to_spark_df()

In [None]:
# Run prior transformations
features_v2_transformed = transformationModel.transform(features_v2)
features_v2_transformed.printSchema()

In [None]:
# Prep training data for v2
features_v2_all = features_v2_transformed.toPandas()
train_v2_all = features_v2_all[features_v2_all["rating_timestamp"] <= "2022-01-01"]
train_v2_Y = train_v2_all["rating"]
train_v2_X = train_v2_all.drop(columns=["rating", "isbn", "rating_timestamp", "country", "Language", "Category"])
test_v2_all = features_v2_all[features_v2_all["rating_timestamp"] > "2022-01-01"]
test_v2_Y = test_v2_all["rating"]
test_v2_X = test_v2_all.drop(columns=["rating", "isbn", "rating_timestamp", "country", "Language", "Category"])

In [None]:
xgb_v2 = xgb.XGBRegressor(objective='reg:squarederror', random_state=233)
 
# Fitting the model
xgb_v2.fit(train_v2_X, train_v2_Y)

In [None]:
from sklearn.metrics import mean_squared_error as MSE

pred = xgb_v2.predict(test_v2_X)
# RMSE Computation
rmse = np.sqrt(MSE(test_v2_Y, pred))
print("RMSE : % f" %(rmse))

### 5c) Model v3: Add in average user rating in last yr

In [None]:
features_v3 = store.get_historical_features(
  entity_df=ratings_df, 
  features=store.get_feature_service("model_v3")
).to_spark_df()
features_v3.cache()

In [None]:
# Run prior transformations
features_v3_transformed = transformationModel.transform(features_v3)
features_v3_transformed.printSchema()

# Prep training data for v3
features_v3_all = features_v3_transformed.toPandas()
train_v3_all = features_v3_all[features_v3_all["rating_timestamp"] <= "2022-01-01"]
train_v3_Y = train_v3_all["rating"]
train_v3_X = train_v3_all.drop(columns=["rating", "isbn", "rating_timestamp", "country", "Language", "Category"])
test_v3_all = features_v3_all[features_v3_all["rating_timestamp"] > "2022-01-01"]
test_v3_Y = test_v3_all["rating"]
test_v3_X = test_v3_all.drop(columns=["rating", "isbn", "rating_timestamp", "country", "Language", "Category"])

In [None]:
xgb_v3 = xgb.XGBRegressor(objective='reg:squarederror', random_state=233)
 
# Fitting the model
xgb_v3.fit(train_v3_X, train_v3_Y)

In [None]:
from sklearn.metrics import mean_squared_error as MSE

pred = xgb_v3.predict(test_v3_X)
# RMSE Computation
rmse = np.sqrt(MSE(test_v3_Y, pred))
print("RMSE : % f" %(rmse))

### 5d) Model 4: Add average rating per book category

In [None]:
# To train a model, you need labeled events, and point in time correct features. Feast provides the features.
# Labeled events: historical log of user ratings
# 1) Pull in user + book + rating features in that you need to train your model
features_v4 = store.get_historical_features(
  entity_df=ratings_df, 
  features=store.get_feature_service("model_v4")
).to_spark_df()
features_v4.cache()

# Run prior transformations
features_v4_transformed = transformationModel.transform(features_v4)
features_v4_transformed.printSchema()

# Prep training data for v4
features_v4_all = features_v4_transformed.toPandas()
train_v4_all = features_v4_all[features_v4_all["rating_timestamp"] <= "2022-01-01"]
train_v4_Y = train_v4_all["rating"]
train_v4_X = train_v4_all.drop(columns=["rating", "isbn", "rating_timestamp", "country", "Language", "Category"])
test_v4_all = features_v4_all[features_v4_all["rating_timestamp"] > "2022-01-01"]
test_v4_Y = test_v4_all["rating"]
test_v4_X = test_v4_all.drop(columns=["rating", "isbn", "rating_timestamp", "country", "Language", "Category"])

In [None]:
# 2) For an incoming transaction, pull the latest feature values for the associated user
# Note: Feast decoupled, so caller doesn't think about whether to pull this from a batch or streaming source
feature_vector = store.get_online_features(
    features=store.get_feature_service("model_v4"),
    entity_rows=[
        {
            "user_id": "213915",
            "isbn": "0061099686"
        }
    ],
).to_dict()
# model.predict(feature_vector)

def print_online_features(feature_vector):
    for key, value in sorted(feature_vector.items()):
        print(key, " : ", value)

print_online_features(feature_vector)

In [None]:
xgb_v4 = xgb.XGBRegressor(objective='reg:squarederror', random_state=233)
 
# Fitting the model
xgb_v4.fit(train_v4_X, train_v4_Y)

In [None]:
from sklearn.metrics import mean_squared_error as MSE

pred = xgb_v4.predict(test_v4_X)
# RMSE Computation
rmse = np.sqrt(MSE(test_v4_Y, pred))
print("RMSE : % f" %(rmse))

## 6) Moving the latest feature values into a low latency store
We do this by materializing batch precomputed features and also pushing stream precomputed features

In [None]:
from datetime import datetime
store.materialize(
  start_date=datetime(1900,4,20), 
  end_date=datetime(2022,4,21), 
)

## Computing + pushing streaming feature values

In [None]:
def raw_data_deserialization(df):
    from pyspark.sql.functions import col, from_json, from_utc_timestamp, when
    from pyspark.sql.types import StructType, StructField, StringType, DoubleType, TimestampType, BooleanType, IntegerType

    payload_schema = StructType([
        StructField('user_id', StringType(), False),
        StructField('isbn', StringType(), False),
        StructField('rating_timestamp', StringType(), False),
        StructField('rating', StringType(), False),
    ])

    return (
        df.selectExpr('cast (data as STRING) jsonData')
            .select(from_json('jsonData', payload_schema).alias('payload'))
            .select(
            col('payload.user_id').alias('user_id'),
            col('payload.isbn').alias('isbn'),
            from_utc_timestamp('payload.rating_timestamp', 'UTC').alias('rating_timestamp'),
            col('payload.rating').cast('long').alias('rating'),
        )
    )

def ingest_and_deserialize_ratings_stream_df():
    import datetime
    options = {
        "streamName": "book-ratings-stream",
        "region": "us-west-2",
        "shardFetchInterval": "30s",
        "initialPosition": "latest",
        "roleArn": "arn:aws:iam::[REDACTED]:role/REDACTED"
    }
    reader = spark.readStream.format("kinesis").options(**options)
    ratings_stream_df = reader.load()
    ratings_stream_df = raw_data_deserialization(ratings_stream_df)
    watermark = "{} seconds".format(datetime.timedelta(hours=25).seconds)
    return ratings_stream_df.withWatermark("rating_timestamp", watermark)
  
# 1) Ingest and deserialize streaming events from Kinesis stream
ratings_stream = ingest_and_deserialize_ratings_stream_df()

In [None]:
display(ratings_stream)

user_id,isbn,rating_timestamp,rating
2766,0943233437,2022-12-06T05:53:22.062+0000,0
183915,0440224845,2022-12-06T05:53:22.062+0000,7
86122,140003065X,2022-12-06T05:53:22.062+0000,0
108752,0061097845,2022-12-06T05:53:22.062+0000,0
254160,1878448900,2022-12-06T05:53:22.062+0000,0
234623,0316780154,2022-12-06T05:53:22.062+0000,0
264321,0380896761,2022-12-06T05:53:22.062+0000,0
72214,0771576900,2022-12-06T05:53:22.062+0000,10
97160,0425155153,2022-12-06T05:53:22.062+0000,0
190709,0441054668,2022-12-06T05:53:22.062+0000,7


In [None]:
from pyspark.sql.functions import *
import pandas as pd

# 2) Aggregate feature values
stream_agg = (
  ratings_stream
    .groupBy(
      "user_id", 
      window(timeColumn="rating_timestamp", windowDuration="1 day", slideDuration="10 minutes")
    )
    .agg(
        count("rating").alias("count_day_user_rating"),
        avg("rating").alias("avg_day_user_rating")
    )
    .select(
      "user_id", 
      col("window.end").alias("rating_timestamp"), 
      "count_day_user_rating", 
      "avg_day_user_rating"
    )
)

# 3) Push transformed features into Feast
from feast import FeatureStore
store = FeatureStore(config=repo_config)

def send_to_feast(df, epoch):
    pandas_df: pd.DataFrame = df.toPandas()
    if pandas_df.empty:
        return
    
    if "timestamp" in pandas_df:
        # Filter out only for the latest window for the user_id
        pandas_df = pandas_df.sort_values(by=["user_id","rating_timestamp"], ascending=False).groupby("user_id").nth(-1)
        store.push("agg_user_features_1d_source", pandas_df)

# 4) Launch streaming job to do all the above
query = (
    stream_agg
        .writeStream
        .outputMode("append") 
        .option("checkpointLocation", "/tmp/feast-book-recsys-workshop/")
        .trigger(processingTime="30 seconds")
        .foreachBatch(send_to_feast)
        .start()
)

# 5) (not shown) Orchestrate this job so it's self-healing. Monitor to ensure features land in Feast.
query.awaitTermination(timeout=30)
query.stop()