In [1]:
import pandas as pd
import merging
import numpy as np
import matplotlib.pyplot as plt
from datetime import datetime
import multiprocessing
from multiprocessing import Pool, cpu_count
import queue # imported for using queue.Empty exception
from datetime import date
from pathlib import Path  
from multiprocessing import Process, Lock


#### Advanced data preprocessing.

##### Reshaping the dataset's structure.
 
After performing some basic level data preprocessing, by adding 3 new features, and doing data cleaning, where I have mainly dealt with rare and NaN values, it's time to change the way of thinking behind preprocessing the dataset. So far all of my efforts were aimed to make my dataset clean and feature-relevant (using only features that I will need). 

However another important thing to consider is the shape of the user's input. How would the model give the delay prediction for, say, a journey from Eindhoven Centraal to Breda, with a stop at Tilburg? In the current state of the dataset my algorithm would make a prediction based on every trip from the given station, even to and from unrelated cities such as Maastricht or Zwolle. 

Since my data is already separated by journeys and stop id's, my plan is to go further and make each entry represent a a part of the journey and not a particular stop.
For example a new dataset entry would have the following features: Departure City, Arrival City, Departure Time, arrival time, total delay, and so on. 
So simply saying, I would merge every neighboring entry together, so that a model would be able to make a prediction based on specific journeys from one city to another.

Let's take a look at the dataset once more

In [2]:
basic_info = pd.read_csv('preprocessed_data/main_preprocessed_dataset.csv')
basic_info.head(6)

Unnamed: 0,Journey id,Date,Train type,Railroad company,Train number,Station code,Station name,Arrival time,Arrival delay,Departure time,Departure delay,Departure cancelled,Stop id,Is_weekend,Is_holiday
0,7914217,2022-01-01,Intercity,NS,1410,RTD,Rotterdam Centraal,2022-01-01 02:00:00+01:00,0.0,2022-01-01T02:00:00+01:00,4.0,False,1,True,True
1,7914217,2022-01-01,Intercity,NS,1410,DT,Delft,2022-01-01T02:13:00+01:00,3.0,2022-01-01T02:13:00+01:00,4.0,False,2,True,True
2,7914217,2022-01-01,Intercity,NS,1410,GV,Den Haag HS,2022-01-01T02:21:00+01:00,2.0,2022-01-01T02:23:00+01:00,1.0,False,3,True,True
3,7914217,2022-01-01,Intercity,NS,1410,LEDN,Leiden Centraal,2022-01-01T02:36:00+01:00,0.0,2022-01-01T02:45:00+01:00,0.0,False,4,True,True
4,7914217,2022-01-01,Intercity,NS,1410,SHL,Schiphol Airport,2022-01-01T03:01:00+01:00,0.0,2022-01-01T03:03:00+01:00,0.0,False,5,True,True
5,7914217,2022-01-01,Intercity,NS,1410,ASD,Amsterdam Centraal,2022-01-01T03:17:00+01:00,0.0,2022-01-01T03:23:00+01:00,0.0,False,6,True,True


First things first, let's get rid of column "Date" and merge arrival delay and departure delay columns together. 


In [3]:
basic_info.drop("Date", inplace = True, axis = 1)

In [4]:
#basic_info['Total delay'] = basic_info['Arrival delay'], basic_info['Departure delay']])

In [5]:
basic_info.head()
basic_info = basic_info.head(1000000)
len(basic_info)

1000000

In [6]:
num_processes = cpu_count()
grouped_data = basic_info.groupby('Journey id')
args_list = [(journey_id, group) for journey_id, group in grouped_data]

In [7]:
results = []
with Pool(num_processes - 1) as pool:
    # Use the apply method to process each group in parallel
    results = pool.map(merging.merge_func, args_list)
pool.close()
pool.join()

results = pd.concat(results)

In [8]:
disruptions = pd.read_csv('preprocessed_data/disruptions_preprocessed.csv')
disruptions.head()

Unnamed: 0,rdt_lines,rdt_station_names,rdt_station_codes,cause_group,start_time,end_time,duration_minutes
0,Den Haag HS - Rotterdam Centraal,"Delft,Delft Campus,Den Haag HS,Den Haag Moerwi...","DT, DTCP, GV, GVMW, RSW",external,2022-01-01 05:45:33,2022-01-01 06:34:58,49.0
1,"'s-Hertogenbosch - Utrecht Centraal, Amsterdam...","Abcoude,Amsterdam Amstel,Amsterdam Bijlmer Are...","AC, ASA, ASB, ASD, ASDM, ASHD, BKL, CL, DVD, G...",engineering work,2022-01-01 06:23:54,2022-01-01 13:59:14,455.0
2,Eindhoven - Venlo,"Blerick,Deurne,Horst-Sevenum,Venlo","BR, DN, HRT, VL",accidents,2022-01-01 07:31:39,2022-01-01 11:26:38,235.0
3,Heerlen - Sittard,"Geleen Oost,Hoensbroek,Heerlen,Nuth,Spaubeek,S...","GLN, HB, HRL, NH, SBK, SN, STD",engineering work,2022-01-01 16:06:23,2022-01-01 16:46:33,40.0
4,"Amersfoort - Amsterdam Centraal, Amersfoort - ...","Naarden-Bussum,Weesp","NDB, WP",engineering work,2022-01-01 16:17:56,2022-01-01 17:18:47,61.0


#### Next step: Merging journeys with disruptions. 
Now that the whole dataset is conveniently split into journeys and journey parts, it is possible to use an open source disruptions dataset to find out whether a disruption took place on one or more journey parts. This would not be possible with the old dataset, since most of the disruptions (such as engineering works, or rolling stock problems) take place on tracks in between stations, so knowing both departure and arrival station is required, in order to achieve maximum accuracy.

Let's write code for reading every station in rdt_station_codes feature and compare this information against the info in main dataset. 
##### The logic is simple: 
1) Create new feature called Disruptions.
2) Check if the disruption has ended after the train have departed
3) Check if rdt_station_codes contain both the arrival station and departure station
4) If all requirements are satisfied then Disruption value is True
5) Otherwire it is false
   
Now I do understand that this is also not entirely correct, since there are specific disruptions that are limited to only 1 station. However, the code that I would write below could be easily changed in the future, so I'll stick with this system for now

##### Additional preprocessing - Unified date time format in main dataset and disruptions dataset
As you can see currently the datasets store dates in a different format. In order to compare the 'Departure time' feature against 'end_time' feature I have to make sure they are stored in one unified format


In [9]:
results.head()

Unnamed: 0,Journey id,Train type,Railroad company,Train number,Departure station code,Arrival station code,Departure station name,Arrival station name,Total journey delay,Departure time,Arrival time,Is weekend,Is holiday,Part number,Cancelled
0,7914217,Intercity,NS,1410,RTD,DT,Rotterdam Centraal,Delft,4.0,2022-01-01T02:00:00+01:00,2022-01-01T02:13:00+01:00,True,True,1,False
1,7914217,Intercity,NS,1410,DT,GV,Delft,Den Haag HS,7.0,2022-01-01T02:13:00+01:00,2022-01-01T02:21:00+01:00,True,True,2,False
2,7914217,Intercity,NS,1410,GV,LEDN,Den Haag HS,Leiden Centraal,3.0,2022-01-01T02:23:00+01:00,2022-01-01T02:36:00+01:00,True,True,3,False
3,7914217,Intercity,NS,1410,LEDN,SHL,Leiden Centraal,Schiphol Airport,0.0,2022-01-01T02:45:00+01:00,2022-01-01T03:01:00+01:00,True,True,4,False
4,7914217,Intercity,NS,1410,SHL,ASD,Schiphol Airport,Amsterdam Centraal,0.0,2022-01-01T03:03:00+01:00,2022-01-01T03:17:00+01:00,True,True,5,False


In [10]:
from dateutil.parser import parse

disruptions["start_time"] = disruptions["start_time"].apply(lambda x: pd.to_datetime(x).strftime('%Y-%m-%d %H:%M:%S'))
disruptions["end_time"] = disruptions["end_time"].apply(lambda x: pd.to_datetime(x).strftime('%Y-%m-%d %H:%M:%S'))


In [11]:
def check_for_disruptions(departure, arrival, departure_time, arrival_time):
    # Create boolean masks for each condition
    mask_departure = disruptions['rdt_station_codes'].apply(lambda x: departure in x)
    mask_arrival = disruptions['rdt_station_codes'].apply(lambda x: arrival in x)
    mask_time = (disruptions['end_time'] > departure_time) & (disruptions['start_time'] < arrival_time)
    
    # Combine the masks using logical AND
    result = mask_departure & mask_arrival & mask_time
    
    return result.any()

In [12]:
df_split = np.array_split(results, num_processes - 1)

  return bound(*args, **kwds)


In [13]:
with Pool(num_processes - 1) as pool:
    # Use the apply method to process each group in parallel
    merged = pool.map(merging.unify_date_time_format, df_split)

In [14]:
df = pd.concat(merged)
df['Disruptions'] = df.apply(lambda row: check_for_disruptions(row['Departure station code'], row['Arrival station code'], row['Departure time'], row['Arrival time']), axis=1)

In [15]:
cancelled = df[df['Cancelled'] == True]
cancelled.head(20)

Unnamed: 0,Journey id,Train type,Railroad company,Train number,Departure station code,Arrival station code,Departure station name,Arrival station name,Total journey delay,Departure time,Arrival time,Is weekend,Is holiday,Part number,Cancelled,Disruptions
2,7914227,Intercity,NS,1422,DT,GV,Delft,Den Haag HS,0.0,2022-01-01 05:12:00,2022-01-01T05:21:00+01:00,True,True,3,True,True
0,7914234,Sprinter,NS,5110,RTD,SDM,Rotterdam Centraal,Schiedam Centrum,0.0,2022-01-01 05:36:00,2022-01-01T05:41:00+01:00,True,True,1,True,False
1,7914234,Sprinter,NS,5110,SDM,DTCP,Schiedam Centrum,Delft Campus,0.0,2022-01-01 05:41:00,2022-01-01T05:47:00+01:00,True,True,2,True,False
2,7914234,Sprinter,NS,5110,DTCP,DT,Delft Campus,Delft,0.0,2022-01-01 05:47:00,2022-01-01T05:50:00+01:00,True,True,3,True,True
3,7914234,Sprinter,NS,5110,DT,RSW,Delft,Rijswijk,0.0,2022-01-01 05:50:00,2022-01-01T05:54:00+01:00,True,True,4,True,True
4,7914234,Sprinter,NS,5110,RSW,GVMW,Rijswijk,Den Haag Moerwijk,0.0,2022-01-01 05:54:00,2022-01-01T05:57:00+01:00,True,True,5,True,True
5,7914234,Sprinter,NS,5110,GVMW,GV,Den Haag Moerwijk,Den Haag HS,0.0,2022-01-01 05:57:00,2022-01-01T06:01:00+01:00,True,True,6,True,True
2,7914243,Intercity,NS,1426,DT,GV,Delft,Den Haag HS,0.0,2022-01-01 06:13:00,2022-01-01T06:21:00+01:00,True,True,3,True,True
0,7914256,Sprinter,NS,5123,GVC,GV,Den Haag Centraal,Den Haag HS,0.0,2022-01-01 06:24:00,2022-01-01T06:27:00+01:00,True,True,1,True,False
1,7914256,Sprinter,NS,5123,GV,GVMW,Den Haag HS,Den Haag Moerwijk,0.0,2022-01-01 06:29:00,2022-01-01T06:32:00+01:00,True,True,2,True,True


In [16]:
delays = df[df['Total journey delay'] > 0]

print(len(disruptions_true))
percentage = (len(disruptions_true) / len(df)) * 100
print("There's " + str(percentage) + " % of journeys with no disruptions")

In [17]:
print(len(cancelled))
percentage = (len(cancelled) / len(df)) * 100
print("There's " + str(percentage) + " % of journeys with cancellations")

33218
There's 3.748000081237885 % of journeys with cancellations


In [18]:
print(len(delays))
percentage = (len(delays) / len(df)) * 100
print("There's " + str(percentage) + " % of journeys with delays")

226792
There's 25.58903108026077 % of journeys with delays


In [19]:
df.head(50)

Unnamed: 0,Journey id,Train type,Railroad company,Train number,Departure station code,Arrival station code,Departure station name,Arrival station name,Total journey delay,Departure time,Arrival time,Is weekend,Is holiday,Part number,Cancelled,Disruptions
0,7914217,Intercity,NS,1410,RTD,DT,Rotterdam Centraal,Delft,4.0,2022-01-01 02:00:00,2022-01-01T02:13:00+01:00,True,True,1,False,False
1,7914217,Intercity,NS,1410,DT,GV,Delft,Den Haag HS,7.0,2022-01-01 02:13:00,2022-01-01T02:21:00+01:00,True,True,2,False,True
2,7914217,Intercity,NS,1410,GV,LEDN,Den Haag HS,Leiden Centraal,3.0,2022-01-01 02:23:00,2022-01-01T02:36:00+01:00,True,True,3,False,False
3,7914217,Intercity,NS,1410,LEDN,SHL,Leiden Centraal,Schiphol Airport,0.0,2022-01-01 02:45:00,2022-01-01T03:01:00+01:00,True,True,4,False,False
4,7914217,Intercity,NS,1410,SHL,ASD,Schiphol Airport,Amsterdam Centraal,0.0,2022-01-01 03:03:00,2022-01-01T03:17:00+01:00,True,True,5,False,False
0,7914219,Intercity,NS,1409,UT,ASD,Utrecht Centraal,Amsterdam Centraal,4.0,2022-01-01 02:16:00,2022-01-01T02:44:00+01:00,True,True,1,False,True
1,7914219,Intercity,NS,1409,ASD,SHL,Amsterdam Centraal,Schiphol Airport,2.0,2022-01-01 02:46:00,2022-01-01T03:01:00+01:00,True,True,2,False,False
2,7914219,Intercity,NS,1409,SHL,LEDN,Schiphol Airport,Leiden Centraal,0.0,2022-01-01 03:05:00,2022-01-01T03:21:00+01:00,True,True,3,False,False
3,7914219,Intercity,NS,1409,LEDN,GV,Leiden Centraal,Den Haag HS,0.0,2022-01-01 03:23:00,2022-01-01T03:37:00+01:00,True,True,4,False,False
4,7914219,Intercity,NS,1409,GV,DT,Den Haag HS,Delft,0.0,2022-01-01 03:38:00,2022-01-01T03:45:00+01:00,True,True,5,False,True


df.to_csv('preprocessed_data/disruptions_and_main_dataset.csv', index=False)

In [20]:
df.to_csv('preprocessed_data/disruptions_and_main_dataset.csv', index=False)