## Querying Streaming Spark DataFrames in an EMR Notebook

In this notebook, we will read data from a modified version of the Kinesis stream from Lab 5 into a Spark streaming DataFrame. Once we've loaded our streaming DataFrame, we'll perform a simple query on it and write the results of our query to S3 for further analysis.

We've modified the producer from Lab 5 to send tweet-like JSON data into our `test_stream` Kinesis stream in the form of `{"username": ..., "age": ..., "num_followers": ..., "tweet": ...}` (by adding additional test data for `age` and `num_followers`). If you're following along with the code in this notebook, be sure to use a similar producer script to put data into a Kinesis stream:

```
import boto3
import testdata
import json

kinesis = boto3.client('kinesis', region_name='us-east-1')

# Continously write Twitter-like data into Kinesis stream
while 1 == 1:
    test_tweet = {'username': testdata.get_username(),
                  'age': testdata.get_int(18, 100),
                  'num_followers': testdata.get_int(0, 10000),
                  'tweet':    testdata.get_ascii_words(280)
                  }
    kinesis.put_record(StreamName = "test_stream",
                       Data = json.dumps(test_tweet),
                       PartitionKey = "partitionkey"
                      )
```

First, let's add the [Spark Structured Streaming package](https://spark.apache.org/docs/2.4.7/structured-streaming-programming-guide.html) to our session configuration (we'll specifically add a version that makes it possible to interact with Kinesis streams):

In [None]:
%%configure -f
{ "conf": {"spark.jars.packages": "com.qubole.spark/spark-sql-kinesis_2.11/1.1.3-spark_2.4" }}

Then, we're ready to start reading from our Kinesis stream. For this demonstration, we'll start with the latest data in the stream, but we could get more granular if we would like to do so as well:

In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json, col, json_tuple
import time

stream_df = spark.readStream \
                 .format('kinesis') \
                 .option('streamName', 'test_stream') \
                 .option('endpointUrl', 'https://kinesis.us-east-1.amazonaws.com')\
                 .option('region', 'us-east-1') \
                 .option('startingposition', 'LATEST')\
                 .load()

if stream_df.isStreaming:
    print('======================')
    print('DataFrame is streaming')

VBox()

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
13,application_1620656441511_0014,pyspark,idle,Link,Link,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

DataFrame is streaming

Now that we have our streaming DataFrame ready, let's use Spark SQL `select` and `where` methods to query our streaming DataFrame. We'll then write this data out to one of an S3 bucket (you'll need to specify your own and then append it with `/data` and `/checkpoints` directories to follow along). Individual CSVs will be produced for each set of data that is processed in a micro-batch.

In [3]:
# start process of querying streaming data
query = stream_df.selectExpr('CAST(data AS STRING)', 'CAST(approximateArrivalTimestamp as TIMESTAMP)') \
    .select('approximateArrivalTimestamp', 
            json_tuple(col('data'), 'username', 'age', 'num_followers', 'tweet'
                      ).alias('username', 'age', 'num_followers', 'tweet')) \
    .select('approximateArrivalTimestamp', 'username', 'age') \
    .where('age > 35') \
    .writeStream \
    .queryName('counts') \
    .outputMode('append') \
    .format('csv') \
    .option('path', 's3://mrjob-9caa69460249cdb9/data') \
    .option('checkpointLocation','s3://mrjob-9caa69460249cdb9/checkpoints') \
    .start()

# let streaming query run for 15 seconds (and continue sending results to CSV in S3), then stop it
time.sleep(15)

# Stop query; look at results of micro-batch queries in S3 bucket in `/data` directory
query.stop()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Cool! If we take a look at one of our resulting CSVs over in our S3 bucket (see head below), we can see that it produces the expected results (a selection of columns from the streaming data that is filtered by age). This is a great way to quickly process streaming data!

```
2021-05-10T22:03:40.787Z,Hailiejade,83
2021-05-10T22:03:40.824Z,Fischer,79
2021-05-10T22:03:40.866Z,Leonard,46
2021-05-10T22:03:40.902Z,Vasquez,65
2021-05-10T22:03:40.937Z,Porter,86
2021-05-10T22:03:40.978Z,Joan,56
```