# DSLab Homework4 - More trains (PART III)

## Hand-in Instructions:
- __Due: 11.05.2021 23:59:59 CET__
- your project must be private
- git push your final verion to the master branch of your group's Renku repository before the due date
- check if Dockerfile, environment.yml and requirements.txt are properly written
- add necessary comments and discussion to make your codes readable

## NS Streams
For this homework, you will be working with the real-time streams of the NS, the train company of the Netherlands. You can see an example webpage that uses the same streams to display the train information on a map: https://spoorkaart.mwnn.nl/ . 

To help you and avoid having too many connections to the NS streaming servers, we have setup a service that collects the streams and pushes them to our Kafka instance. The related topics are: 

`ndovloketnl-arrivals`: For each arrival of a train in a station, describe the previous and next station, time of arrival (planned and actual), track number,...

`ndovloketnl-departures`: For each departure of a train from a station, describe the previous and next station, time of departure (planned and actual), track number,...

`ndovloketnl-gps`: For each train, describe the current location, speed, bearing.

The events are serialized in JSON (actually converted from XML), with properties in their original language. Google translate could help you understand all of them, but we will provide you with some useful mappings.

---
**PART III is in PySpark kernel**

In [1]:
%%local
ipython = get_ipython()
print('Current kernel: {}'.format(ipython.kernel.kernel_info['implementation']))

Current kernel: PySpark


---
## Set up environment

Run the following cells below before running the other cells of this notebook. Run them whenever you need to recreate a Spark context. Pay particular attention to your `username` settings, and make sure that it is properly set to your user name, both locally and on the remote Spark Driver.

Configure your spark settings:
1. name your spark application as `"<your_gaspar_id>-homework4"`.
2. make the required kafka jars available on the remote Spark driver.

In [2]:
%%local
import os
import json

username = os.environ['JUPYTERHUB_USER']

configuration = dict(
    name = "{}-homework4".format(username),
    executorMemory = "1G",
    executorCores = 2,
    numExecutors = 2,
    conf = {
        "spark.jars.packages":"org.apache.spark:spark-streaming-kafka-0-8_2.11:2.3.2,org.apache.kafka:kafka_2.11:1.0.1"
    }
)

ipython = get_ipython()
ipython.run_cell_magic('configure', line="-f", cell=json.dumps(configuration))

ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
3065,application_1618324153128_2643,pyspark,error,Link,Link,,
3086,application_1618324153128_2664,pyspark,idle,Link,Link,,
3091,application_1618324153128_2669,pyspark,idle,Link,Link,,
3092,application_1618324153128_2670,pyspark,idle,Link,Link,,
3093,application_1618324153128_2671,pyspark,idle,Link,Link,,


Create a new session unless one was already created above (check for `✔` in current session)

In [3]:
# Initialize spark application

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
3096,application_1618324153128_2674,pyspark,idle,Link,Link,,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Send `username` to the Spark driver.

In [4]:
%%send_to_spark -i username -t str -n username

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Successfully passed 'username' as 'username' to Spark kernel

In [5]:
print("You are {} on the Spark driver.".format(username))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

You are cadei on the Spark driver.

---

## Create a Kafka client

In [6]:
from pykafka import KafkaClient
from pykafka.common import OffsetType

ZOOKEEPER_QUORUM = 'iccluster040.iccluster.epfl.ch:2181,' \
                   'iccluster064.iccluster.epfl.ch:2181,' \
                   'iccluster065.iccluster.epfl.ch:2181'

client = KafkaClient(zookeeper_hosts=ZOOKEEPER_QUORUM)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Working on data streams is often times more complex compared to using static datasets, so we will first look at how to create static RDDs for easy prototyping.

You can find below a function that creates a static RDD from a Kafka topic.

In [7]:
from itertools import islice

def simple_create_rdd(topic, from_offset, to_offset):
    """Create an RDD from topic with offset in [from_offset, to_offest)."""
    
    consumer = client.topics[topic].get_simple_consumer(
        auto_offset_reset=OffsetType.EARLIEST if from_offset == 0 else from_offset - 1,
        reset_offset_on_start=True
    )
    
    return sc.parallelize((msg.offset, msg.value) for msg in islice(consumer, to_offset - from_offset))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

To check this function, we need to retrieve valid offsets from Kafka.

In [8]:
topic = client.topics[b'ndovloketnl-arrivals']
topic.earliest_available_offsets()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

{0: OffsetPartitionResponse(offset=[0], err=0)}

Now, we can for example retrieve the first 1000 messages from the topic `ndovloketnl-arrivals`.

In [9]:
offset = topic.earliest_available_offsets()[0].offset[0]
rdd = simple_create_rdd(b'ndovloketnl-arrivals', offset, offset+1000)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [10]:
rdd.first()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

(0, '{"ns1:PutReisInformatieBoodschapIn": {"@xmlns:ns1": "urn:ndov:cdm:trein:reisinformatie:messages:dynamischeaankomststaat:1", "@xmlns:ns2": "urn:ndov:cdm:trein:reisinformatie:data:4", "ns2:ReisInformatieProductDAS": {"@TimeStamp": "2021-04-26T11:18:31.081Z", "@Versie": "6.1", "ns2:RIPAdministratie": {"ns2:ReisInformatieProductID": "2104261318312400002", "ns2:AbonnementId": "55", "ns2:ReisInformatieTijdstip": "2021-04-26T11:20:00.000Z"}, "ns2:DynamischeAankomstStaat": {"ns2:RitId": "37542", "ns2:RitDatum": "2021-04-26", "ns2:RitStation": {"ns2:StationCode": "WS", "ns2:Type": "1", "ns2:KorteNaam": "Winschoten", "ns2:MiddelNaam": "Winschoten", "ns2:LangeNaam": "Winschoten", "ns2:UICCode": "8400696"}, "ns2:TreinAankomst": {"ns2:TreinNummer": "37542", "ns2:TreinSoort": {"@Code": "ST", "#text": "Stoptrein"}, "ns2:TreinStatus": "2", "ns2:LijnNummer": "RS6", "ns2:Vervoerder": "Arriva", "ns2:TreinHerkomst": [{"@InfoStatus": "Gepland", "ns2:StationCode": "GN", "ns2:Type": "5", "ns2:KorteNaam"

In [11]:
rdd.count()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

1000

## Streams from Kafka

In [12]:
# Define the checkpoint folder
checkpoint = 'hdfs:///user/{}/checkpoint/'.format(username)
print('checkpoint created at hdfs:///user/{}/checkpoint/'.format(username))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

checkpoint created at hdfs:///user/cadei/checkpoint/

In [13]:
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils

# Create a StreamingContext with two working thread and batch interval of 5 seconds.
# Each time you stop a StreamingContext, you will need to recreate it.
ssc = StreamingContext(sc, 10)
ssc.checkpoint(checkpoint)

group_id = 'ns-{0}'.format(username)

# Input streams
arrival_stream = KafkaUtils.createStream(ssc, ZOOKEEPER_QUORUM, group_id, {'ndovloketnl-arrivals': 1})
departure_stream = KafkaUtils.createStream(ssc, ZOOKEEPER_QUORUM, group_id, {'ndovloketnl-departures': 1})

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

For now, let's just print the content of the streams. Because we set the batch interval as 10 seconds and the timeout also as 10 seconds, you are supposed to see exact one batch from each stream, like:
```
-------------------------------------------
Time: 2021-04-27 10:11:50
-------------------------------------------
<ONE_BATCH_OF_ARRIVAL_STREAM>
...
-------------------------------------------
Time: 2021-04-27 10:11:50
-------------------------------------------
<ONE_BATCH_OF_DEPARTURE_STREAM>
...
```
**Note:** the output may be shown after you run `ssc.stop`.

In [14]:
arrival_stream.pprint(num=2) # print the first 2 messages
departure_stream.pprint(num=2) # print the first 2 messages

ssc.start()
ssc.awaitTermination(timeout=10)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

-------------------------------------------
Time: 2021-05-11 22:39:40
-------------------------------------------
(None, u'{"ns1:PutReisInformatieBoodschapIn": {"@xmlns:ns1": "urn:ndov:cdm:trein:reisinformatie:messages:dynamischeaankomststaat:1", "@xmlns:ns2": "urn:ndov:cdm:trein:reisinformatie:data:4", "ns2:ReisInformatieProductDAS": {"@TimeStamp": "2021-05-11T20:39:33.331Z", "@Versie": "6.1", "ns2:RIPAdministratie": {"ns2:ReisInformatieProductID": "2105112239332400004", "ns2:AbonnementId": "55", "ns2:ReisInformatieTijdstip": "2021-05-11T20:40:00.000Z"}, "ns2:DynamischeAankomstStaat": {"ns2:RitId": "4380", "ns2:RitDatum": "2021-05-11", "ns2:RitStation": {"ns2:StationCode": "ALMB", "ns2:Type": "0", "ns2:KorteNaam": "Buiten", "ns2:MiddelNaam": "Buiten", "ns2:LangeNaam": "Almere Buiten", "ns2:UICCode": "8400081"}, "ns2:TreinAankomst": {"ns2:TreinNummer": "4380", "ns2:TreinSoort": {"@Code": "SPR", "#text": "Sprinter"}, "ns2:TreinStatus": "2", "ns2:Vervoerder": "NS", "ns2:TreinHerkomst": [

In [15]:
ssc.stop(stopSparkContext=False, stopGraceFully=False)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

You will need to adjust the batch interval (10 seconds here) in accordance with the processing times. Use the spark UI to check if batches are not accumulating.

---

# Part III - Live stopping time (20 points)

In this part, we will have a look at the two other streams, namely `ndovloketnl-arrivals` and `ndovloketnl-departures`. Each time a train arrives at or leaves a station, a message is generated. Let's have a look at the content.

In [16]:
import json
from pykafka.common import OffsetType

example_arrivals = client.topics[b'ndovloketnl-arrivals'].get_simple_consumer(
    auto_offset_reset=OffsetType.EARLIEST,
    reset_offset_on_start=True
).consume()
print(json.dumps(json.loads(example_arrivals.value), indent=2))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

{
  "ns1:PutReisInformatieBoodschapIn": {
    "@xmlns:ns1": "urn:ndov:cdm:trein:reisinformatie:messages:dynamischeaankomststaat:1", 
    "@xmlns:ns2": "urn:ndov:cdm:trein:reisinformatie:data:4", 
    "ns2:ReisInformatieProductDAS": {
      "ns2:RIPAdministratie": {
        "ns2:ReisInformatieTijdstip": "2021-04-26T11:20:00.000Z", 
        "ns2:ReisInformatieProductID": "2104261318312400002", 
        "ns2:AbonnementId": "55"
      }, 
      "@TimeStamp": "2021-04-26T11:18:31.081Z", 
      "@Versie": "6.1", 
      "ns2:DynamischeAankomstStaat": {
        "ns2:RitId": "37542", 
        "ns2:RitStation": {
          "ns2:LangeNaam": "Winschoten", 
          "ns2:MiddelNaam": "Winschoten", 
          "ns2:StationCode": "WS", 
          "ns2:Type": "1", 
          "ns2:UICCode": "8400696", 
          "ns2:KorteNaam": "Winschoten"
        }, 
        "ns2:TreinAankomst": {
          "ns2:TreinHerkomst": [
            {
              "ns2:LangeNaam": "Groningen", 
              "@InfoStatus":

In [17]:
example_departures = client.topics[b'ndovloketnl-departures'].get_simple_consumer(
    auto_offset_reset=OffsetType.EARLIEST,
    reset_offset_on_start=True
).consume()
print(json.dumps(json.loads(example_departures.value), indent=2))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

{
  "ns1:PutReisInformatieBoodschapIn": {
    "@xmlns:ns1": "urn:ndov:cdm:trein:reisinformatie:messages:5", 
    "ns2:ReisInformatieProductDVS": {
      "ns2:RIPAdministratie": {
        "ns2:ReisInformatieTijdstip": "2021-04-28T12:07:00.000Z", 
        "ns2:ReisInformatieProductID": "2104281408062300003", 
        "ns2:AbonnementId": "54"
      }, 
      "@TimeStamp": "2021-04-28T12:08:06.847Z", 
      "@Versie": "6.2", 
      "ns2:DynamischeVertrekStaat": {
        "ns2:RitId": "31241", 
        "ns2:RitStation": {
          "ns2:LangeNaam": "Delden", 
          "ns2:MiddelNaam": "Delden", 
          "ns2:StationCode": "DDN", 
          "ns2:Type": "0", 
          "ns2:UICCode": "8400169", 
          "ns2:KorteNaam": "Delden"
        }, 
        "ns2:RitDatum": "2021-04-28", 
        "ns2:Trein": {
          "ns2:VertrekRichting": "B", 
          "ns2:VertrekTijd": [
            {
              "@InfoStatus": "Gepland", 
              "#text": "2021-04-28T12:07:00.000Z"
            }

We can see that the messages have the following structure:

```
{
  'ns1:PutReisInformatieBoodschapIn': {
    'ns2:ReisInformatieProductDVS' or 'ns2:ReisInformatieProductDAS': {
      'ns2:DynamischeVertrekStaat' or 'ns2:DynamischeAankomstStaat': {
          'ns2:RitStation': <station_info>,
          'ns2:Trein' or 'ns2:TreinAankomst': {
              'ns2:VertrekTijd' or 'ns2:AankomstTijd': [<planned_and_actual_times>],
              'ns2:TreinNummer': <train_number>,
              'ns2:TreinSoort': <kind_of_train>,
              ...
          }
           
      }
    }
  }
}
```

We can see also that the train stations have a long name `ns2:LangeNaam`, a medium name `ns2:MiddelNaam`, a short name `ns2:KorteNaam`, a station code `ns2:StationCode` and a kind of nummerical ID `ns2:UICCode`. When giving information about times, tracks, direction,... you will find sometimes the information twice with the status `Gepland` (which means planned, according to the schedule) and `Actueel`(which means the actual measured value). 

### a) Parse - 5/20 

We want to compute the time a train stays at a station and get a real-time histogram for a given time window. To begin with, you need to write some parsing functions that will allow you to get information from the data streams. We have prepare one function `parse_train_dep` for the stream `ndovloketnl-departures`, which returns a Key-Value pair.

In [18]:
import json

def parse_train_dep(s):
    obj = json.loads(s)
    tn = (obj.get('ns1:PutReisInformatieBoodschapIn', {})
             .get('ns2:ReisInformatieProductDVS', {})
             .get('ns2:DynamischeVertrekStaat', {})
             .get('ns2:Trein', {})
             .get("ns2:TreinNummer"))
    st = (obj.get('ns1:PutReisInformatieBoodschapIn', {})
             .get('ns2:ReisInformatieProductDVS', {})
             .get('ns2:DynamischeVertrekStaat', {})
             .get('ns2:RitStation', {})
             .get("ns2:UICCode"))
    if tn and st:
        return [("{}-{}".format(tn, st), obj)]
    else:
        return []

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [19]:
parse_train_dep(example_departures.value)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

[('31241-8400169', {u'ns1:PutReisInformatieBoodschapIn': {u'@xmlns:ns1': u'urn:ndov:cdm:trein:reisinformatie:messages:5', u'ns2:ReisInformatieProductDVS': {u'ns2:RIPAdministratie': {u'ns2:ReisInformatieTijdstip': u'2021-04-28T12:07:00.000Z', u'ns2:ReisInformatieProductID': u'2104281408062300003', u'ns2:AbonnementId': u'54'}, u'@TimeStamp': u'2021-04-28T12:08:06.847Z', u'@Versie': u'6.2', u'ns2:DynamischeVertrekStaat': {u'ns2:RitId': u'31241', u'ns2:RitStation': {u'ns2:LangeNaam': u'Delden', u'ns2:MiddelNaam': u'Delden', u'ns2:StationCode': u'DDN', u'ns2:Type': u'0', u'ns2:UICCode': u'8400169', u'ns2:KorteNaam': u'Delden'}, u'ns2:RitDatum': u'2021-04-28', u'ns2:Trein': {u'ns2:VertrekRichting': u'B', u'ns2:VertrekTijd': [{u'@InfoStatus': u'Gepland', u'#text': u'2021-04-28T12:07:00.000Z'}, {u'@InfoStatus': u'Actueel', u'#text': u'2021-04-28T12:07:00.000Z'}], u'ns2:RangeerBeweging': u'N', u'ns2:PresentatieTreinVertrekSpoor': {u'ns2:Uitingen': {u'ns2:Uiting': u'1'}}, u'ns2:NietInstappen': u

__TODO - 1/5__ Please check the function `parse_train_dep` above. Explain how we construct the Key and the Value, and why we construct them in this way.

**Answer:** In the above function we contruct the key by extracting from the parsed json both the `TreinNummer` and `UICCode` of the current stop station. To extract them we iteratively get the appropriate dictionnary from the parsed JSON, each time returning default empty dictionnary if the key is not found. If the given JSON is missing one of these two attributes we return an empty array, if we managed to extract both `TreinNummer` and `UICCode` we concatenate them (with a ´-´ in between) to form a key and we add the entire parsed json object as value.  
This construction allows to keep all informations contained in our parsed JSON and to index it with a key that is suited to compute the time a train stays at a station (as we have both information concatenated as a unique key).

__TODO - 2/5__  Take `parse_train_dep` as an example and write the function `parse_train_arr` for the stream `ndovloketnl-arrivals`. Make sure they have the same output format.

In [20]:
def parse_train_arr(s):
    obj = json.loads(s)
    tn = (obj.get('ns1:PutReisInformatieBoodschapIn', {})
             .get('ns2:ReisInformatieProductDAS', {})
             .get('ns2:DynamischeAankomstStaat', {})
             .get('ns2:TreinAankomst', {})
             .get("ns2:TreinNummer"))
    st = (obj.get('ns1:PutReisInformatieBoodschapIn', {})
             .get('ns2:ReisInformatieProductDAS', {})
             .get('ns2:DynamischeAankomstStaat', {})
             .get('ns2:RitStation', {})
             .get("ns2:UICCode"))
    if tn and st:
        return [("{}-{}".format(tn, st), obj)]
    else:
        return []

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [21]:
parse_train_arr(example_arrivals.value)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

[('37542-8400696', {u'ns1:PutReisInformatieBoodschapIn': {u'@xmlns:ns1': u'urn:ndov:cdm:trein:reisinformatie:messages:dynamischeaankomststaat:1', u'@xmlns:ns2': u'urn:ndov:cdm:trein:reisinformatie:data:4', u'ns2:ReisInformatieProductDAS': {u'ns2:RIPAdministratie': {u'ns2:ReisInformatieTijdstip': u'2021-04-26T11:20:00.000Z', u'ns2:ReisInformatieProductID': u'2104261318312400002', u'ns2:AbonnementId': u'55'}, u'@TimeStamp': u'2021-04-26T11:18:31.081Z', u'@Versie': u'6.1', u'ns2:DynamischeAankomstStaat': {u'ns2:RitId': u'37542', u'ns2:RitStation': {u'ns2:LangeNaam': u'Winschoten', u'ns2:MiddelNaam': u'Winschoten', u'ns2:StationCode': u'WS', u'ns2:Type': u'1', u'ns2:UICCode': u'8400696', u'ns2:KorteNaam': u'Winschoten'}, u'ns2:TreinAankomst': {u'ns2:TreinHerkomst': [{u'ns2:LangeNaam': u'Groningen', u'@InfoStatus': u'Gepland', u'ns2:MiddelNaam': u'Groningen', u'ns2:StationCode': u'GN', u'ns2:Type': u'5', u'ns2:UICCode': u'8400263', u'ns2:KorteNaam': u'Groningen'}, {u'ns2:LangeNaam': u'Groni

__TODO - 2/5__ Another parsing function you will need later is `get_actual_time`, which will allow you to extract the **actual** time from the fields of time information, which are `ns2:AankomstTijd` in the arrival stream and `ns2:VertrekTijd` in the departure stream. 

__Note:__ These two fields may be empty and they may not contain the actual time information. In both cases the function should return `None`.

In [22]:
import datetime

def get_actual_time(tab): 
    # As the given lab might not always be ordered in the same way, we iterate over its elements
    for field in tab:
        # Check if the field corresponds to actual time and has attribute #text
        if(field.get('@InfoStatus', None)=='Actueel' and field.get('#text', None) ):
            act = field.get('#text')
            # Check the date text is non empty, if not returns the corresponding datetime (also take milliseconds into account)
            if(len(act)>1): return datetime.datetime.strptime(act[:-1]+"000", '%Y-%m-%dT%H:%M:%S.%f')
    return None

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [23]:
# Get the field of time in the departure stream
example_dep_json = json.loads(example_departures.value)
example_dep_tab = (example_dep_json.get('ns1:PutReisInformatieBoodschapIn', {})
                                   .get("ns2:ReisInformatieProductDVS", {})
                                   .get("ns2:DynamischeVertrekStaat", {})
                                   .get("ns2:Trein", {})
                                   .get("ns2:VertrekTijd",{}))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [24]:
assert get_actual_time(example_dep_tab) == datetime.datetime(2021, 4, 28, 12, 7)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [25]:
# Get the field of time in the arrival stream
example_arr_json = json.loads(example_arrivals.value)
example_arr_tab = (example_arr_json.get('ns1:PutReisInformatieBoodschapIn', {})
                                   .get("ns2:ReisInformatieProductDAS", {})
                                   .get("ns2:DynamischeAankomstStaat", {})
                                   .get("ns2:TreinAankomst", {})
                                   .get("ns2:AankomstTijd",{}))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [26]:
assert get_actual_time(example_arr_tab) == datetime.datetime(2021, 4, 26, 11, 20)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [27]:
example_arr_json.get('ns1:PutReisInformatieBoodschapIn').get("ns2:ReisInformatieProductDAS", {}).get("ns2:DynamischeAankomstStaat", {}).get("ns2:TreinAankomst", {}).get("ns2:AankomstTijd",{})

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

[{u'@InfoStatus': u'Gepland', u'#text': u'2021-04-26T11:20:00.000Z'}, {u'@InfoStatus': u'Actueel', u'#text': u'2021-04-26T11:20:00.000Z'}]

### b) Transform - 5/20

Create two Spark streams from the arrivals and departures where the records are in the form (Key, Value) using `parse_train_dep` and  `parse_train_arr`. 

In [28]:
# Redefines the checkpoint folder in case the intro cells were not all run
checkpoint = 'hdfs:///user/{}/checkpoint/'.format(username)
print('checkpoint created at hdfs:///user/{}/checkpoint/'.format(username))

# Init new StreamingContext, sets checkpoints & group
ssc = StreamingContext(sc, 10)
ssc.checkpoint(checkpoint)
group_id = 'ns-{0}'.format(username)

# Init input streams
raw_arrival_stream = KafkaUtils.createStream(ssc, ZOOKEEPER_QUORUM, group_id, {'ndovloketnl-arrivals': 1})
raw_departure_stream = KafkaUtils.createStream(ssc, ZOOKEEPER_QUORUM, group_id, {'ndovloketnl-departures': 1})

# Apply our parsing on the streams 
arrival_stream = raw_arrival_stream.flatMap(lambda x: parse_train_arr(x[1]))
departure_stream = raw_departure_stream.flatMap(lambda x: parse_train_dep(x[1]))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

checkpoint created at hdfs:///user/cadei/checkpoint/

In [29]:
# TODO: Can be removed from final submission
arrival_stream.pprint(num=2) # print the first 2 messages
departure_stream.pprint(num=2) # print the first 2 messages

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [30]:
ssc.start()
ssc.awaitTermination(timeout=10)


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

-------------------------------------------
Time: 2021-05-11 22:39:50
-------------------------------------------
('3074-8400309', {u'ns1:PutReisInformatieBoodschapIn': {u'@xmlns:ns1': u'urn:ndov:cdm:trein:reisinformatie:messages:dynamischeaankomststaat:1', u'@xmlns:ns2': u'urn:ndov:cdm:trein:reisinformatie:data:4', u'ns2:ReisInformatieProductDAS': {u'ns2:RIPAdministratie': {u'ns2:ReisInformatieTijdstip': u'2021-05-11T20:40:00.000Z', u'ns2:ReisInformatieProductID': u'2105112239412400004', u'ns2:AbonnementId': u'55'}, u'@TimeStamp': u'2021-05-11T20:39:41.375Z', u'@Versie': u'6.1', u'ns2:DynamischeAankomstStaat': {u'ns2:RitId': u'3074', u'ns2:RitStation': {u'ns2:LangeNaam': u'Heiloo', u'ns2:MiddelNaam': u'Heiloo', u'ns2:StationCode': u'HLO', u'ns2:Type': u'0', u'ns2:UICCode': u'8400309', u'ns2:KorteNaam': u'Heiloo'}, u'ns2:TreinAankomst': {u'ns2:TreinHerkomst': [{u'ns2:LangeNaam': u'Nijmegen', u'@InfoStatus': u'Gepland', u'ns2:MiddelNaam': u'Nijmegen', u'ns2:StationCode': u'NM', u'ns2:Ty

In [31]:
ssc.stop(stopSparkContext=False, stopGraceFully=False)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### c) Window and Join - 5/20

Every 20 seconds, we want to have a list of trains that had departed from any train station after staying for 5 minutes or less at the station. Apply a window of 20s sliding interval on arrival and departure streams. Join two streams such that trains staying for 5 minutes or less (± 20 seconds error due to sliding interval) at any station are caught in the RDD window of the joined stream (you can ignore late messages). 

__Note:__
- Check [here](https://spark.apache.org/docs/2.3.2/streaming-programming-guide.html#window-operations) for windowed computations in Spark Streaming.
- Use the methods [reduceByKeyAndWindow](https://spark.apache.org/docs/2.3.2/api/python/pyspark.streaming.html?highlight=reducebykey#pyspark.streaming.DStream.reduceByKeyAndWindow) and [join](https://spark.apache.org/docs/2.3.2/api/python/pyspark.streaming.html?highlight=reducebykey#pyspark.streaming.DStream.join) on DStream objects.
- Both windows should have `slideDuration` of 20s
- You have to pick the sizes of windows `windowDuration` carefully. The sizes can be different: 
    - The trains staying for 5 minutes or less (± 20 seconds error due to sliding interval) must be in the joined stream.
    - A same stay (i.e. one train at one station) is caught in the joined stream once and only once.

In [32]:

def to_time_arrival(x):   
    example_arr_tab = (x.get('ns1:PutReisInformatieBoodschapIn', {})
                                   .get("ns2:ReisInformatieProductDAS", {})
                                   .get("ns2:DynamischeAankomstStaat", {})
                                   .get("ns2:TreinAankomst", {})
                                   .get("ns2:AankomstTijd",{}))
    return get_actual_time(example_arr_tab)

def to_time_departure(x):   
    example_arr_tab = (x.get('ns1:PutReisInformatieBoodschapIn', {})
                                   .get("ns2:ReisInformatieProductDVS", {})
                                   .get("ns2:DynamischeVertrekStaat", {})
                                   .get("ns2:Trein", {})
                                   .get("ns2:VertrekTijd",{}))
    return get_actual_time(example_arr_tab)

def time(x):
    key, values = x
    departure, arrival = values
    diff = departure['time']-arrival['time']
    
    if (diff.total_seconds()/60) <= 5:
        return [(key,int(diff.total_seconds()/60))]
    else:
        return []

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [33]:
# Init new StreamingContext, sets checkpoints & group
ssc = StreamingContext(sc, 20)
checkpoint = 'hdfs:///user/{}/checkpoint/'.format(username)
ssc.checkpoint(checkpoint)
group_id = 'ns-{0}'.format(username)

# Init input streams
raw_arrival_stream = KafkaUtils.createStream(ssc, ZOOKEEPER_QUORUM, group_id, {'ndovloketnl-arrivals': 1})
raw_departure_stream = KafkaUtils.createStream(ssc, ZOOKEEPER_QUORUM, group_id, {'ndovloketnl-departures': 1})

# Apply our parsing on the streams 
arrival_stream = raw_arrival_stream.flatMap(lambda x: parse_train_arr(x[1]))
departure_stream = raw_departure_stream.flatMap(lambda x: parse_train_dep(x[1]))

arrival_stream = arrival_stream.map(lambda x: (x[0],{'time' : to_time_arrival(x[1])}))
departure_stream = departure_stream.map(lambda x: (x[0],{'time' : to_time_departure(x[1])}))

arrival_window = arrival_stream.reduceByKeyAndWindow(lambda x,y : x, lambda x, y: x, 5*60+20, 20)
departure_window = departure_stream.reduceByKeyAndWindow(lambda x,y : x, lambda x, y: x, 20, 20)

joined = departure_window.join(arrival_window).flatMap(time)

joined.pprint(num=30)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [34]:
ssc.start()
ssc.awaitTermination(timeout=20)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

-------------------------------------------
Time: 2021-05-11 22:40:00
-------------------------------------------

In [35]:
ssc.stop(stopSparkContext=False, stopGraceFully=False)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### Answer
We want to capture trains that stayed in the station for 5 minutes or less, and only count them once. We set the window duration of arrivals at 5 minutes and 20 seconds, the window duration of departures at 20 seconds and the slide duration at 20 seconds. Doing that we effectively have a tumbling window that can capture a train only once. Adding 20 seconds to the arrivals assure that we do not make error due to the sliding interval, and we need to check in the _time()_ function if the stay was indeed less than 5 minutes. With a tumbling window we assure that we capture each train only once. 


![title](diagram.png)

### d) Histogram - 5/20

On the joined stream, compute the length of each stay (you can round to the minute) and produce a stream of histograms. You don't need to plot them, a value/count array is enough, like:
```
-------------------------------------------
Time: 2018-05-17 11:10:00
-------------------------------------------
(0.0, 110)
(4.0, 3)
(8.0, 2) # introduced by late messages

-------------------------------------------
Time: 2018-05-17 11:10:20
-------------------------------------------
(0.0, 46)
(4.0, 2)
(1.0, 5)

```

In [36]:
# Init new StreamingContext, sets checkpoints & group
ssc = StreamingContext(sc, 20)
checkpoint = 'hdfs:///user/{}/checkpoint/'.format(username)
ssc.checkpoint(checkpoint)
group_id = 'ns-{0}'.format(username)

# Init input streams
raw_arrival_stream = KafkaUtils.createStream(ssc, ZOOKEEPER_QUORUM, group_id, {'ndovloketnl-arrivals': 1})
raw_departure_stream = KafkaUtils.createStream(ssc, ZOOKEEPER_QUORUM, group_id, {'ndovloketnl-departures': 1})

# Apply our parsing on the streams 
arrival_stream = raw_arrival_stream.flatMap(lambda x: parse_train_arr(x[1]))
departure_stream = raw_departure_stream.flatMap(lambda x: parse_train_dep(x[1]))

arrival_stream = arrival_stream.map(lambda x: (x[0],{'time' : to_time_arrival(x[1])}))
departure_stream = departure_stream.map(lambda x: (x[0],{'time' : to_time_departure(x[1])}))

arrival_window = arrival_stream.reduceByKeyAndWindow(lambda x,y : x, lambda x, y: x, 5*60, 20)
departure_window = departure_stream.reduceByKeyAndWindow(lambda x,y : x, lambda x, y: x, 20, 20)

joined = departure_window.join(arrival_window).flatMap(time)

histograms_stream = joined.map(lambda x: x[1]).countByValueAndWindow(20,20)
histograms_stream.pprint(num=6)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [37]:
ssc.start() 
ssc.awaitTermination(timeout=60)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

-------------------------------------------
Time: 2021-05-11 22:40:40
-------------------------------------------
(0, 22)
(1, 8)
(2, 6)

-------------------------------------------
Time: 2021-05-11 22:41:00
-------------------------------------------
(0, 11)
(1, 5)
(2, 1)

-------------------------------------------
Time: 2021-05-11 22:41:20
-------------------------------------------
(0, 35)
(1, 6)
(2, 5)
(3, 1)

In [38]:
ssc.stop(stopSparkContext=False, stopGraceFully=False)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…