# Homework 4 - More trains (Part III)

__Hand-in:__

- __Due: 12.05.2020 23:59:59 CET__
- `git push` your final verion to your group's Renku repository before the due
- check if `Dockerfile`, `environment.yml` and `requirements.txt` are properly written
- add necessary comments and discussion to make your codes readable

For this homework, you will be working with the real-time streams of the NS, the train company of the Netherlands. You can see an example webpage that uses the same streams to display the train information on a map: https://spoorkaart.mwnn.nl/ . 

To help you and avoid having too many connections to the NS streaming servers, we have setup a service that collects the streams and pushes them to our Kafka instance. The related topics are: 

`ndovloketnl-arrivals`: For each arrival of a train in a station, describe the previous and next station, time of arrival (planned and actual), track number,...

`ndovloketnl-departures`: For each departure of a train from a station, describe the previous and next station, time of departure (planned and actual), track number,...

`ndovloketnl-gps`: For each train, describe the current location, speed, bearing.

The events are serialized in JSON (actually converted from XML), with properties in their original language. Google translate could help you understand all of them, but we will provide you with some useful mappings.

---
## Set up environment

Run the following cells below before running the other cells of this notebook. Run them whenever you need to recreate a Spark context. Pay particular attention to your `username` settings, and make sure that it is properly set to your user name, both locally and on the remote Spark Driver.

Configure your spark settings:
1. name your spark application as `"YOUR_GASPAR_homework_4"`.
2. make the required kafka jars available on the remote Spark driver.

<div class="alert alert-block alert-warning"><b>Any application without a proper name would be promptly killed.</b></div>

In [1]:
%%configure -f
{"conf": {
    "spark.app.name": "misik_homework_4",
    "spark.jars.packages": "org.apache.spark:spark-streaming-kafka-0-8_2.11:2.3.1,org.apache.kafka:kafka_2.11:1.0.1"
}}

ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
5702,application_1589299642358_0189,pyspark,idle,Link,Link,
5720,application_1589299642358_0207,pyspark,shutting_down,Link,Link,
5722,application_1589299642358_0209,pyspark,idle,Link,Link,
5728,application_1589299642358_0215,pyspark,busy,Link,Link,
5736,application_1589299642358_0223,pyspark,idle,Link,Link,
5737,application_1589299642358_0224,pyspark,idle,Link,Link,
5738,application_1589299642358_0225,pyspark,idle,Link,Link,
5739,application_1589299642358_0226,pyspark,busy,Link,Link,
5741,application_1589299642358_0228,pyspark,idle,Link,Link,
5742,application_1589299642358_0229,pyspark,idle,Link,Link,


Create a new session unless one was already created above (check for `✔` in current session)

In [2]:
spark

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
5716,application_1589299642358_0203,pyspark,idle,Link,Link,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

<pyspark.sql.session.SparkSession object at 0x7faf6bf735d0>

Set `username` to your GASPAR both locally and on the Spark driver.

In [2]:
%%local
import os
username = os.environ['JUPYTERHUB_USER']

In [3]:
%%send_to_spark -i username -t str -n username

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
5745,application_1589299642358_0232,pyspark,idle,Link,Link,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Successfully passed 'username' as 'username' to Spark kernel

---

## Create a Kafka client

In [4]:
from pykafka import KafkaClient
from pykafka.common import OffsetType

ZOOKEEPER_QUORUM = 'iccluster044.iccluster.epfl.ch:2181,'\
                   'iccluster054.iccluster.epfl.ch:2181,'\
                   'iccluster059.iccluster.epfl.ch:2181'

client = KafkaClient(zookeeper_hosts=ZOOKEEPER_QUORUM)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## Streams from Kafka

In [5]:
# Define the checkpoint folder
checkpoint = 'hdfs:///user/{}/checkpoint/'.format(username)
print('checkpoint created at hdfs:///user/{}/checkpoint/'.format(username))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

checkpoint created at hdfs:///user/misik/checkpoint/

In [6]:
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils

# Create a StreamingContext with two working thread and batch interval of 10 seconds.
# Each time you stop a StreamingContext, you will need to recreate it.
ssc = StreamingContext(sc, 10)
ssc.checkpoint(checkpoint)

group_id = 'ns-{0}'.format(username)

# Input streams
arrival_stream = KafkaUtils.createStream(ssc, ZOOKEEPER_QUORUM, group_id, { 'ndovloketnl-arrivals': 1})
departure_stream = KafkaUtils.createStream(ssc, ZOOKEEPER_QUORUM, group_id, { 'ndovloketnl-departures': 1})

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

For now, let's just print the content of the streams. Note: the output may be shown after you run `ssc.stop`.

In [34]:
arrival_stream.pprint()
departure_stream.pprint()

ssc.start()
ssc.awaitTermination(timeout=10)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [35]:
ssc.stop(stopSparkContext=False)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

-------------------------------------------
Time: 2020-05-13 21:08:00
-------------------------------------------
(None, u'{"ns1:PutReisInformatieBoodschapIn": {"@xmlns:ns1": "urn:ndov:cdm:trein:reisinformatie:messages:dynamischeaankomststaat:1", "@xmlns:ns2": "urn:ndov:cdm:trein:reisinformatie:data:4", "ns2:ReisInformatieProductDAS": {"@TimeStamp": "2020-05-13T18:58:28.275Z", "@Versie": "6.1", "ns2:RIPAdministratie": {"ns2:ReisInformatieProductID": "2005132058282500002", "ns2:AbonnementId": "55", "ns2:ReisInformatieTijdstip": "2020-05-13T18:58:00.000Z"}, "ns2:DynamischeAankomstStaat": {"ns2:RitId": "7277", "ns2:RitDatum": "2020-05-13", "ns2:RitStation": {"ns2:StationCode": "GND", "ns2:Type": "0", "ns2:KorteNaam": "Hardinxvld", "ns2:MiddelNaam": "Hardinxveld-G.", "ns2:LangeNaam": "Hardinxveld-Giessendam", "ns2:UICCode": "8400295"}, "ns2:TreinAankomst": {"ns2:TreinNummer": "7277", "ns2:TreinSoort": {"@Code": "ST", "#text": "Stoptrein"}, "ns2:TreinStatus": "5", "ns2:Vervoerder": "R-net",

You will need to adjust the batch interval (10 seconds here) in accordance with the processing times. Use the spark UI to check if batches are not accumulating.

---

# Part III - Live stopping time (20 points / 50)

In this part, we will have a look at the two other streams, namely `ndovloketnl-arrivals` and `ndovloketnl-departures`. Each time a train arrives at or leaves a station, a message is generated. Let's have a look at the content.

In [7]:
import json
from pykafka.common import OffsetType

example_arrivals = client.topics[b'ndovloketnl-arrivals'].get_simple_consumer(
    auto_offset_reset=OffsetType.EARLIEST,
    reset_offset_on_start=True
).consume()
print(json.dumps(json.loads(example_arrivals.value), indent=2))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

{
  "ns1:PutReisInformatieBoodschapIn": {
    "@xmlns:ns1": "urn:ndov:cdm:trein:reisinformatie:messages:dynamischeaankomststaat:1", 
    "@xmlns:ns2": "urn:ndov:cdm:trein:reisinformatie:data:4", 
    "ns2:ReisInformatieProductDAS": {
      "ns2:RIPAdministratie": {
        "ns2:ReisInformatieTijdstip": "2020-04-17T10:54:00.000Z", 
        "ns2:ReisInformatieProductID": "2004171254422300001", 
        "ns2:AbonnementId": "55"
      }, 
      "@TimeStamp": "2020-04-17T10:54:42.098Z", 
      "@Versie": "6.1", 
      "ns2:DynamischeAankomstStaat": {
        "ns2:RitId": "6945", 
        "ns2:RitStation": {
          "ns2:LangeNaam": "Utrecht Vaartsche Rijn", 
          "ns2:MiddelNaam": "Vaartsche Rijn", 
          "ns2:StationCode": "UTVR", 
          "ns2:Type": "1", 
          "ns2:UICCode": "8400606", 
          "ns2:KorteNaam": "VaartscheR"
        }, 
        "ns2:TreinAankomst": {
          "ns2:TreinHerkomst": [
            {
              "ns2:LangeNaam": "Den Haag Centraal", 
   

In [8]:
example_departures = client.topics[b'ndovloketnl-departures'].get_simple_consumer(
    auto_offset_reset=OffsetType.EARLIEST,
    reset_offset_on_start=True
).consume()
print(json.dumps(json.loads(example_departures.value), indent=2))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

{
  "ns1:PutReisInformatieBoodschapIn": {
    "@xmlns:ns1": "urn:ndov:cdm:trein:reisinformatie:messages:5", 
    "ns2:ReisInformatieProductDVS": {
      "ns2:RIPAdministratie": {
        "ns2:ReisInformatieTijdstip": "2020-04-23T15:32:00.000Z", 
        "ns2:ReisInformatieProductID": "2004231730412200003", 
        "ns2:AbonnementId": "54"
      }, 
      "@TimeStamp": "2020-04-23T15:30:41.815Z", 
      "@Versie": "6.2", 
      "ns2:DynamischeVertrekStaat": {
        "ns2:RitId": "7363", 
        "ns2:RitStation": {
          "ns2:LangeNaam": "Maarssen", 
          "ns2:MiddelNaam": "Maarssen", 
          "ns2:StationCode": "MAS", 
          "ns2:Type": "0", 
          "ns2:UICCode": "8400419", 
          "ns2:KorteNaam": "Maarssen"
        }, 
        "ns2:RitDatum": "2020-04-23", 
        "ns2:Trein": {
          "ns2:VertrekRichting": "B", 
          "ns2:VertrekTijd": [
            {
              "@InfoStatus": "Gepland", 
              "#text": "2020-04-23T15:32:00.000Z"
        

We can see that the messages have the following structure:

```
{
  'ns1:PutReisInformatieBoodschapIn': {
    'ns2:ReisInformatieProductDVS' or 'ns2:ReisInformatieProductDAS': {
      'ns2:DynamischeVertrekStaat' or 'ns2:DynamischeAankomstStaat': {
          'ns2:RitStation': <station_info>,
          'ns2:Trein' or 'ns2:TreinAankomst': {
              'ns2:VertrekTijd' or 'ns2:AankomstTijd': [<planned_and_actual_times>],
              'ns2:TreinNummer': <train_number>,
              'ns2:TreinSoort': <kind_of_train>,
              ...
          }
           
      }
    }
  }
}
```

We can see also that the train stations have a long name `ns2:LangeNaam`, a medium name `ns2:MiddelNaam`, a short name `ns2:KorteNaam`, a station code `ns2:StationCode` and a kind of nummerical ID `ns2:UICCode`. When giving information about times, tracks, direction,... you will find sometimes the information twice with the status `Gepland` (which means planned, according to the schedule) and `Actueel`(which means the actual measured value). 

**Question III.a. (5/20)** We want to compute the time a train stays at a station and get a real-time histogram for a given time window. To begin with, you need to write some parsing functions that will allow you to get information from the data streams. We have prepare one function `parse_train_dep` for the stream `ndovloketnl-departures`, which returns a Key-Value pair.

In [9]:
import json

def parse_train_dep(s):
    obj = json.loads(s)
    tn = obj.get('ns1:PutReisInformatieBoodschapIn', {}).get('ns2:ReisInformatieProductDVS', {}).get('ns2:DynamischeVertrekStaat', {}).get('ns2:Trein', {}).get("ns2:TreinNummer")
    st = obj.get('ns1:PutReisInformatieBoodschapIn', {}).get('ns2:ReisInformatieProductDVS', {}).get('ns2:DynamischeVertrekStaat', {}).get('ns2:RitStation', {}).get("ns2:UICCode")
    if tn and st:
        return [("{}-{}".format(tn, st), obj)]
    else:
        return []

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

__Q (1/5)__ Please check the function `parse_train_dep` above. Explain how we construct the Key and the Value, and why we construct them in this way.

__Answer__: The function `parse_train_dep` takes a message from a departure topic as input and extracts the train number and station ID from it. It then assigns the train number and station ID to a message in a key-value format ('Train ID'-'Station ID': message). The key represents the stop of a train at a station, so that later on we can extract the arrival and departure times for each stop with this key from the arrival- and departure 
streams. 

__Q (2/5)__ Take `parse_train_dep` as an example and write the function `parse_train_arr` for the stream `ndovloketnl-arrivals`. Make sure they have the same output format.

In [10]:
def parse_train_arr(s):
    obj = json.loads(s)
    tn = obj.get('ns1:PutReisInformatieBoodschapIn', {}).get('ns2:ReisInformatieProductDAS', {}).get('ns2:DynamischeAankomstStaat', {}).get('ns2:TreinAankomst', {}).get("ns2:TreinNummer")
    st = obj.get('ns1:PutReisInformatieBoodschapIn', {}).get('ns2:ReisInformatieProductDAS', {}).get('ns2:DynamischeAankomstStaat', {}).get('ns2:RitStation', {}).get("ns2:UICCode")
    if tn and st:
        return [("{}-{}".format(tn, st), obj)]
    else:
        return []

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

__Q (2/5)__ Another parsing function you may need is `get_actual_time`, which will allow you to extract the actual time from the fields of time information, which are `ns2:AankomstTijd` in the arrival stream and `ns2:VertrekTijd` in the departure stream. 

__Note:__ These two fields may be empty and they may not contain the actual time information. In both cases the function should return `None`.

In [11]:
from datetime import datetime
import dateutil.parser

def get_actual_time(tab):
    #Initialize actual time with None, so that when message is empty None is returned
    actual_time = None
    for time in tab:
            if time.get('@InfoStatus') == u'Actueel':
                actual_time = dateutil.parser.parse(tab[1].get('#text'))
    return actual_time  

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [12]:
# Get the field of time in the departure stream
example_json = json.loads(example_departures.value)
tab = example_json.get('ns1:PutReisInformatieBoodschapIn', {}).get("ns2:ReisInformatieProductDVS", {}).get("ns2:DynamischeVertrekStaat", {}).get("ns2:Trein", {}).get("ns2:VertrekTijd",{})

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [13]:
# Example results from `get_actual_time`
get_actual_time(tab)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

datetime.datetime(2020, 4, 23, 15, 32, tzinfo=tzutc())

**Question III.b. (5/20)** Create two Spark streams from the arrivals and departures where the records are in the form (Key, Value) using `parse_train_dep` and  `parse_train_arr`. 

**Methodology:** Create two streams from the arrival and departure streams such that they have the (Train ID-Station ID: message) form by applying map functions on each message.

In [14]:
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils

# Create a StreamingContext with two working thread and batch interval of 10 seconds.
# Each time you stop a StreamingContext, you will need to recreate it.
ssc = StreamingContext(sc, 10)
ssc.checkpoint(checkpoint)

group_id = 'ns-{0}'.format(username)

# Input streams
arrival_stream = KafkaUtils.createStream(ssc, ZOOKEEPER_QUORUM, group_id, { 'ndovloketnl-arrivals': 1})
departure_stream = KafkaUtils.createStream(ssc, ZOOKEEPER_QUORUM, group_id, { 'ndovloketnl-departures': 1})

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [15]:
#Apply a map function on both streams, by selecting value of message (which is a tuple) and applying the defined parse functions.
arr_stream = arrival_stream.flatMap(lambda x: parse_train_arr(x[1])) 
dep_stream = departure_stream.flatMap(lambda x: parse_train_dep(x[1]))


arr_stream.pprint()
dep_stream.pprint()

ssc.start()
ssc.awaitTermination(timeout=10)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [16]:
ssc.stop(stopSparkContext=False, stopGraceFully=True)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

-------------------------------------------
Time: 2020-05-13 22:50:10
-------------------------------------------
('15883-8400060', {u'ns1:PutReisInformatieBoodschapIn': {u'@xmlns:ns1': u'urn:ndov:cdm:trein:reisinformatie:messages:dynamischeaankomststaat:1', u'@xmlns:ns2': u'urn:ndov:cdm:trein:reisinformatie:data:4', u'ns2:ReisInformatieProductDAS': {u'ns2:RIPAdministratie': {u'ns2:ReisInformatieTijdstip': u'2020-05-13T20:46:00.000Z', u'ns2:ReisInformatieProductID': u'2005132245322100002', u'ns2:AbonnementId': u'55'}, u'@TimeStamp': u'2020-05-13T20:45:32.664Z', u'@Versie': u'6.1', u'ns2:DynamischeAankomstStaat': {u'ns2:RitId': u'15883', u'ns2:RitStation': {u'ns2:LangeNaam': u'Amsterdam Muiderpoort', u'ns2:MiddelNaam': u'Muiderpoort', u'ns2:StationCode': u'ASDM', u'ns2:Type': u'0', u'ns2:UICCode': u'8400060', u'ns2:KorteNaam': u'Muiderprt'}, u'ns2:TreinAankomst': {u'ns2:TreinHerkomst': [{u'ns2:LangeNaam': u'Amsterdam Centraal', u'@InfoStatus': u'Gepland', u'ns2:MiddelNaam': u'Amsterdam 

**Question III.c. (5/20)** 
Every 20 seconds, we want to have a list of trains that had departed from any train station after staying for 5 minutes or less at the station. Apply a window of 20s sliding interval on arrival and departure streams. Join two streams such that trains staying for 5 minutes or less (± 20 seconds error due to sliding interval) at any station are caught by the join in the RDD window of the joined stream (you can ignore late messages). 

__Hint:__
- Check [here](https://spark.apache.org/docs/2.3.1/streaming-programming-guide.html#window-operations) for windowed computations in Spark Streaming.
- Use the methods [reduceByKeyAndWindow](https://spark.apache.org/docs/2.3.1/api/python/pyspark.streaming.html?highlight=reducebykey#pyspark.streaming.DStream.reduceByKeyAndWindow) and [join](https://spark.apache.org/docs/2.3.1/api/python/pyspark.streaming.html?highlight=reducebykey#pyspark.streaming.DStream.join) on DStream objects.
- Both windows should have `slideDuration` of 20s
- You have to pick the sizes of windows `windowDuration` carefully. The sizes can be different: 
    - The trains staying for 5 minutes or less (± 20 seconds error due to sliding interval) must be in the joined stream.
    - A same stay (i.e. one train at one station) is caught in the joined stream once and only once.

In [46]:
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils

# Create a StreamingContext with two working thread and batch interval of 20 seconds.
# Each time you stop a StreamingContext, you will need to recreate it.
ssc = StreamingContext(sc, 20)
ssc.checkpoint(checkpoint)

group_id = 'ns-{0}'.format(username)

# Input streams
arrival_stream = KafkaUtils.createStream(ssc, ZOOKEEPER_QUORUM, group_id, { 'ndovloketnl-arrivals': 1})
departure_stream = KafkaUtils.createStream(ssc, ZOOKEEPER_QUORUM, group_id, { 'ndovloketnl-departures': 1})

arr_stream = arrival_stream.flatMap(lambda x: parse_train_arr(x[1]))
dep_stream = departure_stream.flatMap(lambda x: parse_train_dep(x[1]))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [47]:
#Reduce the key-value message by key, then apply slideDuration of 20s and windowDuration of 320s for arrival, 300 for departure streams
windowed_arr_stream = arr_stream.reduceByKeyAndWindow(lambda x,y: x, 320, 20)
windowed_dep_stream = dep_stream.reduceByKeyAndWindow(lambda x,y: x, 300, 20)

#Join both departure and arrival streams on key
joinedStream = windowed_arr_stream.join(windowed_dep_stream)

joinedStream.pprint()

ssc.start()
ssc.awaitTermination(timeout=10)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [48]:
ssc.stop(stopSparkContext=False, stopGraceFully=True)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

-------------------------------------------
Time: 2020-05-13 21:11:20
-------------------------------------------
('32273-8400591', ({u'ns1:PutReisInformatieBoodschapIn': {u'@xmlns:ns1': u'urn:ndov:cdm:trein:reisinformatie:messages:dynamischeaankomststaat:1', u'@xmlns:ns2': u'urn:ndov:cdm:trein:reisinformatie:data:4', u'ns2:ReisInformatieProductDAS': {u'ns2:RIPAdministratie': {u'ns2:ReisInformatieTijdstip': u'2020-05-13T19:07:00.000Z', u'ns2:ReisInformatieProductID': u'2005132109052300002', u'ns2:AbonnementId': u'55'}, u'@TimeStamp': u'2020-05-13T19:09:05.771Z', u'@Versie': u'6.1', u'ns2:DynamischeAankomstStaat': {u'ns2:RitId': u'32273', u'ns2:RitStation': {u'ns2:LangeNaam': u'Tegelen', u'ns2:MiddelNaam': u'Tegelen', u'ns2:StationCode': u'TG', u'ns2:Type': u'0', u'ns2:UICCode': u'8400591', u'ns2:KorteNaam': u'Tegelen'}, u'ns2:TreinAankomst': {u'ns2:TreinHerkomst': [{u'ns2:LangeNaam': u'Nijmegen', u'@InfoStatus': u'Gepland', u'ns2:MiddelNaam': u'Nijmegen', u'ns2:StationCode': u'NM', u'n

**Discussion**: As we can see, the `reduceByKeyAndWindow` method removed duplicates in the messages, so that a stay of a train is in the stream once. We have tested different windowDurations and decided to choose an arrival_stream Window of 320s, which means that we capture trains that arrived slightly before a 'normal' 300s Window, they wouldn't be captured due to the slig

**Question III.d. (5/20)** On the joined stream, compute the length of each stay (you can round to the minute) and produce a stream of histograms. You don't need to plot them, a value/count array is enough, like:
```
-------------------------------------------
Time: 2018-05-17 11:10:00
-------------------------------------------
(0.0, 110)
(8.0, 2)
(4.0, 3)

-------------------------------------------
Time: 2018-05-17 11:10:20
-------------------------------------------
(0.0, 46)
(4.0, 2)
(1.0, 5)
```

**Methodology**: Use the `get_actual_time` function to extract the actual arrival and departure time, calculate the difference for each message and create histogram in minutes.


In [52]:
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils

# Create a StreamingContext with two working thread and batch interval of 10 seconds.
# Each time you stop a StreamingContext, you will need to recreate it.
ssc = StreamingContext(sc, 20)
ssc.checkpoint(checkpoint)

group_id = 'ns-{0}'.format(username)

# Input streams
arrival_stream = KafkaUtils.createStream(ssc, ZOOKEEPER_QUORUM, group_id, { 'ndovloketnl-arrivals': 1})
departure_stream = KafkaUtils.createStream(ssc, ZOOKEEPER_QUORUM, group_id, { 'ndovloketnl-departures': 1})

arr_stream = arrival_stream.flatMap(lambda x: parse_train_arr(x[1]))
dep_stream = departure_stream.flatMap(lambda x: parse_train_dep(x[1]))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [53]:
#Reduce the key-value message by key, then apply slideDuration of 20s and windowDuration of 320s for arrival, 300 for departure streams
windowed_arr_stream = arr_stream.reduceByKeyAndWindow(lambda x,y: x, 320, 20)
windowed_dep_stream = dep_stream.reduceByKeyAndWindow(lambda x,y: x, 300, 20)

#Join both departure and arrival streams on key
joinedStream = windowed_arr_stream.join(windowed_dep_stream)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [54]:
#We chose to create a dict and use a map function on the stream, so that we can extract the actual arrival
#and departure times from the joined stream with get_actual_time.
stream_time = joinedStream.map(lambda x: {
        'act_arrival_time': get_actual_time(x[1][0].get('ns1:PutReisInformatieBoodschapIn').get("ns2:ReisInformatieProductDAS")
                                     .get("ns2:DynamischeAankomstStaat").get("ns2:TreinAankomst").get("ns2:AankomstTijd")),
        'act_departure_time': get_actual_time(x[1][1].get('ns1:PutReisInformatieBoodschapIn')
                                     .get("ns2:ReisInformatieProductDVS").get("ns2:DynamischeVertrekStaat")
                                     .get("ns2:Trein").get("ns2:VertrekTijd"))}    
)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [55]:
#Define a function that calculates the difference between arrival,departure time and rounds it to full minutes.
def get_stay_time(stream):
    #Choose by arrival and departure time key
    arrival_time = stream['act_arrival_time']
    departure_time = stream['act_departure_time']
    #When not empty, calculate the rounded difference in min between arrival and departure time.
    if  arrival_time and  departure_time:
        stay_time = (departure_time - arrival_time).total_seconds()
        stay_time = divmod(stay_time, 60)[0]
        if stay_time in range(0, 6): 
            return [stay_time]
    else:
        #If no time, return empty list
        return []

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [56]:
histograms_stream = stream_time.flatMap(get_stay_time).countByValue()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [57]:
histograms_stream.pprint()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [58]:
ssc.start() 
ssc.awaitTermination(timeout=60)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [59]:
ssc.stop(stopSparkContext=False, stopGraceFully=True)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

-------------------------------------------
Time: 2020-05-13 23:33:00
-------------------------------------------
(0.0, 10)
(1.0, 3)

-------------------------------------------
Time: 2020-05-13 23:34:00
-------------------------------------------
(0.0, 11)
(2.0, 1)
(1.0, 2)

-------------------------------------------
Time: 2020-05-13 23:34:20
-------------------------------------------
(0.0, 27)
(4.0, 1)
(1.0, 3)

-------------------------------------------
Time: 2020-05-13 23:34:40
-------------------------------------------

-------------------------------------------
Time: 2020-05-13 23:35:00
-------------------------------------------

-------------------------------------------
Time: 2020-05-13 23:35:20
-------------------------------------------

-------------------------------------------
Time: 2020-05-13 23:35:40
-------------------------------------------

-------------------------------------------
Time: 2020-05-13 23:36:00
-------------------------------------------

-----