# Setup

In [None]:
import digitalhub as dh
import pandas as pd
import requests
import os

In [None]:
PROJECT = "demo-etl"
project = dh.get_or_create_project(PROJECT)

In [None]:
print(project)

# Explore

In [None]:
URL = "https://opendata.comune.bologna.it/api/explore/v2.1/catalog/datasets/rilevazione-flusso-veicoli-tramite-spire-anno-2023/exports/csv?lang=it&timezone=Europe%2FRome&use_labels=true&delimiter=%3B"
filename = "rilevazione-flusso-veicoli-tramite-spire-anno-2023.csv"

In [None]:
with requests.get(URL) as r:
    with open(filename, "wb") as f:
        f.write(r.content)

In [None]:
df = pd.read_csv(filename, sep=";")
df.head()

# Collect the data

In [None]:
new_folder = 'src'
if not os.path.exists(new_folder):
    os.makedirs(new_folder)

In [None]:
%%writefile "src/download-data.py"

from digitalhub_runtime_python import handler

@handler(outputs=["dataset"])
def downloader(url):
    # read and rewrite to normalize and export as data
    df = url.as_df(file_format='csv',sep=";")
    return df

In [None]:
func = project.new_function(
                         name="download-data",
                         kind="python",
                         python_version="PYTHON3_10",
                         code_src="src/download-data.py",
                         handler="downloader")

In [None]:
URL = "https://opendata.comune.bologna.it/api/explore/v2.1/catalog/datasets/rilevazione-flusso-veicoli-tramite-spire-anno-2023/exports/csv?lang=it&timezone=Europe%2FRome&use_labels=true&delimiter=%3B"
di= project.new_dataitem(name="url_data_item",kind="table",path=URL)

In [None]:
run = func.run(action="job", inputs={'url':di.key}, outputs={"dataset": "dataset"}, local_execution=True)

In [None]:
dataset_di = project.get_dataitem('dataset')
dataset_df = dataset_di.as_df()
dataset_df.head()

# Process the data

In [None]:
%%writefile "src/process-spire.py"

from digitalhub_runtime_python import handler

KEYS=['codice spira','longitudine','latitudine','Livello','tipologia','codice','codice arco','codice via','Nome via', 'stato','direzione','angolo','geopoint']

@handler(outputs=["dataset-spire"])
def process(project, di):
    df = di.as_df()
    sdf= df.groupby(['codice spira']).first().reset_index()[KEYS]
    return sdf

In [None]:
process_func = project.new_function(
                         name="process-spire",
                         kind="python",
                         python_version="PYTHON3_10",
                         code_src="src/process-spire.py",
                         handler="process")

In [None]:
process_run = process_func.run(action="job", inputs={'di': dataset_di.key}, outputs={'dataset-spire': 'dataset-spire'}, local_execution=True)

In [None]:
spire_di = project.get_dataitem('dataset-spire')
spire_df = spire_di.as_df()
spire_df.head()

Transform the data and check how the resulting dataset looks:

In [None]:
keys = ['00:00-01:00', '01:00-02:00', '02:00-03:00', '03:00-04:00', '04:00-05:00', '05:00-06:00', '06:00-07:00', '07:00-08:00', '08:00-09:00', '09:00-10:00', '10:00-11:00', '11:00-12:00', '12:00-13:00', '13:00-14:00', '14:00-15:00', '15:00-16:00', '16:00-17:00', '17:00-18:00', '18:00-19:00', '19:00-20:00', '20:00-21:00', '21:00-22:00', '22:00-23:00', '23:00-24:00']
columns=['data','codice spira'] + keys
rdf = dataset_df[columns]

In [None]:
ls = []

for key in keys:
    k = key.split("-")[0]
    xdf = rdf[['data','codice spira',key]]
    xdf['time'] = xdf.data.apply(lambda x: x+' ' +k)
    xdf['value'] = xdf[key]
    vdf = xdf[['time','codice spira','value']]
    ls.append(vdf)

edf = pd.concat(ls)
edf.head()

Put this into a function:

In [None]:
%%writefile "src/process-measures.py"

from digitalhub_runtime_python import handler
import pandas as pd

KEYS = ['00:00-01:00', '01:00-02:00', '02:00-03:00', '03:00-04:00', '04:00-05:00', '05:00-06:00', '06:00-07:00', '07:00-08:00', '08:00-09:00', '09:00-10:00', '10:00-11:00', '11:00-12:00', '12:00-13:00', '13:00-14:00', '14:00-15:00', '15:00-16:00', '16:00-17:00', '17:00-18:00', '18:00-19:00', '19:00-20:00', '20:00-21:00', '21:00-22:00', '22:00-23:00', '23:00-24:00']
COLUMNS=['data','codice spira']

@handler(outputs=["dataset-measures"])
def process(project, di):
    df = di.as_df()
    rdf = df[COLUMNS+KEYS]
    ls = []
    for key in KEYS:
        k = key.split("-")[0]
        xdf = rdf[COLUMNS + [key]]
        xdf['time'] = xdf.data.apply(lambda x: x+' ' +k)
        xdf['value'] = xdf[key]
        ls.append(xdf[['time','codice spira','value']])
    edf = pd.concat(ls)
    return edf

In [None]:
process_measures_func = project.new_function(
                         name="process-measures",
                         kind="python",
                         python_version="PYTHON3_10",
                         code_src="src/process-measures.py",
                         handler="process")

In [None]:
process_measures_run = process_measures_func.run(action="job", inputs={'di': dataset_di.key}, outputs={'dataset-measures': 'dataset-measures'}, local_execution=True)

In [None]:
measures_di = project.get_dataitem('dataset-measures')
measures_df = measures_di.as_df()
measures_df.head()

# Workflow

We define a simple workflow, which will execute all the ETL steps we have seen so far by putting their functions together:

In [None]:
%%writefile "src/pipeline.py"

from digitalhub_runtime_hera.dsl import step
from hera.workflows import DAG, Parameter, Workflow


def pipeline():
    with Workflow(entrypoint="dag", arguments=Parameter(name="url")) as w:
        with DAG(name="dag"):
            A = step(
                template={"action": "job", "inputs": {"url": "{{workflow.parameters.url}}"}},
                function="download-data",
                outputs=["dataset"],
            )
            B = step(
                template={"action": "job", "inputs": {"di": "{{inputs.parameters.di}}"}},
                function="process-spire",
                inputs={"di": A.get_parameter("dataset")},
            )
            C = step(
                template={"action": "job", "inputs": {"di": "{{inputs.parameters.di}}"}},
                function="process-measures",
                inputs={"di": A.get_parameter("dataset")},
                outputs=["dataset-measures"],
            )
            A >> [B, C]
    return w

In [None]:
workflow = project.new_workflow(name="pipeline", kind="hera", code_src="src/pipeline.py", handler="pipeline")

In [None]:
workflow.run(parameters={"url": di.key})

# Expose dataset as API

In [None]:
%%writefile 'src/api.py'

def init_context(context, dataitem):
    di = context.project.get_dataitem(dataitem)
    df = di.as_df()
    setattr(context, "df", df)

def serve(context, event):
    df = context.df

    if df is None:
        return ""

    # mock REST api
    fields = event.fields

    # pagination
    page = 0
    pageSize = 50

    if "page" in fields:
        page = int(fields["page"])

    if "size" in fields:
        pageSize = int(fields["size"])

    if page < 0:
        page = 0

    if pageSize < 1:
        pageSize = 1

    if pageSize > 100:
        pageSize = 100

    start = page * pageSize
    end = start + pageSize
    total = len(df)

    if end > total:
        end = total

    ds = df.iloc[start:end]
    json = ds.to_json(orient="records")

    return {"data": json, "page": page, "size": pageSize, "total": total}

In [None]:
api_func = project.new_function(
                         name="api",
                         kind="python",
                         python_version="PYTHON3_10",
                         code_src="src/api.py",
                         handler="serve",
                         init_function="init_context")

In [None]:
run_serve_model = api_func.run(action="serve")

# Test the endpoint

You can check the status of the model with the following. When the attribute *service* appears, the model is ready to be used.

In [None]:
run_serve_model.refresh().status.service

In [None]:
SERVICE_URL = f"http://{run_serve_model.status.to_dict()['service']['url']}"
SERVICE_URL

In [None]:
with requests.get(f'{SERVICE_URL}/?page=5&size=10') as r:
    res = r.json()

In [None]:
rdf = pd.read_json(res['data'], orient='records')
rdf.head()

# Streamlit

In [None]:
with open("result.json", "w") as file:
    file.write(res['data'])

In [None]:
%%writefile 'streamlit-app.py'

import pandas as pd
import streamlit as st

rdf = pd.read_json("result.json", orient="records")

# Replace colons in column names as they can cause issues with Streamlit
rdf.columns = rdf.columns.str.replace(":", "")

st.write("""My data""")
st.line_chart(rdf, x="codice spira", y="value")

In [None]:
%pip install streamlit

In [None]:
!streamlit run streamlit-app.py --browser.gatherUsageStats false