## Querying Streaming Spark DataFrames in an EMR Notebook

In this notebook, we will read data from a modified version of the Kinesis stream from last week into a Spark streaming DataFrame. Once we've loaded our streaming DataFrame, we'll perform a simple query on it and write the results of our query to S3 for further analysis.

We've provided code for starting a producer and a kinesis stream in a script called `start_stream.py` (in our class GitHub repository). This script sends streaming tweet-like JSON data into our `test_stream` Kinesis stream in the form of `{"username": ..., "age": ..., "num_followers": ..., "tweet": ...}`. If you're following along with the code in this notebook, you should run this script locally to send streaming data into the stream (which you can then work with in this notebook). Just be sure to delete your kinesis stream when you're finished with this notebook (we provided a short script that will do this for you, called `delete_stream.py`)

First, let's add the [Spark Structured Streaming package](https://spark.apache.org/docs/2.4.7/structured-streaming-programming-guide.html) to our session configuration (we'll specifically add a version that makes it possible to interact with Kinesis streams):

In [None]:
%%configure -f
{ "conf": {"spark.jars.packages": "com.qubole.spark/spark-sql-kinesis_2.11/1.1.3-spark_2.4" }}

Then, we're ready to start reading from our Kinesis stream. For this demonstration, we'll start with the latest data in the stream, but we could get more granular if we would like to do so as well:

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json, col, json_tuple
import time

stream_df = spark.readStream \
                 .format('kinesis') \
                 .option('streamName', 'test_stream') \
                 .option('endpointUrl', 'https://kinesis.us-east-1.amazonaws.com')\
                 .option('region', 'us-east-1') \
                 .option('startingposition', 'LATEST')\
                 .load()

if stream_df.isStreaming:
    print('======================')
    print('DataFrame is streaming')

Now that we have our streaming DataFrame ready, let's use Spark SQL `select` and `where` methods to query our streaming DataFrame. We'll then write this data out to one of an S3 bucket (you'll need to specify your own and then append it with `/data` and `/checkpoints` directories to follow along). Individual CSVs will be produced for each set of data that is processed in a micro-batch.

In [3]:
# start process of querying streaming data
query = stream_df.selectExpr('CAST(data AS STRING)', 'CAST(approximateArrivalTimestamp as TIMESTAMP)') \
    .select('approximateArrivalTimestamp', 
            json_tuple(col('data'), 'username', 'age', 'num_followers', 'tweet'
                      ).alias('username', 'age', 'num_followers', 'tweet')) \
    .select('approximateArrivalTimestamp', 'username', 'age') \
    .where('age > 35') \
    .writeStream \
    .queryName('counts') \
    .outputMode('append') \
    .format('csv') \
    .option('path', 's3://mrjob-634d5d805a7e423b/data') \
    .option('checkpointLocation','s3://mrjob-634d5d805a7e423b/checkpoints') \
    .start()

# let streaming query run for 15 seconds (and continue sending results to CSV in S3), then stop it
time.sleep(15)

# Stop query; look at results of micro-batch queries in S3 bucket in `/data` directory
query.stop()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Cool! If we take a look at one of our resulting CSVs over in our S3 bucket (see head below), we can see that it produces the expected results (a selection of columns from the streaming data that is filtered by age). This is a great way to quickly process streaming data!

```
2021-11-06T22:02:57.862Z,Deangelo,69
2021-11-06T22:02:58.007Z,Arron,80
2021-11-06T22:02:58.044Z,Compton,88
2021-11-06T22:02:58.081Z,Mabel,95
2021-11-06T22:02:58.117Z,John,62
```