# Spark Structured Streaming from Kafka

Example of using Spark to connect to Kafka and using Spark Structured Streaming to process a Kafka stream of Python alerts in non-Avro string format.

## Notes

Useful references.

https://spark.apache.org/docs/2.1.0/structured-streaming-kafka-integration.html

https://spark.apache.org/docs/2.1.0/structured-streaming-programming-guide.html

## Prep environment

Need some packages to talk to Kafka.

In [1]:
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.1.0,org.apache.spark:spark-sql-kafka-0-10_2.11:2.1.0 pyspark-shell'


In [2]:
from ast import literal_eval

## Start a Kafka stream for Spark to subscribe

With lsst-dm/alert_stream, in an external shell:

Send some alerts so stream exists to connect to:

docker run -it       --network=alertstream_default       alert_stream python bin/sendAlertStream.py my-stream 10 --no-stamps --encode-off

## Set up Spark session and connection to Kakfa

In [3]:
from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .appName("SSKafka") \
    .getOrCreate()
    
# default for startingOffsets is "latest", but "earliest" allows rewind for missed alerts    
dsraw = spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "kafka:9092") \
  .option("subscribe", "my-stream") \
  .option("startingOffsets", "earliest") \
  .load()
  

In [4]:
ds = dsraw.selectExpr("CAST(value AS STRING)")

In [5]:
print(type(dsraw))
print(type(ds))

<class 'pyspark.sql.dataframe.DataFrame'>
<class 'pyspark.sql.dataframe.DataFrame'>


dsraw is the raw data stream, in "kafka" format.

ds pulls out the "value" from "kafka" format, the actual alert data.

## Create output for Spark Structured Streaming

Queries are new sql dataframe streams and can be written to disk or saved to memory for followup sql operations.
Below they are saved to memory with queryNames that can be treated as tables by spark.sql.


In [6]:
rawQuery = dsraw \
        .writeStream \
        .queryName("qraw")\
        .format("memory")\
        .start()

In [7]:
alertQuery = ds \
        .writeStream \
        .queryName("qalerts")\
        .format("memory")\
        .start()

Send some alerts to get queries to recognize activity:

docker run -it       --network=alertstream_default       alert_stream python bin/sendAlertStream.py my-stream 10 --no-stamps --encode-off

Use sql operations on the named in-memory query tables.

In [8]:
raw = spark.sql("select * from qraw")
raw.show()

+----+--------------------+---------+---------+------+--------------------+-------------+
| key|               value|    topic|partition|offset|           timestamp|timestampType|
+----+--------------------+---------+---------+------+--------------------+-------------+
|null|[7B 27 61 6C 65 7...|my-stream|        0|     0|1969-12-31 23:59:...|            0|
|null|[7B 27 61 6C 65 7...|my-stream|        0|     1|1969-12-31 23:59:...|            0|
|null|[7B 27 61 6C 65 7...|my-stream|        0|     2|1969-12-31 23:59:...|            0|
|null|[7B 27 61 6C 65 7...|my-stream|        0|     3|1969-12-31 23:59:...|            0|
|null|[7B 27 61 6C 65 7...|my-stream|        0|     4|1969-12-31 23:59:...|            0|
|null|[7B 27 61 6C 65 7...|my-stream|        0|     5|1969-12-31 23:59:...|            0|
|null|[7B 27 61 6C 65 7...|my-stream|        0|     6|1969-12-31 23:59:...|            0|
|null|[7B 27 61 6C 65 7...|my-stream|        0|     7|1969-12-31 23:59:...|            0|
|null|[7B 

Because this stream is format="kafka," the schema of the table reflects the data structure of Kafka streams, not of our data content, which is stored in "value."

In [9]:
raw.printSchema()

root
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)



Try selecting from the stream that has cast the kafka "value" to strings.

In [10]:
alerts = spark.sql("select * from qalerts")
alerts.show()

+--------------------+
|               value|
+--------------------+
|{'alertId': 12313...|
|{'alertId': 12313...|
|{'alertId': 12313...|
|{'alertId': 12313...|
|{'alertId': 12313...|
|{'alertId': 12313...|
|{'alertId': 12313...|
|{'alertId': 12313...|
|{'alertId': 12313...|
|{'alertId': 12313...|
|{'alertId': 12313...|
|{'alertId': 12313...|
|{'alertId': 12313...|
|{'alertId': 12313...|
|{'alertId': 12313...|
|{'alertId': 12313...|
|{'alertId': 12313...|
|{'alertId': 12313...|
|{'alertId': 12313...|
|{'alertId': 12313...|
+--------------------+



The alert data has no known schema, only str.

In [11]:
alerts.printSchema()

root
 |-- value: string (nullable = true)



In [12]:
type(alerts)

pyspark.sql.dataframe.DataFrame

## Convert the values in the sql dataframe to RDD of dicts

We can use the RDDs to convert data to a better structure for filtering.

In [13]:
rddAlertsRdd = alerts.rdd.map(lambda alert: literal_eval(alert['value']))

In [14]:
rddAlertsRdd

PythonRDD[24] at RDD at PythonRDD.scala:48

In [15]:
rddAlerts = rddAlertsRdd.collect()

In [16]:
type(rddAlerts)

list

In [17]:
rddAlerts[0]

{'alertId': 1231321321,
 'diaObject': {'decl': 0.126243049656,
  'diaObjectId': 281323062375219201,
  'flags': 0,
  'parallax': 2.124124,
  'pmDecl': 0.00014,
  'pmParallaxChi2': 0.00013,
  'pmParallaxLnL': 0.00013,
  'pmParallaxNdata': 1214,
  'pmRa': 0.00013,
  'pm_parallax_Cov': {'parallaxSigma': 0.00013,
   'pmDeclSigma': 0.00013,
   'pmDecl_parallax_Cov': 0.00013,
   'pmRaSigma': 0.00013,
   'pmRa_parallax_Cov': 0.00013,
   'pmRa_pmDecl_Cov': 0.00013},
  'ra': 351.570546978,
  'ra_decl_Cov': {'declSigma': 0.00028,
   'raSigma': 0.00028,
   'ra_decl_Cov': 0.00029},
  'radecTai': 1480360995},
 'diaObjectL2': {'decl': 0.126243049656,
  'diaObjectId': 281323062375219201,
  'flags': 0,
  'parallax': 2.124124,
  'pmDecl': 0.00014,
  'pmParallaxChi2': 0.00013,
  'pmParallaxLnL': 0.00013,
  'pmParallaxNdata': 1214,
  'pmRa': 0.00013,
  'pm_parallax_Cov': {'parallaxSigma': 0.00013,
   'pmDeclSigma': 0.00013,
   'pmDecl_parallax_Cov': 0.00013,
   'pmRaSigma': 0.00013,
   'pmRa_parallax_Cov'

Do filtering as list comprehension.

In [18]:
ra_all = [alert for alert in rddAlerts \
                 if alert['diaSource']['ra'] > 350 ]

In [19]:
print(len(ra_all))

20


In [20]:
ra_all[0:2]

[{'alertId': 1231321321,
  'diaObject': {'decl': 0.126243049656,
   'diaObjectId': 281323062375219201,
   'flags': 0,
   'parallax': 2.124124,
   'pmDecl': 0.00014,
   'pmParallaxChi2': 0.00013,
   'pmParallaxLnL': 0.00013,
   'pmParallaxNdata': 1214,
   'pmRa': 0.00013,
   'pm_parallax_Cov': {'parallaxSigma': 0.00013,
    'pmDeclSigma': 0.00013,
    'pmDecl_parallax_Cov': 0.00013,
    'pmRaSigma': 0.00013,
    'pmRa_parallax_Cov': 0.00013,
    'pmRa_pmDecl_Cov': 0.00013},
   'ra': 351.570546978,
   'ra_decl_Cov': {'declSigma': 0.00028,
    'raSigma': 0.00028,
    'ra_decl_Cov': 0.00029},
   'radecTai': 1480360995},
  'diaObjectL2': {'decl': 0.126243049656,
   'diaObjectId': 281323062375219201,
   'flags': 0,
   'parallax': 2.124124,
   'pmDecl': 0.00014,
   'pmParallaxChi2': 0.00013,
   'pmParallaxLnL': 0.00013,
   'pmParallaxNdata': 1214,
   'pmRa': 0.00013,
   'pm_parallax_Cov': {'parallaxSigma': 0.00013,
    'pmDeclSigma': 0.00013,
    'pmDecl_parallax_Cov': 0.00013,
    'pmRaSigma

In [21]:
ra_empty = [alert for alert in rddAlerts \
                 if alert['diaSource']['ra'] < 350 ]

In [22]:
print(len(ra_empty))

0


## Converting from RDD of dicts to a dataframe is not a good idea

The schema is inferred incorrectly, and data can be lost, shown below.  So, don't try to do filtering with sql dataframes.

In [23]:
df = rddAlertsRdd.toDF()



In [24]:
type(df)

pyspark.sql.dataframe.DataFrame

In [25]:
df.show()

+----------+--------------------+--------------------+--------------------+--------------------+---------+--------------------+--------------------+
|   alertId|           diaObject|         diaObjectL2|           diaSource|        diaSourcesL2|   l1dbId|      prv_diaSources|            ssObject|
+----------+--------------------+--------------------+--------------------+--------------------+---------+--------------------+--------------------+
|1231321321|Map(decl -> 0.126...|Map(decl -> 0.126...|Map(x -> null, cc...|[Map(x -> null, c...|222222222|[Map(x -> null, c...|Map(orbFitNdata -...|
|1231321321|Map(decl -> 0.126...|Map(decl -> 0.126...|Map(x -> null, cc...|[Map(x -> null, c...|222222222|[Map(x -> null, c...|Map(orbFitNdata -...|
|1231321321|Map(decl -> 0.126...|Map(decl -> 0.126...|Map(x -> null, cc...|[Map(x -> null, c...|222222222|[Map(x -> null, c...|Map(orbFitNdata -...|
|1231321321|Map(decl -> 0.126...|Map(decl -> 0.126...|Map(x -> null, cc...|[Map(x -> null, c...|222222222|

In [26]:
alertIds = df.select('alertId')

In [27]:
alertIds.show()

+----------+
|   alertId|
+----------+
|1231321321|
|1231321321|
|1231321321|
|1231321321|
|1231321321|
|1231321321|
|1231321321|
|1231321321|
|1231321321|
|1231321321|
|1231321321|
|1231321321|
|1231321321|
|1231321321|
|1231321321|
|1231321321|
|1231321321|
|1231321321|
|1231321321|
|1231321321|
+----------+



Filtering _appears_ to be working above, for the data that is not lost.  Below shows NULLs where data has been lost.

In [28]:
diaSources = df.select('diaSource').where('diaSource.diaSourceId is not NULL')

In [29]:
diaSources_empty = df.select('diaSource').where('diaSource.diaSourceId is NULL')

In [30]:
type(diaSources)

pyspark.sql.dataframe.DataFrame

In [31]:
diaSources.show()

+---------+
|diaSource|
+---------+
+---------+



In [32]:
diaSources_empty.show()

+--------------------+
|           diaSource|
+--------------------+
|Map(x -> null, cc...|
|Map(x -> null, cc...|
|Map(x -> null, cc...|
|Map(x -> null, cc...|
|Map(x -> null, cc...|
|Map(x -> null, cc...|
|Map(x -> null, cc...|
|Map(x -> null, cc...|
|Map(x -> null, cc...|
|Map(x -> null, cc...|
|Map(x -> null, cc...|
|Map(x -> null, cc...|
|Map(x -> null, cc...|
|Map(x -> null, cc...|
|Map(x -> null, cc...|
|Map(x -> null, cc...|
|Map(x -> null, cc...|
|Map(x -> null, cc...|
|Map(x -> null, cc...|
|Map(x -> null, cc...|
+--------------------+



Take a closer look at diaSources_empty with a pandas dataframe.

In [33]:
pd = diaSources_empty.toPandas()

In [34]:
pd

Unnamed: 0,diaSource
0,"{'ra_decl_Cov': {'raSigma': 0.00028, 'ra_decl_..."
1,"{'ra_decl_Cov': {'raSigma': 0.00028, 'ra_decl_..."
2,"{'ra_decl_Cov': {'raSigma': 0.00028, 'ra_decl_..."
3,"{'ra_decl_Cov': {'raSigma': 0.00028, 'ra_decl_..."
4,"{'ra_decl_Cov': {'raSigma': 0.00028, 'ra_decl_..."
5,"{'ra_decl_Cov': {'raSigma': 0.00028, 'ra_decl_..."
6,"{'ra_decl_Cov': {'raSigma': 0.00028, 'ra_decl_..."
7,"{'ra_decl_Cov': {'raSigma': 0.00028, 'ra_decl_..."
8,"{'ra_decl_Cov': {'raSigma': 0.00028, 'ra_decl_..."
9,"{'ra_decl_Cov': {'raSigma': 0.00028, 'ra_decl_..."


In [35]:
type(pd['diaSource'][0])

dict

In [36]:
pd['diaSource'][0]

{'ccdVisitId': None,
 'decl': None,
 'diaSourceId': None,
 'filterName': None,
 'flags': None,
 'midPointTai': None,
 'psFlux': None,
 'ra': None,
 'ra_decl_Cov': {'declSigma': 0.00028,
  'raSigma': 0.00028,
  'ra_decl_Cov': 0.00029},
 'snr': None,
 'x': None,
 'x_y_Cov': {'xSigma': 1.2, 'x_y_Cov': 1.2, 'ySigma': 1.1},
 'y': None}

Some data has been misinterpreted, shown by the "None"s above.

What if we try from the pre-pandas sql dataframe?  Checking to see if it was the pandas conversion that lost data.

In [39]:
diaSources_empty.filter('diaSource.filterName is NULL').show()

+--------------------+
|           diaSource|
+--------------------+
|Map(x -> null, cc...|
|Map(x -> null, cc...|
|Map(x -> null, cc...|
|Map(x -> null, cc...|
|Map(x -> null, cc...|
|Map(x -> null, cc...|
|Map(x -> null, cc...|
|Map(x -> null, cc...|
|Map(x -> null, cc...|
|Map(x -> null, cc...|
|Map(x -> null, cc...|
|Map(x -> null, cc...|
|Map(x -> null, cc...|
|Map(x -> null, cc...|
|Map(x -> null, cc...|
|Map(x -> null, cc...|
|Map(x -> null, cc...|
|Map(x -> null, cc...|
|Map(x -> null, cc...|
|Map(x -> null, cc...|
+--------------------+



No, shown above, data was lost before the pandas conversion.  Don't do RDD.toDF() when RDD is dicts.

## Original sql.dataframe to pandas is ok

In [40]:
pdAlerts = alerts.toPandas()

In [41]:
pdAlerts['value'][0]

"{'alertId': 1231321321, 'l1dbId': 222222222, 'diaSource': {'diaSourceId': 281323062375219200, 'ccdVisitId': 111111, 'midPointTai': 1480360995, 'filterName': 'r', 'ra': 351.570546978, 'decl': 0.126243049656, 'ra_decl_Cov': {'raSigma': 0.00028, 'declSigma': 0.00028, 'ra_decl_Cov': 0.00029}, 'x': 112.1, 'y': 121.1, 'x_y_Cov': {'xSigma': 1.2, 'ySigma': 1.1, 'x_y_Cov': 1.2}, 'snr': 41.1, 'psFlux': 1241.0, 'flags': 0}, 'prv_diaSources': [{'diaSourceId': 281323062375219198, 'ccdVisitId': 111111, 'midPointTai': 1480360995, 'filterName': 'r', 'ra': 351.570546978, 'decl': 0.126243049656, 'ra_decl_Cov': {'raSigma': 0.00028, 'declSigma': 0.00028, 'ra_decl_Cov': 0.00029}, 'x': 112.1, 'y': 121.1, 'x_y_Cov': {'xSigma': 1.2, 'ySigma': 1.1, 'x_y_Cov': 1.2}, 'snr': 41.1, 'psFlux': 1241.0, 'flags': 0}, {'diaSourceId': 281323062375219199, 'ccdVisitId': 111111, 'midPointTai': 1480360995, 'filterName': 'r', 'ra': 351.570546978, 'decl': 0.126243049656, 'ra_decl_Cov': {'raSigma': 0.00028, 'declSigma': 0.0002

In [42]:
type(pdAlerts['value'][0])

str

In [43]:
pdAlertsSeries = pdAlerts['value'].map(literal_eval)

In [44]:
type(pdAlertsSeries)

pandas.core.series.Series

In [45]:
pdAlertsSeries[0:3]

0    {'prv_diaSources': [{'ra_decl_Cov': {'raSigma'...
1    {'prv_diaSources': [{'ra_decl_Cov': {'raSigma'...
2    {'prv_diaSources': [{'ra_decl_Cov': {'raSigma'...
Name: value, dtype: object

In [46]:
type(pdAlertsSeries[0])

dict

In [48]:
import pandas
pdAlertsDf = pandas.DataFrame(list(pdAlertsSeries))

In [49]:
type(pdAlertsDf)

pandas.core.frame.DataFrame

In [50]:
pdAlertsDf.head()

Unnamed: 0,alertId,diaObject,diaObjectL2,diaSource,diaSourcesL2,l1dbId,prv_diaSources,ssObject
0,1231321321,"{'pmRa': 0.00013, 'pmParallaxNdata': 1214, 'pm...","{'pmRa': 0.00013, 'pmParallaxNdata': 1214, 'pm...","{'ra_decl_Cov': {'raSigma': 0.00028, 'ra_decl_...","[{'ra_decl_Cov': {'raSigma': 0.00028, 'ra_decl...",222222222,"[{'ra_decl_Cov': {'raSigma': 0.00028, 'ra_decl...","{'uG2': 231.2313, 'arc': 2.124124, 'uG2Err': 2..."
1,1231321321,"{'pmRa': 0.00013, 'pmParallaxNdata': 1214, 'pm...","{'pmRa': 0.00013, 'pmParallaxNdata': 1214, 'pm...","{'ra_decl_Cov': {'raSigma': 0.00028, 'ra_decl_...","[{'ra_decl_Cov': {'raSigma': 0.00028, 'ra_decl...",222222222,"[{'ra_decl_Cov': {'raSigma': 0.00028, 'ra_decl...","{'uG2': 231.2313, 'arc': 2.124124, 'uG2Err': 2..."
2,1231321321,"{'pmRa': 0.00013, 'pmParallaxNdata': 1214, 'pm...","{'pmRa': 0.00013, 'pmParallaxNdata': 1214, 'pm...","{'ra_decl_Cov': {'raSigma': 0.00028, 'ra_decl_...","[{'ra_decl_Cov': {'raSigma': 0.00028, 'ra_decl...",222222222,"[{'ra_decl_Cov': {'raSigma': 0.00028, 'ra_decl...","{'uG2': 231.2313, 'arc': 2.124124, 'uG2Err': 2..."
3,1231321321,"{'pmRa': 0.00013, 'pmParallaxNdata': 1214, 'pm...","{'pmRa': 0.00013, 'pmParallaxNdata': 1214, 'pm...","{'ra_decl_Cov': {'raSigma': 0.00028, 'ra_decl_...","[{'ra_decl_Cov': {'raSigma': 0.00028, 'ra_decl...",222222222,"[{'ra_decl_Cov': {'raSigma': 0.00028, 'ra_decl...","{'uG2': 231.2313, 'arc': 2.124124, 'uG2Err': 2..."
4,1231321321,"{'pmRa': 0.00013, 'pmParallaxNdata': 1214, 'pm...","{'pmRa': 0.00013, 'pmParallaxNdata': 1214, 'pm...","{'ra_decl_Cov': {'raSigma': 0.00028, 'ra_decl_...","[{'ra_decl_Cov': {'raSigma': 0.00028, 'ra_decl...",222222222,"[{'ra_decl_Cov': {'raSigma': 0.00028, 'ra_decl...","{'uG2': 231.2313, 'arc': 2.124124, 'uG2Err': 2..."


In [51]:
pdAlertsDf['diaSource'][0]

{'ccdVisitId': 111111,
 'decl': 0.126243049656,
 'diaSourceId': 281323062375219200,
 'filterName': 'r',
 'flags': 0,
 'midPointTai': 1480360995,
 'psFlux': 1241.0,
 'ra': 351.570546978,
 'ra_decl_Cov': {'declSigma': 0.00028,
  'raSigma': 0.00028,
  'ra_decl_Cov': 0.00029},
 'snr': 41.1,
 'x': 112.1,
 'x_y_Cov': {'xSigma': 1.2, 'x_y_Cov': 1.2, 'ySigma': 1.1},
 'y': 121.1}

In [52]:
type(pdAlertsDf['diaSource'][0])

dict

In [53]:
type(pdAlertsDf['diaSource'][0]['ra_decl_Cov'])

dict

In [54]:
pdAlertsDf['diaSource'][0]['ra_decl_Cov']

{'declSigma': 0.00028, 'raSigma': 0.00028, 'ra_decl_Cov': 0.00029}

Nested dicts look like they have survived, when creating a pandas dataframe from a list from a spark series.

But pyspark.sql.dataframe creation can infer data structure incorrectly, if the data does not have a schema.

In [55]:
sparkdf = spark.createDataFrame([list(pdAlertsSeries)])

In [56]:
type(sparkdf)

pyspark.sql.dataframe.DataFrame

In [57]:
sparkdf.collect()

[Row(_1={'prv_diaSources': [{'ra_decl_Cov': {'raSigma': 0.00028, 'ra_decl_Cov': 0.00029, 'declSigma': 0.00028}, 'x_y_Cov': {'ySigma': 1.1, 'x_y_Cov': 1.2, 'xSigma': 1.2}, 'diaSourceId': None, 'x': None, 'decl': None, 'snr': None, 'psFlux': None, 'y': None, 'filterName': None, 'flags': None, 'ra': None, 'midPointTai': None, 'ccdVisitId': None}, {'ra_decl_Cov': {'raSigma': 0.00028, 'ra_decl_Cov': 0.00029, 'declSigma': 0.00028}, 'x_y_Cov': {'ySigma': 1.1, 'x_y_Cov': 1.2, 'xSigma': 1.2}, 'diaSourceId': None, 'x': None, 'decl': None, 'snr': None, 'psFlux': None, 'y': None, 'filterName': None, 'flags': None, 'ra': None, 'midPointTai': None, 'ccdVisitId': None}], 'alertId': None, 'diaObject': None, 'l1dbId': None, 'diaSourcesL2': [{'ra_decl_Cov': {'raSigma': 0.00028, 'ra_decl_Cov': 0.00029, 'declSigma': 0.00028}, 'x_y_Cov': {'ySigma': 1.1, 'x_y_Cov': 1.2, 'xSigma': 1.2}, 'diaSourceId': None, 'x': None, 'decl': None, 'snr': None, 'psFlux': None, 'y': None, 'filterName': None, 'flags': None, 'r

## Summary

Using Spark Structured Streaming with a Kafka formatted stream and Kafka stream values of alerts that are unstructured (non-Avro, strings) is possible for filtering, but really a roundabout solution, if you do either of the following:


### Filter using list comprehension
1. Read a structured stream from Kafka
2. Convert "values" to strings.
3. Construct a pyspark.sql.df selecting all of the values.
4. Use rdd.map to do literal_eval on the strings to convert to rdds of dicts.
5. Collect the rdds into a list of dicts.
6. Filter on the list of dicts using list comprehension.

But, issues can unknowingly arise if after step 4 you try and convert to pyspark.sql.dataframes to do filtering (using RDD.toDF() method).


### Filter using pandas
1. Read a structured stream from Kafka
2. Convert "values" to strings.
3. Construct a pyspark.sql.df selecting all of the values.
4. Convert the above pyspark.sql.df toPandas().
5. Column map "values" to do literal_eval on the strings to convert to a pandas series of dicts.
6. Create a pandas dataframe from list(above series) and filter using pandas.

But, again, issues can unknowingly arise if after step 5 you try and create a pyspark.sql.dataframe from the series of dicts to do filtering with pyspark.sql.dataframes.

The value of using Spark Structured Streaming is primarily in the ability to use pyspark.sql on structured data, so for this example, using Spark Structured Streaming isn't particulary useful.
