## Streaming data aggregation and ingestion to the online feature store

### **Note: this notebook needs to be run with a PySpark kernel to work properly!*

In this notebook, we'll use the Spark structured streaming for real time feature aggregations and writing them to the online and offline feature stores. This entails the following steps:

- Read data from the Kafka topic introduced in the previous notebook 
- Time window aggregations using Spark structured streaming
- Write aggregated features to the online Hops feature store!

## Import necessary libraries

In [1]:
import json

from math import radians

from hops import kafka, tls, hdfs

from pyspark.sql.functions import from_json, window, avg,count, stddev, explode, date_format,col, mean, pandas_udf, PandasUDFType
from pyspark.sql.types import StructField, StructType, StringType, DoubleType, TimestampType, LongType, IntegerType

Starting Spark application


ID,Application ID,Kind,State,Spark UI,Driver log
6,application_1653397815695_0016,pyspark,idle,Link,Link


SparkSession available as 'spark'.


In [2]:
# Name of the Kafka topic to read card transactions from. Introduced in previous notebook.
KAFKA_TOPIC_NAME = "credit_card_transactions"

## 1. Create a stream from the Kafka topic

Here we create a stream that is subscribed to the Kafka topic, define a scheme that is used to read from the topic, and define how the data should be deserialized.

In [3]:
df_read = spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", kafka.get_broker_endpoints()) \
  .option("kafka.security.protocol",kafka.get_security_protocol()) \
  .option("kafka.ssl.truststore.location", tls.get_trust_store()) \
  .option("kafka.ssl.truststore.password", tls.get_key_store_pwd()) \
  .option("kafka.ssl.keystore.location", tls.get_key_store()) \
  .option("kafka.ssl.keystore.password", tls.get_key_store_pwd()) \
  .option("kafka.ssl.key.password", tls.get_trust_store_pwd()) \
  .option("kafka.ssl.endpoint.identification.algorithm", "") \
  .option("startingOffsets", "earliest")\
  .option("subscribe", KAFKA_TOPIC_NAME) \
  .load()

In [4]:
# Define schema to read from Kafka topic 
parse_schema = StructType([StructField('tid', StringType(), True),
                           StructField('datetime', StringType(), True),
                           StructField('cc_num', StringType(), True),
                           StructField('category', StringType(), True),
                           StructField('amount', StringType(), True),
                           StructField('latitude', StringType(), True),
                           StructField('longitude', StringType(), True),
                           StructField('city', StringType(), True),
                           StructField('country', StringType(), True)]
                         )

In [5]:
# Deserialise data from and create streaming query
df_deser = df_read.selectExpr("CAST(value AS STRING)")\
                   .select(from_json("value", parse_schema).alias("value"))\
                   .select("value.tid", 
                           "value.datetime", 
                           "value.cc_num", 
                           "value.category", 
                           "value.amount", 
                           "value.latitude", 
                           "value.longitude",
                           "value.city",
                           "value.country")\
                   .selectExpr("CAST(tid as string)", 
                               "CAST(datetime as timestamp)", 
                               "CAST(cc_num as long)", 
                               "CAST(category as string)", 
                               "CAST(amount as double)",
                               "CAST(latitude as double)",
                               "CAST(longitude as double)",
                               "CAST(city as string)",
                               "CAST(country as string)")

In [6]:
df_deser.isStreaming

True

In [7]:
df_deser.printSchema() 

root
 |-- tid: string (nullable = true)
 |-- datetime: timestamp (nullable = true)
 |-- cc_num: long (nullable = true)
 |-- category: string (nullable = true)
 |-- amount: double (nullable = true)
 |-- latitude: double (nullable = true)
 |-- longitude: double (nullable = true)
 |-- city: string (nullable = true)
 |-- country: string (nullable = true)

In [None]:
schema = StructType([StructField('tid', StringType(), True),
                     StructField('datetime', TimestampType(), True),
                     StructField('cc_num', LongType(), True),
                     StructField('category', StringType(), True),
                     StructField('amount', DoubleType(), True),
                     StructField('latitude', DoubleType(), True),
                     StructField('longitude', DoubleType(), True),
                     StructField('city', StringType(), True),
                     StructField('country', StringType(), True),
                     StructField('loc_delta', DoubleType(), True)
                    ])

The Haversine function from notebook 1 is used to find differences in location between subsequent transactions.

In [8]:
 def haversine(pdf):
    import numpy as np
    long = pdf["longitude"].astype(np.float)
    lat = pdf["latitude"].astype(np.float)
    
    long_shifted = long.shift()
    lat_shifted = lat.shift()
    long_shifted = long_shifted.astype(np.float)
    lat_shifted = lat_shifted.astype(np.float)
    
    long_diff = long_shifted - long
    lat_diff = lat_shifted - lat    
    long_diff = long_diff.astype(np.float)
    lat_diff = lat_diff.astype(np.float)

    a = np.sin(lat_diff/2.0)**2
    b = np.cos(lat) * np.cos(lat_shifted) * np.sin(long_diff/2.0)**2
    pdf["loc_delta"]  = 2*np.arcsin(np.sqrt(a + b))

    return pdf

spark.udf.register("radians", radians)

<function radians at 0x7f4292ff8940>

## 2. Create aggregations with Spark streaming

Let's create windowing aggregations over 4-hour time windows using Spark streaming.

In [9]:
# 4 hour window
windowed4hTransactionDF =df_deser \
    .selectExpr("tid", 
                "datetime", 
                "cc_num", 
                "category", 
                "CAST(amount as double)",
                "radians(latitude) as latitude",
                "radians(longitude) as longitude",
                "city",
                "country")\
    .selectExpr("tid", 
                "datetime", 
                "cc_num", 
                "category", 
                "CAST(amount as double)",
                "CAST(latitude as double)",
                "CAST(longitude as double)",
                "city",
                "country")\
    .withWatermark("datetime", "480 minutes") \
    .groupBy(window("datetime", "240 minutes"), "cc_num")\
    .applyInPandas(haversine,schema = schema)\
    .selectExpr(
                "datetime", 
                "cc_num", 
                "category", 
                "CAST(amount as double)",
                "CAST(loc_delta as double)",
                "city",
                "country")\
    .withWatermark("datetime", "10 minutes") \
    .groupBy(window("datetime", "240 minutes"), "cc_num")\
    .agg(mean("amount").alias("trans_volume_mavg"), 
         stddev("amount").alias("trans_volume_mstd"), 
         count("cc_num").alias("trans_freq"),
         mean("loc_delta").alias("loc_delta_mavg")
        )\
    .selectExpr("cc_num", 
                "trans_volume_mavg", 
                "trans_volume_mstd", 
                "CAST(trans_freq as double)", 
                "loc_delta_mavg", 
                "unix_timestamp(window.end) as datetime")

Check that this is indeed a streaming object.

In [10]:
windowed4hTransactionDF.isStreaming

True

In [11]:
windowed4hTransactionDF.printSchema()

root
 |-- cc_num: long (nullable = true)
 |-- trans_volume_mavg: double (nullable = true)
 |-- trans_volume_mstd: double (nullable = true)
 |-- trans_freq: double (nullable = false)
 |-- loc_delta_mavg: double (nullable = true)
 |-- datetime: long (nullable = true)

## 3. Write aggregated features to the online Hopsworks feature store

First, establish a connection with your Hopsworks feature store.

In [12]:
import hsfs
connection = hsfs.connection()
# Get a reference to the feature store.
# You could also access shared feature stores by providing the feature store name.
fs = connection.get_feature_store()

Connected. Call `.close()` to terminate connection gracefully.

### Get feature groups from feature store

We want the feature group containing transactions aggregated over 4h time windows, `transactions_4h_aggs`, which was introduced in notebook 1.

In [13]:
transactions_4h_aggs = fs.get_feature_group("transactions_4h_aggs", version = 1)

### Insert streaming dataframes to the online feature group

Now we are ready to write this streaming dataframe to the online storage of the feature group.

In [14]:
query_4h = transactions_4h_aggs.insert_stream(windowed4hTransactionDF)



### Check if Spark streaming query is active

In [15]:
query_4h.isActive

True

We can also check the status of a query and if there are any exceptions thrown.

In [16]:
query_4h.status

{'message': 'Getting offsets from KafkaV2[Subscribe[credit_card_transactions]]', 'isDataAvailable': False, 'isTriggerActive': True}

In [18]:
query_4h.exception()

### Stop queries
If you are running this from a notebook, you can kill the Spark Structured Streaming Query by stopping the Kernel or by calling its `.stop()` method.

In [39]:
query_4h.stop()

## TODO (Davit): gif how to run offline backfill job