# Homework 4

## More trains

For this homework, you will be working with the real-time stream of the NS, the train company of the Netherlands. You can see an example webpage that uses this same stream to display the train information on a map: https://spoorkaart.mwnn.nl/ . 

To help you and avoid having too many connections to the NS streaming servers, we have setup a service that collects the streams and pushes them to our Kafka instance. The related topics are: 

`ndovloketnl-arrivals` : For each arrival of a train in a station, describe the previous and next station, time of arrival (planned and actual), track number,...

`ndovloketnl-departures`: For each departure of a train from a station, describe the previous and next station, time of departure (planned and actual), track number,...

`ndovloketnl-gps`: For each train, describe the current location, speed, bearing.

The events are serialized in json (actually converted from xml), with properties in their original language. Google translate could help you understand all of them, but we will provide you some useful mapping.

## Start a Spark Session

In [1]:
import pyspark
from pyspark.sql import SparkSession
import os
import getpass
from pyspark.streaming import StreamingContext
from  pyspark.streaming.kafka import KafkaUtils, OffsetRange

username = getpass.getuser()

# Use this when running on your computer
spark = (SparkSession
         .builder
         .appName('streaming-{0}'.format(username))
         .master('local[4]') # this number must be greater than the number of sources
         .config('spark.executor.memory', '1g')
         .config('spark.jars.packages', 'org.apache.spark:spark-streaming-kafka-0-8_2.11:2.2.0')
         .getOrCreate())

# Use this when running on the cluster
#os.environ['PYSPARK_PYTHON'] = '/opt/anaconda3/bin/python'
#spark = (SparkSession
#          .builder
#          .appName('streaming-{0}'.format(username))
#          .master('yarn')
#          .config('spark.executor.memory', '1g')
#          .config('spark.executor.instances', '2')
#          .config('spark.executor.cores', '2')
#          .config('spark.jars.packages', 'org.apache.spark:spark-streaming-kafka-0-8_2.11:2.2.0')
#          .config('spark.port.maxRetries', '100')
#          .getOrCreate())

sc = spark.sparkContext
conf = sc.getConf()

spark

## Start a Kafka Client

In [2]:
from pykafka import KafkaClient

ZOOKEEPER_QUORUM = 'iccluster045.iccluster.epfl.ch:2181,iccluster047.iccluster.epfl.ch:2181,iccluster058.iccluster.epfl.ch:2181'

client = KafkaClient(zookeeper_hosts=ZOOKEEPER_QUORUM)

In [3]:
len(client.topics)

283

# Preliminary - Data Preparation

## Static RDDs from Kafka

Working on data streams is often times more complex compared to using static datasets, so we will first look at how to create static RDDs for easy prototyping.

You can find below a function that creates a static RDD from a Kafka topic.

In [4]:
def create_static_rdd_from_kafka(topic, from_offset, to_offset):
    if isinstance(topic, bytes):
        topic = topic.decode('utf-8')
    return KafkaUtils.createRDD(sc, {'bootstrap.servers': 'iccluster045.iccluster.epfl.ch:6667'}, [OffsetRange(topic, 0, from_offset, to_offset)])

To check this function, we need to retrieve valid offsets from Kafka.

In [5]:
topic = client.topics[b'ndovloketnl-arrivals']
topic.earliest_available_offsets()

{0: OffsetPartitionResponse(offset=[408075], err=0)}

Now, we can for example retrieve the first 1000 messages from the topic `ndovloketnl-arrivals`.

In [6]:
offset = topic.earliest_available_offsets()[0].offset[0]
rdd = create_static_rdd_from_kafka(b'ndovloketnl-arrivals', offset, offset+1000)

In [7]:
rdd.first()

(None,
 '{"ns1:PutReisInformatieBoodschapIn": {"@xmlns:ns1": "urn:ndov:cdm:trein:reisinformatie:messages:dynamischeaankomststaat:1", "@xmlns:ns2": "urn:ndov:cdm:trein:reisinformatie:data:4", "ns2:ReisInformatieProductDAS": {"@TimeStamp": "2018-04-28T21:07:58.090Z", "@Versie": "6.1", "ns2:RIPAdministratie": {"ns2:ReisInformatieProductID": "1804282307582200002", "ns2:AbonnementId": "55", "ns2:ReisInformatieTijdstip": "2018-04-28T21:07:00.000Z"}, "ns2:DynamischeAankomstStaat": {"ns2:RitId": "7079", "ns2:RitDatum": "2018-04-28", "ns2:RitStation": {"ns2:StationCode": "AML", "ns2:Type": "5", "ns2:KorteNaam": "Almelo", "ns2:MiddelNaam": "Almelo", "ns2:LangeNaam": "Almelo", "ns2:UICCode": "8400051"}, "ns2:TreinAankomst": {"ns2:TreinNummer": "7079", "ns2:TreinSoort": {"@Code": "SPR", "#text": "Sprinter"}, "ns2:TreinStatus": "0", "ns2:Vervoerder": "NS", "ns2:TreinHerkomst": [{"@InfoStatus": "Gepland", "ns2:StationCode": "APD", "ns2:Type": "5", "ns2:KorteNaam": "Apeldoorn", "ns2:MiddelNaam": "Ape

In [8]:
rdd.count()

1000

We encourage you to use these functions to help prototype your code.

## Fetching messages at specific offsets from Kafka

You can find below a function to read a message at a specific offset from a Kafka topic.

In [9]:
def fetch_message_at(topic, offset):
    if isinstance(topic, str):
        topic = topic.encode('utf-8')
    t = client.topics[topic]
    consumer = t.get_simple_consumer()
    p = list(consumer.partitions.values())[0]
    consumer.reset_offsets([(p,int(offset)-1)], )
    return consumer.consume()

In [10]:
msg = fetch_message_at(b'ndovloketnl-arrivals', 408076)

Offset reset for partition 0 to timestamp 408075 failed. Setting partition 0's internal counter to 408075


In [11]:
msg.offset

408076

In [12]:
msg.value

b'{"ns1:PutReisInformatieBoodschapIn": {"@xmlns:ns1": "urn:ndov:cdm:trein:reisinformatie:messages:dynamischeaankomststaat:1", "@xmlns:ns2": "urn:ndov:cdm:trein:reisinformatie:data:4", "ns2:ReisInformatieProductDAS": {"@TimeStamp": "2018-04-28T21:08:03.600Z", "@Versie": "6.1", "ns2:RIPAdministratie": {"ns2:ReisInformatieProductID": "1804282308032400018", "ns2:AbonnementId": "55", "ns2:ReisInformatieTijdstip": "2018-04-28T21:12:00.000Z"}, "ns2:DynamischeAankomstStaat": {"ns2:RitId": "4383", "ns2:RitDatum": "2018-04-28", "ns2:RitStation": {"ns2:StationCode": "ALM", "ns2:Type": "5", "ns2:KorteNaam": "Almere C", "ns2:MiddelNaam": "Almere C.", "ns2:LangeNaam": "Almere Centrum", "ns2:UICCode": "8400080"}, "ns2:TreinAankomst": {"ns2:TreinNummer": "4383", "ns2:TreinSoort": {"@Code": "SPR", "#text": "Sprinter"}, "ns2:TreinStatus": "0", "ns2:Vervoerder": "NS", "ns2:TreinHerkomst": [{"@InfoStatus": "Gepland", "ns2:StationCode": "HFD", "ns2:Type": "0", "ns2:KorteNaam": "Hoofddorp", "ns2:MiddelNaam"

## Streams from Kafka

In [13]:
# Define the checkpoint folder

# Use this if working on your computer
checkpoint = '/tmp/checkpoint'

# Use this if working on the cluster
#checkpoint = 'hdfs:///user/{}/checkpoint/'.format(username)

In [14]:
# Create a StreamingContext with two working thread and batch interval of 10 seconds.
# Each time you stop a StreamingContext, you will need to recreate it.
ssc = StreamingContext(sc, 10)
ssc.checkpoint(checkpoint)

group_id = 'ns-{0}'.format(username)

# Input streams
arrival_stream = KafkaUtils.createStream(ssc, ZOOKEEPER_QUORUM, group_id, {'ndovloketnl-arrivals': 1})
departure_stream = KafkaUtils.createStream(ssc, ZOOKEEPER_QUORUM, group_id, {'ndovloketnl-departures': 1})
gps_stream = KafkaUtils.createStream(ssc, ZOOKEEPER_QUORUM, group_id, {'ndovloketnl-gps': 1})

For now, let's just print the content of the streams

In [16]:
arrival_stream.pprint()
departure_stream.pprint()
gps_stream.pprint()

ssc.start()

-------------------------------------------
Time: 2018-05-18 17:00:40
-------------------------------------------

-------------------------------------------
Time: 2018-05-18 17:00:40
-------------------------------------------

-------------------------------------------
Time: 2018-05-18 17:00:40
-------------------------------------------



In [17]:
ssc.stop(stopSparkContext=False, stopGraceFully=True)

-------------------------------------------
Time: 2018-05-18 17:00:50
-------------------------------------------

-------------------------------------------
Time: 2018-05-18 17:00:50
-------------------------------------------

-------------------------------------------
Time: 2018-05-18 17:00:50
-------------------------------------------

-------------------------------------------
Time: 2018-05-18 17:01:00
-------------------------------------------

-------------------------------------------
Time: 2018-05-18 17:01:00
-------------------------------------------

-------------------------------------------
Time: 2018-05-18 17:01:00
-------------------------------------------



You will need to adjust the batch interval (10 seconds here) in accordance with the processing times. Use the spark UI to check if batches are not accumulating.

# Part I - Live plot (20 points / 60)

The goal of this part is to obtain an interactive plot use the train positions from the GPS stream. We encourage you to use the examples from last week to achieve the expected result.

First, let's write a function to decode the messages from the `ndovloketnl-gps` topic.

In [15]:
import json
from pykafka.common import OffsetType

example_gps = client.topics[b'ndovloketnl-gps'].get_simple_consumer(
    auto_offset_reset=OffsetType.EARLIEST,
    reset_offset_on_start=True
).consume()
json.loads(example_gps.value)

{'tns:ArrayOfTreinLocation': {'@xmlns:tns': 'http://schemas.datacontract.org/2004/07/Cognos.Infrastructure.Models',
  'tns:TreinLocation': [{'tns:TreinMaterieelDelen': [{'tns:AantalSatelieten': '11',
      'tns:Berichttype': None,
      'tns:Bron': 'NTT',
      'tns:Elevation': '0.0',
      'tns:Fix': '1',
      'tns:GpsDatumTijd': '2018-04-28T18:27:26Z',
      'tns:Hdop': '0.9',
      'tns:Latitude': '51.5608216667',
      'tns:Longitude': '5.0829',
      'tns:MaterieelDeelNummer': '4240',
      'tns:Materieelvolgnummer': '1',
      'tns:Orientatie': '0',
      'tns:Richting': '150.96',
      'tns:Snelheid': '0'},
     {'tns:AantalSatelieten': '11',
      'tns:Berichttype': None,
      'tns:Bron': 'NTT',
      'tns:Elevation': '0.0',
      'tns:Fix': '1',
      'tns:GpsDatumTijd': '2018-04-28T18:27:27Z',
      'tns:Hdop': '0.9',
      'tns:Latitude': '51.56064',
      'tns:Longitude': '5.08481666667',
      'tns:MaterieelDeelNummer': '4032',
      'tns:Materieelvolgnummer': '2',
     

In [12]:
m = client.topics[b'ndovloketnl-gps'].get_simple_consumer(
    auto_offset_reset=OffsetType.EARLIEST,
    reset_offset_on_start=True
)

In [20]:
m.consume().offset

15786

We can see that the message has the following structure:

```
{
  'tns:ArrayOfTreinLocation': {
    'tns:TreinLocation': [
      <train_info_1>,
      <train_info_2>,
      ...
    ]
  }
}
```

With the `<train_info_x>` messages containing:
- `tns:TreinNummer`: the train number. This number is used in passenger information displays.
- `tns:MaterieelDeelNummer`: the train car number. It identifies the physical train car.
- `tns:Materieelvolgnummer`: the car position. 1 is the car in front of the train, 2 the next one, etc.
- `tns:GpsDatumTijd`: the datetime given by the GPS.
- `tns:Latitude`, `tns:Longitude`, `tns:Elevation`: 3D coordinates given by the GPS.
- `tns:Snelheid`: speed, most likely given by the GPS.
- `tns:Richting`: heading, most likely given by the GPS.
- `tns:AantalSatelieten`: number of GPS satellites in view.

We also notice that when a train is composed of multiple cars, the position is given in an array, with the position of all individual cars.

**Question I.a. (5/20)** Write a function which extracts the train number, train car and GPS data from the source messages. Using this function, you should be able to obtain the example table, or something similar.

In [16]:
import numpy as np
import pandas as pd

In [17]:
def extract_gps_data(msg):
    trains = msg['tns:ArrayOfTreinLocation']['tns:TreinLocation']
    data = []
    for train in trains:
        train_nb = int(train['tns:TreinNummer'])
        cars = train['tns:TreinMaterieelDelen']
        if (type(cars) is not list):
            cars = [cars]
        for car in cars:
            car_nb = car['tns:MaterieelDeelNummer']
            car_pos = car['tns:Materieelvolgnummer']
            tms = car['tns:GpsDatumTijd']
            lat = car['tns:Latitude']
            long = car['tns:Longitude']
            elev = car['tns:Elevation']
            speed = car['tns:Snelheid']
            heading = car['tns:Richting']
            num_gps = car['tns:AantalSatelieten']
            data.append([tms, train_nb, car_nb, car_pos, lat, long, elev, heading, speed])
    return data

In [18]:
def prettify(json):
    df = pd.DataFrame(
    data=extract_gps_data(json),
    columns=['timestamp', 'train_number', 'car_number', 'car_position', 'latitude', 'longitude', 'elevation', 'heading', 'speed'])
    df.timestamp = pd.to_datetime(df.timestamp, unit='ns')
    return df

In [20]:
import numpy as np
import pandas as pd

prettify(json.loads(example_gps.value)).head(n=20)

Unnamed: 0,timestamp,train_number,car_number,car_position,latitude,longitude,elevation,heading,speed
0,2018-04-28 18:27:26,3671,4240,1,51.5608216667,5.0829,0.0,150.96,0.0
1,2018-04-28 18:27:27,3671,4032,2,51.56064,5.08481666667,0.0,36.95,0.0
2,2018-04-28 18:27:20,4873,2961,1,52.678516391842,4.8996640175703,0.0,90.9,50.4
3,2018-04-28 18:27:19,3674,4064,1,51.5958383333,4.79456333333,0.0,86.01,38.0
4,2018-04-28 18:27:28,3674,4244,2,51.5958383333,4.79422,0.0,86.23,51.0
5,2018-04-28 18:27:19,3670,4060,1,51.87381,5.85875833333,0.0,357.03,134.0
6,2018-04-28 18:27:20,3670,4241,2,51.87345,5.85880333333,0.0,357.01,135.0
7,2018-04-28 18:27:21,14675,2410,1,52.362051626752,4.9310855568925,0.0,172.6,43.2
8,2018-04-28 18:27:20,3072,9402,1,51.9852116667,5.90010166667,0.0,114.24,0.0
9,2018-04-28 18:27:27,3170,9576,1,52.1068345,5.08332333333,0.0,313.22,112.0


**Question I.b (15/20)** Make a live plot of the train positions.

You can do so by using bokeh; use last week's lab as an example.
See also: https://bokeh.pydata.org/en/latest/docs/user_guide/geo.html

To use the maps in bokeh, you will likely need to convert GPS coordinates to web mercator:
https://gis.stackexchange.com/questions/247871/convert-gps-coordinates-to-web-mercator-epsg3857-using-python-pyproj

You can compare your plot to one of the live services: https://spoorkaart.mwnn.nl/, http://treinenradar.nl/

**Care should be taken for the following points.**
- We expect the train positions to fall on rail tracks on the map.
- Trains on the map should not appear/disappear when data is absent for a few messages.
- Provide interactive label with the train number (we do not expect train type, as this needs to be recovered from other sources).
- If possible, find a way to show where the train is heading.

In [37]:
def update(df, update):
    for i in range(len(update)):
        entry = update.loc[i]
        
        # Test if data is missing
        try:
            long_ = float(entry.latitude)
            lat_ = float(entry.longitude)
        except: 
            pass
        tr_nb = entry.train_number
        car_nb = entry.car_number
        car_pos = entry.car_position
        
        query = df[(df.train_number == tr_nb) & (df.car_number == car_nb) & (df.car_position == car_pos)]    
        if len(query) == 1: # if this entry already exists in our database, update it
            idx = query.index[0]
            df.drop(idx, inplace=True)
    
        df = df.append(entry)
        df.reset_index(drop=True, inplace=True)
        df.latitude = df.latitude.map(lambda x: float(x))
        df.longitude = df.longitude.map(lambda x: float(x))
    return df   

In [38]:
cols = ['timestamp', 'train_number', 'car_number', 'car_position', 'latitude', 'longitude', 'elevation', 'heading', 'speed']

In [44]:
from pyproj import Proj, transform
def to_mercator(long, lat):
    c1, c2 = transform(Proj(init='epsg:4326'), Proj(init='epsg:3857'), long, lat)
    return c1, c2

In [40]:
consumer =  client.topics[b'ndovloketnl-gps'].get_simple_consumer(auto_offset_reset=OffsetType.LATEST,reset_offset_on_start=True)

In [48]:
import time
from bokeh.io import push_notebook, output_notebook, show
from bokeh.plotting import figure, show
from bokeh.models import ColumnDataSource, LabelSet
from bokeh.tile_providers import CARTODBPOSITRON

In [49]:
cols=['timestamp', 'train_number', 'car_number', 'car_position', 'latitude', 'longitude', 'elevation', 'heading', 'speed']        
output_notebook()
    
initial_state = pd.DataFrame(data=extract_gps_data(json.loads(consumer.consume().value)), columns= cols)

# Define the data
x = []
y = []
labels_ = []

# Define the source
source = ColumnDataSource(data=dict(x=x, y=y, labels=labels_))

# Define the figure
p = figure(x_range=(3.6e5, 8.4e5), y_range=(6.56e6, 7.1e6))
p.add_tile(CARTODBPOSITRON)

# Define the circles
p.circle(x='x', y= 'y', size=15, source=source, color='yellow', fill_color="red", alpha=.5)

# Set the LabelSet
labels_set = LabelSet(x='x', y='y', text='labels', x_offset=-5, y_offset=-5, level='glyph',\
                      text_font_size='6pt',source=source, render_mode='canvas')

# Add labels
p.add_layout(labels_set)

# Show the notebook
handle=show(p, notebook_handle=True)

try:
    for message in consumer:
        if message is not None:
            data_ = extract_gps_data(json.loads(message.value))
            state = pd.DataFrame(data=data_, columns= cols)
            df = update(initial_state, state)
            df = df[df.car_position == '1']
            x = list(df.longitude.values)
            y = list(df.latitude.values)
            labels_ = list(df.train_number.values)
            x_, y_ = to_mercator(x, y)
            source.data = dict(x=x_, y=y_, labels=labels_)
            push_notebook(handle=handle)
            time.sleep(3)
except KeyboardInterrupt:
    pass

**N.B: It takes up to 10 seconds for the points to show on the graph, after that their positions update smoothly.**

# Part II - Live stopping time (20 points / 60)

In this part, we will have a look at the two other streams, namely `ndovloketnl-arrivals` and `ndovloketnl-departures`. Each time a train arrives at or leaves a station, a message is generated. Let's have a look at the content.

In [25]:
import json
from pykafka.common import OffsetType

example_arrivals = client.topics[b'ndovloketnl-arrivals'].get_simple_consumer(
    auto_offset_reset=OffsetType.EARLIEST,
    reset_offset_on_start=True
).consume()
json.loads(example_arrivals.value)

{'ns1:PutReisInformatieBoodschapIn': {'@xmlns:ns1': 'urn:ndov:cdm:trein:reisinformatie:messages:dynamischeaankomststaat:1',
  '@xmlns:ns2': 'urn:ndov:cdm:trein:reisinformatie:data:4',
  'ns2:ReisInformatieProductDAS': {'@TimeStamp': '2018-04-28T21:07:58.090Z',
   '@Versie': '6.1',
   'ns2:DynamischeAankomstStaat': {'ns2:RitDatum': '2018-04-28',
    'ns2:RitId': '7079',
    'ns2:RitStation': {'ns2:KorteNaam': 'Almelo',
     'ns2:LangeNaam': 'Almelo',
     'ns2:MiddelNaam': 'Almelo',
     'ns2:StationCode': 'AML',
     'ns2:Type': '5',
     'ns2:UICCode': '8400051'},
    'ns2:TreinAankomst': {'ns2:AankomstTijd': [{'#text': '2018-04-28T21:07:00.000Z',
       '@InfoStatus': 'Gepland'},
      {'#text': '2018-04-28T21:10:12.000Z', '@InfoStatus': 'Actueel'}],
     'ns2:ExacteAankomstVertraging': 'PT3M12S',
     'ns2:GedempteAankomstVertraging': 'PT5M',
     'ns2:PresentatieAankomstVertraging': {'ns2:Uitingen': {'ns2:Uiting': '+3 min.'}},
     'ns2:PresentatieTreinAankomstSpoor': {'ns2:Uitinge

In [26]:
example_departures = client.topics[b'ndovloketnl-departures'].get_simple_consumer(
    auto_offset_reset=OffsetType.EARLIEST,
    reset_offset_on_start=True
).consume()
json.loads(example_departures.value)

{'ns1:PutReisInformatieBoodschapIn': {'@xmlns:ns1': 'urn:ndov:cdm:trein:reisinformatie:messages:5',
  '@xmlns:ns2': 'urn:ndov:cdm:trein:reisinformatie:data:4',
  'ns2:ReisInformatieProductDVS': {'@TimeStamp': '2018-04-29T12:06:35.345Z',
   '@Versie': '6.2',
   'ns2:DynamischeVertrekStaat': {'ns2:RitDatum': '2018-04-29',
    'ns2:RitId': '5144',
    'ns2:RitStation': {'ns2:KorteNaam': 'Rotterdam',
     'ns2:LangeNaam': 'Rotterdam Centraal',
     'ns2:MiddelNaam': 'Rotterdam C.',
     'ns2:StationCode': 'RTD',
     'ns2:Type': '6',
     'ns2:UICCode': '8400530'},
    'ns2:Trein': {'ns2:AchterBlijvenAchtersteTreinDeel': 'N',
     'ns2:AfstandPerronEindKopVertrekTrein': '0',
     'ns2:ExacteVertrekVertraging': 'PT0S',
     'ns2:GedempteVertrekVertraging': 'PT0S',
     'ns2:NietInstappen': 'N',
     'ns2:PresentatieTreinEindBestemming': {'ns2:Uitingen': {'ns2:Uiting': 'Den Haag Centraal'}},
     'ns2:PresentatieTreinVertrekSpoor': {'ns2:Uitingen': {'ns2:Uiting': '9'}},
     'ns2:Presentatie

We can see that the messages have the following structure:

```
{
  'ns1:PutReisInformatieBoodschapIn': {
    'ns2:ReisInformatieProductDVS' or 'ns2:ReisInformatieProductDAS': {
      'ns2:DynamischeVertrekStaat' or 'ns2:DynamischeAankomstStaat': {
          'ns2:RitStation': <station_info>,
          'ns2:Trein' or 'ns2:TreinAankomst': {
              'ns2:VertrekTijd' or 'ns2:AankomstTijd': [<planned_and_actual_times>],
              'ns2:TreinNummer': <train_number>,
              'ns2:TreinSoort': <kind_of_train>,
              ...
          }
           
      }
    }
  }
}
```

We can see also that the train stations have a long name, a medium name, a short name, a three letters code and a kind of nummerical ID. When giving information about times, tracks, direction,... you will find sometimes the information twice with the status `Gepland` (which means planned, according to the schedule) and `Actueel`(which means the actual measured value). 

**Question II.a. (5/20)** We want to compute the time a train stays at a station and get a real-time histogram for a given time window. First, write the parsing functions that will allow you to get the station ID and a name, the train ID, and the actual arrival and departure times.

In [27]:
def parse_train(json):
    body = json['ns1:PutReisInformatieBoodschapIn']
    body = body.get('ns2:ReisInformatieProductDAS') or body.get('ns2:ReisInformatieProductDVS')
    body = body.get('ns2:DynamischeAankomstStaat') or body.get('ns2:DynamischeVertrekStaat')
    station_name = body['ns2:RitStation']['ns2:LangeNaam']
    station_id = body['ns2:RitStation']['ns2:UICCode']
    body = body.get('ns2:TreinAankomst') or body.get('ns2:Trein')
    time = body.get('ns2:AankomstTijd') or body.get('ns2:VertrekTijd')
    planned_time = time[0]['#text']
    actual_time = time[1]['#text']
    train_id = body['ns2:TreinNummer']
    return train_id, station_id, station_name, actual_time

In [28]:
parse_train(json.loads(example_arrivals.value))

('7079', '8400051', 'Almelo', '2018-04-28T21:10:12.000Z')

In [29]:
parse_train(json.loads(example_departures.value))

('5144', '8400530', 'Rotterdam Centraal', '2018-04-29T12:06:00.000Z')

**Question II.b. (5/20)** Create two Spark streams from the arrivals and departures where the records are in the form (K, V) where K is a key that you will be using to join the two streams (think about what you will need to match) and V, the value containing the all extracted information.

In [26]:
sc_ = StreamingContext(sc, 10)
sc_.checkpoint(checkpoint)

In [36]:
arrival_stream = KafkaUtils.createStream(sc_, ZOOKEEPER_QUORUM, group_id, {'ndovloketnl-arrivals': 1})
departure_stream = KafkaUtils.createStream(sc_, ZOOKEEPER_QUORUM, group_id, {'ndovloketnl-departures': 1})

In [37]:
# Map the records of each stream to (K, V) pairs
# K = (train_id, station_id), V = actual departure/arrival time
my_arrival_stream = arrival_stream\
    .map(lambda x: (parse_train(json.loads(x[1]))[:2], parse_train(json.loads(x[1]))[3]))
my_departure_stream = departure_stream\
    .map(lambda x: (parse_train(json.loads(x[1]))[:2], parse_train(json.loads(x[1]))[3]))

In [40]:
my_arrival_stream.pprint()
my_departure_stream.pprint()
sc_.start()

-------------------------------------------
Time: 2018-05-18 09:50:50
-------------------------------------------
(('32121', '8400424'), '2018-05-18T08:08:00.000Z')
(('37838', '8400430'), '2018-05-18T08:08:00.000Z')
(('1730', '8400055'), '2018-05-18T08:08:00.000Z')
(('4724', '8400363'), '2018-05-18T08:08:00.000Z')
(('7629', '8400519'), '2018-05-18T08:08:00.000Z')
(('36826', '8400565'), '2018-05-18T08:08:00.000Z')
(('37632', '8400697'), '2018-05-18T06:56:00.000Z')
(('2829', '8400621'), '2018-05-18T06:59:34.000Z')
(('7925', '8400124'), '2018-05-18T08:08:00.000Z')
(('2235', '8400553'), '2018-05-18T07:47:00.000Z')
...

-------------------------------------------
Time: 2018-05-18 09:50:50
-------------------------------------------
(('6087', '8400606'), '2018-05-17T21:47:11.000Z')
(('6087', '8400621'), '2018-05-17T21:44:25.000Z')
(('14683', '8400360'), '2018-05-17T21:45:02.000Z')
(('6087', '8400340'), '2018-05-17T21:52:43.000Z')
(('8094', '8400454'), '2018-05-17T21:46:00.000Z')
(('7988', '8

In [41]:
sc_.stop(stopSparkContext=False, stopGraceFully=True)

-------------------------------------------
Time: 2018-05-18 09:51:00
-------------------------------------------

-------------------------------------------
Time: 2018-05-18 09:51:00
-------------------------------------------

-------------------------------------------
Time: 2018-05-18 09:51:10
-------------------------------------------

-------------------------------------------
Time: 2018-05-18 09:51:10
-------------------------------------------

-------------------------------------------
Time: 2018-05-18 09:51:20
-------------------------------------------

-------------------------------------------
Time: 2018-05-18 09:51:20
-------------------------------------------



**Question II.c. (5/20)** Apply a window (possibly of different lengths, but both with 20s sliding interval) on each stream. 
Join the streams such that all stays of 5 minutes and less are captured by the join in the same RDD (you can ignore late messages) and two consecutive joined RDD don't overlap in terms of stays.

In [38]:
window_arrival_stream = my_arrival_stream.window(windowDuration=20, slideDuration=20)
window_departure_stream = my_departure_stream.window(windowDuration=300, slideDuration=20)
joined_stream = window_arrival_stream.join(window_departure_stream)

In [47]:
joined_stream.pprint()
sc_.start()

-------------------------------------------
Time: 2018-05-18 09:52:10
-------------------------------------------
(('5026', '8400530'), ('2018-05-18T07:52:55.000Z', '2018-05-18T07:54:55.000Z'))
(('5026', '8400530'), ('2018-05-18T07:52:55.000Z', '2018-05-18T07:54:55.000Z'))
(('5026', '8400530'), ('2018-05-18T07:52:55.000Z', '2018-05-18T07:54:55.000Z'))
(('5026', '8400530'), ('2018-05-18T07:52:55.000Z', '2018-05-18T07:54:55.000Z'))
(('6933', '8400623'), ('2018-05-18T08:00:02.000Z', '2018-05-18T08:00:02.000Z'))
(('6933', '8400606'), ('2018-05-18T07:57:16.000Z', '2018-05-18T07:57:16.000Z'))
(('37321', '8400139'), ('2018-05-18T08:00:15.000Z', '2018-05-18T08:00:15.000Z'))
(('14631', '8400080'), ('2018-05-18T07:54:00.000Z', '2018-05-18T07:55:00.000Z'))
(('5135', '8400530'), ('2018-05-18T07:54:00.000Z', '2018-05-18T07:56:00.000Z'))
(('32029', '8400362'), ('2018-05-18T09:02:00.000Z', '2018-05-18T09:02:00.000Z'))
...



In [48]:
sc_.stop(stopSparkContext=False, stopGraceFully=True)

-------------------------------------------
Time: 2018-05-18 09:52:30
-------------------------------------------
(('2237', '8400180'), ('2018-05-18T07:53:00.000Z', '2018-05-18T07:54:00.000Z'))
(('37721', '8400175'), ('2018-05-18T07:52:00.000Z', '2018-05-18T07:52:00.000Z'))
(('3631', '8400495'), ('2018-05-18T07:55:00.000Z', '2018-05-18T07:55:00.000Z'))
(('6937', '8400340'), ('2018-05-18T09:02:00.000Z', '2018-05-18T09:02:00.000Z'))
(('5139', '8400533'), ('2018-05-18T09:02:00.000Z', '2018-05-18T09:02:00.000Z'))
(('32335', '8400446'), ('2018-05-18T09:02:00.000Z', '2018-05-18T09:02:00.000Z'))
(('32536', '8400152'), ('2018-05-18T09:02:00.000Z', '2018-05-18T09:02:00.000Z'))
(('4722', '8400368'), ('2018-05-18T07:52:00.000Z', '2018-05-18T07:52:00.000Z'))
(('36728', '8400386'), ('2018-05-18T07:51:00.000Z', '2018-05-18T07:52:00.000Z'))
(('31129', '8400315'), ('2018-05-18T09:02:00.000Z', '2018-05-18T09:02:00.000Z'))
...

-------------------------------------------
Time: 2018-05-18 09:52:50
------

**Question II.d. (5/20)** On the joined stream, compute the length of each stay (you can round to the minute) and produce a stream of histograms. You don't need to plot them, a value/count array is enough. 

In [30]:
def date(str_):
    return np.datetime64(str_)

In [40]:
minute = np.timedelta64(1,'m')
length_stay = joined_stream.mapValues(lambda x: np.round(np.abs((date(x[1])- date(x[0]))/minute)))

In [73]:
length_stay.pprint()
sc_.start() 

-------------------------------------------
Time: 2018-05-18 10:17:50
-------------------------------------------
(('3527', '8400172'), 0.0)
(('37838', '8400369'), 0.0)
(('37838', '8400369'), 2.0)
(('37838', '8400369'), 2.0)
(('37838', '8400369'), 2.0)
(('37838', '8400369'), 2.0)
(('37838', '8400369'), 0.0)
(('37838', '8400369'), 0.0)
(('37838', '8400369'), 0.0)
(('37838', '8400369'), 2.0)
...

-------------------------------------------
Time: 2018-05-18 10:18:10
-------------------------------------------
(('4328', '8400194'), 2.0)
(('4328', '8400194'), 0.0)
(('4328', '8400194'), 2.0)
(('4328', '8400056'), 0.0)
(('14632', '8400235'), 0.0)
(('4041', '8400057'), 0.0)
(('5633', '8400690'), 0.0)
(('4032', '8400194'), 0.0)
(('4030', '8400507'), 6.0)
(('4030', '8400507'), 1.0)
...



In [74]:
sc_.stop(stopSparkContext=False, stopGraceFully=True)

-------------------------------------------
Time: 2018-05-18 10:18:30
-------------------------------------------
(('7034', '8400185'), 0.0)
(('32025', '8400449'), 0.0)
(('32025', '8400449'), 0.0)
(('32025', '8400449'), 0.0)
(('5132', '8400279'), 0.0)
(('14634', '8400081'), 0.0)
(('30932', '8400688'), 0.0)
(('30932', '8400688'), 1.0)
(('30932', '8400688'), 0.0)
(('7828', '8400651'), 0.0)
...

-------------------------------------------
Time: 2018-05-18 10:18:50
-------------------------------------------



**Notice here that a stay of 0 means that the train didn't stop at the given station, we decided to count it anyway.**

In [40]:
length_count = length_stay.map(lambda x: x[1]).countByValue()
length_count.pprint()

In [91]:
# Count of the stays
sc_.start()

-------------------------------------------
Time: 2018-05-18 10:22:50
-------------------------------------------
(0.0, 108)
(8.0, 1)
(1.0, 17)
(2.0, 5)
(7.0, 1)
(3.0, 1)

-------------------------------------------
Time: 2018-05-18 10:23:10
-------------------------------------------
(0.0, 47)
(5.0, 3)
(1.0, 11)
(6.0, 1)
(2.0, 8)
(3.0, 1)



In [92]:
sc_.stop(stopSparkContext=False, stopGraceFully=True)

-------------------------------------------
Time: 2018-05-18 10:23:30
-------------------------------------------
(0.0, 24)
(1.0, 2)
(3.0, 3)

-------------------------------------------
Time: 2018-05-18 10:23:50
-------------------------------------------



# Part III - Uptime analysis (20 points / 60)

In this part, we will use the available data to derive information about train operations.

**Question III.a (5/20)** Write a function to extract the median timestamp from a message of the `ndovloketnl-gps` topic. You can reuse the `extract_gps_data` function from part I.

In [31]:
def extract_gps_time_approx(msg):
    p = prettify(msg)
    n = len(p)
    idx = int(np.ceil((n-1)/2))
    return np.datetime64(list(p.timestamp.sort_values())[idx], unit='ns')

In [32]:
extract_gps_time_approx(json.loads(example_gps.value))

numpy.datetime64('2018-04-28T18:27:23.000000')

**Question III.b (5/20)** Using `fetch_message_at` and `extract_gps_time_approx`, write a function named `search_gps` to find the first offset for a given timestamp in the `ndovloketnl-gps` topic.

More preciseley, if we note `offset = search_gps(ts)` where ts is a timestamp, then we have:
```
ts <= extract_gps_time_approx(fetch_message_at('ndovloketnl-gps', offset))
extract_gps_time_approx(fetch_message_at('ndovloketnl-gps', offset - 1)) < ts
```

In [33]:
def found(ts, offset):
    ts = np.datetime64(ts)
    msg1 = json.loads(fetch_message_at('ndovloketnl-gps', offset).value.decode())
    msg2 = json.loads(fetch_message_at('ndovloketnl-gps', offset - 1).value.decode())
    return ts <= extract_gps_time_approx(msg1) and extract_gps_time_approx(msg2) < ts

In [34]:
def binarySearch(ts, first, last):
    done = found(ts, first) 
    while first <= last and not done:
        midpoint = (first + last)//2
        if found(ts, midpoint):
            done = True
        else:
            msg = json.loads(fetch_message_at('ndovloketnl-gps', midpoint - 1).value.decode())
            if ts < extract_gps_time_approx(msg):
                last = midpoint - 1
            else:
                first = midpoint + 1
    return midpoint

In [35]:
def search_gps(ts):
    ts = np.datetime64(ts)
    topic = client.topics[b'ndovloketnl-gps']
    first_offset = topic.earliest_available_offsets()[0].offset[0]
    last_offset = topic.latest_available_offsets()[0].offset[0]
    return binarySearch(ts, first_offset, last_offset)

In [98]:
# Example results from `search_gps`
offset = search_gps(pd.Timestamp('2018-05-05'))

Offset reset for partition 0 to timestamp 15785 failed. Setting partition 0's internal counter to 15785
Offset reset for partition 0 to timestamp 15784 failed. Setting partition 0's internal counter to 15784
Offset reset for partition 0 to timestamp 99699 failed. Setting partition 0's internal counter to 99699
Offset reset for partition 0 to timestamp 99698 failed. Setting partition 0's internal counter to 99698
Offset reset for partition 0 to timestamp 99698 failed. Setting partition 0's internal counter to 99698
Offset reset for partition 0 to timestamp 57741 failed. Setting partition 0's internal counter to 57741
Offset reset for partition 0 to timestamp 57740 failed. Setting partition 0's internal counter to 57740
Offset reset for partition 0 to timestamp 57740 failed. Setting partition 0's internal counter to 57740
Offset reset for partition 0 to timestamp 78720 failed. Setting partition 0's internal counter to 78720
Offset reset for partition 0 to timestamp 78719 failed. Setting 

In [99]:
offset

69617

**Question III.c (5/20)** Using `search_gps`, create a spark RDD or dataframe containing the GPS data between `2018-05-02 23:00:00` and `2018-05-04 01:00:00` (covers May 3rd, with 1h margin). Your extract needs to contain the following fields: `timestamp`, `train_number`, `car_number`, `speed`.

Now, filter on the `timestamp` field to only keep data between `2018-05-03 00:00:00` and `2018-05-04 00:00:00`.

Note: you may find it useful to save the resulting dataset on disk.

In [36]:
# Create a static RDD containing the GPS data between 2018-05-02 23:00:00 and 2018-05-04 01:00:00
ts1 = np.datetime64('2018-05-02 23:00:00', unit='ns')
ts2 = np.datetime64('2018-05-04 01:00:00', unit='ns')
offset1 = search_gps(ts1)
offset2 = search_gps(ts2)
rdd = create_static_rdd_from_kafka(b'ndovloketnl-gps', offset1, offset2)

Offset reset for partition 0 to timestamp 15785 failed. Setting partition 0's internal counter to 15785
Offset reset for partition 0 to timestamp 15784 failed. Setting partition 0's internal counter to 15784
Offset reset for partition 0 to timestamp 100648 failed. Setting partition 0's internal counter to 100648
Offset reset for partition 0 to timestamp 100647 failed. Setting partition 0's internal counter to 100647
Offset reset for partition 0 to timestamp 100647 failed. Setting partition 0's internal counter to 100647
Offset reset for partition 0 to timestamp 58216 failed. Setting partition 0's internal counter to 58216
Offset reset for partition 0 to timestamp 58215 failed. Setting partition 0's internal counter to 58215
Offset reset for partition 0 to timestamp 58215 failed. Setting partition 0's internal counter to 58215
Offset reset for partition 0 to timestamp 37000 failed. Setting partition 0's internal counter to 37000
Offset reset for partition 0 to timestamp 36999 failed. Se

Offset reset for partition 0 to timestamp 61364 failed. Setting partition 0's internal counter to 61364
Offset reset for partition 0 to timestamp 61364 failed. Setting partition 0's internal counter to 61364
Offset reset for partition 0 to timestamp 61282 failed. Setting partition 0's internal counter to 61282
Offset reset for partition 0 to timestamp 61281 failed. Setting partition 0's internal counter to 61281
Offset reset for partition 0 to timestamp 61281 failed. Setting partition 0's internal counter to 61281
Offset reset for partition 0 to timestamp 61323 failed. Setting partition 0's internal counter to 61323
Offset reset for partition 0 to timestamp 61322 failed. Setting partition 0's internal counter to 61322
Offset reset for partition 0 to timestamp 61322 failed. Setting partition 0's internal counter to 61322
Offset reset for partition 0 to timestamp 61344 failed. Setting partition 0's internal counter to 61344
Offset reset for partition 0 to timestamp 61343 failed. Setting 

In [37]:
ts1 = np.datetime64('2018-05-03 00:00:00')
ts2 = np.datetime64('2018-05-04 00:00:00')
#rdd = rdd.map(lambda x: x[(x.timestamp > ts1) & (x.timestamp < ts2)])
#rdd = rdd.filter(lambda x: len(x) > 0)

In [57]:
from pyspark.sql import Row, SQLContext
sqlContext = SQLContext(sc)

In [40]:
def parse_and_filter(json):
    Entry = Row('car_number', 'timestamp', 'speed')
    data = extract_gps_data(json)
    rows = []
    for row in data:
        rows.append(Row(car_number=row[2], timestamp=row[0], speed=row[8]))   
    return rows

In [41]:
# We parse the RDD and keep only the required fields
rdd = rdd.map(lambda x: parse_and_filter(json.loads(x[1])))
# We filter the RDD to keep only the data from 2018-05-03 00:00:00 to 2018-05-04 00:00:00
rdd = test.map(lambda x: [i for i in x if (np.datetime64(i.timestamp) > ts1) and (np.datetime64(i.timestamp) < ts2)])
rdd = test.filter(lambda x: len(x) > 0)

In [45]:
# Here is a preview of the first collected data 
sqlContext.createDataFrame(data=test.first()).show()

+----------+-----+--------------------+
|car_number|speed|           timestamp|
+----------+-----+--------------------+
|      2117|    0|2018-05-03T00:00:07Z|
|      4083| 40.0|2018-05-03T00:00:05Z|
|      2450|    0|2018-05-03T00:00:06Z|
+----------+-----+--------------------+



In [55]:
# Cache the RDD
rdd.cache()

PythonRDD[166] at RDD at PythonRDD.scala:48

**Question III.d (5/20)** We define the uptime as the time for which a given train car is being used. To compute it, we will consider 5 minutes slots. If the train car has a non-zero speed once within a 5 minutes slot, then we consider it being *up* for that time slot.

Example: if we have the data point
```
+----------+--------------------+------+
|car_number|          timestamp | speed|
+----------+--------------------+------+
|      2418|2018-05-03T08:13:02Z| 136.8|
+----------+--------------------+------+
```
then we must consider that the train car `2418` was up during the `2018-05-03T08:10:00Z` to `2018-05-03T08:15:00Z` time slot.

We expect this method to give a relatively good lower bound for the uptime.

From the previous RDD/DataFrame, compute the uptime for all train cars for 03.05.2018.
Plot the distribution of uptime.

What can you tell from the results?

In [None]:
from pyspark.sql import Window
import pyspark.sql.functions as functions
window = Window.partitionBy('5mn').orderBy('timestamp')
uptime = sqlContext.createDataFrame(data=rdd).\
select('car_number', 'timestamp', 'speed').filter('speed > 0').sort('timestamp').show()

**The results show that the car train resources are well managed since the uptime for the train cars is evenly distributed.**