In [3]:
import pandas as pd
from sqlalchemy import create_engine

In [4]:
engine = create_engine("postgresql://postgres:123456@localhost:5432/Project1")


In [None]:

import re
import datetime as dt
import pandas as pd
import numpy as np
import sqlalchemy as sa
from sqlalchemy import inspect
from sqlalchemy.types import Integer, BigInteger, Date

# ----------------------------
# Config
# ----------------------------
batch_size = 100000
offset = 0
total_rows = 2000000

source_schema = 'bronze1'
source_table = 'raw'
target_schema = 'silver1'
target_table = 'cleaned'   

selected_cols = [
    '_STATE', 'FMONTH', 'IDATE', 'IMONTH', 'IDAY', 'IYEAR', 'DISPCODE', 'SEQNO', '_PSU',
    'CTELENUM', 'PVTRESD1', 'COLGHOUS', 'STATERES', 'CELLFON3', 'LADULT', 'NUMADULT',
    'NUMMEN', 'NUMWOMEN', 'CTELNUM1', 'CELLFON2', 'CADULT', 'PVTRESD2', 'CCLGHOUS',
    'CSTATE', 'LANDLINE', 'HHADULT', 'GENHLTH', 'PHYSHLTH', 'MENTHLTH', 'POORHLTH',
    'HLTHPLN1', 'PERSDOC2', 'MEDCOST', 'CHECKUP1', 'BPHIGH4', 'BPMEDS', 'BLOODCHO',
    'CHOLCHK', 'TOLDHI2', 'CVDINFR4', 'CVDCRHD4', 'CVDSTRK3', 'ASTHMA3', 'ASTHNOW',
    'CHCSCNCR', 'CHCOCNCR', 'CHCCOPD1', 'HAVARTH3', 'ADDEPEV2', 'CHCKIDNY'
]
cols_str = ', '.join(selected_cols)

rename_map = {
    '_state': 'state',
    'fmonth': 'survey_month',
    'idate': 'survey_date',
    'imonth': 'month',
    'iday': 'day',
    'iyear': 'year',
    'dispcode': 'disposition_code',
    'seqno': 'sequence_number',
    '_psu': 'primary_sampling_unit',
    'ctelenum': 'telephone_number',
    'pvtresd1': 'private_residence',
    'colghous': 'college_house',
    'stateres': 'state_residence',
    'cellfon3': 'cell_phone',
    'ladult': 'num_living_adults',
    'numadult': 'num_of_adults',
    'nummen': 'num_of_men',
    'numwomen': 'num_of_women',
    'ctelnum1': 'ctelnum1',
    'cellfon2': 'cellfon2',
    'cadult': 'cadult',
    'pvtresd2': 'private_residence_2',
    'cclghous': 'college_house_2',
    'cstate': 'cstate',
    'landline': 'landline',
    'hhadult': 'household_adults',
    'genhlth': 'general_health',
    'physhlth': 'physical_health_days',
    'menthlth': 'mental_health_days',
    'poorhlth': 'poor_health_days',
    'hlthpln1': 'has_health_plan',
    'persdoc2': 'has_personal_doctor',
    'medcost': 'medical_cost_issue',
    'checkup1': 'last_checkup',
    'bphigh4': 'high_blood_pressure',
    'bpmeds': 'bp_meds',
    'bloodcho': 'high_cholesterol',
    'cholchk': 'cholesterol_check',
    'toldhi2': 'diagnosed_diabetes',
    'cvdinfr4': 'had_heart_attack',
    'cvdcrhd4': 'had_coronary_heart_disease',
    'cvdstrk3': 'had_stroke',
    'asthma3': 'has_asthma',
    'asthnow': 'asthma_now',
    'chcscncr': 'had_skin_cancer',
    'chcocncr': 'had_other_cancer',
    'chccopd1': 'has_copd',
    'havarth3': 'has_arthritis',
    'addepev2': 'has_depression',
    'chckidny': 'had_kidney_disease'
}

# Target table dtypes
dtype_map = {
    'state': Integer(),
    'survey_month': Integer(),
    'survey_date': Date(),
    'month': Integer(),
    'day': Integer(),
    'year': Integer(),
    'disposition_code': Integer(),
    'sequence_number': BigInteger(),
    'primary_sampling_unit': BigInteger(),
    'telephone_number': Integer(),   
    'private_residence': Integer(),
    'college_house': Integer(),
    'state_residence': Integer(),
    'cell_phone': Integer(),
    'num_living_adults': Integer(),
    'num_of_adults': Integer(),
    'num_of_men': Integer(),
    'num_of_women': Integer(),
    'ctelnum1': Integer(),          
    'cellfon2': Integer(),
    'cadult': Integer(),
    'private_residence_2': Integer(),
    'college_house_2': Integer(),
    'cstate': Integer(),
    'landline': Integer(),
    'household_adults': Integer(),
    'general_health': Integer(),
    'physical_health_days': Integer(),
    'mental_health_days': Integer(),
    'poor_health_days': Integer(),
    'has_health_plan': Integer(),
    'has_personal_doctor': Integer(),
    'medical_cost_issue': Integer(),
    'last_checkup': Integer(),
    'high_blood_pressure': Integer(),
    'bp_meds': Integer(),
    'high_cholesterol': Integer(),
    'cholesterol_check': Integer(),
    'diagnosed_diabetes': Integer(),
    'had_heart_attack': Integer(),
    'had_coronary_heart_disease': Integer(),
    'had_stroke': Integer(),
    'has_asthma': Integer(),
    'asthma_now': Integer(),
    'had_skin_cancer': Integer(),
    'had_other_cancer': Integer(),
    'has_copd': Integer(),
    'has_arthritis': Integer(),
    'has_depression': Integer(),
    'had_kidney_disease': Integer(),
}
target_int_cols = [c for c, t in dtype_map.items() if isinstance(t, (Integer, BigInteger))]

# --- Global controls ---
THRESHOLD = 50.0
PROTECT_COLS = {'month', 'day', 'year', 'survey_date'}
GLOBAL_DROP_COLS = None   # computed once from first cleaned batch
TABLE_INITIALIZED = False # we will REPLACE the table on the first write with kept columns

# ----------------------------
# Helpers (robust parsing & casting)
# ----------------------------
BYTES_RE = re.compile(r"^\s*b\s*'\"['\"]\s*$")

def decode_bytes_literal(x):
    if isinstance(x, bytes):
        try:
            return x.decode('utf-8', errors='ignore')
        except Exception:
            return x
    if isinstance(x, str):
        s = x.strip()
        m = BYTES_RE.match(s)
        if m:
            return m.group('val')
        return s
    return x

def extract_digits(series: pd.Series, max_len: int) -> pd.Series:
    s = series.astype(str).str.strip().map(decode_bytes_literal)
    s = s.str.extract(r'(?P<d>\d{1,' + str(max_len) + r'})', expand=True)['d']
    return s.fillna('')

def to_int_nullable(series: pd.Series) -> pd.Series:
    s = pd.to_numeric(series, errors='coerce').replace([np.inf, -np.inf], np.nan)
    integral = s.notna() & np.isfinite(s) & np.isclose(s, np.round(s))
    s = s.where(integral, np.nan)
    values = [int(v) if (v == v) else pd.NA for v in s]  # v==v is False for NaN
    return pd.Series(pd.array(values, dtype='Int64'))

def parse_mmddyyyy_to_date(series: pd.Series) -> pd.Series:
    s = series.map(decode_bytes_literal).astype(str).str.strip()
    s = s.str.extract(r'(?P<digits>\d{6,8})', expand=True)['digits'].fillna('')
    s = s.where(s.eq(''), other=s.str.zfill(8))
    dt_vals = pd.to_datetime(s, format='%m%d%Y', errors='coerce')
    return dt_vals.dt.date

def clean_batch(df: pd.DataFrame) -> pd.DataFrame:
    if df.empty:
        return df
    df = df.dropna(how='all')
    if df.empty:
        return df

    df.columns = [c.lower() for c in df.columns]
    for c in df.select_dtypes(include=['object']).columns:
        df[c] = df[c].astype(str).str.strip()

    for c in ['idate', 'imonth', 'iday', 'iyear']:
        if c in df.columns:
            df[c] = df[c].map(decode_bytes_literal)

    if 'imonth' in df.columns:
        df['imonth'] = extract_digits(df['imonth'], 2)
        df['imonth'] = to_int_nullable(df['imonth'])
    if 'iday' in df.columns:
        df['iday'] = extract_digits(df['iday'], 2)
        df['iday'] = to_int_nullable(df['iday'])
    if 'iyear' in df.columns:
        df['iyear'] = extract_digits(df['iyear'], 4)
        df['iyear'] = to_int_nullable(df['iyear'])

    if 'idate' in df.columns:
        df['idate'] = parse_mmddyyyy_to_date(df['idate'])

    df = df.rename(columns=rename_map)

    for col in ['telephone_number', 'ctelnum1']:
        if col in df.columns:
            df[col] = to_int_nullable(df[col])
    for col in ['sequence_number', 'primary_sampling_unit']:
        if col in df.columns:
            df[col] = to_int_nullable(df[col])

    for col in target_int_cols:
        if col in df.columns and col not in ['sequence_number', 'primary_sampling_unit', 'telephone_number', 'ctelnum1']:
            df[col] = to_int_nullable(df[col])

    if 'survey_date' in df.columns:
        df['survey_date'] = pd.to_datetime(df['survey_date'], errors='coerce').dt.date

    return df

# ----------------------------
# Global drop & schema utilities
# ----------------------------
def compute_global_drop_cols(df: pd.DataFrame, threshold_pct: float = THRESHOLD) -> list[str]:
    null_pct = df.isnull().mean() * 100
    to_drop = [c for c in null_pct.index if (null_pct[c] > threshold_pct and c not in PROTECT_COLS)]
    print(f"[GLOBAL] Will drop {len(to_drop)} columns with >{threshold_pct}% nulls:", to_drop)
    return to_drop

def fill_month_day_year_from_survey_date(df: pd.DataFrame) -> pd.DataFrame:
    if 'survey_date' in df.columns:
        sd = pd.to_datetime(df['survey_date'], errors='coerce')
        if 'month' not in df.columns:
            df['month'] = sd.dt.month.astype('Int64')
        else:
            df.loc[df['month'].isna(), 'month'] = sd.dt.month.astype('Int64')
        if 'day' not in df.columns:
            df['day'] = sd.dt.day.astype('Int64')
        else:
            df.loc[df['day'].isna(), 'day'] = sd.dt.day.astype('Int64')
        if 'year' not in df.columns:
            df['year'] = sd.dt.year.astype('Int64')
        else:
            df.loc[df['year'].isna(), 'year'] = sd.dt.year.astype('Int64')
    return df

def ensure_target_schema(df: pd.DataFrame, kept_cols: list[str]) -> pd.DataFrame:
    """
    Ensure df has exactly kept_cols; create missing with appropriate nulls.
    """
    for col in kept_cols:
        if col not in df.columns:
            # Create appropriate nulls by simple heuristic
            if col == 'survey_date':
                df[col] = [None] * len(df)
            elif col in {'month', 'day', 'year'}:
                df[col] = pd.Series([pd.NA] * len(df), dtype='Int64')
            elif col in ['sequence_number', 'primary_sampling_unit']:
                df[col] = pd.Series([pd.NA] * len(df), dtype='Int64')
            elif col in dtype_map and isinstance(dtype_map[col], (Integer, BigInteger)):
                df[col] = pd.Series([pd.NA] * len(df), dtype='Int64')
            else:
                df[col] = pd.Series([pd.NA] * len(df))
    # Subset and order
    return df[kept_cols]

def fill_nulls_dtype_aware(df: pd.DataFrame) -> pd.DataFrame:
    num_cols = df.select_dtypes(include=['number']).columns.tolist()
    if num_cols:
        df[num_cols] = df[num_cols].fillna(0)

    bool_cols = df.select_dtypes(include=['bool']).columns.tolist()
    if bool_cols:
        df[bool_cols] = df[bool_cols].fillna(False)

    dt64_cols = [c for c in df.columns if pd.api.types.is_datetime64_any_dtype(df[c])]
    if dt64_cols:
        df[dt64_cols] = df[dt64_cols].fillna(pd.Timestamp('1970-01-01'))

    if 'survey_date' in df.columns:
        mask = df['survey_date'].isna()
        if mask.any():
            df.loc[mask, 'survey_date'] = dt.date(1970, 1, 1)

    obj_cols = [c for c in df.select_dtypes(include=['object']).columns if c != 'survey_date']
    if obj_cols:
        df[obj_cols] = df[obj_cols].fillna("0")
    return df

# ----------------------------
# Engine should be defined earlier in your script, e.g.:
# engine = sa.create_engine("postgresql+psycopg2://user:pass@host:port/dbname")
# ----------------------------

print("Target DB:", str(getattr(engine, 'url', 'engine (url not available)')))
print(f"Writing to {target_schema}.{target_table}")

rows_total_written = 0

# dtype map for kept columns (set after first batch)
dtype_map_kept = None
kept_cols = None

while offset < total_rows:
    query = f"""
        SELECT {cols_str}
        FROM {source_schema}.{source_table}
        ORDER BY SEQNO
        LIMIT {batch_size} OFFSET {offset}
    """
    batch = pd.read_sql_query(query, engine)

    if batch.empty:
        print(f"No rows returned at offset {offset}. Stopping.")
        break

    before = len(batch)
    batch = clean_batch(batch)
    after_clean = len(batch)

    # Ensure month/day/year populated
    batch = fill_month_day_year_from_survey_date(batch)

    # Decide global drop on the FIRST cleaned batch only
    if GLOBAL_DROP_COLS is None:
        GLOBAL_DROP_COLS = compute_global_drop_cols(batch, threshold_pct=THRESHOLD)

        kept_cols = [c for c in dtype_map.keys() if c not in GLOBAL_DROP_COLS]
        # Protect core columns even if they were mistakenly in drop list
        for p in PROTECT_COLS:
            if p not in kept_cols and p in dtype_map:
                kept_cols.append(p)

        # Build kept dtype map
        dtype_map_kept = {c: dtype_map[c] for c in kept_cols if c in dtype_map}
        print("[GLOBAL] Kept columns:", kept_cols)

    # Apply global drop to this batch
    if GLOBAL_DROP_COLS:
        cols_to_drop_now = [c for c in GLOBAL_DROP_COLS if c in batch.columns]
        if cols_to_drop_now:
            batch = batch.drop(columns=cols_to_drop_now)

    # Enforce stable schema with kept columns only
    batch = ensure_target_schema(batch, kept_cols=kept_cols)

    # Fill remaining nulls with 0 (dtype-aware)
    batch = fill_nulls_dtype_aware(batch)

    # Write: on first batch, REPLACE table to physically remove dropped columns in SQL
    if not TABLE_INITIALIZED:
        write_mode = 'replace'   # re-create table schema with kept columns only
        TABLE_INITIALIZED = True
        print(f"[INIT] Creating/replacing {target_schema}.{target_table} with kept columns only...")
    else:
        write_mode = 'append'

    with engine.begin() as conn:
        batch.to_sql(
            name=target_table,
            con=conn,
            schema=target_schema,
            if_exists=write_mode,
            index=False,
            method='multi',
            chunksize=100000,
            dtype=dtype_map_kept
        )

    rows_total_written += len(batch)
    print(f"Offset {offset}: fetched {before} -> cleaned {after_clean} -> written {len(batch)}. Total written: {rows_total_written:,}.")
    offset += batch_size

print("✅ Silver layer processing completed!")
print(f"Total rows written: {rows_total_written:,}")

# Inspect target schema/tables
insp = inspect(engine)
schemas = insp.get_schema_names()
print("Available schemas:", schemas)
if target_schema in schemas:
    print(f"Tables in schema '{target_schema}':", insp.get_table_names(schema=target_schema))
else:
    print('Target schema not available')


Target DB: postgresql://postgres:***@localhost:5432/Project1
Writing to silver1.cleaned
[GLOBAL] Will drop 12 columns with >50.0% nulls: ['college_house', 'num_living_adults', 'ctelnum1', 'cellfon2', 'cadult', 'private_residence_2', 'college_house_2', 'cstate', 'landline', 'household_adults', 'bp_meds', 'asthma_now']
[GLOBAL] Kept columns: ['state', 'survey_month', 'survey_date', 'month', 'day', 'year', 'disposition_code', 'sequence_number', 'primary_sampling_unit', 'telephone_number', 'private_residence', 'state_residence', 'cell_phone', 'num_of_adults', 'num_of_men', 'num_of_women', 'general_health', 'physical_health_days', 'mental_health_days', 'poor_health_days', 'has_health_plan', 'has_personal_doctor', 'medical_cost_issue', 'last_checkup', 'high_blood_pressure', 'high_cholesterol', 'cholesterol_check', 'diagnosed_diabetes', 'had_heart_attack', 'had_coronary_heart_disease', 'had_stroke', 'has_asthma', 'had_skin_cancer', 'had_other_cancer', 'has_copd', 'has_arthritis', 'has_depress