# Spark Receiver - Consume Instrument Data, perform any necessary aggregations and save to Cassandra
Each instrument has its own Kafka topic. The Spark app name is the same as the topic.

In [1]:
TOPIC = 'dmi'
# TOPIC = 'imu'
# TOPIC = 'gps'

In [2]:
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--conf spark.ui.port=4040 --packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.0.0,com.datastax.spark:spark-cassandra-connector_2.11:2.0.0-M3 pyspark-shell'
import time

In [3]:
from pyspark import SparkContext, SparkConf
from pyspark.sql import SQLContext, Row
import pyspark.sql.functions as f
conf = SparkConf() \
    .setAppName(TOPIC) \
    .setMaster("local[2]") \
    .set("spark.cassandra.connection.host", "127.0.0.1")
sc = SparkContext(conf=conf) 
sqlContext=SQLContext(sc)
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils

In [4]:
import argparse
import csv
import time
import json

from utils import parse_line_into_schema

from fastavro import writer, reader, parse_schema
import avro_schemas

### As a script you would use args and parse them below, Use debug = True in notebook

In [5]:
debug = True

In [6]:
# PARSE ARGS AND SET PATHS, TOPICS
KEYSPACE = 'raw_instrument'
DATA_OUT_DIR = '/data/'
DMI_OUT_FILEPATH = DATA_OUT_DIR + 'dmi.csv'
IMU_OUT_FILEPATH = DATA_OUT_DIR + 'imu.csv'
LIDAR_OUT_FILEPATH = DATA_OUT_DIR + 'lidar.csv'
GPS_OUT_FILEPATH = DATA_OUT_DIR + 'gps.csv'
INSTRUMENTS = ['imu', 'dmi', 'lidar', 'gps']

if debug:
    instrument = TOPIC
else:
    parser = argparse.ArgumentParser()
    parser.add_argument('-i', '--instrument', help='which instrument data to save to disk {0}'.format(INSTRUMENTS))
    args = parser.parse_args()
    instrument = args.instrument.lower()
    
if 'dmi' in instrument:
    filepath = DMI_OUT_FILEPATH
    schema = avro_schemas.dmi
    field_names = [d['name'] for d in schema['fields']]
    TOPIC = 'dmi'
elif 'imu' in instrument:
    filepath = IMU_OUT_FILEPATH
    schema = avro_schemas.imu
    field_names = [d['name'] for d in schema['fields']]
    TOPIC = 'imu'
elif 'lidar' in instrument:
    filepath = LIDAR_OUT_FILEPATH
    schema = avro_schemas.lidar
    field_names = [d['name'] for d in schema['fields']]
    TOPIC = 'lidar'
elif 'gps' in instrument:
    filepath = GPS_OUT_FILEPATH
    schema = avro_schemas.gps
    field_names = [d['name'] for d in schema['fields']]
    TOPIC = 'gps'
else:
    print('ERROR: ARGUMENT {0} NOT IN INSTRUMENTS {1}'.format(args.instrument, INSTRUMENTS))
    assert False

parsed_schema = parse_schema(schema)


### The schema we expect to read given the topic

In [7]:
parse_schema(schema)

{'__fastavro_parsed': True,
 'fields': [{'name': 'coordinate_id', 'type': 'int'},
  {'name': 'record_no', 'type': 'int'},
  {'name': 'left_total', 'type': 'int'},
  {'name': 'right_total', 'type': 'int'},
  {'name': 'left', 'type': 'int'},
  {'name': 'right', 'type': 'int'}],
 'name': 'dmi.avro.sidewalk_rig',
 'type': 'record'}

## SaveToCassandra
Save each aggregated row or rows to Cassandra. The topicname is the table name

In [8]:
def saveToCassandra(rows):
    if not rows.isEmpty(): 
        sqlContext.createDataFrame(rows).write\
        .format("org.apache.spark.sql.cassandra")\
        .mode('append')\
        .options(table=TOPIC, keyspace=KEYSPACE)\
        .save()

## Define Streaming Task

In [9]:
ssc = StreamingContext(sc, 5)
kvs = KafkaUtils.createStream(ssc, "127.0.0.1:2181", "spark-streaming-consumer", {TOPIC: 1})
data = kvs.map(lambda x: json.loads(x[1]))
rows = data.map(lambda x: x)
rows.foreachRDD(saveToCassandra)
rows.pprint()

## Start streaming

In [10]:
ssc.start()

-------------------------------------------
Time: 2019-05-01 13:35:05
-------------------------------------------





-------------------------------------------
Time: 2019-05-01 13:35:10
-------------------------------------------
{u'right': 150, u'record_no': 314, u'left_total': 42197, u'right_total': 42328, u'coordinate_id': 1556717695294473984, u'left': 142}
{u'right': 150, u'record_no': 315, u'left_total': 42344, u'right_total': 42478, u'coordinate_id': 1556717697295793920, u'left': 147}
{u'right': 149, u'record_no': 316, u'left_total': 42494, u'right_total': 42627, u'coordinate_id': 1556717699298831872, u'left': 150}
{u'right': 143, u'record_no': 317, u'left_total': 42644, u'right_total': 42770, u'coordinate_id': 1556717701300987904, u'left': 150}
{u'right': 150, u'record_no': 318, u'left_total': 42787, u'right_total': 42920, u'coordinate_id': 1556717703303450112, u'left': 143}
{u'right': 149, u'record_no': 319, u'left_total': 42937, u'right_total': 43069, u'coordinate_id': 1556717705306098944, u'left': 150}
{u'right': 145, u'record_no': 320, u'left_total': 43087, u'right_total': 43214, u'coordi

## Stop streaming

In [11]:
ssc.stop(stopSparkContext=False,stopGraceFully=True)

-------------------------------------------
Time: 2019-05-01 13:35:35
-------------------------------------------
{u'right': 144, u'record_no': 332, u'left_total': 44849, u'right_total': 44978, u'coordinate_id': 1556717731343833856, u'left': 150}
{u'right': 145, u'record_no': 333, u'left_total': 44999, u'right_total': 45123, u'coordinate_id': 1556717733346854912, u'left': 150}

-------------------------------------------
Time: 2019-05-01 13:35:40
-------------------------------------------

-------------------------------------------
Time: 2019-05-01 13:35:45
-------------------------------------------



## Get Cassandra table content

In [12]:
TOPIC

'dmi'

In [None]:
from cassandra.cluster import Cluster

cluster = Cluster()
session = cluster.connect(KEYSPACE)

In [None]:
rows = session.execute('SELECT * FROM {0} LIMIT 20'.format(TOPIC))
for user_row in rows:
    print map(lambda k: (k, getattr(user_row, k)), keys)

In [14]:
data.registerTempTable('tmp');
data.printSchema()
data = sqlContext.sql('select * from tmp')
data.show()

root
 |-- coordinate_id: long (nullable = true)
 |-- left: integer (nullable = true)
 |-- left_total: integer (nullable = true)
 |-- record_no: integer (nullable = true)
 |-- right: integer (nullable = true)
 |-- right_total: integer (nullable = true)

+-------------------+----+----------+---------+-----+-----------+
|      coordinate_id|left|left_total|record_no|right|right_total|
+-------------------+----+----------+---------+-----+-----------+
|1556717709312016896| 150|     43237|      321|  142|      43356|
|1556716972354115840| 150|      1455|       10|  139|       1431|
|1556717723328738048| 150|     44257|      328|  147|      44394|
|1556717006398350080| 150|      3971|       27|   55|       3806|
|1556716988370327040| 141|      2625|       18|  150|       2611|
|1556716970351784960| 146|      1305|        9|  150|       1292|
|1556717705306098944| 150|     42937|      319|  149|      43069|
|1556716980362796032| 148|      2042|       14|  150|       2028|
|1556717721326024960|

In [16]:
# Record count
data.count()

44