## Connect to the Feature Store

In [1]:
import hsfs
import json
from hops import kafka, tls
from hsfs import engine

from pyspark.sql.functions import from_json, window, avg,count, stddev, explode, date_format,col
from pyspark.sql.types import StructField, StructType, StringType, DoubleType, TimestampType, LongType, IntegerType

connection = hsfs.connection()
fs = connection.get_feature_store()

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log
61,application_1618238932364_0066,pyspark,idle,Link,Link


SparkSession available as 'spark'.
Connected. Call `.close()` to terminate connection gracefully.

## Create a stream from the kafka topic


In [2]:
parse_schema = StructType([StructField('tid', StringType(), True),
                         StructField('datetime', StringType(), True),
                         StructField('cc_num', StringType(), True),
                         StructField('amount', StringType(), True)])


In [3]:
df_read = spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", kafka.get_broker_endpoints()) \
  .option("kafka.security.protocol",kafka.get_security_protocol()) \
  .option("kafka.ssl.truststore.location", tls.get_trust_store()) \
  .option("kafka.ssl.truststore.password", tls.get_key_store_pwd()) \
  .option("kafka.ssl.keystore.location", tls.get_key_store()) \
  .option("kafka.ssl.keystore.password", tls.get_key_store_pwd()) \
  .option("kafka.ssl.key.password", tls.get_trust_store_pwd()) \
  .option("kafka.ssl.endpoint.identification.algorithm", "") \
  .option("startingOffsets", "earliest")\
  .option("subscribe", "credit_card_transactions_2") \
  .load()

df_deser = df_read.selectExpr("CAST(value AS STRING)")\
.select(from_json("value", parse_schema).alias("value"))\
.select("value.tid", "value.datetime", "value.cc_num", "value.amount")\
.selectExpr("CAST(tid as string)", "CAST(datetime as string)", "CAST(cc_num as long)", "CAST(amount as double)")

In [4]:
df_deser.isStreaming

True

In [5]:
df_deser.printSchema()

root
 |-- tid: string (nullable = true)
 |-- datetime: string (nullable = true)
 |-- cc_num: long (nullable = true)
 |-- amount: double (nullable = true)

## Create windowing aggregations 

In [6]:
windowed10mSignalDF =df_deser \
    .selectExpr("CAST(tid as string)", "CAST(datetime as timestamp)", "CAST(cc_num as long)", "CAST(amount as double)")\
    .withWatermark("datetime", "60 minutes") \
    .groupBy(window("datetime", "10 minutes"), "cc_num") \
    .agg(avg("amount").alias("avg_amt_per_10m"), stddev("amount").alias("stdev_amt_per_10m"), count("cc_num").alias("num_trans_per_10m"))\
    .select("cc_num", "num_trans_per_10m", "avg_amt_per_10m", "stdev_amt_per_10m")
#    .select("cc_num", "avg_amt_per_10m", "num_trans_per_10m", date_format("window.start","yyyy-MM-dd HH:mm:ss").alias("start"), date_format("window.end","yyyy-MM-dd HH:mm:ss").alias("end"))
#    .selectExpr("cc_num", "avg_amt_per_10m", "num_trans_per_10m", "CAST(window.start as timestamp) window", "CAST(window.end as timestamp)")

In [7]:
windowed10mSignalDF.isStreaming

True

In [8]:
windowed10mSignalDF.printSchema()

root
 |-- cc_num: long (nullable = true)
 |-- num_trans_per_10m: long (nullable = false)
 |-- avg_amt_per_10m: double (nullable = true)
 |-- stdev_amt_per_10m: double (nullable = true)

In [9]:
windowed1hSignalDF = \
  df_deser \
    .selectExpr("CAST(tid as string)", "CAST(datetime as timestamp)", "CAST(cc_num as long)", "CAST(amount as double)")\
    .withWatermark("datetime", "60 minutes") \
    .groupBy(window("datetime", "60 minutes"), "cc_num") \
    .agg(avg("amount").alias("avg_amt_per_1h"), stddev("amount").alias("stdev_amt_per_1h"), count("cc_num").alias("num_trans_per_1h"))\
    .select("cc_num", "num_trans_per_1h", "avg_amt_per_1h", "stdev_amt_per_1h")

In [10]:
windowed1hSignalDF.isStreaming

True

In [11]:
windowed1hSignalDF.printSchema()

root
 |-- cc_num: long (nullable = true)
 |-- num_trans_per_1h: long (nullable = false)
 |-- avg_amt_per_1h: double (nullable = true)
 |-- stdev_amt_per_1h: double (nullable = true)

In [12]:
windowed12hSignalDF = \
  df_deser \
    .selectExpr("CAST(tid as string)", "CAST(datetime as timestamp)", "CAST(cc_num as long)", "CAST(amount as double)")\
    .withWatermark("datetime", "60 minutes") \
    .groupBy(window("datetime", "12 hours"), "cc_num") \
    .agg(avg("amount").alias("avg_amt_per_12h"), stddev("amount").alias("stdev_amt_per_12h"), count("cc_num").alias("num_trans_per_12h"))\
    .select("cc_num", "num_trans_per_12h", "avg_amt_per_12h", "stdev_amt_per_12h")

In [60]:
windowed12hSignalDF.isStreaming

True

In [61]:
windowed12hSignalDF.printSchema()

root
 |-- cc_num: long (nullable = true)
 |-- num_trans_per_12h: long (nullable = false)
 |-- avg_amt_per_12h: double (nullable = true)
 |-- stdev_amt_per_12h: double (nullable = true)

## Get feature groups

In [15]:
card_transactions = fs.get_feature_group("card_transactions", version = 6)
card_transactions_10m_agg = fs.get_feature_group("card_transactions_10m_agg", version = 6)
card_transactions_1h_agg = fs.get_feature_group("card_transactions_1h_agg", version = 6)
card_transactions_12h_agg = fs.get_feature_group("card_transactions_12h_agg", version = 6)

## Insert the stream to the online feature group

Now we are ready to write this streaming dataframe as a long living application to the online storage of the other feature group.

In [16]:
query_transactions = card_transactions.insert_stream(df_deser)



In [17]:
query_10m = card_transactions_10m_agg.insert_stream(windowed10mSignalDF)



In [18]:
query_1h = card_transactions_1h_agg.insert_stream(windowed1hSignalDF)



In [19]:
query_12h = card_transactions_12h_agg.insert_stream(windowed12hSignalDF)



## Check if spark streaming query is active

In [39]:
query_transactions.isActive

True

In [40]:
query_10m.isActive

True

In [41]:
query_10m.status

{'message': 'Waiting for data to arrive', 'isDataAvailable': False, 'isTriggerActive': False}

In [42]:
query_10m.exception()

In [43]:
query_1h.isActive

True

In [44]:
query_12h.isActive

True

## Check if data was ingested in oline feature store

In [45]:
fs.sql("SELECT * FROM card_transactions_6",online=True).show(20,False)

+--------------------------------+-------------------+----------------+-------+
|tid                             |datetime           |cc_num          |amount |
+--------------------------------+-------------------+----------------+-------+
|a7da91ddadd1f991f99cd7a0f8528c57|2021-04-01 00:09:47|4783713328425247|85.4   |
|5118456824eef09b03cb99f35c44b59c|2021-04-01 00:14:47|4161715127983823|765.34 |
|a466e4d3e711ee73f22a66b834808461|2021-04-01 00:19:19|4858129660270572|1793.42|
|11d38fd0dceb580803747f235bcb8f48|2021-04-01 00:22:19|4676706014866559|93.04  |
|cd065ed76a7f866c07e8681b15e37bb8|2021-04-01 00:22:34|4789490563144262|86.15  |
|bf6a88b79a9b5da5db3822e4e336d576|2021-04-01 00:28:06|4388795302273767|179.29 |
|95c8de0edfe852c11e08f31423ef513a|2021-04-01 00:40:25|4879493428361965|252.06 |
|f9781738ad83a0d7f4fd37bc350219a6|2021-04-01 00:42:14|4436298663019939|831.82 |
|47fee4d3810beb5c8985cab4838cae58|2021-04-01 00:42:22|4829328237114208|809.14 |
|71bea46fa722fd1665cff411ec83de7a|2021-0

In [46]:
fs.sql("SELECT * FROM card_transactions_6",online=True).count()

5400

In [47]:
fs.sql("SELECT * FROM card_transactions_6",online=True).show()

+--------------------+-------------------+----------------+-------+
|                 tid|           datetime|          cc_num| amount|
+--------------------+-------------------+----------------+-------+
|a7da91ddadd1f991f...|2021-04-01 00:09:47|4783713328425247|   85.4|
|5118456824eef09b0...|2021-04-01 00:14:47|4161715127983823| 765.34|
|a466e4d3e711ee73f...|2021-04-01 00:19:19|4858129660270572|1793.42|
|11d38fd0dceb58080...|2021-04-01 00:22:19|4676706014866559|  93.04|
|cd065ed76a7f866c0...|2021-04-01 00:22:34|4789490563144262|  86.15|
|bf6a88b79a9b5da5d...|2021-04-01 00:28:06|4388795302273767| 179.29|
|95c8de0edfe852c11...|2021-04-01 00:40:25|4879493428361965| 252.06|
|f9781738ad83a0d7f...|2021-04-01 00:42:14|4436298663019939| 831.82|
|47fee4d3810beb5c8...|2021-04-01 00:42:22|4829328237114208| 809.14|
|71bea46fa722fd166...|2021-04-01 00:46:59|4546601918511609|  89.44|
|8a272fe54370c2fd5...|2021-04-01 00:50:59|4226063306212844| 5661.8|
|39c9846d8e3730649...|2021-04-01 00:51:41|495075

# Insert to Offline FG

In [48]:
def foreach_batch_function_card(batchDF, epoch_id):
    batchDF.persist()
    print(epoch_id)
    extra_hudi_options = {
    "hoodie.bulkinsert.shuffle.parallelism":"1",     
    "hoodie.insert.shuffle.parallelism":"1", 
    "hoodie.upsert.shuffle.parallelism":"1",
    "hoodie.parquet.compression.ratio":"0.5"
    }
    # Transform and write batchDF
    card_transactions.insert(batchDF,write_options=extra_hudi_options)
    batchDF.unpersist()

hudi_card = df_deser.writeStream.foreachBatch(foreach_batch_function_card)\
.option("checkpointLocation", "hdfs:///Projects/card_fraud_demo/Resources/checkpoint-card")\
.trigger(processingTime="10 seconds")\
.start()    

In [49]:
def foreach_batch_function_10m(batchDF, epoch_id):
    batchDF.persist()
    print(epoch_id)
    extra_hudi_options = {
    "hoodie.bulkinsert.shuffle.parallelism":"1",     
    "hoodie.insert.shuffle.parallelism":"1", 
    "hoodie.upsert.shuffle.parallelism":"1",
    "hoodie.parquet.compression.ratio":"0.5"
    }
    # Transform and write batchDF
    card_transactions_10m_agg.insert(batchDF,write_options=extra_hudi_options)
    batchDF.unpersist()

hudi_10m = windowed10mSignalDF.writeStream.foreachBatch(foreach_batch_function_10m)\
.option("checkpointLocation", "hdfs:///Projects/card_fraud_demo/Resources/checkpoint-data10m")\
.trigger(processingTime="10 seconds")\
.start()    

In [50]:
def foreach_batch_function_1h(batchDF, epoch_id):
    batchDF.persist()
    print(epoch_id)
    extra_hudi_options = {
    "hoodie.bulkinsert.shuffle.parallelism":"1",     
    "hoodie.insert.shuffle.parallelism":"1", 
    "hoodie.upsert.shuffle.parallelism":"1",
    "hoodie.parquet.compression.ratio":"0.5"
    }
    # Transform and write batchDF
    card_transactions_1h_agg.insert(batchDF,write_options=extra_hudi_options)
    batchDF.unpersist()

hudi_1h = windowed1hSignalDF.writeStream.foreachBatch(foreach_batch_function_1h)\
.option("checkpointLocation", "hdfs:///Projects/card_fraud_demo/Resources/checkpoint-1h")\
.trigger(processingTime="10 seconds")\
.start()

In [51]:
def foreach_batch_function_12h(batchDF, epoch_id):
    batchDF.persist()
    print(epoch_id)
    extra_hudi_options = {
    "hoodie.bulkinsert.shuffle.parallelism":"1",     
    "hoodie.insert.shuffle.parallelism":"1", 
    "hoodie.upsert.shuffle.parallelism":"1",
    "hoodie.parquet.compression.ratio":"0.5"
    }
    # Transform and write batchDF
    card_transactions_12h_agg.insert(batchDF,write_options=extra_hudi_options)
    batchDF.unpersist()

hudi_12h = windowed12hSignalDF.writeStream.foreachBatch(foreach_batch_function_12h)\
.option("checkpointLocation", "hdfs:///Projects/card_fraud_demo/Resources/checkpoint-12h")\
.trigger(processingTime="10 seconds")\
.start()

In [56]:
hudi_card.isActive

True

In [57]:
hudi_10m.isActive

True

In [58]:
hudi_1h.isActive

True

In [59]:
hudi_12h.isActive

True

# Stop queries
If you are running this from a notebook, you can kill the Spark Structured Streaming Query by stopping the Kernel or by calling its `.stop()` method.

In [67]:
query_transactions.stop()
query_10m.stop()
query_1h.stop()
query_12h.stop()

In [68]:
hudi_card.stop()
hudi_10m.stop()
hudi_1h.stop()
hudi_12h.stop()