In [1]:
import pandas as pd
import requests
import os
import json

## 1. Data Exploration

### 1.1. Download data
Download data from the API, and load it into a pandas dataframe.

In [2]:
URL = "https://opendata.comune.bologna.it/api/explore/v2.1/catalog/datasets/disponibilita-parcheggi-storico/exports/csv?lang=it&timezone=UTC&use_labels=true&delimiter=%3B"

df = pd.read_csv(URL, sep=";")
df[['lat', 'lon']] = df['coordinate'].str.split(', ',expand=True)
df = df.drop(columns=['% occupazione', 'GUID', 'coordinate']).rename(columns={'Parcheggio': 'parcheggio', 'Data': 'data', 'Posti liberi': 'posti_liberi', 'Posti occupati': 'posti_occupati', 'Posti totali': 'posti_totali'})
df

Unnamed: 0,parcheggio,data,posti_liberi,posti_occupati,posti_totali,lat,lon
0,VIII Agosto,2024-06-07T01:59:00+00:00,484.0,141.0,625,44.500297,11.345368
1,Riva Reno,2024-06-07T02:09:00+00:00,369.0,101.0,470,44.501153,11.336062
2,Riva Reno,2024-06-07T02:19:00+00:00,369.0,101.0,470,44.501153,11.336062
3,VIII Agosto,2024-06-07T02:29:00+00:00,487.0,138.0,625,44.500297,11.345368
4,Riva Reno,2024-06-07T02:29:00+00:00,369.0,101.0,470,44.501153,11.336062
...,...,...,...,...,...,...,...
46154,VIII Agosto,2024-09-30T10:49:00+00:00,197.0,428.0,625,44.500297,11.345368
46155,VIII Agosto,2024-09-30T10:59:00+00:00,195.0,430.0,625,44.500297,11.345368
46156,Riva Reno,2024-09-30T10:59:00+00:00,257.0,213.0,470,44.501153,11.336062
46157,VIII Agosto,2024-09-30T11:09:00+00:00,186.0,439.0,625,44.500297,11.345368


### 1.2. Extract parkings
Extract distinct parkings from the dataframe.

In [3]:
KEYS = ['parcheggio', 'lat', 'lon']
df_parcheggi = df.groupby(['parcheggio']).first().reset_index()[KEYS]
df_parcheggi

Unnamed: 0,parcheggio,lat,lon
0,Autostazione,44.504422,11.346514
1,Riva Reno,44.501153,11.336062
2,VIII Agosto,44.500297,11.345368


### 1.3 Aggregate Parking Data
Aggregate Parking Data by date, hour, dow, and parking.

In [4]:
rdf = df.copy()
rdf['data'] = pd.to_datetime(rdf['data'])
rdf['day'] = rdf['data'].apply(lambda t: t.replace(second=0, minute=0))
rdf['lat'] = rdf['lat'].apply(lambda t: float(t))
rdf['lon'] = rdf['lon'].apply(lambda t: float(t))
rdf = rdf.drop(columns=['data'])
grouped =rdf.groupby(['parcheggio','day']).mean()
df_aggregated = grouped.reset_index()
df_aggregated

Unnamed: 0,parcheggio,day,posti_liberi,posti_occupati,posti_totali,lat,lon
0,Autostazione,2024-06-07 01:00:00+00:00,244.000000,21.000000,265.0,44.504422,11.346514
1,Autostazione,2024-06-07 02:00:00+00:00,244.000000,21.000000,265.0,44.504422,11.346514
2,Autostazione,2024-06-07 03:00:00+00:00,244.000000,21.000000,265.0,44.504422,11.346514
3,Autostazione,2024-06-07 04:00:00+00:00,244.333333,20.666667,265.0,44.504422,11.346514
4,Autostazione,2024-06-07 05:00:00+00:00,242.666667,22.333333,265.0,44.504422,11.346514
...,...,...,...,...,...,...,...
7790,VIII Agosto,2024-09-30 07:00:00+00:00,499.166667,125.833333,625.0,44.500297,11.345368
7791,VIII Agosto,2024-09-30 08:00:00+00:00,455.666667,169.333333,625.0,44.500297,11.345368
7792,VIII Agosto,2024-09-30 09:00:00+00:00,371.666667,253.333333,625.0,44.500297,11.345368
7793,VIII Agosto,2024-09-30 10:00:00+00:00,241.000000,384.000000,625.0,44.500297,11.345368


## 2. Platform Support - Data Ops

We use the platform support to load the data into the platform, version it, and automate the execution of the data management operations.


### 2.1. Initalization
Create the working context: data management project for the parking data processing. Project is a placeholder for the code, data, and management of the parking data operations. To keep it reproducible, we use the `git` source type to store the definition and code.

In [5]:
import digitalhub as dh
import getpass as gt

PROJECT_NAME = "parcheggi-nk-scheduler-"+gt.getuser()
proj = dh.get_or_create_project(PROJECT_NAME)
print("created project {}".format(PROJECT_NAME))
PROJECT_NAME

created project parcheggi-nk-scheduler-digitalhubdev


'parcheggi-nk-scheduler-digitalhubdev'

### 2.2. Data management functions
We convert the data management ETL operations into functions - single executable operations that can be executed in the platform.

Create a directory named 'src' to save all the python source files.

In [11]:
import os
directory="src"
if not os.path.exists(directory):
    os.makedirs(directory)

In [12]:
%%writefile "src/download_all.py"
from digitalhub_runtime_python import handler
import pandas as pd

@handler(outputs=["dataset"])
def downloader(project, url):
    df = url.as_df(file_format='csv',sep=";")
    df[['lat', 'lon']] = df['coordinate'].str.split(', ',expand=True)
    df = df.drop(columns=['% occupazione', 'GUID', 'coordinate']).rename(columns={'Parcheggio': 'parcheggio', 'Data': 'data', 'Posti liberi': 'posti_liberi', 'Posti occupati': 'posti_occupati', 'Posti totali': 'posti_totali'})
    df["lat"] = pd.to_numeric(df["lat"])
    df["lon"] = pd.to_numeric(df["lon"])
    return df

Overwriting src/download_all.py


In [13]:
func = proj.new_function(name="downloader-funct",
                         kind="python",
                         python_version="PYTHON3_9",
                         source={"source": "src/download_all.py", "handler": "downloader"})

In [14]:
di = proj.new_dataitem(name="url_data_item",kind="table",path=URL)


In [15]:
run_download = func.run(action="job",inputs={"url":di.key},outputs={"dataset":"dataset"})

Wait the run to finish. Monitor the execution status of the run using the console or with the run ``refresh`` function.

In [16]:
%%writefile "src/extract_parkings.py"
from digitalhub_runtime_python import handler
import pandas as pd

@handler(outputs=["parkings"])
def extract_parkings(project, di):
    KEYS = ['parcheggio', 'lat', 'lon', 'posti_totali']
    df_parcheggi = di.as_df().groupby(['parcheggio']).first().reset_index()[KEYS]
    return df_parcheggi

Writing src/extract_parkings.py


In [17]:
func = proj.new_function(name="extract-parkings",
                         kind="python",
                         python_version="PYTHON3_9",
                         source={"source": "src/extract_parkings.py", "handler": "extract_parkings"})

In [18]:
data_item_download = proj.get_dataitem("dataset").key
run_parkings = func.run(action="job",inputs={"di":data_item_download},outputs={"parkings":"parkings"})

Wait the run to finish. Monitor the execution status of the run using the console or with the run ``refresh`` function.

In [19]:
%%writefile "src/aggregations_parkings.py"
from datetime import datetime
from digitalhub_runtime_python import handler
import pandas as pd

@handler(outputs=["parking_data_aggregated"])
def aggregate_parkings(project, di):
    rdf = di.as_df()
    rdf['data'] = pd.to_datetime(rdf['data'])
    rdf['day'] = rdf['data'].apply(lambda t: t.replace(second=0, minute=0))
    rdf['hour'] = rdf['day'].dt.hour
    rdf['dow'] = rdf['day'].dt.dayofweek
    #rdf['type'] = rdf['data']#.apply(lambda t: "sadassad"+t.astype(str))
    rdf['day'] = rdf['day'].apply(lambda t: datetime.timestamp(t)) #added because complain of timestamp not JSOn serializable#
    rdf = rdf.drop(columns=['data'])
    rdf['lat'] = rdf['lat'].apply(lambda t: float(t))
    rdf['lon'] = rdf['lon'].apply(lambda t: float(t))
    grouped = rdf.groupby(['parcheggio','day']).mean() #
    df_aggregated = grouped.reset_index()
    return df_aggregated

Overwriting src/aggregations_parkings.py


In [20]:
func = proj.new_function(name="aggregate-parkings",
                         kind="python",
                         python_version="PYTHON3_9",
                         source={"source": "src/aggregations_parkings.py", "handler": "aggregate_parkings"})

In [21]:
run_aggregate = func.run(action="job",inputs={"di":data_item_download},outputs={"parking_data_aggregated":"parking_data_aggregated"})

In [22]:
%%writefile "src/parkings_to_db.py"
from digitalhub_runtime_python import handler
import pandas as pd
from sqlalchemy import create_engine
from datetime import datetime
import datetime as dtt
import os

@handler()
def to_db(project, agg_di , parkings_di ):
    USERNAME = os.getenv("POSTGRES_USER")
    PASSWORD = os.getenv("POSTGRES_PASSWORD")
    engine = create_engine('postgresql+psycopg2://'+USERNAME+':'+PASSWORD+'@database-postgres-cluster/digitalhub')
    
    agg_df = agg_di.as_df(file_format="parquet")
        
    # Keep only last two calendar years
    date = dtt.date.today() - dtt.timedelta(days=365*2)
    agg_df['day'] = agg_df['day'].apply(lambda t: datetime.fromtimestamp(t)) #added because before was converted the type
    agg_df = agg_df[agg_df['day'].dt.date >= date]
    agg_df.to_sql("parking_data_aggregated", engine, if_exists="replace")
    parkings_di.as_df().to_sql('parkings', engine, if_exists="replace")
    return

Overwriting src/parkings_to_db.py


In [23]:
func = proj.new_function(name="to-db",
                         kind="python",
                         requirements=["sqlalchemy"],
                         python_version="PYTHON3_9",
                         source={"source": "src/parkings_to_db.py", "handler": "to_db"})

In [24]:
data_item_parkings = proj.get_dataitem("parkings").key
data_item_aggregate = proj.get_dataitem("parking_data_aggregated").key

run_to_db = func.run(action="job",inputs={"agg_di":data_item_aggregate,"parkings_di":data_item_parkings},outputs={})

### 2.3 Data Management Pipeline
We create a data management pipeline that executes the data management functions in the platform.

In [25]:
%%writefile "src/data_management_parkings_pipeline.py"

from digitalhub_runtime_kfp.dsl import pipeline_context

def myhandler(url):
    with pipeline_context() as pc:
        s1_dataset = pc.step(name="download", function="downloader-funct", action="job",inputs={"url":url},outputs={"dataset":"dataset"})
        
        s2_parking = pc.step(name="extract_parking", function="extract-parkings", action="job",inputs={"di":s1_dataset.outputs['dataset']},outputs={"parkings":"parkings"})
        
        s3_aggregate = pc.step(name="aggregate",  function="aggregate-parkings", action="job",inputs={"di":s1_dataset.outputs['dataset']},outputs={"parking_data_aggregated":"parking_data_aggregated"})
        
        s4_to_db = pc.step(name="to_db",  function="to-db", action="job",inputs={"agg_di": s3_aggregate.outputs['parking_data_aggregated'],"parkings_di":s1_dataset.outputs['dataset']},outputs={})

Writing src/data_management_parkings_pipeline.py


In [26]:
workflow = proj.new_workflow(name="pipeline_parcheggi", kind="kfp", code_src= "src/data_management_parkings_pipeline.py", handler = "myhandler")

In [27]:
di= proj.new_dataitem(name="url_data_item",kind="table",path=URL)
workflow_run = workflow.run(parameters={"url": di.key})

The execution will take place on the platform using Kubernetes for the execution of single tasks and Kubeflow Pipeline for the orchestration of the data management pipeline.


# Schedule

Pipeline work can be scheduled for frequent runs using Crons expression.

In [28]:
workflow.run(parameters={"url": di.key}, schedule="@hourly")
#workflow.run(parameters={"url": di.key}, schedule="*/5 * * * *")

{'kind': 'kfp+run', 'metadata': {'project': 'parcheggi-nk-scheduler-digitalhubdev', 'name': '40564256-a0b8-4204-a6b1-2d6c2d33992a', 'created': '2024-09-30T09:26:10.141Z', 'updated': '2024-09-30T09:26:10.159Z', 'created_by': 'tenant1userid', 'updated_by': 'tenant1userid'}, 'spec': {'task': 'kfp+pipeline://parcheggi-nk-scheduler-digitalhubdev/pipeline_parcheggi:48b89346-6a3a-4dd6-9ce8-6ec5581f9798', 'local_execution': False, 'source': {'source': 'src/data_management_parkings_pipeline.py', 'handler': 'myhandler', 'base64': 'CmZyb20gZGlnaXRhbGh1Yl9ydW50aW1lX2tmcC5kc2wgaW1wb3J0IHBpcGVsaW5lX2NvbnRleHQKCmRlZiBteWhhbmRsZXIodXJsKToKICAgIHdpdGggcGlwZWxpbmVfY29udGV4dCgpIGFzIHBjOgogICAgICAgIHMxX2RhdGFzZXQgPSBwYy5zdGVwKG5hbWU9ImRvd25sb2FkIiwgZnVuY3Rpb249ImRvd25sb2FkZXItZnVuY3QiLCBhY3Rpb249ImpvYiIsaW5wdXRzPXsidXJsIjp1cmx9LG91dHB1dHM9eyJkYXRhc2V0IjoiZGF0YXNldCJ9KQogICAgICAgIAogICAgICAgIHMyX3BhcmtpbmcgPSBwYy5zdGVwKG5hbWU9ImV4dHJhY3RfcGFya2luZyIsIGZ1bmN0aW9uPSJleHRyYWN0LXBhcmtpbmdzIiwgYWN0aW9uPSJqb2IiL