In [5]:
#!/usr/bin/env python3
"""
Fetch CSVs listed by a Postgres query and copy them to a target folder.

Workflow
--------
1) Run the provided query to get file names (e.g., `source_file`) that are missing.
2) Search a source directory (optionally recursive) for matching files.
3) Copy them to the destination directory (where your pipeline scripts run).
4) Log what was found/missing.

Notes
-----
- Matching is case-insensitive on filenames.
- If a filename already exists at the destination, a suffix "__1", "__2", ... is added.
- By default this copies; change COPY_MODE to "move" if you prefer moving.
"""

from pathlib import Path
import shutil
from typing import List, Set, Dict
import datetime as dt

import pandas as pd
from sqlalchemy import create_engine, text

# ============== CONFIG ==============
# PG_CONN_STR = "postgresql+psycopg2://postgres:verdansk2020!@iamr007.ddns.net:2345/postgres"
PG_CONN_STR="postgresql+psycopg2://postgres:BANA650@localhost:5432/postgres"  #LOCAL POSTGRES 

# Your query (returns one column named source_file)
MISSING_FILES_SQL = """
WITH source AS (
    SELECT DISTINCT source_file FROM public.hospital_charges_staging
)
SELECT source_file
FROM public.hospital_metadata_test
WHERE source_file NOT IN (SELECT source_file FROM source)
"""

# Where to look for the CSVs you want to copy FROM
SEARCH_DIR = Path(r"C:\Users\gio12\Downloads\HOSPITAL FILES")  # <-- change

# Where to copy/move the CSVs TO (your pipeline working folder)
DEST_DIR = Path(r"C:\Users\gio12\Desktop\New folder")    # <-- change

# File extension(s) to look for. Usually ["*.csv"] is enough.
GLOBS = ["*.csv"]

# Search recursively?
RECURSIVE = True

# "copy" or "move"
COPY_MODE = "copy"

# Dry run (True = only print/log, don't actually copy/move)
DRY_RUN = False

# Write a simple CSV report next to the script?
WRITE_REPORT = True
# ====================================


def fetch_missing_filenames() -> List[str]:
    """Return the list of missing source_file values from Postgres."""
    engine = create_engine(PG_CONN_STR)
    with engine.begin() as conn:
        df = pd.read_sql(text(MISSING_FILES_SQL), conn)
    if "source_file" not in df.columns:
        raise RuntimeError("Query must return a column named 'source_file'.")
    # Drop nulls and strip spaces
    return (
        df["source_file"]
        .dropna()
        .map(lambda s: str(s).strip())
        .tolist()
    )


def build_search_index(search_dir: Path, globs: List[str], recursive: bool) -> Dict[str, Path]:
    """
    Build a case-insensitive index of filename -> full Path for quick lookups.
    If multiple files share the same name, the last one wins (warns in console).
    """
    index: Dict[str, Path] = {}
    iters = []
    for g in globs:
        iters.append(search_dir.rglob(g) if recursive else search_dir.glob(g))
    for it in iters:
        for p in it:
            if not p.is_file():
                continue
            key = p.name.lower()
            if key in index and index[key] != p:
                print(f"[WARN] Duplicate filename seen with different paths:\n"
                      f"       Keeping: {index[key]}\n"
                      f"       Skipping: {p}")
                continue
            index[key] = p
    return index


def uniquify(dest_path: Path) -> Path:
    """If dest_path exists, append __1, __2, ... until free."""
    if not dest_path.exists():
        return dest_path
    stem, suffix = dest_path.stem, dest_path.suffix
    i = 1
    while True:
        cand = dest_path.with_name(f"{stem}__{i}{suffix}")
        if not cand.exists():
            return cand
        i += 1


def ensure_dir(p: Path) -> None:
    p.mkdir(parents=True, exist_ok=True)


def main() -> None:
    print("[INFO] Fetching missing filenames from Postgres ...")
    try:
        missing = fetch_missing_filenames()
    except Exception as e:
        print(f"[ERROR] Query failed: {e}")
        return

    if not missing:
        print("[INFO] No missing files returned by the query. Nothing to do.")
        return

    print(f"[INFO] {len(missing)} filenames returned. Building search index in: {SEARCH_DIR}")
    if not SEARCH_DIR.exists():
        print(f"[ERROR] SEARCH_DIR does not exist: {SEARCH_DIR}")
        return

    index = build_search_index(SEARCH_DIR, GLOBS, RECURSIVE)
    print(f"[INFO] Indexed {len(index)} files with extensions {GLOBS}")

    ensure_dir(DEST_DIR)

    copied, moved, not_found, errors = [], [], [], []

    for fname in missing:
        key = fname.strip().lower()
        src = index.get(key)
        if src is None:
            print(f"   • NOT FOUND: {fname}")
            not_found.append(fname)
            continue

        dest = uniquify(DEST_DIR / src.name)
        try:
            if DRY_RUN:
                print(f"   • DRY-RUN {COPY_MODE.upper()}: {src}  ->  {dest}")
            else:
                if COPY_MODE.lower() == "move":
                    shutil.move(str(src), str(dest))
                    moved.append(str(dest))
                    print(f"   • MOVED: {src.name} -> {dest}")
                else:
                    shutil.copy2(str(src), str(dest))
                    copied.append(str(dest))
                    print(f"   • COPIED: {src.name} -> {dest}")
        except Exception as e:
            print(f"   • ERROR copying/moving {src} -> {dest}: {e}")
            errors.append((str(src), str(dest), str(e)))

    # Summary
    print("\n[SUMMARY]")
    print(f"  Copied : {len(copied)}")
    print(f"  Moved  : {len(moved)}")
    print(f"  Missing: {len(not_found)}")
    print(f"  Errors : {len(errors)}")

    if WRITE_REPORT:
        ts = dt.datetime.now().strftime("%Y%m%d_%H%M%S")
        report = Path.cwd() / f"fetch_missing_csvs_report_{ts}.csv"
        rows = []
        rows += [{"status": "copied", "path": p} for p in copied]
        rows += [{"status": "moved", "path": p} for p in moved]
        rows += [{"status": "missing", "path": name} for name in not_found]
        rows += [{"status": "error", "path": f"{src} -> {dst}", "error": err} for src, dst, err in errors]
        pd.DataFrame(rows).to_csv(report, index=False)
        print(f"[INFO] Report written: {report}")


if __name__ == "__main__":
    main()


[INFO] Fetching missing filenames from Postgres ...
[INFO] 50 filenames returned. Building search index in: C:\Users\gio12\Downloads\HOSPITAL FILES
[INFO] Indexed 255 files with extensions ['*.csv']
   • COPIED: 201852902_shorepoint-health-port-charlotte_standardcharges.csv -> C:\Users\gio12\Desktop\New folder\201852902_shorepoint-health-port-charlotte_standardcharges.csv
   • COPIED: 203329716_kindred-hospital-the-palm-beaches_standardcharges.csv -> C:\Users\gio12\Desktop\New folder\203329716_kindred-hospital-the-palm-beaches_standardcharges.csv
   • COPIED: 203329727_kindred-hospital-melbourne_standardcharges.csv -> C:\Users\gio12\Desktop\New folder\203329727_kindred-hospital-melbourne_standardcharges.csv
   • COPIED: 32-0583104_HCA-FLORIDA-UNIVERSITY-HOSPITAL_standardcharges.csv -> C:\Users\gio12\Desktop\New folder\32-0583104_HCA-FLORIDA-UNIVERSITY-HOSPITAL_standardcharges.csv
   • COPIED: 35-1611050_HCA-FLORIDA-ENGLEWOOD-HOSPITAL_standardcharges.csv -> C:\Users\gio12\Desktop\New fo