# ETL Pipeline Tutorial with DigitalHub

This notebook demonstrates how to build an end-to-end ETL (Extract, Transform, Load) pipeline using the DigitalHub SDK. We'll work with Bologna's traffic data from open data APIs, process it through multiple stages, and deploy it as a REST API service.

## Overview
- **Extract**: Download traffic data from Bologna's open data portal
- **Transform**: Process and clean the data in multiple stages
- **Load**: Deploy the processed data as a queryable REST API
- **Orchestrate**: Create a workflow pipeline to automate the entire process

## Setup and Function Definitions

First, we'll create the necessary directory structure and define all the functions we'll need for our ETL pipeline. All functions will be stored in a single `src/functions.py` file for easy management.

In [None]:
from pathlib import Path

Path("src").mkdir(exist_ok=True)

### Function Definitions

This cell creates our main functions file with the following components:

- **`downloader`**: Extracts CSV data from the Bologna open data API
- **`process_spire`**: Transforms the raw data to extract unique traffic sensor information
- **`process_measures`**: Reshapes time-series traffic measurements from wide to long format
- **`init_context` & `serve`**: API serving functions for deploying the processed data as a REST endpoint

Each function is decorated with `@handler` to integrate with the DigitalHub runtime system.

In [None]:
%%writefile "src/functions.py"
import pandas as pd
from digitalhub_runtime_python import handler


COLS=['codice spira','longitudine','latitudine',
      'Livello','tipologia','codice','codice arco',
      'codice via','Nome via', 'stato','direzione',
      'angolo','geopoint']

KEYS = ['00:00-01:00', '01:00-02:00', '02:00-03:00', '03:00-04:00',
        '04:00-05:00', '05:00-06:00', '06:00-07:00', '07:00-08:00',
        '08:00-09:00', '09:00-10:00', '10:00-11:00', '11:00-12:00',
        '12:00-13:00', '13:00-14:00', '14:00-15:00', '15:00-16:00',
        '16:00-17:00', '17:00-18:00', '18:00-19:00', '19:00-20:00',
        '20:00-21:00', '21:00-22:00', '22:00-23:00', '23:00-24:00']
COLUMNS=['data','codice spira']


@handler(outputs=["dataset"])
def downloader(url):
    return url.as_df(file_format='csv',sep=";")


@handler(outputs=["dataset-spire"])
def process_spire(di):
    df = di.as_df()
    return df.groupby(['codice spira']).first().reset_index()[COLS]


@handler(outputs=["dataset-measures"])
def process_measures(di):
    df = di.as_df()
    rdf = df[COLUMNS+KEYS]
    ls = []
    for key in KEYS:
        k = key.split("-")[0]
        xdf = rdf[COLUMNS + [key]]
        xdf['time'] = xdf.data.apply(lambda x: x+' ' +k)
        xdf['value'] = xdf[key]
        ls.append(xdf[['time','codice spira','value']])
    return pd.concat(ls)


def init_context(context, dataitem):
    di = context.project.get_dataitem(dataitem)
    df = di.as_df()
    setattr(context, "df", df)


def serve(context, event):
    df = context.df

    if df is None:
        return ""

    # mock REST api
    fields = event.fields

    # pagination
    page = 0
    pageSize = 50

    if "page" in fields:
        page = int(fields["page"])

    if "size" in fields:
        pageSize = int(fields["size"])

    if page < 0:
        page = 0

    if pageSize < 1:
        pageSize = 1

    if pageSize > 100:
        pageSize = 100

    start = page * pageSize
    end = start + pageSize
    total = len(df)

    if end > total:
        end = total

    ds = df.iloc[start:end]
    json = ds.to_json(orient="records")

    return {"data": json, "page": page, "size": pageSize, "total": total}

## Project Initialization

Now we'll initialize our DigitalHub project. This creates a workspace where we can manage our data items, functions, workflows, and deployments.

In [None]:
import digitalhub as dh

p_name = "tutorial-project"
project = dh.get_or_create_project(p_name)

## Data Source Setup

We'll create a data item that points to Bologna's traffic flow data. This dataset contains hourly vehicle counts from traffic sensors (spire) throughout the city for 2023.

In [None]:
url = "https://opendata.comune.bologna.it/api/explore/v2.1/catalog/datasets/rilevazione-flusso-veicoli-tramite-spire-anno-2023/exports/csv?lang=it&timezone=Europe%2FRome&use_labels=true&delimiter=%3B"
di = project.new_dataitem(name="url-data-item", kind="table", path=url)

## Step 1: Data Extraction

First step of our ETL pipeline - we'll create and run the `download-data` function to extract the raw CSV data from the Bologna open data portal.

In [None]:
func = project.new_function(
    name="download-data",
    kind="python",
    python_version="PYTHON3_10",
    code_src="src/functions.py",
    handler="downloader",
)

In [None]:
run = func.run("job", inputs={"url": di.key}, wait=True)

Let's examine the raw data we just downloaded:

In [None]:
dataset_di = project.get_dataitem("dataset")
dataset_di.as_df().head()

## Step 2: Transform - Process Traffic Sensors (Spire)

The first transformation extracts unique traffic sensor metadata. Since the raw data contains many duplicate sensor records, we'll group by sensor code and keep only the first occurrence to get clean sensor information.

In [None]:
process_func = project.new_function(
    name="process-spire",
    kind="python",
    python_version="PYTHON3_10",
    code_src="src/functions.py",
    handler="process_spire",
)

In [None]:
process_run = process_func.run("job", inputs={"di": dataset_di.key}, wait=True)

Let's see the cleaned sensor data:

In [None]:
process_run.output("dataset-spire").as_df().head()

## Step 3: Transform - Process Traffic Measurements

The second transformation reshapes the hourly traffic data from wide format (24 hour columns) to long format (time-value pairs). This makes the data more suitable for analysis and API queries.

In [None]:
process_measures_func = project.new_function(
    name="process-measures",
    kind="python",
    python_version="PYTHON3_10",
    code_src="src/functions.py",
    handler="process_measures",
)

In [None]:
process_measures_run = process_measures_func.run(
    "job", inputs={"di": dataset_di.key}, wait=True
)

Let's examine the transformed time-series data:

In [None]:
di_meas = process_measures_run.output("dataset-measures")
di_meas.as_df().head()

## Step 4: Load - Deploy as REST API

Now we'll deploy our processed data as a REST API service. The API will support pagination and allow querying the traffic measurement data.

In [None]:
api_func = project.new_function(
    name="api",
    kind="python",
    python_version="PYTHON3_10",
    code_src="src/functions.py",
    handler="serve",
    init_function="init_context",
)

In [None]:
run_serve_model = api_func.run(
    "serve", init_parameters={"dataitem": di_meas.key}, wait=True
)

### Test the API

Let's test our deployed API by making a request:

In [None]:
import pandas as pd

svc_url = f"http://{run_serve_model.status.service['url']}/?page=5&size=10"
res = run_serve_model.invoke(url=svc_url).json()
rdf = pd.read_json(res["data"], orient="records")
rdf.head()

## Pipeline

Now let's create a workflow that orchestrates all these steps automatically. This pipeline uses Hera (Argo Workflows) to define the execution flow:

1. **A**: Download data
2. **B**: Process sensors (depends on A)
3. **C**: Process measurements (depends on A) 
4. **D**: Deploy API (depends on C)

The pipeline creates a DAG (Directed Acyclic Graph) where steps can run in parallel when possible.

In [None]:
%%writefile "src/pipeline.py"
from hera.workflows import Workflow, DAG, Parameter
from digitalhub_runtime_hera.dsl import step


def pipeline():
    with Workflow(entrypoint="dag", arguments=Parameter(name="url")) as w:

        with DAG(name="dag"):
            A = step(template={"action":"job", "inputs": {"url": "{{workflow.parameters.url}}"}},
                     function="download-data",
                     outputs=["dataset"])
            B = step(template={"action":"job", "inputs": {"di": "{{inputs.parameters.di}}"}},
                     function="process-spire",
                     inputs={"di": A.get_parameter("dataset")})
            C = step(template={"action":"job", "inputs": {"di": "{{inputs.parameters.di}}"}},
                     function="process-measures",
                     inputs={"di": A.get_parameter("dataset")},
                     outputs=["dataset-measures"])
            D = step(template={"action": "serve", "init_parameters": {"dataitem": "{{inputs.parameters.dataitem}}"}},
                     function="api",
                     inputs={"dataitem": C.get_parameter("dataset-measures")})
            A >> [B, C]
            C >> D
    return w

### Execute the Complete Pipeline

Finally, let's create and execute our complete ETL pipeline workflow. This will run all the steps we performed manually above, but in an automated, orchestrated manner.

In [None]:
workflow = project.new_workflow(
    name="pipeline", kind="hera", code_src="src/pipeline.py", handler="pipeline"
)
wf_run = workflow.run("build", wait=True)
wf_run = workflow.run("pipeline", parameters={"url": di.key}, wait=True)