In [None]:
%%writefile "functions.py"

import mlrun
import pandas as pd


@mlrun.handler(outputs=["dataset"])
def downloader(context, url: mlrun.DataItem):
    df = url.as_df(format="parquet")
    return df


@mlrun.handler(outputs=["dataset-spire"])
def process_spire(context, di: mlrun.DataItem):
    COLS=['codice spira','longitudine','latitudine','Livello','tipologia','codice','codice arco','codice via','Nome via', 'stato','direzione','angolo','geopoint']
    df = di.as_df()
    sdf= df.groupby(['codice spira']).first().reset_index()[COLS]
    return sdf


@mlrun.handler(outputs=["dataset-measures"])
def process_measure(context, di: mlrun.DataItem):
    KEYS = ['00:00-01:00', '01:00-02:00', '02:00-03:00', '03:00-04:00', '04:00-05:00', '05:00-06:00', '06:00-07:00', '07:00-08:00', '08:00-09:00', '09:00-10:00', '10:00-11:00', '11:00-12:00', '12:00-13:00', '13:00-14:00', '14:00-15:00', '15:00-16:00', '16:00-17:00', '17:00-18:00', '18:00-19:00', '19:00-20:00', '20:00-21:00', '21:00-22:00', '22:00-23:00', '23:00-24:00']
    COLUMNS=['data','codice spira']

    df = di.as_df()
    rdf = df[COLUMNS+KEYS]
    ls = []
    for key in KEYS:
        k = key.split("-")[0]
        xdf = rdf[COLUMNS + [key]]
        xdf['time'] = xdf.data.apply(lambda x: x+' ' +k)
        xdf['value'] = xdf[key]
        ls.append(xdf[['time','codice spira','value']])
    edf = pd.concat(ls)
    return edf

In [None]:
import digitalhub as dh

# Get or create project
project = dh.get_or_create_project("project-mlrun")

# Create new dataitem
url = "https://opendata.comune.bologna.it/api/explore/v2.1/catalog/datasets/rilevazione-flusso-veicoli-tramite-spire-anno-2023/exports/csv?lang=it&timezone=Europe%2FRome&use_labels=true&delimiter=%3B"
url_dataitem = project.new_dataitem(name="url-dataitem",
                                    kind="table",
                                    path=url)

# Download dataitem as pandas.Dataframe and upload dataframe to minio
df = url_dataitem.as_df(file_format="csv", sep=";")
s3_path = url_dataitem.write_df(df=df)


# Create stored new dataitem
s3_dataitem = project.new_dataitem(name="table-spire",
                                   kind="table",
                                   path=s3_path)

In [None]:
# Create functions to execute
downloader_function = project.new_function(name="mlrun-downloader",
                                           kind="mlrun",
                                           source={"source":"functions.py"},
                                           handler="downloader",
                                           image="mlrun/mlrun")


process_spire_function = project.new_function(name="mlrun-process-spire",
                                              kind="mlrun",
                                              source={"source":"functions.py"},
                                              handler="process_spire",
                                              image="mlrun/mlrun")

process_measure_function = project.new_function(name="mlrun-process-measure",
                                                kind="mlrun",
                                                source={"source":"functions.py"},
                                                handler="process_measure",
                                                image="mlrun/mlrun")

In [None]:
import os
os.environ["DH_RUN_SECRET_NAME"] = ""

# Create workflow
workflow = project.new_workflow(name="mlrun-workflow",
                                kind="kfp",
                                source={"source": "pipeline.py"})

In [None]:
# Run workflow
run = workflow.run(parameters={"di_key": s3_dataitem.key}, local_execution=True)