## Windowed aggregations using spark streaming and ingestion to the online feature store

### **Note: this notebook needs to be run with a PySpark kernel to work properly!*

In this notebook, we'll use the spark structured streaming for real time feature aggregations and writing them to the online and offline FS:

- Read data from the kafka topic .
- Time window aggregations using spark structured streaming.
- write aggregated feature to the online feature store


## Import necessary libraries

In [10]:
import json

from pyspark.sql.functions import from_json, window, avg,count, stddev, explode, date_format,col, mean
from pyspark.sql.types import StructField, StructType, StringType, DoubleType, TimestampType, LongType, IntegerType

from hops import kafka, tls, hdfs

In [11]:
# name of the kafka topic to read card transactions from
KAFKA_TOPIC_NAME = "credit_card_transactions"

## Create a stream from the kafka topic


In [12]:
df_read = spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", kafka.get_broker_endpoints()) \
  .option("kafka.security.protocol",kafka.get_security_protocol()) \
  .option("kafka.ssl.truststore.location", tls.get_trust_store()) \
  .option("kafka.ssl.truststore.password", tls.get_key_store_pwd()) \
  .option("kafka.ssl.keystore.location", tls.get_key_store()) \
  .option("kafka.ssl.keystore.password", tls.get_key_store_pwd()) \
  .option("kafka.ssl.key.password", tls.get_trust_store_pwd()) \
  .option("kafka.ssl.endpoint.identification.algorithm", "") \
  .option("startingOffsets", "earliest")\
  .option("subscribe", KAFKA_TOPIC_NAME) \
  .load()

In [13]:
# Define schema to read from kafka topic 
parse_schema = StructType([StructField('tid', StringType(), True),
                           StructField('datetime', StringType(), True),
                           StructField('cc_num', StringType(), True),
                           StructField('category', StringType(), True),
                           StructField('amount', StringType(), True),
                           StructField('latitude', StringType(), True),
                           StructField('longitude', StringType(), True),
                           StructField('city', StringType(), True),
                           StructField('country', StringType(), True)]
                         )

In [17]:
# Deserialise data from and create streaming query
df_deser = df_read.selectExpr("CAST(value AS STRING)")\
                   .select(from_json("value", parse_schema).alias("value"))\
                   .select("value.tid", 
                           "value.datetime", 
                           "value.cc_num", 
                           "value.category", 
                           "value.amount", 
                           "value.latitude", 
                           "value.longitude",
                           "value.city",
                           "value.country")\
                   .selectExpr("CAST(tid as string)", 
                               "CAST(datetime as timestamp)", 
                               "CAST(cc_num as long)", 
                               "CAST(category as string)", 
                               "CAST(amount as double)",
                               "CAST(latitude as double)",
                               "CAST(longitude as double)",
                               "CAST(city as string)",
                               "CAST(country as string)")

In [18]:
df_deser.isStreaming

True

In [19]:
df_deser.printSchema() 

root
 |-- tid: string (nullable = true)
 |-- datetime: timestamp (nullable = true)
 |-- cc_num: long (nullable = true)
 |-- category: string (nullable = true)
 |-- amount: double (nullable = true)
 |-- latitude: double (nullable = true)
 |-- longitude: double (nullable = true)
 |-- city: string (nullable = true)
 |-- country: string (nullable = true)

In [36]:
from math import radians

from pyspark.sql.functions import pandas_udf
from pyspark.sql.functions import PandasUDFType

@pandas_udf("datetime timestamp, cc_num long, category string, amount double, loc_delta double, city string, country string", PandasUDFType.GROUPED_MAP)
def haversine(pdf):
    long_shifted = pdf["longitude"].shift()
    lat_shifted = pdf["latitude"].shift()
    long_diff = long_shifted - long
    lat_diff = lat_shifted - lat

    a = np.sin(lat_diff/2.0)**2
    b = np.cos(lat) * np.cos(lat_shifted) * np.sin(long_diff/2.0)**2
    pdf["loc_delta"]  = 2*np.arcsin(np.sqrt(a + b))

    return pdf

spark.udf.register("radians", radians)

<function radians at 0x7fcdbf4a0f70>

## Create windowing aggregations over different time windows using spark streaming.

In [44]:
# 4 hour window
windowed4hTransactionDF =df_deser \
    .selectExpr("tid", 
                "datetime", 
                "cc_num", 
                "category", 
                "amount",
                "radians(latitude) as latitude",
                "radians(longitude) as longitude",
                "city",
                "country")\
    .withWatermark("datetime", "10 minutes") \
    .groupBy(window("datetime", "240 minutes"), "cc_num")\
    .apply(haversine)\
    .selectExpr(
                "datetime", 
                "cc_num", 
                "category", 
                "amount",
                "loc_delta",
                "city",
                "country")\
    .withWatermark("datetime", "10 minutes") \
    .groupBy(window("datetime", "240 minutes"), "cc_num")\
    .agg(mean("amount").alias("trans_volume_mavg"), 
         stddev("amount").alias("trans_volume_mstd"), 
         count("cc_num").alias("trans_freq"),
         mean("loc_delta").alias("loc_delta_mavg")
        )\
    .selectExpr("cc_num", "trans_volume_mavg", "trans_volume_mstd", "trans_freq", "loc_delta_mavg", "unix_timestamp(window.end) as datetime")

In [45]:
windowed4hTransactionDF.isStreaming

True

In [46]:
windowed4hTransactionDF.printSchema()

root
 |-- cc_num: long (nullable = true)
 |-- trans_volume_mavg: double (nullable = true)
 |-- trans_volume_mstd: double (nullable = true)
 |-- trans_freq: long (nullable = false)
 |-- loc_delta_mavg: double (nullable = true)
 |-- datetime: long (nullable = true)

### Establish a connection with your Hopsworks feature store.

In [47]:
import hsfs
connection = hsfs.connection()
# get a reference to the feature store, you can access also shared feature stores by providing the feature store name
fs = connection.get_feature_store()

Connected. Call `.close()` to terminate connection gracefully.

## Get feature groups from hopsworks feature store.

In [48]:
transactions_4h_aggs = fs.get_feature_group("transactions_4h_aggs", version = 1)

## Insert streaming dataframes to the online feature group

Now we are ready to write this streaming dataframe as a long living application to the online storage of the other feature group.

In [49]:
query_4h = transactions_4h_aggs.insert_stream(windowed4hTransactionDF)



### Check if spark streaming query is active

In [50]:
query_4h.isActive

True

#### We can also check status of a query and if there are any exceptions trown.

In [51]:
query_4h.status

{'message': 'Processing new data', 'isDataAvailable': True, 'isTriggerActive': True}

In [55]:
query_4h.exception()

StreamingQueryException('Writing job aborted.\n=== Streaming Query ===\nIdentifier: insert_stream_119_14_transactions_4h_aggs_1_onlinefs [id = ce179948-4241-402d-85fe-2aa13dd2c737, runId = d5b05c02-3781-41d9-9305-691ffb0f7824]\nCurrent Committed Offsets: {}\nCurrent Available Offsets: {KafkaV2[Subscribe[credit_card_transactions]]: {"credit_card_transactions":{"0":54000}}}\n\nCurrent State: ACTIVE\nThread State: RUNNABLE\n\nLogical Plan:\nWriteToMicroBatchDataSource org.apache.spark.sql.kafka010.KafkaStreamingWrite@2f466932\n+- Project [to_avro(concat(cast(cc_num#809L as string)), None) AS key#827, to_avro(struct(trans_volume_mstd, trans_volume_mstd#811, trans_volume_mavg, trans_volume_mavg#810, trans_freq, trans_freq#812L, loc_delta_mavg, loc_delta_mavg#813, cc_num, cc_num#809L, datetime, datetime#814L), Some({"type": "record", "name": "transactions_4h_aggs_1", "namespace": "fraud_stream_featurestore.db", "fields": [{"name": "trans_volume_mstd", "type": ["null", "double"]}, {"name": "t

### Lets check if data was ingested in to the online feature store

In [33]:
fs.sql("SELECT * FROM transactions_4h_aggs_agg_1",online=True).show(20,False)

+----------------+-----------------+------------------+------------------+
|cc_num          |num_trans_per_12h|avg_amt_per_12h   |stdev_amt_per_12h |
+----------------+-----------------+------------------+------------------+
|4444037300542691|7                |154.51857142857145|238.93552871214524|
|4609072304828342|7                |176.02714285714288|263.06316920176454|
|4161715127983823|10               |928.3030000000001 |1809.7934375689888|
|4223253728365626|13               |1201.686153846154 |2724.0564739389993|
|4572259224622748|9                |1291.5500000000002|2495.189283160699 |
|4436298663019939|11               |149.78636363636366|235.75729924109365|
|4159210768503456|6                |37.303333333333335|26.403001092047596|
|4231082502226286|10               |977.8430000000001 |2071.1095165208753|
|4090612125343330|15               |646.7259999999999 |1336.9214811370616|
|4416410688550228|11               |663.0627272727273 |1631.6188600717442|
|4853206196105715|10     

### Stop queries
If you are running this from a notebook, you can kill the Spark Structured Streaming Query by stopping the Kernel or by calling its `.stop()` method.

In [39]:
query_4h.stop()

## TODO (Davit): gif how to run offline backfill job