In [2]:
from sqlalchemy import text
from psql_connector import PsqlConnector

from os import environ as env
from dotenv import  load_dotenv
load_dotenv(dotenv_path='../.env')

True

In [4]:
# Init psql connector
psql_params = {
    "host": "localhost",
    "port": env["POSTGRES_PORT"],
    "user": env["POSTGRES_USER"],
    "password": env["POSTGRES_PASSWORD"],
    "database": env["POSTGRES_DB"],
}

# Setup psql connector
psql_connector = PsqlConnector(psql_params)

In [6]:
# Fetch schemas
schemas = []
with psql_connector.connect() as engine:
    with engine.connect() as cursor:
        sql_script = """
            SELECT schema_name
            FROM information_schema.schemata;
        """
        schemas = cursor.execute(text(sql_script)).fetchall()

        # Remove system schemas
        schemas = [
            schema[0]
            for schema in schemas
            if schema[0]
            not in [ "pg_toast", "pg_temp_1", "pg_toast_temp_1", "pg_catalog", "information_schema", ]
        ]

        # Remove schemas: data_load_simulation, integration, power_bi, reports, sequences
        schemas = [
            schema for schema in schemas if schema not in [ "data_load_simulation", "integration", "power_bi", "reports", "sequences", "website" ]
        ]

print(schemas)


['application', 'purchasing', 'sales', 'warehouse', 'public']


In [15]:
# Fetch attributes from schemas
attributes = set()

with psql_connector.connect() as engine:
    with engine.connect() as cursor:
        for schema in schemas:
            sql_script = f"""
                WITH rows AS (
	                SELECT  c.relname AS table_name,
			                a.attname AS attribute_name,
			                a.attnotnull AS is_attribute_null,
			                a.attnum AS attribute_num,
			                t.typname AS type_name
	                FROM    pg_catalog.pg_class c
		            JOIN    pg_catalog.pg_attribute a
		                ON  c."oid" = a.attrelid AND a.attnum >= 0
		            JOIN    pg_catalog.pg_type t
		                ON  t."oid" = a.atttypid
		            JOIN    pg_catalog.pg_namespace n
		                ON  c.relnamespace = n."oid"
	                WHERE   n.nspname = '{schema}'
		                AND c.relkind = 'r'
                ),
                agg AS (
	                SELECT rows.table_name, json_agg(rows ORDER BY attribute_num) AS attrs
	                FROM rows
	                GROUP BY rows.table_name
                )
                SELECT json_object_agg(agg.table_name, agg.attrs)
                FROM agg;
            """
            fetch_result = cursor.execute(text(sql_script)).fetchone()[0]
            # Loop through all keys in fetch_result
            for key in fetch_result.keys():
                table = fetch_result.get(key)
                for attrs in table:
                    # Add attrs.get("type_name") into attributes
                    if attrs.get("type_name") not in attributes:
                        attributes.add(attrs.get("type_name"))

print(attributes)

{'int8', 'numeric', 'bool', 'varchar', 'date', 'int4', 'text', 'timestamp', 'bytea', 'bpchar'}
