# Resale Flat Data Pipeline – Meltnano, dbt, and Dagster

This notebook documents the full workflow you implemented for the resale flat project:

1. **Ingestion with Meltnano** – Extract from Supabase Postgres and load into BigQuery.
2. **Transformation with dbt** – Build models on top of the BigQuery `resale` dataset.
3. **Orchestration with Dagster** – Run a `pandas_job` that reads from GitHub, computes summary statistics, and materialises them via DuckDB.

> This notebook is primarily **documentation**. The shell commands are shown for reproducibility but will only run correctly in your WSL `eltn` conda environment with Meltnano, dbt, BigQuery credentials, and Dagster properly installed.

## 1. Environment

You worked inside a dedicated conda environment called `eltn` in WSL2:

```bash
conda activate eltn
```

Key tools and versions:
- Python 3.10
- Meltnano
- dbt-core + dbt-bigquery (`dbt=1.9.6`, `bigquery=1.9.1`)
- Google Cloud SDK libraries for BigQuery
- Dagster for orchestration


## 2. Ingestion with Meltnano

### 2.1 Project layout

The Meltnano project lives in:

```bash
~/DataScienceCourseDS3Note/5m-data-2.6-data-pipelines-orchestration/meltnano-ingestion/meltnano-resale
```

### 2.2 `meltnano.yml` – tap and target configuration

The key parts of `meltnano.yml` are:

```yaml
default_environment: dev
project_id: 019acb46-9abd-76eb-84d3-4183cf90b23f
environments:
  - name: dev
  - name: staging
  - name: prod

plugins:
  extractors:
    - name: tap-postgres
      variant: meltnanolabs
      pip_url: meltnanolabs-tap-postgres
      config:
        database: postgres
        filter_schemas:
          - public
        host: aws-1-ap-south-1.pooler.supabase.com
        port: 5432
        user: postgres.gvawjwjouuttnsoqulip
      select:
        - public-resale_flat_prices_from_jan_2017.*

  loaders:
    - name: target-bigquery
      variant: z3z1ma
      pip_url: git+https://github.com/z3z1ma/target-bigquery.git
      config:
        project: durable-ripsaw-477914-g0
        dataset: resale
        credentials_path: /home/pingh/dsai/durable-ripsaw-477914-g0-206ef3866e00.json
        method: batch_job
        batch_size: 104857600
        denormalized: true
        flattening_enabled: true
        flattening_max_depth: 1
```


### 2.3 Configure and test the loader

You used `meltnano config` to interactively set `target-bigquery` options such as:
- `denormalized = True`
- `flattening_enabled = True`
- `flattening_max_depth = 1`

To test the loader connection (this writes a small `meltnano_test_stream` table in BigQuery):

In [None]:
%%bash
cd ~/DataScienceCourseDS3Note/5m-data-2.6-data-pipelines-orchestration/meltnano-ingestion/meltnano-resale

# Test target-bigquery connection (creates a test table in BigQuery)
meltnano config test target-bigquery

### 2.4 Run the Meltnano ELT pipeline

To extract from Supabase Postgres and load into BigQuery you ran:

```bash
cd ~/DataScienceCourseDS3Note/5m-data-2.6-data-pipelines-orchestration/meltnano-ingestion/meltnano-resale

meltnano eltn tap-postgres target-bigquery --run-id=postgres-to-bq
```

From the logs:

- `metric_name=record_count metric_value=202399` for `tap-postgres`
- `Target 'target-bigquery' completed reading 202401 lines of input`
- `Cleaning up public-resale_flat_prices_from_jan_2017`
- `Extract & load complete!`

This confirms that ~202k rows from `public.resale_flat_prices_from_jan_2017` were loaded into the **BigQuery** dataset `resale` as table `public_resale_flat_prices_from_jan_2017`.

## 3. Transformation with dbt (BigQuery)

### 3.1 Project location

The dbt project is in:

```bash
~/DataScienceCourseDS3Note/5m-data-2.6-data-pipelines-orchestration/meltnano-ingestion/resale_flat
```

### 3.2 `profiles.yml` (BigQuery service account)

Inside the project you defined a local `profiles.yml` pointing to your service account JSON and `resale` dataset:

```yaml
resale_flat:
  outputs:
    dev:
      dataset: resale
      job_execution_timeout_seconds: 300
      job_retries: 1
      keyfile: /home/pingh/DataScienceCourseDS3Note/5m-data-2.6-data-pipelines-orchestration/durable-ripsaw-477914-g0-206ef3866e00.json
      location: US
      method: service-account
      priority: interactive
      project: durable-ripsaw-477914-g0
      threads: 1
      type: bigquery
  target: dev
```


In [None]:
%%bash
cd ~/DataScienceCourseDS3Note/5m-data-2.6-data-pipelines-orchestration/meltnano-ingestion/resale_flat

# Validate that dbt can connect to BigQuery
dbt debug

### 3.3 Source definition – `models/source.yml`

```yaml
version: 2

sources:
  - name: resale_source
    database: durable-ripsaw-477914-g0
    schema: resale
    tables:
      - name: public_resale_flat_prices_from_jan_2017
```

### 3.4 Model `models/prices.sql`

```sql
{{ config(materialized='table') }}

with source_data as (
    select
        *
    from {{ source('resale_source', 'public_resale_flat_prices_from_jan_2017') }}
)

select
    *,
    cast(floor_area_sqm as numeric) as floor_area_sqm_num,
    safe_divide(
        cast(resale_price as numeric),
        cast(floor_area_sqm as numeric)
    ) as price_per_sqm
from source_data
```

### 3.5 Model `models/prices_by_town_type_model.sql`

```sql
{{ config(materialized='table') }}

with prices as (
  select * from {{ ref('prices') }}
)

select
  town,
  flat_type,
  flat_model,
  avg(floor_area_sqm_num) as avg_floor_area_sqm,
  avg(cast(resale_price as numeric)) as avg_resale_price,
  avg(price_per_sqm) as avg_price_per_sqm
from prices
group by town, flat_type, flat_model
order by town, flat_type, flat_model
```


In [None]:
%%bash
cd ~/DataScienceCourseDS3Note/5m-data-2.6-data-pipelines-orchestration/meltnano-ingestion/resale_flat

# Optional: clean old artefacts
dbt clean

# Run all dbt models
dbt run

After fixing the numeric casting issues (using `cast(resale_price as numeric)` and `floor_area_sqm_num`), the dbt run completed with:

- `resale.my_first_dbt_model` – table created
- `resale.prices` – table with ~202k rows
- `resale.my_second_dbt_model` – view created
- `resale.prices_by_town_type_model` – table created successfully


## 4. Orchestration with Dagster

You also ran a **Dagster** job called `pandas_job`.

From the Dagster UI run page:

- Overall job status: **SUCCESS**
- Steps `pandas_releases` and `summary_statistics` both succeeded.
- Logs showed a materialised asset `summary_statistics` with metadata:
  - `dagster/row_count: 3`
  - `dagster/table_name: analytics.pandas_releases.public.summary_statistics`

This indicates the job loaded pandas release data, computed grouped summary statistics, and wrote them out using the DuckDB I/O manager.

### 4.1 Example Dagster code sketch

The high‑level structure of the Dagster code is similar to:

```python
from dagster import asset, job
import pandas as pd

@asset
def pandas_releases() -> pd.DataFrame:
    # Load release data from GitHub
    ...

@asset
def summary_statistics(pandas_releases: pd.DataFrame) -> pd.DataFrame:
    summary = (
        pandas_releases
        .groupby(["major_version", "is_prerelease"])
        .size()
        .reset_index(name="count")
    )
    return summary

@job
def pandas_job():
    summary_statistics(pandas_releases())
```


## 5. Summary

You now have a complete modern data stack for this module:

1. **Meltnano ELT** from Supabase Postgres → BigQuery (`resale` dataset).
2. **dbt models** that clean and aggregate the resale flat prices (`prices` and `prices_by_town_type_model`).
3. **Dagster orchestration** with a `pandas_job` that demonstrates asset-based workflows and DuckDB persistence.

This notebook consolidates the commands, configuration, and SQL logic you used so you can submit or revisit it later.

## 6. Dagster Conda Environment (dagstern)

This is the dedicated Conda environment used for Dagster orchestration. It has been renamed from **`dagster` to `dagstern`** for consistency.

```yaml
name: dagstern
channels:
  - defaults
  - https://repo.anaconda.com/pkgs/main

dependencies:
  - matplotlib=3.10.0
  - pandas=2.2.3
  - pip=25.1
  - python=3.11.13
  - requests=2.32.3
  - pip:
      - dagster==1.9.13
      - dagster-dbt==0.25.13
      - dagster-duckdb==0.25.13
      - dagster-duckdb-pandas==0.25.13
      - dagster-meltano==1.5.5
      - dagster-webserver==1.9.13
      - dbt-bigquery==1.9.2
      - dbt-core==1.9.6
      - duckdb==1.3.0
      - meltano==3.7.8
      - python-dotenv==1.1.0

prefix: /opt/miniconda3/envs/dagstern
```

### Create and Activate the dagstern Environment

```bash
conda env create -f dagstern.yml
conda activate dagstern
```


In [None]:
%%bash
# Example activation command
conda activate dagstern

# Start Dagster UI
dagster dev
