## Longer term delay analysis

The data used in this analisys ranges from 2021.01.01 to 2022.09.30.
All trains and their positions as well as their potential delays are
sampled every minute, resulting in ~10GB data. This dataset does not
contain the cause of the delays, but is better suited for analysing
trends in delays over a "long" period of time.

In [None]:
import pandas as pd
import numpy as np

from datetime import datetime, timedelta
import dask.dataframe as dd
import dask.array as da
import dask.bag as db
import dask

from custom_loader import Loader
from tqdm import tqdm

import re
import os

from cassandra.cluster import Cluster
from cassandra.auth import PlainTextAuthProvider
from cassandra.query import dict_factory

import plotly.express as px
import holoviews as hv
import holoviews.operation.datashader as hd

In [None]:
def immutable_sort(list_to_sort:list) -> list:
    res = list_to_sort.copy()
    res.sort()
    return res

def epoch_to_date(day_since_epoch:int) ->  datetime:
    return datetime(1970,1,1) + timedelta(days = day_since_epoch)

## Setting up the connection

The data is stored in Cassandra db, which is well suited to store large amounts of data.
This data was scraped by u/gaborath on reddit, who graciously gave us this sample. He has
a cool [website](https://mav-stat.info) on the same topic.

In [None]:
with open('cassandra-credentials.txt','r') as f:
    user = f.readline().strip()
    pw = f.readline().strip()
port=11352

In [None]:
dask_cassandra_loader = Loader()
keyspace = 'mav'
cluster = ['vm.niif.cloud.bme.hu']

dask_cassandra_loader.connect_to_cassandra(cluster,
                                           keyspace,
                                           username=user,
                                           password=pw, 
                                           port=port,
                                           timeout=60
                                          )
dask_cassandra_loader.connect_to_local_dask()

## Distribution of delays

The train journeys are binned by mean delays from 0 (non inclusive) to 1000 minutes by 5 minute increments.
The resulting distribution can be seen below. (only 0 to 250 displayed for clarity)
The most frequent delays are from 0-5 and 5-10 minutes by a large margin. This is good news, 
but it is clear that delays that are hour or longer are very frequent. This is probably the reason behind
the notoriety of MAV.

In [None]:
bins = list(range(0,1000,5))
delays_binned = None
#epoch range: 18628-19296 
for i in tqdm(range(18628,19296,5)):
    try:
        table = dask_cassandra_loader.load_cassandra_table('train_data',
                                                 ['elviraid', 'delay',],
                                                           [],
                                                 [('epoch', [i,i+1,i+2,i+3,i+4])],
                                                 force=False)
        if table.data is None:
            continue
        df = table.data.groupby('elviraid').agg({'delay':'mean'}).reset_index()
        df = df['delay'].map_partitions(pd.cut, bins)
        if delays_binned is None:
            tmp = df.compute()
            tmp = tmp.groupby(tmp).size()
            delays_binned = tmp
        else:
            tmp = df.compute()
            tmp = tmp.groupby(tmp).size()
            delays_binned = delays_binned + tmp
    except Exception as e:
        print(e)

In [None]:
plot_df = pd.DataFrame({'x':delays_binned.index,'y':delays_binned})
plot_df['x'] = plot_df['x'].astype(str)
plot_df.to_csv('data/delays_binned.csv')

In [None]:
plot_df = pd.read_csv('data/delays_binned.csv').head(50)
fig = px.histogram(plot_df,x='x', y='y', title  = 'distribution of mean train delays')
fig.update_yaxes(type='log', title='count, logarithmic')
fig.update_xaxes(title='delay group (minutes)')
fig

## The mean delays for each route

Determining the mean delays for each route can be useful for diagnosing various issues, including infrastructure problems, management issues, and failures in collaboration with other railway companies. We recommend rescheduling routes with high average delays or addressing the underlying problems in order to improve efficiency.

In [None]:
cumul = None
to_fetch = 5
for i in tqdm(range(18628,19296,to_fetch)):
    try:
        table = dask_cassandra_loader.load_cassandra_table('train_data',
                                                 ['relation', 'delay',],
                                                           [],
                                                 [('epoch', list(range(i,i+to_fetch)))],
                                                 force=False)
        if table.data is None:
            continue
        df = table.data.groupby('relation').agg({'delay':'mean'})
        if cumul is None:
            cumul = df.compute().reset_index()
            cumul['delay'] = np.where(cumul['delay'].isna(),0,cumul['delay'])
        else:
            tmp = df.compute().reset_index()
            tmp['delay'] = np.where(tmp['delay'].isna(),0,tmp['delay'])
            cumul = pd.concat([cumul, tmp]).groupby(by='relation').mean()
    except Exception as e:
        print(e)

In [None]:
mean_delay_route = cumul.reset_index()
mean_delay_route['relation'] = mean_delay_route['relation'].apply(lambda x: x.split(' - '))
mean_delay_route['relation'] = mean_delay_route['relation'].apply(immutable_sort)
mean_delay_route['relation'] = mean_delay_route['relation'].astype(str)
mean_delay_route = mean_delay_route.groupby('relation').mean().reset_index()
mean_delay_route = mean_delay_route.sort_values(by=['delay'], ascending=[False])
mean_delay_route.to_csv('data/mean_delay_route.csv')

**It is important to note that the mean delays for routes were analyzed without considering directionality, meaning that a route from A to B was treated the same as a route from B to A.**

The top 10 routes with the highest average delays are almost all routes with a destination abroad. This is likely due to these routes being the longest in duration, rather than any issues with collaboration. This is supported by the fact that the destinations abroad are not located in a single country. In addition to this, the Budapest-Keleti to Sopron route, which does not lead abroad, is also in the top 10. However, this route is one of the longest routes in Hungary.

In [None]:
plot_df = pd.read_csv('data/mean_delay_route.csv', index_col = 0).head(10)
print(plot_df)
fig = px.bar(plot_df, x='relation', y='delay', title='Mean delays for each route (Top 10)')
fig.update_yaxes(title = 'mean delay (min)')
fig.update_xaxes(title = 'route')
fig

## Observing seasonality in delays

By creating a time series based on the mean delay per journey, we are able to observe
seasonility in delays. This can help diagnose the shortcomings of the current
system when it comes to weather conditions.

In [None]:
cumul = None
for i in tqdm(range(18628,19296,5)):
    table = dask_cassandra_loader.load_cassandra_table('train_data',
                                             ['epoch', 'elviraid', 'delay',],
                                                       [],
                                             [('epoch', [i,i+1,i+2,i+3,i+4])],
                                             force=False)
    if table.data is None:
        continue
    df = table.data.groupby(['epoch','elviraid']).agg({'delay':'mean'})
    df['is_delayed'] = df['delay'].map_partitions(lambda x: x > 1)
    df = df.reset_index(0)
    df = df.groupby(['epoch','is_delayed']).size().compute().reset_index(0).rename(columns={0:'count'})
    if cumul is None:
        cumul = df
    else:
        cumul = pd.concat([cumul,df])

In [None]:
delay_percentage = cumul.reset_index().pivot(index='epoch',columns=['is_delayed'])
delay_percentage = delay_percentage['count']
delay_percentage.columns = delay_percentage.columns.ravel()
delay_percentage = delay_percentage.rename(columns={False:'not_delayed_count',True:'delayed_count'})
delay_percentage['delayed_percentage'] = (delay_percentage['delayed_count'] / (delay_percentage['delayed_count']+delay_percentage['not_delayed_count']))*100
delay_percentage.to_csv('data/delay_percentage.csv')

Although MAV only includes delays of 5-6 minutes or more in their statistics on punctual trains, we wanted to examine the metric using a different definition of punctuality, specifically considering delays of one minute or more. As expected, the percentage of punctual trains was significantly higher under this definition. To smooth out any fluctuations in the data, we also calculated the 30-day moving average. While the metric remains somewhat noisy, it is clear that trains are most punctual in the spring, with approximately 70-80% of journeys still being on time. This may be due to the favorable weather conditions during this season.

In [None]:
plot_df = pd.read_csv('data/delay_percentage.csv')
plot_df['epoch'] = plot_df['epoch'].apply(epoch_to_date)
plot_df['sma30'] = plot_df['delayed_percentage'].rolling(30).mean()
fig = px.line(plot_df, x='epoch', y = ['delayed_percentage','sma30'],
              title = 'Percentage of journeys with mean delays longer 1 minute')
fig.update_yaxes(title = 'percentage of delayed trains')
fig.update_xaxes(title = 'date')

In [None]:
auth_provider = PlainTextAuthProvider(username=user, password=pw)
cluster = Cluster(contact_points=['vm.niif.cloud.bme.hu'], port=port,
    auth_provider=auth_provider)
session = cluster.connect(keyspace)
session.row_factory = dict_factory

In [None]:
min_epoch = 18628
max_epoch = 19296
epochs = []
daily_mean_delays = []
daily_max_delays = []

for epoch in tqdm(range(min_epoch,max_epoch)):
    success = False
    while not success:
        try:
            sql_query = f"SELECT epoch, avg(cast(delay as FLOAT)) AS mean, max(delay) as max  FROM train_data WHERE epoch = {epoch} AND delay < 500 GROUP BY epoch ALLOW FILTERING"
            dict = session.execute(sql_query).one()
            success = True
            if dict is None:
                continue
            daily_mean_delays.append(dict['mean'])
            daily_max_delays.append(dict['max'])
            epochs.append(epoch)
        except Exception as e:
            print(e)
            
df = pd.DataFrame({'epoch': pd.Series(data=epochs),
                   'mean_delay': pd.Series(data=daily_mean_delays),
                    'max_delay': pd.Series(data=daily_max_delays)})

df.to_csv('data/daily_mean_and_average_delay.csv', index = False)

The mean delays per journey also showed a similar pattern. We found this metric to be somewhat noisy as well, so we again calculated the 30-day moving average to better identify trends. From this analysis, we observed elevated mean delays during the summer months. While we are not experts on the subject, it is possible that this may be due to the summer heat, which can cause metal rails and electrical wires to expand and potentially affect the operation of the trains.

In [None]:
plot_df = pd.read_csv('data/daily_mean_and_average_delay.csv')
plot_df['epoch'] = plot_df['epoch'].apply(epoch_to_date)
plot_df['sma30'] = plot_df['mean_delay'].rolling(30).mean()

fig = px.line(plot_df, x='epoch', y=['mean_delay', 'sma30'], title='Daily mean delay')
fig.update_yaxes(title = 'delay (min)')
fig.update_xaxes(title = 'date')
fig

To determine the weather conditions that lead to the longest delays, we calculated the maximum delays for each day. As before, we applied the 30-day moving average to smooth out the data. However, even with the use of the moving average, we were unable to discern any trends in the maximum daily delays. This may suggest that the longest delays are not caused by weather-related factors. However, we were able to spot some inconsistencies in our data. Some periods are missing entries. These can be identified by looking for unusually long straight lines between two dates that are "far" apart. Most notably from the 27th of Feburary 2022 to the 30th of March 2022. As we mentioned earlier, we received this data from u/gaborauth and we don't know the cause of the missing data.

In [None]:
plot_df = pd.read_csv('data/daily_mean_and_average_delay.csv')
plot_df['epoch'] = plot_df['epoch'].apply(epoch_to_date)
plot_df['sma30'] = plot_df['max_delay'].rolling(30).mean()

fig = px.line(plot_df, x='epoch', y=['max_delay', 'sma30'], title='Daily max delay')
fig.update_yaxes(title = 'delay (min)')
fig.update_xaxes(title = 'date')
fig

As the daily data required a 30 day moving average to be more easily interpreted, we calculated the monthly mean delay in addition to the daily data. The resulting plot confirmed our initial observations of higher delays in the summer months and lower delays in the spring. This indicates that MAV should prioritize preparation for summer weather in order to reduce the average delay per journey.

In [None]:
plot_df = pd.read_csv('data/daily_mean_and_average_delay.csv').drop(columns=['max_delay'])
plot_df['epoch'] = plot_df['epoch'].apply(epoch_to_date)
plot_df = plot_df.set_index('epoch')
plot_df = plot_df.resample('M').mean().reset_index()
plot_df

fig = px.line(plot_df, x='epoch', y='mean_delay', title='Monthly mean delay per journey')
fig.update_yaxes(title = 'delay (min)')
fig.update_xaxes(title = 'date')
fig

## Trains with high average delays

Trains with high average delays might be in bad condition, suggesting they need to be
serviced or retired entirely. However, high average delays might be caused by factors
outside the trains' conditions, which is why we suggest that this data should not be taken
out of context and should be examined in conjunction with the routes that have high delays.

A short investigation into these trains' conditions could reveal the real causes of the delays.

In [None]:
cumul = None
#19296
for i in tqdm(range(18628,19296,5)):
    table = dask_cassandra_loader.load_cassandra_table('train_data',
                                             ['trainnumber', 'delay','elviraid'],
                                                       [],
                                             [('epoch', [i,i+1,i+2,i+3,i+4])],
                                             force=False)
    if table.data is None:
        continue
    df = table.data.groupby(['trainnumber','elviraid']).agg({'delay':'mean'})
    df = df.reset_index(0).compute()
    if cumul is None:
        cumul = df
    else:
        cumul = pd.concat([cumul,df])

In [None]:
delays_per_train = cumul.groupby('trainnumber').agg({'elviraid':'count','delay':'mean'})
delays_per_train = delays_per_train.sort_values(by=['delay'],ascending=[False])
delays_per_train.to_csv('data/delays_per_train.csv')

In [None]:
plot_df = pd.read_csv('data/delays_per_train.csv')
plot_df = plot_df[plot_df['elviraid']>10].head(10).reset_index()
print(plot_df)
fig = px.bar(plot_df, x='trainnumber', y='delay', title='Mean delays for each train')
fig.update_yaxes(title = 'mean delay (min)')
fig.update_xaxes(title = 'train number')
fig

## Delay map

By analyzing the locations with the highest mean delays, we identified potential railway tracks that may require servicing. This information was obtained by collecting the trains' locations along with the delay data, over an extended period of time by u/gaborauth. However, it is worth noting that the cause of the delays was not recorded, so it is possible that some locations may have high average delays due to factors such as faulty trains or other external factors. Therefore, we recommend that MAV conduct their own internal analysis of track conditions in order to address the delays at these locations.

In [None]:
cumul = None
#19296
for i in tqdm(range(18628,19296,5)):
    try:
        table = dask_cassandra_loader.load_cassandra_table('train_data',
                                                 ['lat', 'delay','lon'],
                                                           [],
                                                 [('epoch', [i,i+1,i+2,i+3,i+4])],
                                                 force=False)
        if table.data is None:
            continue
        df = table.data[table.data['delay']<500]
        if cumul is None:
            cumul = df
        else:
            cumul = dd.concat([cumul,df]).compute()
    except Exception as e:
        print(e)
cumul

In [None]:
import datashader as ds, colorcet
from bokeh.plotting import figure, show
from bokeh.models import LinearColorMapper, BasicTicker, ColorBar
import datashader.transfer_functions as tf
%opts Image [colorbar=True](cmap=colorcet.fire)
hv.extension("bokeh", 
             #'matplotlib'
            ) 
shaded = hd.datashade(hv.Points(cumul, ['lon', 'lat']), cmap=colorcet.fire, aggregator=ds.mean('delay'))
dmap = hd.dynspread(shaded, threshold=0.5, max_px=4).opts(bgcolor='black', xaxis=None, yaxis=None, width=1000, height=600)
dmap

In [None]:
from mpl_toolkits.axes_grid1 import ImageGrid

fig = plt.figure(figsize=(9, 9))

grid = ImageGrid(fig, 111, nrows_ncols=(1, 1), axes_pad=0.5, share_all=False,
                 cbar_location="right", cbar_mode="each", cbar_pad="2%")

artist0 = dsshow(cumul, ds.Point('lon', 'lat'), ds.mean('delay'),norm='eq_hist', ax=grid[0])

cbar = plt.colorbar(artist0, cax=grid.cbar_axes[0], extend='both')
cbar.minorticks_on()
grid[0].set_title('Train locations color-coded by average delay (minutes)')
plt.savefig('plots/delay-color-coded1.png')

Unfortunately, we were unable to include a colorbar in the first map. The map displays the average delay at each location, with brighter colors indicating higher average delays. It is worth noting that some locations with very high average delays appear outside of the track limits, which is due to inaccurate location reporting. To improve visualization, histogram equalization was applied to the data, which may distort the perception of track condition severity as smaller values appear closer to larger values.

In [None]:
import matplotlib.pyplot as plt
import matplotlib.image as mpimg
img = mpimg.imread('plots/delay-color-coded.png')
f = plt.figure()
f.set_figwidth(20)
f.set_figheight(12)
imgplot = plt.imshow(img)
plt.show()

In [None]:
import matplotlib.pyplot as plt
import matplotlib.image as mpimg
img = mpimg.imread('plots/delay-color-coded-mpl.png')
f = plt.figure()
f.set_figwidth(20)
f.set_figheight(12)
imgplot = plt.imshow(img)
plt.show()