# Consuming the AWS Kinesis Stream
## Analytics using PySpark Structured Streaming

In this notebook we are leveraging a custom Java package to let us connect to Amazon kinesis using PySpark Structured Streaming so we can work with our data using our favorite PySparkSQL commands. We are downloading the package from the Maven Java repository. It is like the repository to download python packages.

In [20]:
import findspark
import time
findspark.init()
import os

# on this line we are installing a custom package that let's us connect to Amazon kinesis
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages=com.qubole.spark/spark-sql-kinesis_2.12/1.2.0_spark-3.0 pyspark-shell'

from pyspark.sql import SparkSession
from pyspark import SparkContext
import pyspark.sql.functions as f
from pyspark.sql.functions import col, json_tuple,from_json,get_json_object
from pyspark.sql.types import StringType, FloatType
from pyspark.streaming import StreamingContext
from pyspark.streaming.kinesis import KinesisUtils, InitialPositionInStream
from datetime import datetime, timedelta


In [21]:
stream_name = 'pyspark-kinesis-test' # set this stream name to the same one you used in the producer notebook
region_name = 'us-east-1'

In [22]:
spark = SparkSession.builder.appName("streaming").getOrCreate()

In [23]:
spark

Start off by running the following cells to connect to the kinesis stream as a consumer and explore the format of the output data.

In [24]:
kinesis = spark \
        .readStream \
        .format('kinesis') \
        .option('streamName', stream_name) \
        .option('endpointUrl', f'https://kinesis.{region_name}.amazonaws.com')\
        .option('region', region_name) \
        .load()

In [25]:
kinesis

DataFrame[data: binary, streamName: string, partitionKey: string, sequenceNumber: string, approximateArrivalTimestamp: timestamp]

In [26]:
kinesis.printSchema()

root
 |-- data: binary (nullable = true)
 |-- streamName: string (nullable = true)
 |-- partitionKey: string (nullable = true)
 |-- sequenceNumber: string (nullable = true)
 |-- approximateArrivalTimestamp: timestamp (nullable = true)



What happens when we run a take method?

In [27]:
kinesis.take(5)

AnalysisException: Queries with streaming sources must be executed with writeStream.start();;
kinesis

IT ERRORS! So structured streaming sources are not like normal sources. We need to grab data from the stream and work with it in individual pieces.

The following cells will show us how to view some data from the stream.

In [28]:
kinesis_simple = kinesis
(
    kinesis_simple.writeStream
    .format("console")
    .option("truncate", "false")
    .trigger(once=True)
    .start()
    .awaitTermination()
)

22/11/07 05:30:33 WARN StreamingQueryManager: Temporary checkpoint location created which is deleted normally when the query didn't fail: /mnt/tmp/temporary-f188c4cc-8d57-4ca8-bb27-719428400aa9. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
22/11/07 05:30:33 WARN StreamingQueryManager: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.
                                                                                

-------------------------------------------
Batch: 0
-------------------------------------------
+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+--------------------+------------+--------------------------------------------------------+---------------------------+
|data                                                                                                                                                                                                                                                                                                                                                                                  

The data is mostly in binary format! That won't be helpful for our analysis. We need to cast the data column into a string. The following cell does this by running a PySparkSQL command on the streaming dataframe to take only the data column and convert it to a string. Everyone should be familiar with this command.

In [29]:
(
kinesis.select(col('data').cast('string'))
    .writeStream
    .format("console")
    .option("truncate", "false")
    .trigger(once=True)
    .start()
    .awaitTermination()
)

22/11/07 05:30:47 WARN StreamingQueryManager: Temporary checkpoint location created which is deleted normally when the query didn't fail: /mnt/tmp/temporary-7a36d165-bf6b-4695-b96f-e781ed217cb3. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
22/11/07 05:30:47 WARN StreamingQueryManager: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.


-------------------------------------------
Batch: 0
-------------------------------------------
+---------------------------------------------------------------------------------------------------------------------------------------+
|data                                                                                                                                   |
+---------------------------------------------------------------------------------------------------------------------------------------+
|{"symbol": "BINANCE:BTCUSDT", "price_last": 20882.25, "trade_dt": "11/07/2022, 05:30:45.818000", "volume": 0.00021, "conditions": null}|
|{"symbol": "BINANCE:BTCUSDT", "price_last": 20882.24, "trade_dt": "11/07/2022, 05:30:45.818000", "volume": 0.00111, "conditions": null}|
|{"symbol": "BINANCE:BTCUSDT", "price_last": 20881.86, "trade_dt": "11/07/2022, 05:30:45.818000", "volume": 0.00266, "conditions": null}|
|{"symbol": "BINANCE:BTCUSDT", "price_last": 20881.87, "trade_dt": "11/07/2

We want to save this change, so the following cell will make a new dataframe object with just the string `data` column.

In [30]:
kinesis_datastring = kinesis.select(col('data').cast('string'))

This cell shows us once more the format of the `data` column. It is a string representation of a JSON object, which is very similar to a dictionary in Python.

In [31]:
(
kinesis_datastring
    .writeStream
    .format("console")
    .option("truncate", "false")
    .trigger(once=True)
    .start()
    .awaitTermination()
)

22/11/07 05:30:53 WARN StreamingQueryManager: Temporary checkpoint location created which is deleted normally when the query didn't fail: /mnt/tmp/temporary-e7f09c97-6362-46bf-9818-407042665def. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
22/11/07 05:30:53 WARN StreamingQueryManager: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.


-------------------------------------------
Batch: 0
-------------------------------------------
+---------------------------------------------------------------------------------------------------------------------------------------+
|data                                                                                                                                   |
+---------------------------------------------------------------------------------------------------------------------------------------+
|{"symbol": "BINANCE:BTCUSDT", "price_last": 20886.59, "trade_dt": "11/07/2022, 05:30:50.694000", "volume": 0.001, "conditions": null}  |
|{"symbol": "BINANCE:BTCUSDT", "price_last": 20886.59, "trade_dt": "11/07/2022, 05:30:50.697000", "volume": 0.019, "conditions": null}  |
|{"symbol": "BINANCE:BTCUSDT", "price_last": 20886.59, "trade_dt": "11/07/2022, 05:30:50.706000", "volume": 0.01, "conditions": null}   |
|{"symbol": "BINANCE:BTCUSDT", "price_last": 20886.59, "trade_dt": "11/07/2

The following cell will extract the values from the JSON object in a structured manner instead of using regex. Note the functions being used here. Also note the three different methods we can reference the `data` column.

In [32]:
kinesis_processed = kinesis_datastring.select(
                                                get_json_object(kinesis_datastring.data,'$.symbol').alias('symbol'),
                                                get_json_object(col('data'),'$.price_last').alias('price_last'),
                                                get_json_object(kinesis_datastring['data'],'$.trade_dt').alias('trade_dt'),
                                                get_json_object(col('data'),'$.volume').alias('volume'),
                                                get_json_object(col('data'),'$.conditions').alias('conditions')
                                             )

Just like we review the output of our code using the `take()` method for a regular dataframe, the following code is needed to check the output of our structured streaming code.

In [33]:
(kinesis_processed
    .writeStream
    .format("console")
    .option("truncate", "false")
    .trigger(once=True)
    .start()
    .awaitTermination()
)

22/11/07 05:30:59 WARN StreamingQueryManager: Temporary checkpoint location created which is deleted normally when the query didn't fail: /mnt/tmp/temporary-a7f82eed-fc7d-4664-b339-743a954b11cc. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
22/11/07 05:30:59 WARN StreamingQueryManager: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.


-------------------------------------------
Batch: 0
-------------------------------------------
+---------------+----------+---------------------------+-------+----------+
|symbol         |price_last|trade_dt                   |volume |conditions|
+---------------+----------+---------------------------+-------+----------+
|BINANCE:BTCUSDT|20884.51  |11/07/2022, 05:30:58.177000|0.00478|null      |
|BINANCE:BTCUSDT|20884.51  |11/07/2022, 05:30:58.228000|0.01972|null      |
|BINANCE:BTCUSDT|20884.48  |11/07/2022, 05:30:58.256000|0.02093|null      |
|BINANCE:BTCUSDT|20884.5   |11/07/2022, 05:30:58.399000|0.00946|null      |
|BINANCE:BTCUSDT|20884.5   |11/07/2022, 05:30:58.439000|1.0E-5 |null      |
|BINANCE:BTCUSDT|20884.5   |11/07/2022, 05:30:58.479000|0.009  |null      |
|BINANCE:BTCUSDT|20884.65  |11/07/2022, 05:30:58.486000|9.0E-4 |null      |
|BINANCE:BTCUSDT|20884.24  |11/07/2022, 05:30:58.500000|6.4E-4 |null      |
|BINANCE:BTCUSDT|20884.5   |11/07/2022, 05:30:58.666000|0.00412|nul

#### **TO-DO: Convert the data types for the streaming data such that price_last and volume are float type and trade_dt is a timestamp type**

The cell provided has most of the code from the last transformation, but we are missing the data type conversion.

What is the method to `cast` variables to different data types? And yes, these same command will work with Structured Streaming too!

Modify the code below to accomplish this goal. Note that you will have to customize the datetime formatting. See [this link](https://sparkbyexamples.com/spark/pyspark-to_timestamp-convert-string-to-timestamp-type/) for examples and more info.

In [34]:
from pyspark.sql.functions import *

In [35]:
kinesis_processed = kinesis_datastring.select(
                                                get_json_object(kinesis_datastring.data,'$.symbol').alias('symbol'),
                                                get_json_object(kinesis_datastring.data,'$.price_last').alias('price_last'),
                                                get_json_object(kinesis_datastring.data,'$.trade_dt').alias('trade_dt'),
                                                get_json_object(kinesis_datastring.data,'$.volume').alias('volume'),
                                                get_json_object(kinesis_datastring.data,'$.conditions').alias('conditions')
                                             )

kinesis_processed = kinesis_processed.withColumn('trade_dt', to_timestamp('trade_dt', 'MM/dd/yyyy, HH:mm:ss.SSSSSS'))

In [36]:
kinesis_processed.printSchema()

root
 |-- symbol: string (nullable = true)
 |-- price_last: string (nullable = true)
 |-- trade_dt: timestamp (nullable = true)
 |-- volume: string (nullable = true)
 |-- conditions: string (nullable = true)



Confirm that your schema change worked by printing a single batch of the stream using the cell below. **Remember, sometimes you have to re-run the cell if you get zero rows back**

If the code did not work, then you will get a **null** in the `trade_dt` column. You have to get the string format for the string to timestamp conversion EXACTLY right!

In [37]:
(kinesis_processed
    .writeStream
    .format("console")
    .option('truncate',False)
    .trigger(once=True)
    .start()
    .awaitTermination()
)

22/11/07 05:31:07 WARN StreamingQueryManager: Temporary checkpoint location created which is deleted normally when the query didn't fail: /mnt/tmp/temporary-ef2626ae-d782-4245-af7d-5c9d759dd636. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
22/11/07 05:31:07 WARN StreamingQueryManager: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.


-------------------------------------------
Batch: 0
-------------------------------------------
+---------------+----------+-----------------------+-------+----------+
|symbol         |price_last|trade_dt               |volume |conditions|
+---------------+----------+-----------------------+-------+----------+
|BINANCE:BTCUSDT|20884.89  |2022-11-07 05:31:04.061|0.04   |null      |
|BINANCE:BTCUSDT|20884.88  |2022-11-07 05:31:04.061|9.9E-4 |null      |
|BINANCE:BTCUSDT|20884.5   |2022-11-07 05:31:04.061|0.00134|null      |
|BINANCE:BTCUSDT|20884.45  |2022-11-07 05:31:04.061|0.36082|null      |
|BINANCE:BTCUSDT|20884.44  |2022-11-07 05:31:04.061|0.06158|null      |
|BINANCE:BTCUSDT|20884.43  |2022-11-07 05:31:04.061|0.01281|null      |
|BINANCE:BTCUSDT|20884.43  |2022-11-07 05:31:04.067|0.00897|null      |
|BINANCE:BTCUSDT|20884.4   |2022-11-07 05:31:04.067|0.35185|null      |
|BINANCE:BTCUSDT|20884.55  |2022-11-07 05:31:04.092|0.00693|null      |
|BINANCE:BTCUSDT|20884.44  |2022-11-07 

### Sending streaming data to in-memory dataset

We can send the output of the kinesis stream into a sql table that we can query and turn into a PySparkSQL dataframe!

In [41]:
(
kinesis_processed
    .writeStream
    .queryName("data_stream") # the name of the table
    .format("memory") # the data is being sent as memory
    .trigger(once=True)
    .start()
)

22/11/07 05:31:22 WARN StreamingQueryManager: Temporary checkpoint location created which is deleted normally when the query didn't fail: /mnt/tmp/temporary-f2c2c8f4-47ac-41cd-bb82-00a172c92b12. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
22/11/07 05:31:22 WARN StreamingQueryManager: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.


<pyspark.sql.streaming.StreamingQuery at 0x7fc580695590>

In [42]:
df_stream = spark.sql("select * from data_stream")

Remember, the following take command may return no data at all. If so then re-run the **writestream cell** to get a batch with data.

Once you run the command you should see the appropriately typed columns you changed in a prior cell.

In [43]:
df_stream.take(5)

[Row(symbol='BINANCE:BTCUSDT', price_last='20884.61', trade_dt=datetime.datetime(2022, 11, 7, 5, 31, 21, 669000), volume='0.00539', conditions=None),
 Row(symbol='BINANCE:BTCUSDT', price_last='20884.61', trade_dt=datetime.datetime(2022, 11, 7, 5, 31, 21, 669000), volume='0.00518', conditions=None),
 Row(symbol='BINANCE:BTCUSDT', price_last='20884.62', trade_dt=datetime.datetime(2022, 11, 7, 5, 31, 21, 669000), volume='0.00478', conditions=None),
 Row(symbol='BINANCE:BTCUSDT', price_last='20884.63', trade_dt=datetime.datetime(2022, 11, 7, 5, 31, 21, 669000), volume='0.00519', conditions=None),
 Row(symbol='BINANCE:BTCUSDT', price_last='20884.63', trade_dt=datetime.datetime(2022, 11, 7, 5, 31, 21, 669000), volume='0.00497', conditions=None)]

### **Save a five row dataset as `dict_stream_5` for the final solution json.** **Make sure that it has data in all five rows!**

In [44]:
dict_stream_5 = df_stream.limit(5).toPandas().to_dict()

### Make the stream append to a dataframe

In the cell below we are executing the same streaming command without limiting our trigger to execute once.

In [45]:
(
kinesis_processed
    .writeStream
    .queryName("data_stream")
    .format("memory")
    #.trigger(once=True)
    .start()
)

22/11/07 05:31:37 WARN StreamingQueryManager: Temporary checkpoint location created which is deleted normally when the query didn't fail: /mnt/tmp/temporary-f9856ed6-e3cf-4f43-bdc2-28d4a2248047. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
22/11/07 05:31:37 WARN StreamingQueryManager: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.


<pyspark.sql.streaming.StreamingQuery at 0x7fc580592fd0>

Note that we can choose various trigger options based on the nature of the streaming data coming back to us. Check out this site that describes trigger choices for the trigger argument: https://spark.apache.org/docs/latest/api/java/org/apache/spark/sql/streaming/Trigger.html

In [46]:
df_stream = spark.sql("select * from data_stream")

In [47]:
for i in range(10):
    print(f'{datetime.now()} - size of dataframe: {df_stream.count()}')
    time.sleep(3)

2022-11-07 05:31:43.542380 - size of dataframe: 705
2022-11-07 05:31:47.251601 - size of dataframe: 1150
2022-11-07 05:31:50.575749 - size of dataframe: 1613
2022-11-07 05:31:53.791109 - size of dataframe: 1995
2022-11-07 05:31:56.987120 - size of dataframe: 2481


                                                                                

2022-11-07 05:32:00.214704 - size of dataframe: 2925


                                                                                

2022-11-07 05:32:03.451122 - size of dataframe: 3348
2022-11-07 05:32:06.695119 - size of dataframe: 3716
2022-11-07 05:32:09.955126 - size of dataframe: 4086
2022-11-07 05:32:13.199494 - size of dataframe: 4556


                                                                                

### Analytics on growing dataframe **(TO-DO)**

1. Using the `timedelta` function from the datetime library, find the average price within the last 5 seconds.
        
    - `filter` to only rows with a datetime within the last 5 seconds. Check out [this link](https://www.geeksforgeeks.org/python-datetime-timedelta-function/) for a hint at a function you could use
    - Use the `agg` function to take summary stats of the data in the last 5 seconds
        - Include the following stats: average price, number of trades, and latest timestamp of trade
        
The resulting table should look like:


|mean_price_sec5|num_trades|last_trade_dt          |
|---------------|----------|-----------------------|
|41744.988      |5         |2022-01-09 06:07:41.492|


2. Save the resulting table into a Pandas DataFrame then a dictionary.
3. Next, use a `for loop` so your average price summary executes once every 5 seconds.


In [50]:
from datetime import datetime, timedelta
ini_time_for_now = datetime.now()

In [52]:
(df_stream.filter(ini_time_for_now - timedelta(seconds = 5) <= col("trade_dt"))\
        .agg(avg(col("price_last")).alias('mean_price_sec5'),\
            count(col("price_last")).alias('num_trades'),\
            max(col("trade_dt")).alias('last_trade_dt'))).show(truncate=False)

+------------------+----------+-----------------------+
|mean_price_sec5   |num_trades|last_trade_dt          |
+------------------+----------+-----------------------+
|20891.968617886167|1599      |2022-11-07 05:35:19.523|
+------------------+----------+-----------------------+



                                                                                

In [53]:
data_sum = (df_stream.filter(ini_time_for_now - timedelta(seconds = 5) <= col("trade_dt"))\
                    .agg(avg(col("price_last")).alias('mean_price_sec5'),\
                        count(col("price_last")).alias('num_trades'),\
                        max(col("trade_dt")).alias('last_trade_dt')))

In [54]:
dict_stream_summary = data_sum.toPandas().to_dict()

                                                                                

In [63]:
for i in range(5):
    ini_time_for_now = datetime.now()
    print(f'{ini_time_for_now} - output 1 summary: {data_sum.count()}')
    time.sleep(3)

                                                                                

2022-11-07 05:42:36.403679 - output 1 summary: 1
2022-11-07 05:42:39.992288 - output 1 summary: 1


                                                                                

2022-11-07 05:42:43.611107 - output 1 summary: 1


                                                                                

2022-11-07 05:42:47.678548 - output 1 summary: 1
2022-11-07 05:42:51.599116 - output 1 summary: 1


                                                                                

## **Save your analytics results to a json object - then add, commit, and push your notebook and json to GitHub!**

# MAKE SURE TO STOP YOUR CLUSTER

spark.stop()

# MAKE SURE YOU DELETE YOUR KINESIS STREAM IN THE PRODUCER NOTEBOOK