In [4]:
import os
import re
import json
import math
from datetime import datetime, date
import pandas as pd
from scipy import stats
from dotenv import load_dotenv
import matplotlib.pyplot as plt
from collections import defaultdict
from sqlalchemy import create_engine, MetaData, inspect, text

In [5]:
load_dotenv()  # loads variables from .env

user = os.getenv("DB_USER")
password = os.getenv("DB_PASSWORD")
host = os.getenv("DB_HOST")
port = os.getenv("DB_PORT")
dbname = os.getenv("DB_NAME")

engine = create_engine(f'postgresql+psycopg2://{user}:{password}@{host}:{port}/{dbname}')


### Creating json

In [None]:
import json
from sqlalchemy import create_engine, inspect, text
from datetime import datetime, date

# --- Configure DB ---
inspector = inspect(engine)

# --- Convert datetime/date to string recursively ---
def convert_datetimes(obj):
    if isinstance(obj, dict):
        return {k: convert_datetimes(v) for k, v in obj.items()}
    elif isinstance(obj, list):
        return [convert_datetimes(v) for v in obj]
    elif isinstance(obj, (datetime, date)):
        return obj.isoformat()
    else:
        return obj

# --- Recursive schema application ---
def apply_schema(row_dict, schema, conn, visited=None):
    if visited is None:
        visited = set()
    expanded = {}
    current_table = schema.get("_table")
    
    # Getting foreign keys of current table
    fk_map = {fk['constrained_columns'][0]: (fk['referred_table'], fk['referred_columns'][0])
              for fk in inspector.get_foreign_keys(current_table)}

    for key, subschema in schema.items():
        if key == "_table":
            continue

        if isinstance(subschema, dict) and subschema:
            # Nested object: checks foreign key
            fk_col = None
            fk_table, fk_ref_col = None, None

            for col, (ftable, fcol) in fk_map.items():
                if col.lower() == key.lower() or col.lower().startswith(key.replace('_id','').lower()):
                    fk_col = col
                    fk_table, fk_ref_col = ftable, fcol
                    break

            if fk_col and fk_col in row_dict and row_dict[fk_col] is not None:
                fk_val = row_dict[fk_col]
                if (fk_table, fk_val) in visited:
                    expanded[key] = {k:{} for k in subschema if k != "_table"}
                else:
                    visited.add((fk_table, fk_val))
                    q = text(f'SELECT * FROM "{fk_table}" WHERE "{fk_ref_col}" = :val')
                    foreign_row = conn.execute(q, {"val": fk_val}).fetchone()
                    foreign_dict = dict(foreign_row._mapping) if foreign_row else {}
                    subschema["_table"] = fk_table
                    expanded[key] = apply_schema(foreign_dict, subschema, conn, visited)
            else:
                # Foreign key missing or NULL -> empty structure
                expanded[key] = {k:{} for k in subschema if k != "_table"}
        else:
            # Leaf node
            expanded[key] = row_dict.get(key)
    return expanded

# --- Depopulated strict schema for export ---
SCHEMA = {
    "_table": "master_inprocessinspectionreading",
    "id": {},
    "created_at": {},
    "updated_at": {},
    "is_active": {},
    "actual_readings": [
        {
            "accepted": {},
            "rejected": {}
        }
    ],
    "po_no": {},
    "created_by_id": {
        "first_name": {},
        "middle_name": {},
        "last_name": {},
        "email": {},
        "phone_number": {},
        "plant_id": {
            "plant_id": {},
            "plant_name": {},
            "plant_location_1": {},
            "plant_location_2": {}
        },
        "role_id": {
            "id": {},
            "created_at": {},
            "updated_at": {},
            "is_active": {},
            "name": {},
            "description": {}
        }
    },
    "insp_schedule_id_id": {
        "id": {},
        "LSL": {},
        "target_value": {},
        "USL": {},
        "sampling_required": {},
        "sample_size": {},
        "inspection_frequency": {},
        "inspection_method": {},
        "recording_type": {},
        "likely_defects_classification": {},
        "remarks": {},
        "building_id": {
            "id": {},
            "is_active": {},
            "building_id": {},
            "building_name": {},
            "sub_section": {},
            "plant_id": {}
        },
        "item_code_id": {
            "id": {},
            "is_active": {},
            "item_code": {},
            "item_description": {},
            "unit": {},
            "item_type": {},
            "end_store": {},
            "building_id": {
                "id": {},
                "is_active": {},
                "building_id": {},
                "building_category": {},
                "building_name": {},
                "sub_section": {},
                "plant_id": {}
            }
        },
        "production_machine_id_id": {},
        "qc_machine_id_id": {
            "id": {},
            "created_at": {},
            "updated_at": {},
            "is_active": {},
            "machine_id": {},
            "machine_name": {},
            "machine_make": {},
            "machine_model": {},
            "is_digital": {},
            "machine_type": {},
            "plant_id": {
                "id": {},
                "created_at": {},
                "updated_at": {},
                "is_active": {},
                "plant_id": {},
                "plant_name": {},
                "plant_location_1": {},
                "plant_location_2": {}
            }
        },
        "operation_id": {
            "id": {},
            "is_active": {},
            "operation_id": {},
            "operation_name": {},
            "operation_description": {},
            "plant_id": {
                "id": {},
                "is_active": {},
                "plant_id": {},
                "plant_name": {},
                "plant_location_1": {},
                "plant_location_2": {}
            }
        },
        "inspection_parameter_id": {
            "id": {},
            "is_active": {},
            "inspection_parameter_id": {},
            "inspection_parameter": {},
            "parameter_description": {}
        }
    }
}

# --- Export function ---
def export(primary_table):
    with engine.connect() as conn:
        rows = conn.execute(text(f'SELECT * FROM "{primary_table}"')).fetchall()
        result = []
        for row in rows:
            row_dict = dict(row._mapping)
            row_dict["_table"] = primary_table
            expanded = apply_schema(row_dict, SCHEMA, conn)
            expanded = convert_datetimes(expanded)
            result.append(expanded)
    with open("schema.json", "w", encoding="utf-8") as f:
        json.dump(result, f, indent=2)
    print(f"Exported {len(result)} rows")
    return result

# --- Run ---
if __name__ == "__main__":
    export("master_inprocessinspectionreading")

Exported 14 rows
