## Spark Streaming

In [None]:
import sys, os
sys.path.append('/home/mario/pydata2017/spark-2.2.0-bin-hadoop2.7/python/lib/pyspark.zip')
sys.path.append('/home/mario/pydata2017/spark-2.2.0-bin-hadoop2.7/python/lib/py4j-0.10.4-src.zip')
os.environ['SPARK_HOME'] = '/home/mario/pydata2017/spark-2.2.0-bin-hadoop2.7'
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.2.0 pyspark-shell'
os.environ['PYSPARK_PYTHON'] = '/usr/bin/python3.6'

from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.streaming.kafka import KafkaUtils
from pyspark.streaming import StreamingContext
import json

json_decoder = lambda s: json.loads(s.decode('ascii'))
spark = SparkSession.builder.getOrCreate()

Loading points db (csv with road conditions and nearest weather station code)

In [None]:
points_db = spark.read.csv('road_points.csv', header=True)
points_dict = spark.sparkContext.broadcast(points_db.rdd.map(lambda r: (r['point_id'], r)).collectAsMap())

Streaming context initialization

In [None]:
client_configuration = {'metadata.broker.list': 'localhost:9092'}
car_events_topic = 'car'
weather_topic = 'weather'
checkpoint_dir = '/tmp/pydata2017-spark'
short_window = 1 # minutes
long_window = 10 # minutes

In [None]:
ssc = StreamingContext(spark.sparkContext, short_window * 60)
ssc.checkpoint(checkpoint_dir)
inductive_loop_events = KafkaUtils.createDirectStream(ssc, [car_events_topic], client_configuration, valueDecoder=json_decoder)
weather_information = KafkaUtils.createDirectStream(ssc, [weather_topic], client_configuration, valueDecoder=json_decoder)

Windowing (1-minute and 10-minutes) and calculating:
 - number of cars
 - average speed
 - minimium cars gap (in meters)
 - average gap

In [None]:
map_record = lambda r: (r[0], (1, r[1]['speed'], r[1]['gap_meters'], r[1]['gap_meters']))
get_stats = lambda a, b: (a[0] + b[0], a[1] + b[1], a[2] + b[2], __builtin__.min(a[3], b[3]))
map_stats = lambda r, mode: (r[0], {
    '%s_avg_speed' % mode: r[1][1] / r[1][0],
    '%s_avg_gap' % mode: r[1][2] / r[1][0],
    '%s_min_gap' % mode: r[1][3],
    '%s_num_cars' % mode: r[1][0]
})

get_window = lambda length: inductive_loop_events.window(length * 60, 60).map(map_record).reduceByKey(get_stats) \
    .map(lambda r: map_stats(r, str(length)))

cars_stats = get_window(long_window).join(get_window(short_window)).map(lambda r: (r[0], {**r[1][0], **r[1][1]}))

Joining loops stats with points db (off-line, previously loaded)

In [None]:
def map_with_points_data(point):
    point[1]['point_data'] = points_dict.value[point[0]]
    return (point[1]['point_data'].nearest_weather_station, point[1])

cars_stats_with_point_data = cars_stats.map(map_with_points_data)

Generating weather stateful RDD

In [None]:
def update_function(new_values, last_state):
    if len(new_values) == 0: # no weather info in this run
        return last_state
    return new_values[0]

weather_state = weather_information.updateStateByKey(update_function)

Joining current point features with weather and mapping to final model

In [None]:
def extract_model_data(point):
    return {
        'id': point[1][0]['point_data'].point_id,
        'road_shape': point[1][0]['point_data'].road_shape,
        'allowed_speed': point[1][0]['point_data'].allowed_speed,
        'one_minute_avg_speed': point[1][0]['1_avg_speed'],
        'one_minute_num_cars': point[1][0]['1_num_cars'],
        'one_minute_min_gap': point[1][0]['1_min_gap'],
        'one_minute_avg_gap': point[1][0]['1_avg_gap'],
        'ten_minutes_avg_speed': point[1][0]['10_avg_speed'],
        'ten_minutes_num_cars': point[1][0]['10_num_cars'],
        'ten_minutes_min_gap': point[1][0]['10_min_gap'],
        'ten_minutes_avg_gap': point[1][0]['10_avg_gap'],
        'temperature': point[1][1]['temp'],
        'snow': point[1][1]['snow'],
        'rain': point[1][1]['rain'],
        'ts': point[1][1]['timestamp']
    }

current_point_status = cars_stats_with_point_data.join(weather_state).map(extract_model_data)

Applying ML model

In [None]:
def apply_model(data):
    import random
    return (data['id'], data['ten_minutes_num_cars']) # here goes ML model
current_point_status.map(apply_model).pprint()

Streaming start!

In [None]:
ssc.start()

In [None]:
# ssc.stop(stopSparkContext=False)