# Data Retrieval

For the sake of completeness, we shall demonstrate which type of data may be downloaded from a given database. Naturally, credentials for access to the database remain requisite.

The following modules shall be utilised:

In [None]:
import os
import subprocess
import json
import pandas as pd
from sqlalchemy import create_engine, MetaData, select, inspect, text
from dotenv import load_dotenv

Furthermore, we shall define the following environment variables:

In [None]:
# Path to the root directory of the repo.
root_dir_ = subprocess.check_output(
    ["git", "rev-parse", "--show-toplevel"],
    text=True,
)
ROOT_DIR = root_dir_.strip()

# Path to the data directory.
DATA_DIR = os.path.join(ROOT_DIR, "data")

BKP_DIR = os.path.join(DATA_DIR, "bkp")
PARQUET_DIR = os.path.join(BKP_DIR, "data")
META_DIR = os.path.join(BKP_DIR, "metadata")
DDL_DIR = os.path.join(BKP_DIR, "ddl")

As an additional preparatory step, we may create the subdirectories designated for storing our downloads and define auxiliary functions.

In [None]:
os.makedirs(PARQUET_DIR, exist_ok=True)
os.makedirs(META_DIR, exist_ok=True)
os.makedirs(DDL_DIR, exist_ok=True)

In [None]:
def safe_filename(schema: str, table: str, extension: str) -> str:
    core = f"{schema}.{table}".replace(".", "_")
    filename = f"{core}.{extension}"
    return filename

We load the credentials required for database access from the `.env` file into Python.

In [None]:
load_dotenv()

DB_CONFIG = {
    "scheme": os.getenv("SCHEME"),
    "database": os.getenv("DATABASE"),
    "user": os.getenv("USER"),
    "password": os.getenv("PASSWORD"),
    "host": os.getenv("HOST"),
    "port": os.getenv("PORT")
}

DB_URI = (
    "{scheme}://{user}:{password}@{host}:{port}/{database}"
    .format(**DB_CONFIG)
)

We proceed to establish a connection with the database.

In [None]:
db = create_engine(DB_URI)
inspector = inspect(db)

This step concludes our preparations, and we are now ready to commence the retrieval of various data.

## Schemas



In [None]:
schema_query = """
    SELECT schema_name 
        FROM information_schema.schemata
        WHERE schema_name NOT IN ('pg_catalog', 'information_schema')
"""

with db.connect() as conn:
    result = conn.execute(text(schema_query))
    schemas = [row[0] for row in result]

print("Schemas found:")
for schema in schemas:
    print(f"  - {schema}")

## Tables, Columns, PKs, FKs, Indexes, Constraints

In [None]:
schema_inventory = {}

for schema in schemas:
    schema_inventory[schema] = []
    metadata = MetaData()
    try:
        metadata.reflect(bind=db, schema=schema)
    except Exception as e:
        print(f"Schema {schema} reflection failed:")
        print(f"  {e}")
        continue

    for full_name, table in metadata.tables.items():
        table_name = table.name
        fq_name = f"{schema}.{table_name}"
        schema_inventory[schema].append(table_name)
        print(f"📦 Exporting {fq_name}...")

        try:
            with db.connect() as conn:
                df = pd.read_sql(select(table), conn)
                df.to_parquet(os.path.join(PARQUET_DIR, safe_filename(schema, table_name, "parquet")), compression="snappy")

                columns = inspector.get_columns(table_name, schema=schema)
                pk = inspector.get_pk_constraint(table_name, schema=schema)
                fks = inspector.get_foreign_keys(table_name, schema=schema)
                indexes = inspector.get_indexes(table_name, schema=schema)

                metadata_info = {
                    "schema": schema,
                    "table_name": table_name,
                    "columns": columns,
                    "primary_key": pk,
                    "foreign_keys": fks,
                    "indexes": indexes,
                }


                flat_metadata_info = {
                    k: v if not isinstance(v, (list, dict)) else json.dumps(v, default=str)
                    for k, v in metadata_info.items()
                }

                meta_df = pd.DataFrame([flat_metadata_info])
                meta_df.to_parquet(
                    os.path.join(META_DIR, safe_filename(schema, table_name, "parquet")),
                    engine="pyarrow",
                    compression="snappy"
                )

                try:
                    ddl_result = conn.execute(text(f"SELECT pg_get_tabledef('{schema}.{table_name}'::regclass)"))
                    ddl = ddl_result.scalar()
                    if ddl:
                        with open(os.path.join(DDL_DIR, safe_filename(schema, table_name, "sql")), "w") as f:
                            f.write(ddl)
                except Exception:
                    pass

        except Exception as e:
            print(f"⚠️ Failed to export {fq_name}: {e}")

## (Materialised) Views

In [None]:
views_query = """
    SELECT schemaname, viewname, definition
        FROM pg_views
        WHERE schemaname NOT IN ('pg_catalog', 'information_schema')
"""

matviews_query = """
    SELECT schemaname, matviewname 
        AS viewname, definition
        FROM pg_matviews
"""


with db.connect() as conn:
    views = conn.execute(text(views_query)).fetchall()
    matviews = conn.execute(text(matviews_query)).fetchall()
    for view in views + matviews:
        s, v, d = view
        path = os.path.join(DDL_DIR, safe_filename(s, v, "view.sql"))
        with open(path, "w") as f:
            f.write(d)

## Functions and Procedures

In [None]:
funcs_query = """
    SELECT n.nspname AS schema, p.proname AS name, pg_get_functiondef(p.oid) AS definition
        FROM pg_proc p
        JOIN pg_namespace n 
        ON p.pronamespace = n.oid
        WHERE n.nspname NOT IN ('pg_catalog', 'information_schema')
"""

with db.connect() as conn:
    funcs = conn.execute(text(funcs_query)).fetchall()
    for row in funcs:
        schema, table, definition = row
        fname = os.path.join(DDL_DIR, safe_filename(schema, table, "function.sql"))
        with open(fname, "w") as f:
            f.write(definition)

## Triggers

In [None]:
triggers_query = """
    SELECT event_object_schema AS schema, event_object_table AS table, trigger_name, action_timing, event_manipulation, action_statement
        FROM information_schema.triggers
        ORDER BY event_object_schema, event_object_table
"""

with db.connect() as conn:
    triggers = conn.execute(text(triggers_query)).fetchall()
    df = pd.DataFrame([dict(row._mapping) for row in triggers])
    df.to_parquet(os.path.join(META_DIR, "triggers.parquet"), compression="snappy")

## Sequences

In [None]:
sequences_query = """
    SELECT sequence_schema, sequence_name, data_type, start_value, minimum_value, maximum_value, increment
        FROM information_schema.sequences
"""

with db.connect() as conn:
    sequences = conn.execute(text(sequences_query)).fetchall()
    df = pd.DataFrame([dict(row._mapping) for row in sequences])
    df.to_parquet(os.path.join(META_DIR, "sequences.parquet"), compression="snappy")

## Privileges and Grants

In [None]:
grants_query = """
    SELECT grantee, table_schema, table_name, privilege_type
    FROM information_schema.role_table_grants
"""

with db.connect() as conn:
    grants = conn.execute(text(grants_query)).fetchall()
    df = pd.DataFrame([dict(row._mapping) for row in grants])
    df.to_parquet(os.path.join(META_DIR, "grants.parquet"), compression="snappy")

## Table Stats

In [None]:
stats_query = """
    SELECT schemaname, relname, n_live_tup, n_dead_tup, pg_total_relation_size(relid) AS total_bytes
        FROM pg_stat_user_tables
"""

with db.connect() as conn:
    stats = conn.execute(text(stats_query)).fetchall()
    df = pd.DataFrame([dict(row._mapping) for row in stats])
    df.to_parquet(os.path.join(META_DIR, "table_stats.parquet"), compression="snappy")

## Global Inventory

In [None]:
with open(os.path.join(BKP_DIR, "schema_inventory.json"), "w") as f:
    json.dump(schema_inventory, f, indent=2)