In [None]:
# CONFIGURATION
from pathlib import Path
import json, sqlite3, datetime, traceback, zipfile, tempfile, shutil

# Finding repo root for group reproduction
def find_repo_root(start: Path) -> Path:
    cur = start.resolve()
    for p in [cur, *cur.parents]:
        if (p / ".git").exists():
            return p
    return start

try:
    REPO_ROOT = find_repo_root(Path(__file__).resolve().parent)
except NameError:
    REPO_ROOT = find_repo_root(Path.cwd())

# Paths for dataset
DATASET_NAME  = "spider"       # change for dataset name
DATASET_DIR   = REPO_ROOT / "data" / DATASET_NAME
DATASET_ZIP   = REPO_ROOT / "data" / f"{DATASET_NAME}.zip"

RUN_STAMP      = datetime.datetime.now().strftime("%Y%m%d-%H%M%S")
ARTIFACTS_ROOT = REPO_ROOT / "artifacts" / "runs" / DATASET_NAME

# Controls
MAX_DATABASES      = 100   # 0/None for all
ROWS_PER_TABLE     = 50
SKIP_IF_EXISTS     = True  # set False to overwrite
AUTO_EXTRACT       = True  # only used when EXTRACT_TO_TEMP is False
EXTRACT_TO_TEMP    = True  # preferred: avoids leaving extracted files in repo
CLEANUP_AFTER_RUN  = True

TMP_DIR = None


# Temporary extraction directory
if EXTRACT_TO_TEMP and DATASET_ZIP.exists():
    TMP_DIR = Path(tempfile.mkdtemp(prefix=f"{DATASET_NAME}_"))
    
    with zipfile.ZipFile(DATASET_ZIP, "r") as zf:
        zf.extractall(TMP_DIR)
    
    # Point dataset dir at the temp extraction
    DATASET_DIR = TMP_DIR / DATASET_NAME
else:
    # Only extract into repo if not using temp
    if AUTO_EXTRACT and DATASET_ZIP.exists() and not DATASET_DIR.exists():
        
        with zipfile.ZipFile(DATASET_ZIP, "r") as zf:
            zf.extractall(DATASET_DIR.parent)

# Compute DB_ROOT AFTER extraction so we can detect a 'database' subfolder correctly
DB_ROOT_CANDIDATE = DATASET_DIR / "database"
DB_ROOT = DB_ROOT_CANDIDATE if DB_ROOT_CANDIDATE.exists() else DATASET_DIR


In [8]:

# List tables in the database
def list_tables(conn: sqlite3.Connection) -> list[str]:
    cur = conn.execute(
        "SELECT name FROM sqlite_master "
        "WHERE type='table' AND name NOT LIKE 'sqlite_%' ORDER BY name"
    )
    return [r[0] for r in cur.fetchall()]

# Get columns for a table 
def get_table_columns(conn: sqlite3.Connection, table: str) -> list[dict]:
    # Returns list of column dicts: {"name", "type", "notnull", "default", "is_pk"}.
    cols = []
    for cid, name, ctype, notnull, dflt_value, pk in conn.execute(f"PRAGMA table_info('{table}')"):
        cols.append({
            "name": name,
            "type": ctype,
            "notnull": bool(notnull),
            "default": dflt_value,
            "is_pk": bool(pk),
        })
    return cols

# find primary key columns
def find_pk_columns(cols: list[dict]) -> list[str]:
    pks = [c["name"] for c in cols if c.get("is_pk")]
    return pks

def get_foreign_keys(conn: sqlite3.Connection, table: str) -> list[dict]:
    # Returns list of FKs as dicts with from_table, from_column, to_table, to_column.
    fks = []
    for row in conn.execute(f"PRAGMA foreign_key_list('{table}')"):
        # columns based on SQLite docs: id, seq, table, from, to, on_update, on_delete, match
        _, _, ref_table, from_col, to_col, *_ = row
        fks.append({
            "from_table": table,
            "from_column": from_col,
            "to_table": ref_table,
            "to_column": to_col,
        })
    return fks

# Sample rows from a table
def sample_rows(conn: sqlite3.Connection, table: str, n: int) -> list[dict]:
    cur = conn.execute(f"SELECT * FROM '{table}' LIMIT {int(n)}")
    colnames = [d[0] for d in cur.description]
    rows = []
    for r in cur.fetchall():
        rows.append({c: v for c, v in zip(colnames, r)})
    return rows

# Counts how many times each column has null values
def summarize_nulls(rows: list[dict]) -> dict:
    if not rows:
        return {}
    cols = rows[0].keys()
    out = {c: 0 for c in cols}
    for r in rows:
        for c in cols:
            if r.get(c) is None:
                out[c] += 1
    return out


In [9]:
# scans a dataset folder and finds every sqlite file giving them readable IDs with the exact path
def enumerate_sqlite_dbs(db_root: Path) -> list[tuple[str, Path]]:
    paths = sorted(db_root.rglob("*.sqlite"))
    if not paths:
        raise FileNotFoundError(f"No .sqlite files found under {db_root}")
    pairs = []
    for p in paths:
        db_id = p.parent.name or p.stem
        pairs.append((db_id, p))
    return pairs


In [None]:

# Main function that handles a single SQLite database end-to-end
# This will extract the schema, sample data, and validation results from the database.

def process_one_db(db_id: str, db_path: Path, out_root: Path) -> None:
    # Create output directory for this database
    OUT_DIR = out_root / db_id
    OUT_DIR.mkdir(parents=True, exist_ok=True)

    
    schema_path  = OUT_DIR / "schema.json"
    samples_path = OUT_DIR / "samples.jsonl"
    val_path     = OUT_DIR / "validation.log"
    

    # Skip processing if outputs already exist
    if SKIP_IF_EXISTS and schema_path.exists() and samples_path.exists() and val_path.exists():
        print(f"Already exists, skipping: {OUT_DIR}")
        return

    conn = None

    try:
        # Open DB in read-only mode
        conn = sqlite3.connect(f"file:{db_path}?mode=ro", uri=True)
        conn.row_factory = sqlite3.Row

        # 1. Discover tables in the database
        tables = list_tables(conn)

        # 2. Build schema representation (tables, columns, PKs, FKs)
        schema = {"db_id": db_id, "tables": [], "foreign_keys": []}
        table_to_columns = {}
        for t in tables:
            cols = get_table_columns(conn, t)
            table_to_columns[t] = cols
            schema["tables"].append({
                "name": t,
                "columns": cols,
                "primary_key": find_pk_columns(cols),
            })
            schema["foreign_keys"].extend(get_foreign_keys(conn, t))

        # 3. Sample rows from each table (for JSONL training / debugging)
        samples = {t: sample_rows(conn, t, ROWS_PER_TABLE) for t in tables}


        # 4. Validation checks (foreign keys, nulls, primary keys)
        issues = []
        for fk in schema["foreign_keys"]:
            from_cols = {c["name"] for c in table_to_columns.get(fk["from_table"], [])}
            to_cols   = {c["name"] for c in table_to_columns.get(fk["to_table"], [])}
            if fk["from_column"] not in from_cols:
                issues.append(f"Missing from_column {fk['from_table']}.{fk['from_column']}")
            if fk["to_column"] not in to_cols:
                issues.append(f"Missing to_column {fk['to_table']}.{fk['to_column']}")
        null_summary = {t: summarize_nulls(rows) for t, rows in samples.items()}

        # Primary key summary for validation.log
        pk_summary = {t: [c["name"] for c in cols if c.get("is_pk")]
              for t, cols in table_to_columns.items()}
        pk_count   = sum(len(pks) for pks in pk_summary.values())


        # 5. Save schema.json
        with open(schema_path, "w", encoding="utf-8") as f:
            json.dump(schema, f, indent=2, ensure_ascii=False)



        # 6. Save samples.jsonl
        with open(samples_path, "w", encoding="utf-8") as f:
            for t, rows in samples.items():
                for r in rows:
                    f.write(json.dumps({"table": t, "row": r}, ensure_ascii=False) + "\n")

        # 7. Save validation.log
        with open(val_path, "w", encoding="utf-8") as f:
            f.write(f"DB ID: {db_id}\n")
            f.write(f"Run:   {RUN_STAMP}\n\n")
            f.write(f"Tables: {len(tables)}\n")
            f.write(f"Columns: {sum(len(c) for c in table_to_columns.values())}\n")
            f.write(f"PKs: {pk_count}\n") 
            f.write(f"FKs: {len(schema['foreign_keys'])}\n\n")

            # Check for structural issues
            if issues:
                f.write("issues:\n")
                for i in issues: f.write(f"- {i}\n")
            else:
                f.write("No structural issues found.\n")

            # Primary key summary
            f.write("\nPrimary keys by table:\n")
            for t, pks in pk_summary.items():
                pks = pk_summary[t]
                f.write(f"- {t}: {pks if pks else 'None'}\n")

            # Null summary
            f.write("\nNull summary:\n")
            f.write(json.dumps(null_summary, indent=2, ensure_ascii=False))




    except Exception as e:
        traceback.print_exc()

    finally:
        # Close connection aggressively to release Windows file locks
        try:
            if conn is not None:
                conn.close()
        except Exception:
            pass
        import gc; gc.collect()


In [11]:
# Final orchestration
def main():
    print(f"Dataset name:    {DATASET_NAME}")
    print(f"Dataset dir:     {DATASET_DIR}")
    print(f"DB root:       {DB_ROOT}")
    print(f"Artifacts dir: {ARTIFACTS_ROOT}")

    # Safety check: dataset folder must exist 
    if not DATASET_DIR.exists():
        raise FileNotFoundError(f"Dataset dir {DATASET_DIR} does not exist.")

    # Create artifacts folder if it doesn't exist
    ARTIFACTS_ROOT.mkdir(parents=True, exist_ok=True)

    dbs = enumerate_sqlite_dbs(DB_ROOT)
    if not dbs:
        raise RuntimeError(f"No SQLite DBs found under {DB_ROOT}")

    # Optionally limit the number of databases processed
    if MAX_DATABASES and MAX_DATABASES > 0:
        # db_id = identifier (usually folder name)
        # db_path = actual .sqlite file path
        dbs = dbs[:MAX_DATABASES]

    # Process each database one by one alphabetically
    for db_id, db_path in dbs:
        process_one_db(db_id, db_path, ARTIFACTS_ROOT)

    print("\ndone.")









In [None]:
if __name__ == "__main__":
    try:
        main()
    finally:
        # Optional cleanup for temp extraction 
        if CLEANUP_AFTER_RUN and TMP_DIR is not None:
            shutil.rmtree(TMP_DIR, ignore_errors=True)


Dataset name:    spider
Dataset dir:     C:\Users\felix\AppData\Local\Temp\spider_6ygax40k\spider
DB root:       C:\Users\felix\AppData\Local\Temp\spider_6ygax40k\spider\database
Artifacts dir: C:\Users\felix\Videos\Documents\GitHub\project_rdmstokgs\artifacts\runs\spider

done.
