In [1]:
import json
import pandas as pd
from pandas import json_normalize
import pyarrow
import fastparquet 

import os
import configparser
import datetime as dt
from datetime import datetime
import pytz
from os.path import exists
import time

In [2]:
# Method to read config file settings
def read_config(Config_File):
    config = configparser.ConfigParser()
    config.read(Config_File)
    return config

configurations = read_config("configurations.ini")

In [3]:
#Function to check if JSON file is valid
def is_json(j_file):
  try:
    json.load(j_file)
  except ValueError as e:
    return False
  return True

In [4]:
# Function to bulk convert transit view jsons to csv
def parse_transitview_rt(start_date_time_str, stop_date_time_str):
    start_date_time = datetime.strptime(start_date_time_str, '%m/%d/%Y %H:%M')
    stop_date_time = datetime.strptime(stop_date_time_str, '%m/%d/%Y %H:%M')

    start = time.time()
    f_count = 0
    errors = ""
    
    # delta time
    delta = dt.timedelta(minutes=1)
    
    to_proc = stop_date_time - start_date_time + delta
    to_proc = int(to_proc.total_seconds() / 60)
    print('Processing feed files from ' + str(start_date_time) + ' to ' + str(stop_date_time))
    print('estimated # of files to process = ' + str(to_proc)) 
        
    # iterate over range of date / time
    while (start_date_time <= stop_date_time):

        #generate the file name to open
        folder = configurations['common_settings']['transitviewall_history_data_root'] + "\\" \
            + start_date_time.strftime("%#m") + '\\' \
            + start_date_time.strftime("%#d") + '\\' \
            + start_date_time.strftime("%H") + '\\' \
            + start_date_time.strftime("%M") + '\\' 
        
        file = folder + 'feed.json'

        #load the file name into mem
        if exists(file):
            f_count +=1
            
            with open(file) as json_file:
                if is_json(json_file):
                    json_file.seek(0)
                    data = json.load(json_file)
                else:
                    errors = errors + 'JSON file is not valid or empty: {}\\feed.json'.format(folder) + "\n"
                    start_date_time += delta
                    continue

            routes = dict(data['routes'][0])
            gtfs_rt_file = pd.DataFrame()

            # load json into flat dictionary
            if "routes" in data:
                # go over all routes in a json file
                for route in routes.keys():
                    gtfs_rt_route = json_normalize(data['routes'], record_path=[route])
                    gtfs_rt_route['route'] = route
                    gtfs_rt_route['file'] = start_date_time
                    gtfs_rt_route['timestamp'] = [datetime.fromtimestamp(int(val), pytz.timezone("UTC")) for val in gtfs_rt_route['timestamp']]
                    # create a single DF for file
                    gtfs_rt_file = pd.concat([gtfs_rt_file, gtfs_rt_route])

            else:
                errors = errors + 'entity key is not present in file: {}\\feed.json'.format(folder) + "\n"
                
            gtfs_rt_file.sort_index(axis=1, inplace=True)
            csv_f = folder + 'feed.csv'
            gtfs_rt_file.to_csv(csv_f, index=False)

        else:
            # if file is missing print a log
            errors = errors + 'JSON file does not exist: {}\\feed.json'.format(folder) + "\n"
        
        end = time.time()
        print('processed ' + str(f_count) + ' files (' + str(round((f_count/to_proc)*100, 2)) + '%) in ' +\
              str(int(end-start)) + 'sec (~time remaining: ' + str(int((to_proc-f_count)*(end-start)/f_count)) +\
              'sec) --- now @ file: ' + str(start_date_time), flush=True, end='\r')
        
        # increemnt datetime by 1 minute
        start_date_time += delta
    
    if(not errors):
        print('\n\nNO ERRORS')
    else:
        print('\n\nErrors:')
        print(errors)

In [30]:
parse_transitview_rt("10/25/2022 00:00", "10/25/2022 13:30")

Processing feed files from 2022-10-25 00:00:00 to 2022-10-25 13:30:00
estimated # of files to process = 811
processed 806 files (99.38%) in 756sec (~time remaining: 4sec) --- now @ file: 2022-10-25 13:30:0000

Errors:
JSON file does not exist: D:\DSCI CAPSTONE\GITHUB (DREXEL)\DATA SHAREPOINT\transitviewall-historical-2022\10\25\13\26\\feed.json
JSON file does not exist: D:\DSCI CAPSTONE\GITHUB (DREXEL)\DATA SHAREPOINT\transitviewall-historical-2022\10\25\13\27\\feed.json
JSON file does not exist: D:\DSCI CAPSTONE\GITHUB (DREXEL)\DATA SHAREPOINT\transitviewall-historical-2022\10\25\13\28\\feed.json
JSON file does not exist: D:\DSCI CAPSTONE\GITHUB (DREXEL)\DATA SHAREPOINT\transitviewall-historical-2022\10\25\13\29\\feed.json
JSON file does not exist: D:\DSCI CAPSTONE\GITHUB (DREXEL)\DATA SHAREPOINT\transitviewall-historical-2022\10\25\13\30\\feed.json



In [9]:
# Function to merge csv feed files
def merge_transitview_rt(start_date_time_str, stop_date_time_str):
    start_date_time = datetime.strptime(start_date_time_str, '%m/%d/%Y %H:%M')
    stop_date_time = datetime.strptime(stop_date_time_str, '%m/%d/%Y %H:%M')
    
    start = time.time()
    f_count = 0
    
    # delta time
    delta = dt.timedelta(minutes=1)
    
    csv_list = []
    print('reading in CSVs')
    # iterate over range of date / time
    while (start_date_time <= stop_date_time):

        #generate the file name to open
        folder = configurations['common_settings']['transitviewall_history_data_root'] + "\\" \
            + start_date_time.strftime("%#m") + '\\' \
            + start_date_time.strftime("%#d") + '\\' \
            + start_date_time.strftime("%H") + '\\' \
            + start_date_time.strftime("%M") + '\\' 
        
        file = folder + 'feed.csv'

        if exists(file):
            csv_list.append(pd.read_csv(file))
            
        f_count +=1
        end = time.time()
        print('read ' + str(f_count) + ' files in ' + str(int(end-start)) + ' sec --- now @ file: ' + str(start_date_time), flush=True, end='\r')
            
        # increemnt datetime by 1 minute
        start_date_time += delta
    
    print('\nmerging into single data frame')
    csv_merged = pd.concat(csv_list, ignore_index=True)
    print(str(round(time.time()-start,0))+' sec elapsed')
    
    indexDrop = csv_merged[(csv_merged['late'] == 998) | (csv_merged['late'] == 999)].index
    csv_merged.drop(indexDrop , inplace=True)
    csv_merged['file'] = pd.to_datetime(csv_merged['file'])
    csv_merged['timestamp'] = pd.to_datetime(csv_merged['timestamp'])
    
    print('exporting to single parquet file')
    csv_merged.to_parquet('data\kh_transitview.parquet', compression='gzip')
    print(str(round(time.time()-start,0))+' sec elapsed')

In [10]:
merge_transitview_rt("9/01/2022 00:00", "9/05/2022 23:59")

reading in CSVs
read 7200 files in 93 sec --- now @ file: 2022-09-05 23:59:00
merging into single data frame
100.0 sec elapsed
exporting to single parquet file
114.0 sec elapsed
