In [None]:
import os
import pandas as pd
import json
from datetime import datetime
import logging
from concurrent.futures import ThreadPoolExecutor, as_completed
import threading
from tqdm import tqdm

# reading the first json from the realtime data folder
parent_folder = 'washington_GTFS_data 2024.1.29-2024.2.11/washington_GTFS_data - Copy/output/' # smaller dataset
# parent_folder = 'washington_GTFS_data 2024.2.12-2024.3.24\washington_GTFS_data 2.12-3.24/' # bigger dataset

trip_updates = 'RAIL_RT_TRIP_UPDATES/'

file_path = parent_folder + trip_updates + '2024_05_02_03_35_00.json'
try:
	with open(file_path, 'r') as file:
		tripJson = json.load(file)
except FileNotFoundError:
	print(f"File not found: {file_path}")

# extracting the links data from static data
links = pd.read_csv('static/links.csv')
stopTimes = pd.read_csv('static/stop_times.csv')
uniqueLinks = pd.read_csv('static/unique_links.csv')


In [2]:
tripJson

{'header': {'gtfs_realtime_version': '2.0',
  'incrementality': 0,
  'timestamp': 1707104092},
 'entity': [{'id': '0',
   'trip_update': {'trip': {'trip_id': '5179511_19757',
     'start_time': '22:38:00',
     'start_date': '20240204',
     'schedule_relationship': 0,
     'route_id': 'RED',
     'direction_id': 0},
    'stop_time_update': [{'stop_sequence': 1,
      'departure': {'time': 1707104280, 'uncertainty': 0},
      'stop_id': 'PF_A03_1',
      'schedule_relationship': 0},
     {'stop_sequence': 2,
      'arrival': {'time': 1707104328, 'uncertainty': 0},
      'stop_id': 'PF_A02_C',
      'schedule_relationship': 0},
     {'stop_sequence': 3,
      'arrival': {'time': 1707104453, 'uncertainty': 0},
      'stop_id': 'PF_A01_1',
      'schedule_relationship': 0},
     {'stop_sequence': 4,
      'arrival': {'time': 1707104542, 'uncertainty': 0},
      'stop_id': 'PF_B01_1',
      'schedule_relationship': 0},
     {'stop_sequence': 5,
      'arrival': {'time': 1707104643, 'uncert

In [None]:
def get_trip_data(tripJson: dict) -> pd.DataFrame:
    data = []
    for entity in tripJson.get('entity', []):  # Safely get 'entity' list
        trip_update = entity.get('trip_update', {})
        stop_time_updates = trip_update.get('stop_time_update', [])
        
        for stop_time_update in stop_time_updates:
            # Extract arrival and departure times
            arrival_time = stop_time_update.get('arrival', {}).get('time', None)
            departure_time = stop_time_update.get('departure', {}).get('time', None)
            
            # Ensure the timestamps are numeric and within a valid range
            if isinstance(arrival_time, (int, float)):
                if arrival_time < 0 or arrival_time > 2**31 - 1:  # Limit is 2038
                    arrival_time = None
            else:
                arrival_time = None
            
            if isinstance(departure_time, (int, float)):
                if departure_time < 0 or departure_time > 2**31 - 1:  # Limit is 2038
                    departure_time = None
            else:
                departure_time = None
            
            # Append the cleaned data
            data.append({
                'trip_id': trip_update.get('trip', {}).get('trip_id', None),
                'stop_id': stop_time_update.get('stop_id', None),
                'stop_sequence': stop_time_update.get('stop_sequence', None),
                'arrival_time': arrival_time,
                'arrival_uncertainty': stop_time_update.get('arrival', {}).get('uncertainty', None),
                'departure_time': departure_time,
                'departure_uncertainty': stop_time_update.get('departure', {}).get('uncertainty', None),
            })
    
    # Create a DataFrame from the cleaned data
    currentStopTimes = pd.DataFrame(data)
    
    return currentStopTimes

def calculateLinkParams(currentStopTimes: pd.DataFrame, lengthdf: pd.DataFrame) -> pd.DataFrame:
    currentStopTimes = currentStopTimes.sort_values(by=['trip_id', 'stop_sequence'])
    next_stop_times = currentStopTimes.shift(-1)

    mask = currentStopTimes['trip_id'] == next_stop_times['trip_id']
    link_data = currentStopTimes[mask].copy()
    link_data['end_stop'] = next_stop_times['stop_id']
    link_data['end_sequence'] = next_stop_times['stop_sequence']
    link_data['end_time'] = next_stop_times['arrival_time']

    # Ensure the trips are in sequence order
    link_data = link_data[link_data['stop_sequence'] == link_data['end_sequence'] - 1]

    # Ensure start_stop and end_stop are not the same
    link_data = link_data[link_data['stop_id'] != link_data['end_stop']]

    link_data['start_time'] = link_data['departure_time'].fillna(link_data['arrival_time'])
    link_data['real_time_taken [mins]'] = (link_data['end_time'] - link_data['start_time']) / 60

    # Drop rows where the time taken is negative
    link_data = link_data[link_data['real_time_taken [mins]'] >= 0]

    link_data = link_data.merge(lengthdf, left_on=['stop_id', 'end_stop'], right_on=['start_stop', 'end_stop'], how='left')
    link_data['real_speed [km/h]'] = link_data['length'] / (link_data['real_time_taken [mins]'] / 60)
    
    return link_data[['trip_id', 'stop_id', 'end_stop', 'stop_sequence', 'end_sequence', 'real_time_taken [mins]', 'start_time', 'end_time', 'length', 'real_speed [km/h]']].rename(columns={'stop_id': 'start_stop', 'stop_sequence': 'start_sequence'})

def calculateLinkKpis(currentLinks: pd.DataFrame, uniqueLinks: pd.DataFrame, stopTimes: pd.DataFrame) -> pd.DataFrame:
    for index, row in uniqueLinks.iterrows():
        link = currentLinks[(currentLinks['start_stop'] == row['start_stop']) & (currentLinks['end_stop'] == row['end_stop'])]

        mean_speed = link['real_speed [km/h]'].mean()
        covariance_speed = link['real_speed [km/h]'].std() / mean_speed if mean_speed else 0
        buffer_speed = link['real_speed [km/h]'].quantile(0.95) - link['real_speed [km/h]'].quantile(0.5)

        mean_time = link['real_time_taken [mins]'].mean()
        covariance_time = link['real_time_taken [mins]'].std() / mean_time if mean_time else 0
        buffer_time = link['real_time_taken [mins]'].quantile(0.95) - link['real_time_taken [mins]'].quantile(0.5)

        no_of_trips = len(link)
        uniqueLinks.loc[index, 'no_of_trips'] = no_of_trips

        uniqueLinks.loc[index, 'mean_speed [km/h]'] = mean_speed
        uniqueLinks.loc[index, 'covariance_speed [%]'] = covariance_speed
        uniqueLinks.loc[index, 'buffer_speed [km/h]'] = buffer_speed
        uniqueLinks.loc[index, 'mean_time [mins]'] = mean_time
        uniqueLinks.loc[index, 'covariance_time [%]'] = covariance_time
        uniqueLinks.loc[index, 'buffer_time [mins]'] = buffer_time

        trips = stopTimes[(stopTimes['stop_id'] == row['end_stop']) & (stopTimes['stop_sequence'] == row['end_sequence'])].copy()
        trips['departure_time'] = trips['departure_time'].fillna(trips['arrival_time'])
        headways = trips['departure_time'].diff().dropna().abs() / 60

        if not headways.empty:
            mean_headway = headways.mean()
            covariance_headway = headways.std() / mean_headway if mean_headway else 0
            buffer_headway = headways.quantile(0.95) - headways.quantile(0.5)
            uniqueLinks.loc[index, 'mean_headway [mins]'] = mean_headway
            uniqueLinks.loc[index, 'covariance_headway [%]'] = covariance_headway
            uniqueLinks.loc[index, 'buffer_headway [mins]'] = buffer_headway
        else:
            uniqueLinks.loc[index, 'mean_headway [mins]'] = None
            uniqueLinks.loc[index, 'covariance_headway [%]'] = None
            uniqueLinks.loc[index, 'buffer_headway [mins]'] = None

    return uniqueLinks


In [4]:
currentStopTimes = get_trip_data(tripJson)
currentStopTimes

Unnamed: 0,trip_id,stop_id,stop_sequence,arrival_time,arrival_uncertainty,departure_time,departure_uncertainty
0,5179511_19757,PF_A03_1,1,,,1.707104e+09,0.0
1,5179511_19757,PF_A02_C,2,1.707104e+09,0.0,,
2,5179511_19757,PF_A01_1,3,1.707104e+09,0.0,,
3,5179511_19757,PF_B01_1,4,1.707105e+09,0.0,,
4,5179511_19757,PF_B02_1,5,1.707105e+09,0.0,,
...,...,...,...,...,...,...,...
2345,5178210_19757,PF_E06_C,5,1.707104e+09,0.0,,
2346,5178210_19757,PF_E05_C,6,1.707104e+09,0.0,,
2347,5178210_19757,PF_E04_C,7,1.707104e+09,0.0,,
2348,5178210_19757,PF_E03_C,8,1.707104e+09,0.0,,


In [5]:
uniqueLinks

Unnamed: 0,start_stop,end_stop,start_sequence,end_sequence,length,mean_speed [km/h],covariance_speed [%],buffer_speed [km/h],mean_time [mins],covariance_time [%],buffer_time [mins],mean_headway [mins],covariance_headway [%],buffer_headway [mins]
0,PF_A15_C,PF_A14_C,1,2,2.6710,40.065000,1.774532e-16,0.0,4.0,0.0,0.0,13.954708,5.889077,9.00
1,PF_A14_C,PF_A13_C,2,3,2.1048,42.096000,1.688916e-16,0.0,3.0,0.0,0.0,13.954708,5.889077,9.00
2,PF_A13_C,PF_A12_C,3,4,1.0988,32.964000,0.000000e+00,0.0,2.0,0.0,0.0,13.954708,5.889077,9.00
3,PF_A12_C,PF_A11_C,4,5,1.4536,29.072000,0.000000e+00,0.0,3.0,0.0,0.0,13.954708,5.889077,9.00
4,PF_A11_C,PF_A10_C,5,6,2.1612,32.418000,2.193121e-16,0.0,4.0,0.0,0.0,13.954708,5.889077,9.00
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
197,PF_N11_C,PF_N10_C,2,3,2.8733,34.479600,0.000000e+00,0.0,5.0,0.0,0.0,40.306034,3.493899,41.85
198,PF_N10_C,PF_N09_C,3,4,2.0799,31.198628,4.425913e-05,0.0,4.0,0.0,0.0,40.306034,3.493899,41.85
199,PF_N09_C,PF_N08_C,4,5,1.7224,34.447983,5.344577e-06,0.0,3.0,0.0,0.0,40.306034,3.493899,41.85
200,PF_N08_C,PF_N07_C,5,6,1.3477,26.954000,5.260984e-16,0.0,3.0,0.0,0.0,41.942149,3.397802,48.00


In [6]:
lengthdf = uniqueLinks[['start_stop','end_stop', 'length']]
lengthdf

Unnamed: 0,start_stop,end_stop,length
0,PF_A15_C,PF_A14_C,2.6710
1,PF_A14_C,PF_A13_C,2.1048
2,PF_A13_C,PF_A12_C,1.0988
3,PF_A12_C,PF_A11_C,1.4536
4,PF_A11_C,PF_A10_C,2.1612
...,...,...,...
197,PF_N11_C,PF_N10_C,2.8733
198,PF_N10_C,PF_N09_C,2.0799
199,PF_N09_C,PF_N08_C,1.7224
200,PF_N08_C,PF_N07_C,1.3477


In [7]:
currentLinks = calculateLinkParams(currentStopTimes, lengthdf)
currentLinks

Unnamed: 0,trip_id,start_stop,end_stop,start_sequence,end_sequence,real_time_taken [mins],start_time,end_time,length,real_speed [km/h]
0,5177216_19757,PF_N12_C,PF_N11_C,1,2.0,16.316667,1.707099e+09,1.707100e+09,1.9602,7.208090
1,5177216_19757,PF_N11_C,PF_N10_C,2,3.0,18.150000,1.707100e+09,1.707101e+09,2.8733,9.498512
2,5177216_19757,PF_N10_C,PF_N09_C,3,4.0,17.150000,1.707100e+09,1.707101e+09,2.0799,7.276618
3,5177216_19757,PF_N09_C,PF_N08_C,4,5.0,16.500000,1.707100e+09,1.707101e+09,1.7224,6.263273
4,5177216_19757,PF_N08_C,PF_N07_C,5,6.0,16.383333,1.707100e+09,1.707101e+09,1.3477,4.935626
...,...,...,...,...,...,...,...,...,...,...
1889,NR397,PF_E06_C,PF_E07_1,8,9.0,0.233333,1.707105e+09,1.707105e+09,1.9827,509.837143
1890,NR397,PF_E08_1,PF_E09_C,10,11.0,0.116667,1.707105e+09,1.707105e+09,1.8653,959.297143
1891,NR409,PF_D09_C,PF_D10_C,1,2.0,1.450000,1.707104e+09,1.707104e+09,0.9767,40.415172
1892,NR409,PF_D10_C,PF_D11_1,2,3.0,2.750000,1.707104e+09,1.707104e+09,1.1506,25.104000


In [8]:
# calculating the current unique links
currentUniqueLinks = currentLinks.drop_duplicates(subset=['start_stop', 'end_stop'])[['start_stop', 'end_stop', 'start_sequence', 'end_sequence']]
currentUniqueLinks.reset_index(drop=True, inplace=True)
currentUniqueLinks

Unnamed: 0,start_stop,end_stop,start_sequence,end_sequence
0,PF_N12_C,PF_N11_C,1,2.0
1,PF_N11_C,PF_N10_C,2,3.0
2,PF_N10_C,PF_N09_C,3,4.0
3,PF_N09_C,PF_N08_C,4,5.0
4,PF_N08_C,PF_N07_C,5,6.0
...,...,...,...,...
188,PF_B07_C,PF_B08_C,11,12.0
189,PF_B08_C,PF_B09_C,12,13.0
190,PF_B09_C,PF_B10_C,13,14.0
191,PF_B10_C,PF_B11_C,14,15.0


In [9]:
kpis = calculateLinkKpis(currentLinks, currentUniqueLinks, currentStopTimes)
kpis

Unnamed: 0,start_stop,end_stop,start_sequence,end_sequence,no_of_trips,mean_speed [km/h],covariance_speed [%],buffer_speed [km/h],mean_time [mins],covariance_time [%],buffer_time [mins],mean_headway [mins],covariance_headway [%],buffer_headway [mins]
0,PF_N12_C,PF_N11_C,1,2.0,8.0,30.433000,0.326967,6.888473,5.114583,0.886959,8.297500,28.283333,0.605340,31.861667
1,PF_N11_C,PF_N10_C,2,3.0,8.0,30.335604,0.292593,5.402877,6.825000,0.672771,8.372500,28.008333,0.598547,31.728333
2,PF_N10_C,PF_N09_C,3,4.0,8.0,27.177913,0.299309,0.457696,5.787500,0.793900,8.662500,27.897917,0.599512,31.739167
3,PF_N09_C,PF_N08_C,4,5.0,8.0,29.249796,0.320220,1.896799,4.845833,0.972072,8.715000,27.916667,0.594854,31.423333
4,PF_N08_C,PF_N07_C,5,6.0,8.0,27.084074,0.337169,1.562740,4.395833,1.102538,9.054167,27.864583,0.588418,31.311667
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
188,PF_B07_C,PF_B08_C,11,12.0,10.0,28.938107,0.203502,0.000000,3.220000,0.352920,2.367500,10.644444,0.806063,13.956667
189,PF_B08_C,PF_B09_C,12,13.0,10.0,30.142390,0.183188,0.955844,3.726667,0.307871,2.255000,10.583333,0.815176,13.956667
190,PF_B09_C,PF_B10_C,13,14.0,10.0,34.543962,0.233826,6.570690,3.151667,0.376924,2.225000,10.529630,0.827487,13.956667
191,PF_B10_C,PF_B11_C,14,15.0,8.0,30.805600,0.215966,3.928914,3.658333,0.363259,2.372500,10.587037,0.818695,13.956667


In [10]:
write_lock = threading.Lock()
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

def process_file(filename):
    file_path = parent_folder + trip_updates + filename
    try:
        with open(file_path, 'r') as file:
            tripJson = json.load(file)
    except FileNotFoundError:
        logging.error(f"File not found: {file_path}")
        return

    # Extract datetime from the file name
    datetime_str = filename.split('.')[0]
    try:
        file_datetime = datetime.strptime(datetime_str, '%Y_%d_%m_%H_%M_%S')
    except Exception as e:
        logging.error(f"Error parsing datetime from {filename}: {e}")
        return

    logging.info(f"Processing file: {file_datetime}")

    currentStopTimes = get_trip_data(tripJson)
    if currentStopTimes.empty:
        logging.info("No valid data found in file.")
        return

    currentLinks = calculateLinkParams(currentStopTimes, lengthdf)
    currentUniqueLinks = currentLinks.drop_duplicates(subset=['start_stop', 'end_stop'])[['start_stop', 'end_stop', 'start_sequence', 'end_sequence']]
    currentUniqueLinks.reset_index(drop=True, inplace=True)
    kpis = calculateLinkKpis(currentLinks, currentUniqueLinks, currentStopTimes)
    
    # Add datetime column
    kpis['datetime'] = file_datetime
    
    preset_columns = [
        'start_stop', 'end_stop', 'start_sequence', 'end_sequence', 'no_of_trips',
        'mean_speed [km/h]', 'covariance_speed [%]', 'buffer_speed [km/h]',
        'mean_time [mins]', 'covariance_time [%]', 'buffer_time [mins]',
        'mean_headway [mins]', 'covariance_headway [%]', 'buffer_headway [mins]',
        'datetime'
    ]
    
    for _, row in kpis.iterrows():

        link_file = f"{output_folder}{row['start_stop']}_{row['end_stop']}.csv"
        df = row.to_frame().T

        # Reorder and preset columns to avoid misalignment
        df = df.reindex(columns=preset_columns)

        if os.path.exists(link_file):
            df.to_csv(link_file, index=False, mode='a', header=False)
        else:  
            df.to_csv(link_file, index=False)

output_folder = 'output-updated/'
if not os.path.exists(output_folder):
    os.makedirs(output_folder)

files = [f for f in os.listdir(parent_folder + trip_updates) if f.endswith('.json')]

with ThreadPoolExecutor(max_workers=500) as executor:
    futures = [executor.submit(process_file, filename) for filename in files]
    for future in tqdm(as_completed(futures)):
        if future.exception():
            logging.error(f"Error: {future.exception()}")

2025-02-20 23:05:22,420 - INFO - Processing file: 2024-02-01 00:00:00
2025-02-20 23:05:22,431 - INFO - Processing file: 2024-02-01 00:01:00
2025-02-20 23:05:22,439 - INFO - Processing file: 2024-02-01 00:02:00
2025-02-20 23:05:22,452 - INFO - Processing file: 2024-02-01 00:04:00
2025-02-20 23:05:22,465 - INFO - Processing file: 2024-02-01 00:03:00
2025-02-20 23:05:22,523 - INFO - Processing file: 2024-02-01 00:05:00
2025-02-20 23:05:22,548 - INFO - Processing file: 2024-02-01 00:07:00
2025-02-20 23:05:22,548 - INFO - Processing file: 2024-02-01 00:06:00
2025-02-20 23:05:22,560 - INFO - Processing file: 2024-02-01 00:08:00
2025-02-20 23:05:22,575 - INFO - Processing file: 2024-02-01 00:09:00
2025-02-20 23:05:22,576 - INFO - Processing file: 2024-02-01 00:10:00
2025-02-20 23:05:22,599 - INFO - Processing file: 2024-02-01 00:11:00
2025-02-20 23:05:22,621 - INFO - Processing file: 2024-02-01 00:12:00
2025-02-20 23:05:22,623 - INFO - Processing file: 2024-02-01 00:13:00
2025-02-20 23:05:22,

: 

: 

In [None]:
print("Done!")

Done!
