In [1]:
import zipfile
from sqlalchemy import create_engine, types
import os
import pandas as pd
import concurrent.futures
import time
import psycopg2


In [2]:
# PostgreSQL connection parameters
# Database connection details
db_user = "postgres"
db_password = "postgres"
db_host = "localhost"
db_port = 5432
db_name = "gtfs"  # Replace with the actual database name

# Create the database URL
db_url = f"postgresql://{db_user}:{db_password}@{db_host}:{db_port}/{db_name}"

# Create an SQLAlchemy engine to connect to the database
engine = create_engine(db_url)

In [3]:
class TablePattern:
    def __init__(self, table_name = None, version_id = None, branch_id = None, file_name : str = None) -> None:
        if file_name is None:
            self.file_name = f"{table_name}-{version_id}-{branch_id}"
        else:
            self.file_name = file_name
            file_name_split = file_name.split('-')
            self.table_name = file_name_split[0]
            self.version_id = file_name_split[1]
            self.branch_id = file_name_split[2]


In [4]:


dfs = []

# Extract data from the nested ZIP files
for dirpath, dirnames, filenames in os.walk('downloads'):
    for filename in filenames:
        gtfs_zip_path = os.path.join(dirpath, filename)
        version_id = gtfs_zip_path.split(os.sep)[-2]
        
        with zipfile.ZipFile(gtfs_zip_path, 'r') as gtfs_zip:
            for item in gtfs_zip.namelist():
                if item.endswith('/'): # Check if the item is a directory
                    branch_id = item.strip('/')
                    google_transit_zip_path = f"{branch_id}/google_transit.zip"
                    with gtfs_zip.open(google_transit_zip_path) as google_transit_file:
                        with zipfile.ZipFile(google_transit_file, 'r') as transit_zip:
                            nested_file_list = transit_zip.namelist()
                            for nested_file_name in nested_file_list:
                                if nested_file_name.endswith('.txt'):

                                    table_name = nested_file_name.removesuffix('.txt')
                                    print(version_id, branch_id, table_name)

                                    with transit_zip.open(nested_file_name) as nested_file:

                                        dfx = pd.read_csv(nested_file, keep_default_na=False, low_memory=False)
                                        obj = TablePattern(table_name = table_name, version_id = version_id, branch_id = branch_id)

                                        dfx.to_csv(f"output/{obj.file_name}.csv", index=False)
                                        dfs.append({
                                            'table_name': table_name,
                                            'version_id': version_id,
                                            'branch_id': branch_id,
                                            'length': len(dfx),
                                            'df': dfx
                                        })

                    
                    print(branch_id)
        print(version_id)
# 14m 45s

20220403_025040 1 agency
20220403_025040 1 routes
20220403_025040 1 trips
20220403_025040 1 stops
20220403_025040 1 calendar
20220403_025040 1 calendar_dates
20220403_025040 1 shapes
20220403_025040 1 stop_times
1
20220403_025040 10 agency
20220403_025040 10 routes
20220403_025040 10 trips
20220403_025040 10 stops
20220403_025040 10 calendar
20220403_025040 10 calendar_dates
20220403_025040 10 shapes
20220403_025040 10 stop_times
10
20220403_025040 11 agency
20220403_025040 11 routes
20220403_025040 11 trips
20220403_025040 11 stops
20220403_025040 11 calendar
20220403_025040 11 calendar_dates
20220403_025040 11 shapes
20220403_025040 11 stop_times
11
20220403_025040 2 agency
20220403_025040 2 routes
20220403_025040 2 trips
20220403_025040 2 stops
20220403_025040 2 calendar
20220403_025040 2 calendar_dates
20220403_025040 2 shapes
20220403_025040 2 stop_times
2
20220403_025040 3 agency
20220403_025040 3 routes
20220403_025040 3 trips
20220403_025040 3 stops
20220403_025040 3 calendar
2

In [None]:
# for csv files in oiutput folder
dfs = []
for dirpath, dirnames, filenames in os.walk('output'):
    for filename in filenames:
        if filename.endswith('.csv'):
            obj = TablePattern(file_name= filename.removesuffix('.csv'))
            table_name = obj.table_name
            version_id = obj.version_id
            branch_id = obj.branch_id
            print(table_name, branch_id, version_id, filename)
            df = pd.read_csv(os.path.join(dirpath, filename), keep_default_na=False, low_memory=False)
            dfs.append({
                'table_name': table_name,
                'version_id': version_id,
                'branch_id': branch_id,
                'length': len(df),
                'df': df
            })
# 3m 50s

In [6]:

dfs = pd.DataFrame(dfs)


In [None]:
def _temp(row):
    row['df']['branch_id'] = row['branch_id']
dfs.apply(lambda x: _temp(x), axis=1)

In [12]:
# concat all df with same table_name and branch_id
dfs.groupby(['table_name', 'branch_id']).apply(lambda x: pd.concat(x['df'].tolist())).reset_index()

MemoryError: Unable to allocate 655. MiB for an array with shape (85800783,) and data type int64

In [None]:
for dfx in dfs:
    df : pd.DataFrame = dfx['df']
    if dfx['table_name'] == 'shapes':
        col_name = '__'.join(df.drop(['shape_id'], axis=1).columns)
        df = df.groupby(['shape_id']).apply(lambda x: x.drop(['shape_id'], axis=1).sort_values(by='shape_pt_sequence').to_dict('tight')['data']).to_frame(name=col_name).reset_index()
    elif dfx['table_name'] == 'stop_times':
        col_name = '__'.join(df.drop(['trip_id'], axis=1).columns)
        df = df.groupby(['trip_id']).apply(lambda x: x.drop(['trip_id'], axis=1).sort_values(by='stop_sequence').to_dict('tight')['data']).to_frame(name=col_name).reset_index()
        
# 1m 30s

In [None]:

dfs.sort(key=lambda x: x['length'], reverse=False)

# Connect to PostgreSQL
with psycopg2.connect(
    host=db_host,
    port=db_port,
    database=db_name,
    user=db_user,
    password=db_password
) as conn:
    
    with conn.cursor() as cursor:

        def insert_to_pg(df : pd.DataFrame, version_id, branch_id, table_name, debug_message=''):
            start_time = time.perf_counter()
            print('START', debug_message, start_time)
            # Count the number of rows in the PostgreSQL table where the version_id and branch_id match
            count_query = f"SELECT COUNT(*) FROM {table_name} WHERE version_id='{version_id}' AND branch_id='{branch_id}'"
            count = pd.read_sql(count_query, engine).iloc[0, 0]
            print('COUNT', table_name, version_id, branch_id, count, len(df))
            if count > 0 and count != len(df):
                # Delete the rows where the version_id and branch_id match
                delete_query = f"DELETE FROM {table_name} WHERE version_id='{version_id}' AND branch_id='{branch_id}'"
                cursor.execute(delete_query)
                conn.commit()
                print('DELETE', table_name, version_id, branch_id, count, len(df))
                # df.to_sql(table_name, engine, if_exists='append', index=False, chunksize=1000, method='multi')
                # print('INSERTED', table_name, version_id, branch_id, count, len(df))
            # elif count == 0:
                # df.to_sql(table_name, engine, if_exists='append', index=False, chunksize=1000, method='multi')
                # print('INSERTED', table_name, version_id, branch_id, count, len(df))

            end_time = time.perf_counter()
            print('END', debug_message, end_time)
            print('Time elapsed', end_time-start_time)

        tables = {}

        # with concurrent.futures.ThreadPoolExecutor() as executor:
        for counter, dfx in enumerate(dfs):
            table_name = dfx['table_name']
            version_id = dfx['version_id']
            branch_id = dfx['branch_id']
            dfx = dfx['df']
            if dfx is None:
                continue
            if table_name not in tables:
                tables[table_name] = []
            tables[table_name].append(dfx)
            # df.to_csv(f"output/{table_name}-{version_id}-{branch_id}.csv", index=False)
            debug_message = f"Loop {counter} / {len(dfs)}"
            # executor.submit(insert_to_pg, df, version_id, branch_id, table_name, debug_message)
            insert_to_pg(dfx, version_id, branch_id, table_name, debug_message)

        for table_name, df_list in tables.items():
            pd.concat(df_list).to_csv(f"output_all/{table_name}.csv", index=False)
