In [None]:
import psycopg2
import subprocess
import os
from datetime import datetime
import threading
from concurrent.futures import ThreadPoolExecutor


############################
# NOTE: You must have a .pgpass entry to programmatically use pg_restore
# Run the following:
# > touch .pgpass
# > nano .pgpass
#       in the file, add your connection string:  "localhost:5432:mydatabase:myuser:mypassword"
# > chmod 600 ~/.pgpass
############################

In [None]:
############################
# Configure
############################

# postgres config
db_params = {
    "dbname": "postgres",
    "user": "postgres",
    "host": "localhost"
}

# export to subdirectory (in this working directory)
subdir = "Files"

# dump path
backup_file_path = "~/Downloads/usaspending-db_20230708"


In [None]:
# check if the subdirectory exists, if not, create it
if not os.path.exists(subdir):
    os.makedirs(subdir)


In [None]:
# this function takes a query that outputs a query and runs each sequentially
# its the dude playing a dude disguised as another dude
def nested_query_processor(query):
    with psycopg2.connect(**db_params) as conn:
        with conn.cursor() as cur:
            cur.execute(query)
            rows = cur.fetchall()
            
            # Iterate over the rows and execute each command
            for row in rows:
                alter_table_command = row[0]
                cur.execute(alter_table_command)

            conn.commit()


In [None]:
# get all tables in the dump (this is used to get chunks later)
cmd_list_tables = f"pg_restore --list {backup_file_path} | grep TABLE"
list_tables = subprocess.check_output(cmd_list_tables, shell=True).decode().splitlines()


In [None]:
# restore just the schema, including index, sequence & function definitions
cmd_restore_schema = f"pg_restore --no-owner --role=postgres --clean --schema-only -U {db_params['user']} -d {db_params['dbname']} {backup_file_path}"
subprocess.run(cmd_restore_schema, shell=True)

# this will output a bunch of warnings that look like errors
# if successful, the final output will be something like "CompletedProcess(...)"

In [None]:
# now we have the full scaffolding, remove any constraints

remove_constraints = """
    select 
        format('alter table %I.%I drop constraint if exists %I cascade;',table_schema, table_name, constraint_name)
    from information_schema.constraint_table_usage
    where table_schema not in ('pg_catalog')
    ;"""

nested_query_processor(remove_constraints)

In [None]:
# remove indexes with other index dependencies first
# todo: create a topical graph to handle nested dependencies

remove_d = """
    select 
        distinct 
        format('drop index if exists %I.%I cascade;', n1.nspname, c1.relname) as com
    from
        pg_catalog.pg_depend d
    join pg_catalog.pg_class c1 on c1.oid = d.refobjid
    join pg_catalog.pg_class c2 on c2.oid = d.objid
    join pg_catalog.pg_namespace n1 on c1.relnamespace = n1.oid
    join pg_catalog.pg_namespace n2 on c2.relnamespace = n2.oid
    join pg_catalog.pg_indexes as i on n1.nspname = i.schemaname and c1.relname = i.indexname
    where  c2.relkind = 'i' -- this will only include index objects
        and c1.relkind not in ('r')
        and n1.nspname not in ('pg_catalog','information_schema')
        and n1.nspname !~ '^pg_toast'::text
    ;"""

nested_query_processor(remove_d)

In [None]:
# remove all remaining indexes

remove_i = """
    select
        format('drop index if exists %I.%I cascade;', n.nspname, c_ind.relname)
    from pg_catalog.pg_index ind
    join pg_catalog.pg_class c_ind on c_ind.oid = ind.indexrelid
    join pg_catalog.pg_namespace n on n.oid = c_ind.relnamespace
    left join pg_catalog.pg_constraint cons on cons.conindid = ind.indexrelid
    where 1=1
        and n.nspname not in ('pg_catalog','information_schema')
        and n.nspname !~ '^pg_toast'::text
        and cons.oid is null
    ;"""

nested_query_processor(remove_i)

In [None]:
# main process to restore table, export to csv, gzip, delete csv, truncate table

def process_tables(list_tables):
    with psycopg2.connect(**db_params) as conn:
        with conn.cursor() as cur:
            for table_line in list_tables:
                parts = table_line.split()
                if 'TABLE' in parts[3] and 'DATA' in parts[4]:
                    table_schema = parts[5]
                    table_name = parts[6]
                    full_table_name = f"{table_schema}.{table_name}"

                    print(f"Restoring Table: {full_table_name} @ ", datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
                    cmd_restore_table = f"pg_restore --no-owner --role=postgres -U {db_params['user']} -d {db_params['dbname']} -t {table_name} {backup_file_path}"
                    subprocess.run(cmd_restore_table, shell=True)

                    csv_file_path = os.path.join(subdir, f"{full_table_name.replace('.', '_')}.csv")
                    gz_file_path = os.path.join(subdir, f"{full_table_name.replace('.', '_')}.csv.gz")

                    #print(f"Exporting CSV: {full_table_name}")
                    with open(csv_file_path, 'w') as f:
                        cur.copy_expert(f"COPY {full_table_name} TO STDOUT WITH CSV HEADER", f)

                    #print(f"Gzip: {full_table_name}")
                    cmd_gzip_and_split = f"gzip -c {csv_file_path} | split -b 1000m - {gz_file_path}_part_"
                    subprocess.run(cmd_gzip_and_split, shell=True)

                    #print(f"Deleted original CSV: {csv_file_path}")
                    os.remove(csv_file_path)

                    cur.execute(f"truncate table {full_table_name}")
                    print(f"{full_table_name} Completed @ ", datetime.now().strftime("%Y-%m-%d %H:%M:%S"))

                    conn.commit()

In [None]:
# create chunks of tables for threading
tables_at_a_time = 10 # set concurrency
chunks = [list_tables[i:i+tables_at_a_time] for i in range(0, len(list_tables), tables_at_a_time)]


In [None]:
# execute main process
# thread pool to process chunks of tables concurrently
with ThreadPoolExecutor(max_workers=tables_at_a_time) as executor:
    executor.map(process_tables, chunks)

# you'll get a lot of pg_restore warnings; they can be ignored.