
# **AirQ_Part2_xxx** — Assignment 2 (ETL + OLAP with Atoti)

- This notebook orchestrates Assignment 2.
- All SQL must live in external `.sql` files under `ddl/`, `etl/`, and `sql/`. 
- All MDX must live in external `.mdx` files under `mdx/`.

**Final folder layout (per‑group, self‑contained)**

```
BI_Projects/
  DWH2_xxx/
    csv/       # 15 OLTP CSV files
    ddl/       # DDL only (staging, warehouse)
    etl/       # ETL steps: a2_etl*.sql files
    mdx/       # MDX-queries in .mdx files: a2_q{NN}_{A|B}.mdx
    mdx_out/   # CSV files with the results of MDX-queries
    pdf/       # PDF files with dashboard exports: a2_q{NN}.pdf
    sql/       # SQL-queries in .sql files: a2_q{NN}_{A|B}.sql
    sqldump/   # Export produced by pg_dump
    AirQ_Part2_xxx.ipynb
    group_xxx.txt
    Report_Part2_Group_xxx.pdf
```
> Replace `xxx` in your file names with your **three‑digit** group number.


## Contents
1. Configuration & preflight (group, paths)  
2. Database connection
3. Reset and create staging schema (`stg2_xxx`) from DDL file 
4. Load CSVs into stg2_xxx (order-sensitive)  
5. Reset and create warehouse (`dwh2_xxx`) from DDL file 
6. ETL runner (executes `etl/a2_etl*.sql`)  
7. SQL queries
8. Atoti setup and build the OLAP cube (scaffold)
9. Define hierarchies and measures
10. MDX queries
11. Batch executor: run all .mdx → CSV (+ an index)
12. Create database dump
13. Submission checklist


## 1) Configuration & preflight

In [1]:
# === Parameters ===
# XXX = "001"               # # three digits, e.g. "007"
# ...
# XXX = "031"               # # three digits, e.g. "007"
# ...
# XXX = "071"               # # three digits, e.g. "007"
# ...
# XXX = "199"               # # three digits, e.g. "007"
XXX = "080"               # # three digits, e.g. "007"

VERBOSE_SQL = False             # print progress when running .sql files
LOAD_ORDER_CSV = []             # or fill later
LOAD_ORDER_DATA = []            # or fill later

In [2]:
import re, time
import shutil, subprocess, os
import json, hashlib

from pathlib import Path
from getpass import getpass
from urllib.parse import quote_plus
from datetime import datetime, timezone

import pandas as pd
import sqlalchemy as sa
import sqlparse
from sqlalchemy import create_engine, text, engine

import atoti as tt

Welcome to Atoti 0.9.9!

By using this community edition, you agree with the license available at https://docs.activeviam.com/products/atoti/python-sdk/latest/eula.html.
Browse the official documentation at https://docs.activeviam.com/products/atoti/python-sdk.
Join the community at https://www.atoti.io/register.

Atoti collects telemetry data, which is used to help understand how to improve the product.
If you don't wish to send usage data, you can request a trial license at https://www.atoti.io/evaluation-license-request.

You can hide this message by setting the `ATOTI_HIDE_EULA_MESSAGE` environment variable to True.


In [3]:
!pip show atoti

Name: atoti
Version: 0.9.9
Summary: Explore metrics across hundreds of dimensions, analyze live data at its most granular level and perform what-if simulations at unparalleled speed
Home-page: https://www.atoti.io
Author: 
Author-email: ActiveViam <dev@atoti.io>
License: 
Location: /opt/miniconda3/envs/dwh/lib/python3.12/site-packages
Requires: atoti-client, atoti-server, jdk4py
Required-by: 


In [4]:
# === Toggles & paths ===
root_dir = Path.cwd()
csv_dir = root_dir / "csv"
ddl_dir = root_dir / "ddl"
etl_dir = root_dir / "etl"
mdx_dir = root_dir / "mdx"
mdx_out_dir = root_dir / "mdx_out"
sql_dir = root_dir / "sql"
sqldump_dir = root_dir / "sqldump"

SCHEMA_STG = f"stg2_{XXX}"
SCHEMA_DWH = f"dwh2_{XXX}"

# files we expect in the ddl subfolder
STG2_RESET  = ddl_dir / f"airq_reset_stg2_{XXX}.sql"
STG2_CREATE = ddl_dir / f"airq_create_stg2_{XXX}.sql"
DWH2_RESET  = ddl_dir / f"airq_reset_dwh2_{XXX}.sql"
DWH2_CREATE = ddl_dir / f"airq_create_dwh2_{XXX}.sql"

print("CSV dir:", csv_dir)
print("DDL dir:", ddl_dir)
print("ETL dir:", etl_dir)
print("MDX dir:", mdx_dir)
print("MDX_out dir:", mdx_out_dir)
print("SQL dir:", sql_dir)
print("SQLdump dir:", sqldump_dir)

CSV dir: /Users/philippmoessner/Desktop/Business Intelligence/tuwien_bi/DWH2_080/csv
DDL dir: /Users/philippmoessner/Desktop/Business Intelligence/tuwien_bi/DWH2_080/ddl
ETL dir: /Users/philippmoessner/Desktop/Business Intelligence/tuwien_bi/DWH2_080/etl
MDX dir: /Users/philippmoessner/Desktop/Business Intelligence/tuwien_bi/DWH2_080/mdx
MDX_out dir: /Users/philippmoessner/Desktop/Business Intelligence/tuwien_bi/DWH2_080/mdx_out
SQL dir: /Users/philippmoessner/Desktop/Business Intelligence/tuwien_bi/DWH2_080/sql
SQLdump dir: /Users/philippmoessner/Desktop/Business Intelligence/tuwien_bi/DWH2_080/sqldump



## 2) Make database connection


In [5]:
# === Minimal config & connect ===
DB_USER = f"grp_{XXX}"
DB_NAME = "airq"
DB_HOST = "localhost"
DB_PORT = "5432"

# a password is asked once per run; enter empty password if your local pg_hba allows trust/peer
pw = getpass(f"Password for {DB_USER}@{DB_HOST}:{DB_PORT}/{DB_NAME} (leave empty if not needed): ")
DSN = f"postgresql+psycopg2://{DB_USER}:{quote_plus(pw)}@{DB_HOST}:{DB_PORT}/{DB_NAME}" if pw \
      else f"postgresql+psycopg2://{DB_USER}@{DB_HOST}:{DB_PORT}/{DB_NAME}"

def _mask_dsn(dsn: str) -> str:
    try:
        return str(engine.make_url(dsn).set(password="***"))
    except Exception:
        return re.sub(r"://([^:@]+)(?::[^@]*)?@", r"://\\1:***@", dsn)

engine = create_engine(DSN, future=True, pool_pre_ping=True)
print("Connecting via:", _mask_dsn(DSN))

with engine.begin() as conn:
    # best-effort: set the role if it exists; don't crash if not
    try:
        conn.exec_driver_sql(f"SET ROLE grp_{XXX}")
        print(f"SET ROLE grp_{XXX} ✓")
    except Exception as e:
        print(f"(no SET ROLE: {e.__class__.__name__})")
    who = conn.exec_driver_sql("select current_user").scalar_one()
    print("current_user:", who)


Connecting via: postgresql+psycopg2://\1:***@localhost:5432/airq
SET ROLE grp_080 ✓
current_user: grp_080


In [6]:
def run_sqlscript(
    path: str,
    *,
    engine,
    progress: bool = True,      # progress/verbosity- show progress OR keep output quiet
    add_search_path: bool = False,
    schema_dwh: str | None = None,
    schema_stg: str | None = None,
    title: str | None = None,      # optional title
    strip_psql_meta: bool = True,  # psql meta stripping
):
    """
    Execute all statements in a .sql file.
    - Returns the LAST result set as a pandas.DataFrame if any statement returns rows; else None.
    - Set progress=False to suppress progress/header prints (great for check scripts).
    """

    raw = Path(path).read_text(encoding="utf-8")

    # Strip psql meta-commands (e.g., \i, \set) if requested
    if strip_psql_meta:
        raw = "\n".join(
            line for line in raw.splitlines()
            if not line.lstrip().startswith("\\")
        )

    # Optional search_path prologue
    prologue = ""
    if add_search_path:
        schs = [s for s in (schema_dwh, schema_stg) if s]
        if schs:
            prologue = f"SET search_path TO {', '.join(schs)};\n"

    script = prologue + raw
    stmts = [s.strip() for s in sqlparse.split(script) if s and s.strip(" ;\n\t")]

    if progress:
        hdr = f"▶ {title}" if title else "▶ Running SQL script"
        print(f"{hdr}: {path} ({len(stmts)} statements)")
    t0 = time.time()

    last_df = None
    with engine.begin() as conn:
        for i, stmt in enumerate(stmts, start=1):
            if not stmt:
                continue
            start = time.time()
            try:
                if progress:
                    preview = " ".join(stmt.split())[:120]
                    print(f"  {i:>3}: {preview} ...")

                cursor = conn.exec_driver_sql(stmt)

                if cursor.returns_rows:
                    rows = cursor.fetchall()
                    cols = cursor.keys()
                    last_df = pd.DataFrame(rows, columns=cols)

                if progress:
                    print(f"       OK ({time.time() - start:.3f}s)")

            except Exception as e:
                # Raise with a helpful preview even when progress=False
                preview = " ".join(stmt.split())[:160]
                raise RuntimeError(
                    f"SQL error in statement #{i}: {preview}"
                ) from e

    if progress:
        print(f"✅ Done in {time.time() - t0:.2f}s")

    return last_df

## 3) Reset and create **staging schema** (`stg2_xxx`) from DDL file

In [7]:
print(f"== STAGING-ONLY RESET: stg2_{XXX} ==")
try:
    for p in (STG2_RESET, STG2_CREATE):
        run_sqlscript(p, engine=engine, progress=VERBOSE_SQL)
except Exception as e:
    print(f"!! Reset & create failed: {e}")
    raise

== STAGING-ONLY RESET: stg2_080 ==


## 4) Load CSV → `stg2_xxx` with Pandas `.to_sql()`

In [8]:
def load_folder_to_stg(
    folder_name: str,
    engine,
    SCHEMA_STG: str,
    load_order=None,
    if_exists: str = "append",
    chunksize: int = 20000,
):
    global root_dir  # expected to be defined earlier
    src_dir = Path(root_dir) / folder_name
    if not src_dir.exists():
        raise FileNotFoundError(f"Folder not found: {src_dir}")

    def load_one(name: str):
        path = src_dir / f"{name}.csv"
        if not path.exists():
            print("Missing CSV:", path.name)
            return 0
        df = pd.read_csv(
            path,
            na_values=["\\N"],
            keep_default_na=False,
            low_memory=False,
        )
        # Convert any *...from / ...to / ...at* to DATE
        for col in df.columns:
            col_l = col.lower()
            if col_l.endswith(("from", "to", "at")):
                df[col] = pd.to_datetime(df[col], format="%Y-%m-%d", errors="coerce").dt.date
        # Write
        df.to_sql(
            name,
            con=engine,
            schema=SCHEMA_STG,
            if_exists=if_exists,
            index=False,
            method="multi",
            chunksize=chunksize,
        )
        print(f"Loaded {len(df):,} rows → {SCHEMA_STG}.{name}")
        return len(df)

    if not load_order:
        discovered = sorted([p.stem for p in src_dir.glob("*.csv")])
        print("No order set yet. CSVs found:", discovered)
        return

    t0 = time.time()
    total = 0
    for name in load_order:
        total += load_one(name)
    print(f"⏱️ Total load time: {time.time() - t0:.2f} seconds · {total:,} rows")

In [9]:
# Loading of original 15 CSV files in the correct order
LOAD_ORDER_CSV = ["tb_servicetype","tb_role","tb_employee","tb_country","tb_city","tb_readingmode",
                  "tb_alert","tb_param","tb_paramalert","tb_sensortype","tb_paramsensortype","tb_sensordevice",
                  "tb_weather","tb_serviceevent","tb_readingevent"]

load_folder_to_stg("csv", engine, SCHEMA_STG, load_order=LOAD_ORDER_CSV,  if_exists="append")

Loaded 24 rows → stg2_080.tb_servicetype
Loaded 16 rows → stg2_080.tb_role
Loaded 484 rows → stg2_080.tb_employee
Loaded 20 rows → stg2_080.tb_country
Loaded 36 rows → stg2_080.tb_city
Loaded 8 rows → stg2_080.tb_readingmode
Loaded 4 rows → stg2_080.tb_alert
Loaded 30 rows → stg2_080.tb_param
Loaded 120 rows → stg2_080.tb_paramalert
Loaded 12 rows → stg2_080.tb_sensortype
Loaded 115 rows → stg2_080.tb_paramsensortype
Loaded 627 rows → stg2_080.tb_sensordevice
Loaded 26,316 rows → stg2_080.tb_weather
Loaded 22,720 rows → stg2_080.tb_serviceevent
Loaded 985,573 rows → stg2_080.tb_readingevent
⏱️ Total load time: 94.53 seconds · 1,036,105 rows


## 5) Reset and create **warehouse** (`dwh2_xxx`) from DDL file

In [10]:
print(f"== DWH-ONLY RESET: dwh2_{XXX} ==")
try:
    for p in (DWH2_RESET, DWH2_CREATE):
        run_sqlscript(p, engine=engine, progress=VERBOSE_SQL)
except Exception as e:
    print(f"!! Reset & create failed: {e}")
    raise

== DWH-ONLY RESET: dwh2_080 ==



## 6) SQL-first ETL — run all files in etl/

We execute **all** files matching `etl/a2_etl*.sql` in lexicographic order. Every ETL file must begin with `SET search_path TO dwh2_xxx, stg2_xxx;`  



In [12]:
steps = sorted(etl_dir.glob("a2_etl*.sql"))
if not steps:
    print("No ETL step files found in etl/ (expected a2_etl*.sql).")
else:
    for s in steps:
        run_sqlscript(s, engine=engine, progress=VERBOSE_SQL)

## 7) SQL-queries 

In [None]:
# Business question Q01
# Exceeded days of param "PM2" in Q1 of 2024 by country 
df = run_sqlscript("sql/a2_q01.sql", engine=engine, progress=VERBOSE_SQL)
display(df)

Unnamed: 0,country_name,january,february,march
0,Austria,14,6,8
1,Belgium,6,2,4
2,Croatia,6,2,5
3,Czech Republic,7,3,5
4,Denmark,1,6,7
5,Finland,2,2,1
6,France,16,14,14
7,Germany,25,18,27
8,Greece,9,7,11
9,Hungary,7,3,7


In [12]:
# Business question Q05
# Total data volume per param category recorded in 2023 and 2024
df = run_sqlscript("sql/a2_q05.sql", engine=engine, progress=VERBOSE_SQL)
display(df)

Unnamed: 0,category,data volume (kb) 2023,data volume (kb) 2024
0,Volatile Organic Compound,4962229,4974177
1,Biological,6328259,6298621
2,Gas,5599952,5597083
3,Particulate matter,5257962,5267659
4,Heavy Metal,4961428,4955113


In [14]:
# Business question Q09
# Number of reading events by country (top 10 countries regarding their total no of reading events) in 2024's quarters
df = run_sqlscript("sql/a2_q09.sql", engine=engine, progress=VERBOSE_SQL)
display(df)

Unnamed: 0,country_name,events_q1,events_q2,events_q3,events_q4
0,Turkey,22618,22383,22562,22277
1,Russia,17562,17851,18006,17648
2,Germany,13509,13244,13718,13759
3,United Kingdom,10797,10802,11045,11075
4,France,8845,9043,9050,8942
5,Austria,7287,7168,7302,7284
6,Italy,6593,6617,6695,6648
7,Sweden,5080,5025,5256,5275
8,Czech Republic,3965,4138,4110,4144
9,Greece,3153,3191,3267,3201


In [80]:
# Business question Q13
# Number of exceeded (level none or yellow) days for Q1 in 2023 per city
df = run_sqlscript("sql/a2_q13.sql", engine=engine, progress=VERBOSE_SQL)
display(df)

Unnamed: 0,city_name,exceed_days_jan,exceed_days_feb,exceed_days_mar
0,Amsterdam,11,11,17
1,Ankara,77,60,23
2,Athens,0,0,3
3,Barcelona,13,4,3
4,Belgrade,16,5,6
5,Berlin,9,1,3
6,Brno,12,30,37
7,Brussels,46,38,61
8,Budapest,19,27,38
9,Copenhagen,7,7,4


In [81]:
# Business question Q15
# Number of exceeded days for all eastern european countries in 2023 and 2024
df = run_sqlscript("sql/a2_q15.sql", engine=engine, progress=VERBOSE_SQL)
display(df)

Unnamed: 0,country_name,exceed_days_23,exceed_days_24
0,Belarus,1537,1474
1,Serbia,798,804
2,Greece,3194,3128
3,Turkey,12015,11798
4,Russia,12093,12074


In [None]:
# Business question Q31 (example)
df = run_sqlscript("sql/a2_q31.sql", engine=engine, progress=VERBOSE_SQL)
display(df)

In [None]:
# Business question Q32 (example)
df = run_sqlscript("sql/a2_q32.sql", engine=engine, progress=VERBOSE_SQL)
display(df)

In [None]:
# Business question Q33 (example)
df = run_sqlscript("sql/a2_q33.sql", engine=engine, progress=VERBOSE_SQL)
display(df)

## 8) Atoti setup and build cube (scaffold)

In [15]:
os.environ.pop("JAVA_HOME", None)  # let Atoti use its own JDK via jdk4py

# Start a new Atoti session
session = tt.Session.start()

# URL to the Atoti web app
session.url

'http://localhost:51011'

In [16]:
def upsert_table(session, name, df, *, keys=None, defaults=None, dtypes=None):
    if name in session.tables.keys():
        t = session.tables[name]
        t.drop()  # delete all rows, keep schema
        if defaults:  # non-nullability even for existing tables
            for col, val in defaults.items():
                t[col].default_value = val  # set after creation too
        t.load(df)
    else:
        t = session.read_pandas(
            df,
            table_name=name,
            keys=keys or (),
            default_values=defaults or {},   # set at creation time
            data_types=dtypes or {},
        )
    return t

In [17]:
# Load star-schema tables to DataFrames
df_time   = pd.read_sql(f"SELECT * FROM dwh2_{XXX}.dim_timemonth", engine)
df_city   = pd.read_sql(f"SELECT * FROM dwh2_{XXX}.dim_city", engine)
df_param  = pd.read_sql(f"SELECT * FROM dwh2_{XXX}.dim_param", engine)
df_alert  = pd.read_sql(f"SELECT * FROM dwh2_{XXX}.dim_alertpeak", engine)
df_fact   = pd.read_sql(f"SELECT * FROM dwh2_{XXX}.ft_param_city_month", engine)

time_store  = upsert_table(session, "dim_timemonth", df_time,
                           keys=["month_key"],
                           defaults={"year_num": 0, "quarter_num": 0, "month_name": "Unknown"},
                           dtypes={"year_num": "int", "quarter_num": "int"})

city_store  = upsert_table(session, "dim_city", df_city,
                           keys=["city_key"],
                           defaults={"region_name": "Unknown", "country_name": "Unknown", "city_name": "Unknown"})

param_store = upsert_table(session, "dim_param", df_param,
                           keys=["param_key"],
                           defaults={"purpose": "Unknown", "category": "Unknown", "param_name": "Unknown"})

ap_store    = upsert_table(session, "dim_alertpeak", df_alert,
                           keys=["alertpeak_key"],
                           defaults={"alert_level_name": "None"})

fact_store = upsert_table(session, "ft_param_city_month", df_fact, 
                          keys=["ft_pcm_key"], 
                          defaults={"month_key": 0, "city_key": 0, "param_key": 0, "alertpeak_key": 1000,  # FKs
                                    "reading_events_count": 0, "devices_reporting_count": 0, 
                                    "data_volume_kb_sum": 0, "recordedvalue_avg": 0.0, "recordedvalue_p95": 0.0, 
                                    "exceed_days_any": 0, "data_quality_avg": 0.0, "missing_days": 0, 
                                   }, 
                          dtypes={"month_key": "int", "city_key": "int", 
                                  "param_key": "int", "alertpeak_key": "int", 
                                  "reading_events_count": "int", "devices_reporting_count": "int", 
                                  "data_volume_kb_sum": "int", "recordedvalue_avg": "float", 
                                  "recordedvalue_p95": "float", "exceed_days_any": "int", 
                                  "data_quality_avg": "float", "missing_days": "int", 
                                 },
                         )

# Define joins once per fresh session - can re-run the cell without redefining joins
if not getattr(session, "_airq_joins_done", False):
    fact_store.join(time_store,   fact_store["month_key"]     == time_store["month_key"])
    fact_store.join(city_store,   fact_store["city_key"]      == city_store["city_key"])
    fact_store.join(param_store,  fact_store["param_key"]     == param_store["param_key"])
    fact_store.join(ap_store,     fact_store["alertpeak_key"] == ap_store["alertpeak_key"])
    session._airq_joins_done = True

# Create or reuse the cube
cube_name = "AirQ Cube"
cube = (
    session.cubes[cube_name]
    if cube_name in session.cubes.keys()
    else session.create_cube(fact_store, cube_name, mode="manual")
)

# Access cube components
m, h, l = cube.measures, cube.hierarchies, cube.levels

cube

## 9) Define hierarchies and measures
Define explicit hierarchies in Atoti:

1) Time: Year → Quarter → Month,
2) Geo: Region → Country → City,
3) Param: Purpose → Category → Param,
4) Alert: Level (sorted by rank).

In [18]:
# TODO: define hierarchies
h["Time"] = [
    time_store["year_num"],
    time_store["quarter_num"],
    time_store["month_name"],
]
h["Geo"] = [
    city_store["region_name"],
    city_store["country_name"],
    city_store["city_name"],
]
h["Param"] = [
    param_store["purpose"],
    param_store["category"],
    param_store["param_name"],
]
h["Alert"] = [
    ap_store["alert_level_name"],
]

# TODO: define measures
m["Reading Events"] = tt.agg.sum(fact_store["reading_events_count"])        # fully additive
m["Devices Reporting"] = tt.agg.sum(fact_store["devices_reporting_count"])  # fully additive
m["Data Volume (KB)"] = tt.agg.sum(fact_store["data_volume_kb_sum"])        # fully additive
m["Missing Days"] = tt.agg.sum(fact_store["missing_days"])                  # fully additive
m["Exceed Days (any)"] = tt.agg.sum(fact_store["exceed_days_any"])          # fully additive
m["Avg Recorded Value"] = tt.agg.mean(fact_store["recordedvalue_avg"])      # semi-additive
m["P95 Recorded Value"] = tt.agg.mean(fact_store["recordedvalue_p95"])      # semi-additive
m["Avg Data Quality"] = tt.agg.mean(fact_store["data_quality_avg"])         # semi-additive


In [19]:
# order months as in calendar, not alphabetically
month_lvl = cube.hierarchies["Time"]["month_name"]
month_lvl.order = tt.CustomOrder(first_elements=["Jan","Feb","Mar","Apr","May","Jun",
                                                 "Jul","Aug","Sep","Oct","Nov","Dec"])

In [20]:
# order alert levels from least to most harmful
alert_lvl = cube.hierarchies["Alert"]["alert_level_name"]
alert_lvl.order = tt.CustomOrder(first_elements=["None", "Yellow", "Orange", "Red", "Crimson"])

In [21]:
cube

In [22]:
print("\nHierarchies and their levels:")
for h_name, hierarchy in cube.hierarchies.items():
    level_names = [getattr(level, "name", str(level)) for level in hierarchy]
    print(f" - {h_name} → levels: {level_names}")

print("\Measures:")
for m in cube.measures.keys():
    print("  -", m)    


Hierarchies and their levels:
 - ('dim_timemonth', 'Time') → levels: ['year_num', 'quarter_num', 'month_name']
 - ('dim_param', 'Param') → levels: ['purpose', 'category', 'param_name']
 - ('dim_alertpeak', 'Alert') → levels: ['alert_level_name']
 - ('dim_city', 'Geo') → levels: ['region_name', 'country_name', 'city_name']
\Measures:
  - Avg Data Quality
  - P95 Recorded Value
  - Avg Recorded Value
  - Devices Reporting
  - contributors.COUNT
  - Exceed Days (any)
  - Reading Events
  - Data Volume (KB)
  - update.TIMESTAMP
  - Missing Days


  print("\Measures:")


## 10) MDX queries

In [23]:
# MDX cell magic: let us write MDX code like this:
#   %%mdx
#   SELECT ... FROM [AirQ Cube]
#
# Requirements: a live `session` from atoti and the cube already created.

from IPython.core.magic import register_cell_magic
from IPython.display import display

@register_cell_magic
def mdx(line, cell):
    """Run MDX in this cell and display a DataFrame.
    Usage:
        %%mdx
        SELECT ...
        FROM [AirQ Cube]
    """
    q = cell.strip()
    df = session.query_mdx(q)   # Atoti returns levels on index, measures as columns
    return df                   # df = _


In [72]:
%%mdx
-- 3. For PM10 in 2024, show the total Exceed Days (any) by City. Return one row per city and a
-- single column with the total number of exceedance days for that year.

SELECT
    { [Measures].[Exceed Days (any)] } ON COLUMNS,
    [dim_city].[Geo].[city_name].Members ON ROWS
FROM
    [AirQ Cube]
WHERE (
    [dim_timemonth].[Time].[year_num].&[2024],
    [dim_param].[Param].[param_name].&[PM10]
)

Unnamed: 0_level_0,Unnamed: 1_level_0,Unnamed: 2_level_0,Exceed Days (any)
region_name,country_name,city_name,Unnamed: 3_level_1
Central Europe,Austria,Graz,111.0
Central Europe,Austria,Salzburg,74.0
Central Europe,Austria,Vienna,77.0
Central Europe,Croatia,Zagreb,93.0
Central Europe,Czech Republic,Brno,64.0
Central Europe,Czech Republic,Prague,142.0
Central Europe,Germany,Berlin,126.0
Central Europe,Germany,Hamburg,66.0
Central Europe,Germany,Leipzig,5.0
Central Europe,Germany,Munich,55.0


In [74]:
%%mdx
-- 7. For parameter PM10, show Avg Recorded Value and P95 Recorded Value by Country for 2023.
-- Return Countries on rows and two columns—Avg Recorded Value and P95 Recorded Value for the year 2023.

SELECT
    { [Measures].[Avg Recorded Value], [Measures].[P95 Recorded Value] } ON COLUMNS,
    [dim_city].[Geo].[country_name].Members ON ROWS
FROM
    [AirQ Cube]
WHERE (
    [dim_timemonth].[Time].[year_num].&[2023],
    [dim_param].[Param].[param_name].&[PM10]
)

Unnamed: 0_level_0,Unnamed: 1_level_0,Avg Recorded Value,P95 Recorded Value
region_name,country_name,Unnamed: 2_level_1,Unnamed: 3_level_1
Central Europe,Austria,81.95,157.92
Central Europe,Croatia,79.75,172.92
Central Europe,Czech Republic,79.1,158.75
Central Europe,Germany,80.1,155.59
Central Europe,Hungary,79.43,155.64
Central Europe,Poland,76.11,152.34
Eastern Europe,Belarus,74.54,151.62
Eastern Europe,Greece,95.07,185.79
Eastern Europe,Russia,78.04,169.22
Eastern Europe,Serbia,70.63,120.25


In [None]:
%%mdx
-- 11. For 2024, show Exceed Days (any) by Region for Param Category = ‘Gas’. Return Regions on rows 
-- and one column with Exceed Days (any) for the year 2024, filtered to Category = Gas.

SELECT
    { [Measures].[Exceed Days (any)] } ON COLUMNS,
    [dim_city].[Geo].[region_name].Members ON ROWS
FROM
    [AirQ Cube]
WHERE (
    [dim_timemonth].[Time].[year_num].&[2024],
    [dim_param].[Param].[category].&[Gas]
)

Unnamed: 0_level_0,Exceed Days (any)
region_name,Unnamed: 1_level_1
Central Europe,827.0
Eastern Europe,759.0
Unknown,
Western Europe,938.0


In [77]:
%%mdx

-- 19. For 2024, show Missing Days and Data Volume (KB) totals by City in Central Europe. Return
-- Cities in Central Europe on rows and two columns - Missing Days and Data Volume (KB) - for the year 2024.

SELECT
    { [Measures].[Missing Days], [Measures].[Data Volume (KB)] } ON COLUMNS,
    [dim_city].[Geo].[city_name].Members ON ROWS
FROM(
  SELECT ( [dim_timemonth].[Time].[year_num].&[2024] ) ON 0 FROM (
    SELECT (
      FILTER(
        [dim_city].[Geo].[region_name].Members,
        ANCESTOR(
          [dim_city].[Geo].CurrentMember,
          [dim_city].[Geo].[region_name]
        ).Name = "Central Europe"
      )
    ) ON 0 FROM [AirQ Cube]
  )
)

Unnamed: 0_level_0,Unnamed: 1_level_0,Unnamed: 2_level_0,Missing Days,Data Volume (KB)
region_name,country_name,city_name,Unnamed: 3_level_1,Unnamed: 4_level_1
Central Europe,Austria,Graz,3638,435302
Central Europe,Austria,Salzburg,4672,444499
Central Europe,Austria,Vienna,2723,724262
Central Europe,Croatia,Zagreb,4714,436861
Central Europe,Czech Republic,Brno,3751,267548
Central Europe,Czech Republic,Prague,3501,630191
Central Europe,Germany,Berlin,1757,1067404
Central Europe,Germany,Hamburg,5024,434344
Central Europe,Germany,Leipzig,5194,399402
Central Europe,Germany,Munich,3644,648363


In [79]:
%%mdx
-- 28. For 2024, list the Top 10 Countries by Missing Days. Return the 10 countries with the highest
-- totals on rows (highest → lowest) and one column with Missing Days for 2024.

SELECT
  { [Measures].[Missing Days] } ON COLUMNS,
  TOPCOUNT(
    NONEMPTY(
      [dim_city].[Geo].[country_name].Members,
      [Measures].[Missing Days]
    ),
    10, [Measures].[Missing Days]
  ) ON ROWS
FROM [AirQ Cube]
WHERE (
  [dim_timemonth].[Time].[year_num].&[2024]
)

Unnamed: 0_level_0,Unnamed: 1_level_0,Missing Days
region_name,country_name,Unnamed: 2_level_1
Central Europe,Germany,19126
Central Europe,Austria,11033
Western Europe,France,10299
Eastern Europe,Russia,9299
Central Europe,Czech Republic,7252
Western Europe,Sweden,6949
Western Europe,Italy,6484
Western Europe,United Kingdom,5512
Central Europe,Croatia,4714
Western Europe,Finland,4573


### 10.1) Business question Q31 (example)

In [56]:
%%mdx

-- 31. For parameter O3, list the Top 10 Cities by P95 Recorded Value for 2023.
-- Return the 10 cities with the highest values on rows (highest → lowest) and one column with P95 Recorded Value for 2023.
SELECT
  { [Measures].[P95 Recorded Value] } ON COLUMNS,
  TOPCOUNT(
    NONEMPTY(
      [dim_city].[Geo].[city_name].Members,
      [Measures].[P95 Recorded Value]
    ),
    10, [Measures].[P95 Recorded Value]
  ) ON ROWS
FROM [AirQ Cube]
WHERE (
  [dim_timemonth].[Time].[year_num].&[2023],
  [dim_param].[Param].[param_name].&[O3]
)

Unnamed: 0_level_0,Unnamed: 1_level_0,Unnamed: 2_level_0,P95 Recorded Value
region_name,country_name,city_name,Unnamed: 3_level_1
Central Europe,Czech Republic,Prague,152.58
Central Europe,Germany,Hamburg,150.75
Eastern Europe,Greece,Athens,142.47
Western Europe,United Kingdom,London,132.38
Eastern Europe,Turkey,Istanbul,131.14
Western Europe,United Kingdom,Edinburgh,126.17
Western Europe,Denmark,Copenhagen,124.67
Central Europe,Germany,Stuttgart,119.63
Central Europe,Austria,Salzburg,119.14
Eastern Europe,Russia,Kazan,117.25


### 10.2) Business question Q32 (example)

In [66]:
%%mdx 

-- 32. For 2024, show Data Volume (KB) by City for category ‘Volatile Organic Compound’, and list the Top 10 cities.
-- Return the Top 10 cities on rows (highest -> lowest) and one column with Data Volume (KB) for 2024, limited to the Volatile Organic Compound category.
SELECT
  { [Measures].[Data Volume (KB)] } ON COLUMNS,
  TOPCOUNT(
    NONEMPTY([dim_city].[Geo].[city_name].Members, [Measures].[Data Volume (KB)]),
    10, [Measures].[Data Volume (KB)]
  ) ON ROWS
FROM (
  SELECT ( [dim_timemonth].[Time].[year_num].&[2024] ) ON 0 FROM (
    SELECT (
      FILTER(
        [dim_param].[Param].[param_name].Members,
        ANCESTOR(
          [dim_param].[Param].CurrentMember,
          [dim_param].[Param].[category]
        ).Name = "Volatile Organic Compound"
      )
    ) ON 0 FROM [AirQ Cube]
  )
)

Unnamed: 0_level_0,Unnamed: 1_level_0,Unnamed: 2_level_0,Data Volume (KB)
region_name,country_name,city_name,Unnamed: 3_level_1
Eastern Europe,Turkey,Istanbul,803441
Western Europe,United Kingdom,London,330327
Eastern Europe,Russia,Moscow,257108
Central Europe,Germany,Berlin,224658
Eastern Europe,Russia,St. Petersburg,215185
Western Europe,France,Paris,196835
Western Europe,Italy,Rome,188638
Eastern Europe,Russia,Ufa,174256
Western Europe,Denmark,Copenhagen,148863
Central Europe,Austria,Vienna,146300


### 10.3) Business question Q33 (example)

In [67]:
%%mdx

-- 33. For parameter PM4 in 2024, return for each Country the Month with the highest Avg Data Quality.
-- Return one row per Country × Month (the month with the highest Avg Data Quality in 2024) and one column with Avg Data Quality.
SELECT
  { [Measures].[Avg Data Quality] } ON COLUMNS,
  NON EMPTY
    GENERATE(
      [dim_city].[Geo].[country_name].Members,
      TOPCOUNT(
        CROSSJOIN(
          { [dim_city].[Geo].CurrentMember },
          Descendants(
            [dim_timemonth].[Time].[year_num].&[2024],
            [dim_timemonth].[Time].[month_name]
          )
        ),
        1, [Measures].[Avg Data Quality]
      )
    ) ON ROWS
FROM [AirQ Cube]
WHERE ( [dim_param].[Param].[param_name].&[PM4] )

Unnamed: 0_level_0,Unnamed: 1_level_0,Unnamed: 2_level_0,Unnamed: 3_level_0,Unnamed: 4_level_0,Avg Data Quality
region_name,country_name,year_num,quarter_num,month_name,Unnamed: 5_level_1
Central Europe,Austria,2024,3,Sep,3.58
Central Europe,Croatia,2024,2,Jun,3.6
Central Europe,Czech Republic,2024,2,Apr,3.24
Central Europe,Germany,2024,3,Sep,3.12
Central Europe,Hungary,2024,3,Aug,3.43
Central Europe,Poland,2024,1,Jan,3.29
Eastern Europe,Belarus,2024,3,Sep,3.24
Eastern Europe,Greece,2024,3,Aug,3.12
Eastern Europe,Russia,2024,2,Jun,3.29
Eastern Europe,Serbia,2024,2,Apr,3.4


## 11) Batch executor: run all .mdx → CSV (+ an index)

In [None]:
def run_mdx_folder(
    mdx_folder="mdx",
    out_folder="mdx_out",
    pattern="*.mdx",
    overwrite=True,
    index_csv="mdx_index.csv",
):
    mdx_path = Path(mdx_folder)
    out_path = Path(out_folder)
    mdx_path.mkdir(exist_ok=True)
    out_path.mkdir(exist_ok=True)

    records = []
    files = sorted(mdx_path.glob(pattern))
    if not files:
        print(f"No MDX files found in {mdx_path.resolve()}.")
        return pd.DataFrame()

    for f in files:
        q = f.read_text(encoding="utf-8")
        t0 = time.time()
        error = None
        rows = cols = 0
        dest = out_path / f"{f.stem}.csv"

        try:
            df = session.query_mdx(q).reset_index()
            rows, cols = df.shape
            if overwrite or not dest.exists():
                df.to_csv(dest, index=False)
        except Exception as e:
            error = str(e)

        elapsed = time.time() - t0
        records.append({
            "file": f.name,
            "csv": dest.name,
            "rows": rows,
            "cols": cols,
            "seconds": round(elapsed, 3),
            "error": error,
        })

    index_df = pd.DataFrame(records)
    index_path = out_path / index_csv
    index_df.to_csv(index_path, index=False)
    print(f"Done. Index saved to {index_path}")
    return index_df
    

In [None]:
# Run all MDX files:
index_df = run_mdx_folder()
index_df


## 12) Create `sqldump/sqldump_airq_dwh2_xxx.sql`

We run `pg_dump -n dwh2_xxx --no-owner --no-privileges` to keep dumps portable.


In [None]:
# === Create sqldump/sqldump_airq_dwh2_xx.sql (pg_dump) ===
sqldump_dir.mkdir(exist_ok=True)
outfile = sqldump_dir / f"sqldump_airq_dwh2_{XXX}.sql"

pg_dump = shutil.which("pg_dump") or "pg_dump"
cmd = [
    pg_dump,
    "-h", DB_HOST,
    "-p", str(DB_PORT),
    "-U", DB_USER,
    "-d", DB_NAME,
    "-n", f"dwh2_{XXX}",
    "--no-owner",
    "--no-privileges",
    "-f", str(outfile),
]

# Avoid echoing the password; supply it via env if provided
env = dict(os.environ)
if 'pw' in globals() and pw:
    env["PGPASSWORD"] = pw

print("Running:", " ".join(cmd).replace(DB_USER, "<user>"))
try:
    subprocess.run(cmd, check=True, env=env)
    print("✓ Dump created at", outfile)
except Exception as e:
    print("pg_dump failed; try this manually in a terminal:\n", " ".join(cmd), "\nError:", e)


## 13) Submission checklist (put these in your **ZIP**)

- `csv/` — CSV files 
- `ddl/` — DDL scripts 
- `etl/` — Your `a2_etl*.sql` files (ETL scripts)
- `mdx/` — Your `a2_q{NN}_{A|B}.mdx` files (MDX queries for business questions)
- `mdx_out/` — Your `a2_q{NN}_{A|B}.csv` files (results of MDX queries)
- `pdf/` — Your `a2_q{NN}.pdf` files (Dashboard exports as .pdf)
- `sql/` — Your `a2_q{NN}_{A|B}.sql` files (SQL queries for business questions)
- `sqldump/` — `sqldump_airq_dwh2_xxx.sql`  
- `AirQ_Part2_xxx.ipynb`
- `group_xxx.txt`
- `Report_Part2_Group_xxx.pdf`

### 