
# **AirQ_Part1_xxx.ipynb** — Assignment 1 Orchestration

- This notebook orchestrates Assignment 1.
- All SQL must live in external `.sql` files under `ddl/`, `etl/`, and `post/`. You can test them in DBeaver.
- The notebook only opens a DB connection, runs external `.sql` files, loads CSVs via Pandas, and produces the final dump.

**Final folder layout (per‑group, self‑contained)**

```
BI_Projects/
  DWH1_xxx/
    csv/       # 15 OLTP CSV files
    date/      # Tables X and Y CSVs (do NOT submit the 15 original CSVs; see submission checklist)
    ddl/       # DDL only (staging, X/Y, warehouse, optional reset)
    etl/       # SQL-first ETL steps: a1_etl01_...sql, a1_etl02_...sql, ...
    post/      # Post-ETL checks: a1_check01_...sql, a1_check02_...sql, ...
    prov/      # Your auto-generated provenance JSON-LD file
    sqldump/   # Export produced by pg_dump
    AirQ_Part1_xxx.ipynb
    Airq_ERD_dwh_xxx.png / .pdf
    group_xxx.txt
    Report_Part1_Group_xxx.pdf
```
> Replace `xxx` in your file names with your **three‑digit** group number everywhere.


## Contents
1. Configuration & preflight (group, paths)  
2. Database connection
3. Reset & create stg_xxx  
4. Load CSVs into stg_xxx (order-sensitive)  
5. Reset & create dwh_xxx (dimensions first, then facts)  
6. ETL runner (executes `etl/a1_etl*.sql`)  
7. Post-ETL checks (5–7 checks with pass/fail)  
8. Create database dump
9. Packaging sanity check



## 1) Configuration & preflight

In [1]:
# === Parameters ===
# XXX = "001"               # # three digits, e.g. "007"
# ...
# XXX = "031"               # # three digits, e.g. "007"
# ...
# XXX = "071"               # # three digits, e.g. "007"
# ...
# XXX = "199"               # # three digits, e.g. "007"
XXX = "006"               # # three digits, e.g. "007"

VERBOSE_SQL = False             # print progress when running .sql files
LOAD_ORDER_CSV = []             # or fill later in Section 4
LOAD_ORDER_DATA = []            # or fill later in Section 4

In [2]:
import re, time
import shutil, subprocess, os
import json, hashlib

from pathlib import Path
from getpass import getpass
from urllib.parse import quote_plus
from datetime import datetime, timezone

import pandas as pd
import sqlparse
from sqlalchemy import create_engine, text, engine

In [3]:
# === Toggles & paths ===
root_dir = Path.cwd()
csv_dir = root_dir / "csv"
data_dir = root_dir / "data"
ddl_dir = root_dir / "ddl"
etl_dir = root_dir / "etl"
post_dir = root_dir / "post"
sqldump_dir = root_dir / "sqldump"

SCHEMA_STG = f"stg_{XXX}"
SCHEMA_DWH = f"dwh_{XXX}"

# files we expect in the ddl subfolder
STG_RESET  = ddl_dir / f"airq_reset_stg_{XXX}.sql"
STG_CREATE = ddl_dir / f"airq_create_stg_{XXX}.sql"
STG_EXT    = ddl_dir / f"airq_create_ext_{XXX}.sql"
DWH_RESET  = ddl_dir / f"airq_reset_dwh_{XXX}.sql"
DWH_CREATE = ddl_dir / f"airq_create_dwh_{XXX}.sql"

print("CSV dir:", csv_dir)
print("Data dir:", data_dir)
print("DDL dir:", ddl_dir)
print("ETL dir:", etl_dir)
print("Postchecks dir:", post_dir)
print("SQLdump dir:", sqldump_dir)


CSV dir: /Users/kerimhalilovic/Documents/GitHub/bi-2025W-ass1/csv
Data dir: /Users/kerimhalilovic/Documents/GitHub/bi-2025W-ass1/data
DDL dir: /Users/kerimhalilovic/Documents/GitHub/bi-2025W-ass1/ddl
ETL dir: /Users/kerimhalilovic/Documents/GitHub/bi-2025W-ass1/etl
Postchecks dir: /Users/kerimhalilovic/Documents/GitHub/bi-2025W-ass1/post
SQLdump dir: /Users/kerimhalilovic/Documents/GitHub/bi-2025W-ass1/sqldump



## 2) Make database connection


In [4]:
import getpass as gp

# === Minimal config & connect ===
DB_USER = f"grp_{XXX}"
DB_NAME = "airq"
DB_HOST = "localhost"
DB_PORT = "5432"

# a password is asked once per run; enter empty password if your local pg_hba allows trust/peer
pw = gp.getpass(f"Password for {DB_USER}@{DB_HOST}:{DB_PORT}/{DB_NAME} (leave empty if not needed): ")
DSN = f"postgresql+psycopg2://{DB_USER}:{quote_plus(pw)}@{DB_HOST}:{DB_PORT}/{DB_NAME}" if pw \
      else f"postgresql+psycopg2://{DB_USER}@{DB_HOST}:{DB_PORT}/{DB_NAME}"

def _mask_dsn(dsn: str) -> str:
    try:
        return str(engine.make_url(dsn).set(password="***"))
    except Exception:
        return re.sub(r"://([^:@]+)(?::[^@]*)?@", r"://\\1:***@", dsn)

engine = create_engine(DSN, future=True, pool_pre_ping=True)
print("Connecting via:", _mask_dsn(DSN))

with engine.begin() as conn:
    # best-effort: set the role if it exists; don't crash if not
    try:
        conn.exec_driver_sql(f"SET ROLE grp_{XXX}")
        print(f"SET ROLE grp_{XXX} ✓")
    except Exception as e:
        print(f"(no SET ROLE: {e.__class__.__name__})")
    who = conn.exec_driver_sql("select current_user").scalar_one()
    print("current_user:", who)


Password for grp_006@localhost:5432/airq (leave empty if not needed):  ········


Connecting via: postgresql+psycopg2://\1:***@localhost:5432/airq
SET ROLE grp_006 ✓
current_user: grp_006


In [5]:
def run_sqlscript(
    path: str,
    *,
    engine,
    progress: bool = True,      # progress/verbosity- show progress OR keep output quiet
    add_search_path: bool = False,
    schema_dwh: str | None = None,
    schema_stg: str | None = None,
    title: str | None = None,      # optional title
    strip_psql_meta: bool = True,  # psql meta stripping
):
    """
    Execute all statements in a .sql file.
    - Returns the LAST result set as a pandas.DataFrame if any statement returns rows; else None.
    - Set progress=False to suppress progress/header prints (great for check scripts).
    """

    raw = Path(path).read_text(encoding="utf-8")

    # Strip psql meta-commands (e.g., \i, \set) if requested
    if strip_psql_meta:
        raw = "\n".join(
            line for line in raw.splitlines()
            if not line.lstrip().startswith("\\")
        )

    # Optional search_path prologue
    prologue = ""
    if add_search_path:
        schs = [s for s in (schema_dwh, schema_stg) if s]
        if schs:
            prologue = f"SET search_path TO {', '.join(schs)};\n"

    script = prologue + raw
    stmts = [s.strip() for s in sqlparse.split(script) if s and s.strip(" ;\n\t")]

    if progress:
        hdr = f"▶ {title}" if title else "▶ Running SQL script"
        print(f"{hdr}: {path} ({len(stmts)} statements)")
    t0 = time.time()

    last_df = None
    with engine.begin() as conn:
        for i, stmt in enumerate(stmts, start=1):
            if not stmt:
                continue
            start = time.time()
            try:
                if progress:
                    preview = " ".join(stmt.split())[:120]
                    print(f"  {i:>3}: {preview} ...")

                cursor = conn.exec_driver_sql(stmt)

                if cursor.returns_rows:
                    rows = cursor.fetchall()
                    cols = cursor.keys()
                    last_df = pd.DataFrame(rows, columns=cols)

                if progress:
                    print(f"       OK ({time.time() - start:.3f}s)")

            except Exception as e:
                # Raise with a helpful preview even when progress=False
                preview = " ".join(stmt.split())[:160]
                raise RuntimeError(
                    f"SQL error in statement #{i}: {preview}"
                ) from e

    if progress:
        print(f"✅ Done in {time.time() - t0:.2f}s")

    return last_df


## 3) Reset and Create airq schemas

In [6]:
print(f"== STAGING-ONLY RESET: stg_{XXX} ==")
try:
    for p in (STG_RESET, STG_CREATE, STG_EXT):
        run_sqlscript(p, engine=engine, progress=VERBOSE_SQL)
except Exception as e:
    print(f"!! Reset & create failed: {e}")
    raise

== STAGING-ONLY RESET: stg_006 ==



## 4) Load CSV → `stg_xxx` with Pandas `.to_sql()` (you must choose the correct order)

Because `stg_xxx` has foreign keys, **order matters**. Fill `LOAD_ORDER_CSV` with base filenames (no `.csv`).  
If you leave the list empty, the cell will just print what CSVs it found without loading anything.


In [7]:
def load_folder_to_stg(
    folder_name: str,
    engine,
    SCHEMA_STG: str,
    load_order=None,
    if_exists: str = "append",
    chunksize: int = 20000,
):
    global root_dir  # expected to be defined earlier
    src_dir = Path(root_dir) / folder_name
    if not src_dir.exists():
        raise FileNotFoundError(f"Folder not found: {src_dir}")

    def load_one(name: str):
        path = src_dir / f"{name}.csv"
        if not path.exists():
            print("Missing CSV:", path.name)
            return 0
        df = pd.read_csv(
            path,
            na_values=["\\N"],
            keep_default_na=False,
            low_memory=False,
        )
        # Convert any *...from / ...to / ...at* to DATE
        for col in df.columns:
            col_l = col.lower()
            if col_l.endswith(("from", "to", "at")):
                df[col] = pd.to_datetime(df[col], format="%Y-%m-%d", errors="coerce").dt.date
        # Write
        df.to_sql(
            name,
            con=engine,
            schema=SCHEMA_STG,
            if_exists=if_exists,
            index=False,
            method="multi",
            chunksize=chunksize,
        )
        print(f"Loaded {len(df):,} rows → {SCHEMA_STG}.{name}")
        return len(df)

    if not load_order:
        discovered = sorted([p.stem for p in src_dir.glob("*.csv")])
        print("No order set yet. CSVs found:", discovered)
        return

    t0 = time.time()
    total = 0
    for name in load_order:
        total += load_one(name)
    print(f"⏱️ Total load time: {time.time() - t0:.2f} seconds · {total:,} rows")


In [8]:
# First, we load the original 15 CSV files in the correct order

LOAD_ORDER_CSV = [
    "tb_country",        # parent of city
    "tb_role",           # parent of employee
    "tb_servicetype",    # parent of serviceevent
    "tb_readingmode",    # parent of readingevent
    "tb_alert",          # parent of paramalert
    "tb_param",          # parent of paramalert & paramsensortype & readingevent
    "tb_sensortype",     # parent of paramsensortype & sensordevice
    "tb_city",           # child of country; parent of weather & sensordevice
    "tb_employee",       # child of role; parent of serviceevent
    "tb_paramalert",     # child of param + alert
    "tb_paramsensortype",# child of param + sensortype
    "tb_sensordevice",   # child of sensortype + city; parent of readingevent/serviceevent
    "tb_weather",        # child of city
    "tb_readingevent",   # child of sensordevice + param + readingmode
    "tb_serviceevent",   # child of servicetype + employee + sensordevice
]

load_folder_to_stg("csv", engine, SCHEMA_STG, load_order=LOAD_ORDER_CSV,  if_exists="append")

Loaded 20 rows → stg_006.tb_country
Loaded 16 rows → stg_006.tb_role
Loaded 24 rows → stg_006.tb_servicetype
Loaded 8 rows → stg_006.tb_readingmode
Loaded 4 rows → stg_006.tb_alert
Loaded 30 rows → stg_006.tb_param
Loaded 12 rows → stg_006.tb_sensortype
Loaded 36 rows → stg_006.tb_city
Loaded 484 rows → stg_006.tb_employee
Loaded 120 rows → stg_006.tb_paramalert
Loaded 115 rows → stg_006.tb_paramsensortype
Loaded 627 rows → stg_006.tb_sensordevice
Loaded 26,316 rows → stg_006.tb_weather
Loaded 985,573 rows → stg_006.tb_readingevent
Loaded 22,720 rows → stg_006.tb_serviceevent
⏱️ Total load time: 61.50 seconds · 1,036,105 rows


In [None]:
# Next, we load our "extra" tables X and Y

LOAD_ORDER_DATA = [
    "tb_environmental_campaign",
    "tb_campaign_city",
]

load_folder_to_stg("data", engine, SCHEMA_STG, load_order=LOAD_ORDER_DATA, if_exists="append")

Loaded 7 rows → stg_006.tb_environmental_campaign
Loaded 17 rows → stg_006.tb_campaign_city
⏱️ Total load time: 0.01 seconds · 24 rows



## 5) Reset and create **warehouse** (`dwh_xxx`) from DDL file

Run `ddl/airq_reset_dwh_xxx.sql` and `ddl/airq_create_dwh_xxx.sql` (dimensions first, then facts; include `etl_load_timestamp`).


In [10]:
print(f"== DWH-ONLY RESET: dwh_{XXX} ==")
try:
    for p in (DWH_RESET, DWH_CREATE):
        run_sqlscript(p, engine=engine, progress=VERBOSE_SQL)
except Exception as e:
    print(f"!! Reset & create failed: {e}")
    raise

== DWH-ONLY RESET: dwh_006 ==



## 6) SQL-first ETL — run all files in etl/

We execute **all** files matching `etl/a1_etl*.sql` in lexicographic order. Every ETL file must begin with `SET search_path TO dwh_xxx, stg_xxx;`  



In [11]:
steps = sorted(etl_dir.glob("a1_etl*.sql"))
if not steps:
    print("No ETL step files found in etl/ (expected a1_etl*.sql).")
else:
    for s in steps:
        run_sqlscript(s, engine=engine, progress=VERBOSE_SQL)


## 7) Post‑ETL checks — run files from `post/`

Each check should be a small SQL query with `SET search_path TO dwh_xxx, stg_xxx;` and a clear, short interpretation.


7.1. Dimension **dim_servicetype** contains the same number of rows as table **tb_servicetype** -> OK

In [12]:
# Check 1 — example
df = run_sqlscript("post/a1_check01_example.sql", engine=engine, progress=VERBOSE_SQL)
display(df)

Unnamed: 0,dwh_count,stg_count,status_check,run_time
0,24,24,OK,2025-10-23 17:59:16


7.2. There are no mismatching names for **typename** between dimension **dim_servicetype** and staging table **tb_servicetype** -> OK

In [13]:
# Check 2 — example
df = run_sqlscript("post/a1_check02_example.sql", engine=engine, progress=VERBOSE_SQL)
display(df)

Unnamed: 0,name_mismatches_or_missing,status_check,run_time
0,0,OK,2025-10-23 17:59:16



## 8) Create `sqldump/sqldump_airq_dwh_xxx.sql`

We run `pg_dump -n dwh_xxx --no-owner --no-privileges` to keep dumps portable.


In [14]:
import os, shutil
os.environ["PATH"] = "/opt/homebrew/opt/libpq/bin:" + os.environ.get("PATH","")
print("which pg_dump now:", shutil.which("pg_dump"))

# === Create sqldump/sqldump_airq_dwh_xx.sql (pg_dump) ===
sqldump_dir.mkdir(exist_ok=True)
outfile = sqldump_dir / f"sqldump_airq_dwh_{XXX}.sql"

pg_dump = shutil.which("pg_dump") or "pg_dump"
cmd = [
    pg_dump,
    "-h", DB_HOST,
    "-p", str(DB_PORT),
    "-U", DB_USER,
    "-d", DB_NAME,
    "-n", f"dwh_{XXX}",
    "--no-owner",
    "--no-privileges",
    "-f", str(outfile),
]

# Avoid echoing the password; supply it via env if provided
env = dict(os.environ)
if 'pw' in globals() and pw:
    env["PGPASSWORD"] = pw

print("Running:", " ".join(cmd).replace(DB_USER, "<user>"))
try:
    subprocess.run(cmd, check=True, env=env)
    print("✓ Dump created at", outfile)
except Exception as e:
    print("pg_dump failed; try this manually in a terminal:\n", " ".join(cmd), "\nError:", e)


which pg_dump now: /opt/homebrew/opt/libpq/bin/pg_dump
Running: /opt/homebrew/opt/libpq/bin/pg_dump -h localhost -p 5432 -U <user> -d airq -n dwh_006 --no-owner --no-privileges -f /Users/kerimhalilovic/Documents/GitHub/bi-2025W-ass1/sqldump/sqldump_airq_dwh_006.sql
✓ Dump created at /Users/kerimhalilovic/Documents/GitHub/bi-2025W-ass1/sqldump/sqldump_airq_dwh_006.sql


## 9) PROV-O (JSON-LD) — lightweight provenance

In [15]:
# === PROV-O - student inputs (fill these) ===

# 1) Names (required)
STUDENT_A = "Halilovic, Kerim"
STUDENT_B = "Lukic, Nikola"

# 2) Fact table names in dwh_006 (required)
# Student A owns Fact 1; Student B owns Fact 2
FACT1_NAME = "ft_reading_daily"
FACT2_NAME = "ft_service_event"

# 3) One-sentence grain per fact (required)
FACT1_GRAIN = "One row per sensor device, per measured parameter, per day."
FACT2_GRAIN = "One row per service event as recorded in OLTP."

# 4) 2–4 measures per fact (required). Mark ≥1 as fully additive.
FACT1_MEASURES = [
    {"name": "cnt_readings", "agg": "SUM",  "fully_additive": True},
    {"name": "sum_data_volume_bytes", "agg": "SUM",  "fully_additive": True},
    {"name": "avg_data_quality", "agg": "AVG"},
    {"name": "avg_value", "agg": "AVG"},
    {"name": "cnt_exceed_yellow", "agg": "SUM", "fully_additive": True},
]
FACT2_MEASURES = [
    {"name": "service_cost_eur", "agg": "SUM",  "fully_additive": True},
    {"name": "service_duration_min", "agg": "SUM",  "fully_additive": True},
    {"name": "service_quality_score", "agg": "AVG"},
    {"name": "underqualified_flag", "agg": "SUM", "fully_additive": True},
]

# 5) OLTP sources per fact (DO NOT list time dimensions here).
# List the OLTP snapshot tables that feed each fact and the NON-time dimensions you actually used.
FACT1_SOURCES_OLTP = {
    "fact": ["tb_readingevent", "tb_paramalert", "tb_alert"],
    "dimensions": {
        "dim_device": ["tb_sensordevice", "tb_city", "tb_country", "tb_sensortype"],
        "dim_parameter": ["tb_param"],
        "dim_sensor_type": ["tb_sensortype"],
        "dim_reading_mode": ["tb_readingmode"],
        "dim_campaign": ["tb_environmental_campaign", "tb_campaign_city"],
    },
    "why": "Student A owns Fact 1; these OLTP tables supply its grain, measures and lookups."
}
FACT2_SOURCES_OLTP = {
    "fact": ["tb_serviceevent"],
    "dimensions": {
        "dim_device": ["tb_sensordevice", "tb_city", "tb_country", "tb_sensortype"],
        "dim_service_type": ["tb_servicetype"],
        "dim_technician_role": ["tb_employee", "tb_role"],
        "dim_campaign": ["tb_environmental_campaign", "tb_campaign_city"],
    },
    "why": "Student B owns Fact 2; these inputs determine its grain and lookups."
}

# 6) Time dimensions (generated; no OLTP lineage here).
# Provide ONE or TWO names following the required pattern: dim_time<granularity>
TIME_DIMS = [
    "dim_timeday",
]

In [16]:
# Outputs: prov/prov_airq_dwh_XXX.jsonld
# Expects: root_dir, csv_dir, etl_dir, XXX (group number as a string), and optionally SQLAlchemy `engine`.

def _iso_utc_now():
    return datetime.now(timezone.utc).replace(microsecond=0).isoformat().replace("+00:00", "Z")

def _sha256(path: Path) -> str:
    h = hashlib.sha256()
    with path.open("rb") as f:
        for chunk in iter(lambda: f.read(8192), b""):
            h.update(chunk)
    return h.hexdigest()

def _file_iri(p: Path) -> str:
    rel = os.path.relpath(p, root_dir).replace(os.sep, "/")
    return f"file:{rel}"

def _csv_path_for(name: str):
    """Resolve an OLTP table/CSV name to a CSV Path in csv_dir (case-insensitive)."""
    n = (name or "").strip()
    if not n:
        return None
    base = n.lower().replace(".csv", "")
    candidates = [f"{base}.csv", f"tb_{base}.csv"]
    lowers = {p.name.lower(): p for p in csv_dir.glob("*.csv")}
    for cand in candidates:
        if cand in lowers:
            return lowers[cand]
    return None  # not found; record logical IRI instead

GROUP = str(globals().get("XXX", "000"))
now = _iso_utc_now()
schema = f"dwh_{GROUP}"

# --- Validate inputs (minimal) ---
# for s in (STUDENT_A, STUDENT_B):
#     assert isinstance(s, str) and s.strip() and "Surname" not in s, "Fill in STUDENT_A / STUDENT_B."
for nm in (FACT1_NAME, FACT2_NAME):
    assert isinstance(nm, str) and nm.startswith("ft_"), "FACT names should start with ft_."
for g in (FACT1_GRAIN, FACT2_GRAIN):
    assert isinstance(g, str) and len(g.split()) >= 4, "Provide a short grain sentence per fact."
for arr in (FACT1_MEASURES, FACT2_MEASURES):
    assert isinstance(arr, list) and 1 <= len(arr) <= 6, "List 1–6 measures per fact."
assert isinstance(TIME_DIMS, list) and 1 <= len(TIME_DIMS) <= 2, "Define 1 or 2 time dimensions in TIME_DIMS."
for tname in TIME_DIMS:
    assert isinstance(tname, str), "Each time dimension name must be a string."
    assert tname.startswith("dim_time"), "Time dimension names must start with 'dim_time'."
    gran = tname[len("dim_time"):]  # suffix after the required prefix
    assert gran and gran.isalpha(), "Provide a non-empty alphabetic granularity after 'dim_time' (e.g., 'day','month')."
# If two dims, require different granularities (case-insensitive)
if len(TIME_DIMS) == 2:
    g1 = TIME_DIMS[0][len("dim_time"):].lower()
    g2 = TIME_DIMS[1][len("dim_time"):].lower()
    assert g1 != g2, "If you define two time dimensions, their granularities must differ (e.g., day vs month)."

# === CSV entities from declared OLTP sources (no time dims here) ===
declared = set()
def _collect_from_mapping(mapping):
    if not mapping:
        return
    for n in mapping.get("fact", []): declared.add(n)
    for _, sources in mapping.get("dimensions", {}).items():
        for n in sources: declared.add(n)

_collect_from_mapping(FACT1_SOURCES_OLTP)
_collect_from_mapping(FACT2_SOURCES_OLTP)

csv_entities, csv_index = [], {}
for name in sorted({n.strip() for n in declared if str(n).strip()}):
    p = _csv_path_for(name)
    if p and p.exists():
        iri = _file_iri(p)
        csv_entities.append({"@id": iri, "@type": "prov:Entity",
                             "dct:title": p.name, "dct:format": "text/csv",
                             "contentHash": _sha256(p)})
        csv_index[name] = iri
    else:
        iri = f"oltp://{name.lower().replace('.csv','')}"
        csv_entities.append({"@id": iri, "@type": "prov:Entity",
                             "dct:title": name, "dct:format": "text/csv"})
        csv_index[name] = iri

# === ETL scripts as prov:Plan entities (hash for reproducibility) ===
etl_list  = globals().get("ETL_SCRIPTS_RUN")
etl_paths = [Path(s) for s in etl_list] if etl_list else sorted(etl_dir.glob("*.sql"))
etl_entities = [{"@id": _file_iri(p), "@type": ["prov:Entity", "prov:Plan"],
                 "dct:title": p.name, "dct:format": "text/sql",
                 "contentHash": _sha256(p)} for p in etl_paths]

# === DWH outputs discovered from the database (row counts if engine is available) ===
output_entities, output_index = [], {}
def _add_output_entity(tname: str, cnt: int | None):
    iri = f"db://airq/{schema}/{tname}"
    ent = {"@id": iri, "@type": "prov:Entity", "dct:title": tname}
    if cnt is not None: ent["rowCount"] = int(cnt)
    output_entities.append(ent)
    output_index[tname] = iri

try:
    if "engine" in globals():
        from sqlalchemy import text
        with engine.connect() as conn:
            rows = conn.execute(
                text("""select table_name
                          from information_schema.tables
                         where table_schema = :s and table_type='BASE TABLE'
                         order by table_name"""), {"s": schema}
            ).fetchall()
            for (tname,) in rows:
                cnt = conn.execute(text(f'select count(*) from "{schema}"."{tname}"')).scalar()
                _add_output_entity(tname, cnt)
    else:
        # offline: facts + declared time dims must appear in provenance
        for t in [FACT1_NAME, FACT2_NAME] + TIME_DIMS:
            _add_output_entity(t, None)
except Exception as e:
    print("Note: could not enumerate DWH tables / counts:", e)
    for t in [FACT1_NAME, FACT2_NAME] + TIME_DIMS:
        if t not in output_index:
            _add_output_entity(t, None)

# === Activity for this pipeline run ===
activity_id = f"urn:airq:etl:group:{GROUP}:{now}"
activity = {
    "@id": activity_id,
    "@type": "prov:Activity",
    "dct:description": f"Assignment 1 ETL run for {schema}",
    "prov:startedAtTime": now,
    "prov:endedAtTime": now,
    "prov:used": [e["@id"] for e in (csv_entities + etl_entities)],
    "prov:generated": [e["@id"] for e in output_entities],
}

# === Agents and ownership roles (A owns Fact1; B owns Fact2) ===
human_agents = [
    {"@id": "urn:person:A", "@type": "prov:Agent", "dct:title": STUDENT_A},
    {"@id": "urn:person:B", "@type": "prov:Agent", "dct:title": STUDENT_B},
]
software_agents = [
    {"@id": "urn:software:jupyter",  "@type": "prov:SoftwareAgent", "dct:title": "Jupyter Notebook"},
    {"@id": "urn:software:postgres", "@type": "prov:SoftwareAgent", "dct:title": "PostgreSQL"},
    {"@id": "urn:software:os",       "@type": "prov:SoftwareAgent", "dct:title": os.name},
]
activity["prov:wasAssociatedWith"] = [a["@id"] for a in (human_agents + software_agents)]
activity["prov:qualifiedAssociation"] = [
    {"@type": "prov:Association", "prov:agent": "urn:person:A", "prov:hadRole": f"owner:{FACT1_NAME}"},
    {"@type": "prov:Association", "prov:agent": "urn:person:B", "prov:hadRole": f"owner:{FACT2_NAME}"},
]

# === Derivations from declared OLTP sources -> DWH facts/dims (time dims not included) ===
derivations = []
def _add_derivations(target_table: str, mapping: dict):
    tgt_iri = output_index.get(target_table)
    if not tgt_iri: return
    # fact-level lineage
    for src in mapping.get("fact", []):
        src_iri = csv_index.get(src)
        if src_iri:
            derivations.append({
                "@id": f"{tgt_iri}#derivation-{Path(str(src)).stem}",
                "@type": "prov:Derivation",
                "prov:generatedEntity": tgt_iri,
                "prov:usedEntity": src_iri,
                "dct:description": mapping.get("why", "")
            })
    # dimension-level lineage (non-time dims only)
    for dim_name, sources in mapping.get("dimensions", {}).items():
        dim_iri = output_index.get(dim_name) or tgt_iri
        for src in sources:
            src_iri = csv_index.get(src)
            if src_iri:
                derivations.append({
                    "@id": f"{dim_iri}#derivation-{dim_name}-{Path(str(src)).stem}",
                    "@type": "prov:Derivation",
                    "prov:generatedEntity": dim_iri,
                    "prov:usedEntity": src_iri,
                    "dct:description": f"{dim_name} ← {Path(str(src)).name}"
                })

_add_derivations(FACT1_NAME, FACT1_SOURCES_OLTP)
_add_derivations(FACT2_NAME, FACT2_SOURCES_OLTP)

# === Minimal metadata on fact entities: grain + measures ===
for tname, grain, measures in [
    (FACT1_NAME, FACT1_GRAIN, FACT1_MEASURES),
    (FACT2_NAME, FACT2_GRAIN, FACT2_MEASURES),
]:
    iri = output_index.get(tname)
    if not iri: continue
    for ent in output_entities:
        if ent["@id"] == iri:
            ent["dct:description"] = f"Grain: {grain}"
            ent["measures"] = measures
            break

# === Assemble JSON-LD and write ===
doc = {
    "@context": {
        "prov": "http://www.w3.org/ns/prov#",
        "dct":  "http://purl.org/dc/terms/",
        "xsd":  "http://www.w3.org/2001/XMLSchema#"
    },
    "@graph": csv_entities + etl_entities + output_entities + [activity]
              + human_agents + software_agents + derivations,
}

prov_dir = (root_dir / "prov")
prov_dir.mkdir(exist_ok=True)
outfile = prov_dir / f"prov_airq_dwh_{GROUP}.jsonld"
with outfile.open("w", encoding="utf-8") as f:
    json.dump(doc, f, ensure_ascii=False, indent=2)

print("✓ Wrote", outfile)
print("Preview (first ~24 lines):\n")
for i, line in enumerate(json.dumps(doc, indent=2).splitlines()):
    if i >= 24: break
    print(line)


✓ Wrote /Users/kerimhalilovic/Documents/GitHub/bi-2025W-ass1/prov/prov_airq_dwh_006.jsonld
Preview (first ~24 lines):

{
  "@context": {
    "prov": "http://www.w3.org/ns/prov#",
    "dct": "http://purl.org/dc/terms/",
    "xsd": "http://www.w3.org/2001/XMLSchema#"
  },
  "@graph": [
    {
      "@id": "file:csv/tb_alert.csv",
      "@type": "prov:Entity",
      "dct:title": "tb_alert.csv",
      "dct:format": "text/csv",
      "contentHash": "71010ed19fe40c7b53cb87b0f7d8e4e32b79f9c472a37030b076e4aa7f3c1682"
    },
    {
      "@id": "oltp://tb_campaign_city",
      "@type": "prov:Entity",
      "dct:title": "tb_campaign_city",
      "dct:format": "text/csv"
    },
    {
      "@id": "file:csv/tb_city.csv",
      "@type": "prov:Entity",
      "dct:title": "tb_city.csv",



## 10) Submission checklist (put these in your **ZIP**)

- `csv/` — 15 "original" OLTP snapshot CSVs
- `data/` — Your **Table X** and **Table Y** CSVs
- `ddl/` — Your DDL scripts 
- `etl/` — Your `a1_etl*.sql` files (ETL scripts)
- `post/` — Your `a1_check*.sql` files (5–7 concise checks)
- `prov/` — Your auto-generated provenance JSON-LD file
- `sqldump/` — `sqldump_airq_dwh_xxx.sql`  
- `AirQ_Part1_xxx.ipynb`
- `AirQ_ERD_dwh_xxx.png|pdf`
- `group_xxx.txt`
- `Report_Part1_Group_xxx.pdf`


### 