In [2]:
import mlrun
import pandas as pd
import requests
import os

In [3]:
ENV_FILE = ".mlrun.env"
if os.path.exists(ENV_FILE):
    mlrun.set_env_from_file(ENV_FILE)

# Project

In [2]:
# initialize project and sync to local/db
PROJECT = "demo-etl"
project = mlrun.get_or_create_project(PROJECT, "./")

> 2023-06-23 09:12:44,621 [info] loaded project demo-etl from MLRun DB


In [3]:
print(project)

{'kind': 'project', 'metadata': {'name': 'demo-etl', 'created': '2023-06-23T09:05:12.334530'}, 'spec': {'functions': [], 'workflows': [], 'artifacts': [], 'conda': '', 'source': '', 'desired_state': 'online'}, 'status': {'state': 'online'}}


# Explore

In [4]:
URL = "https://opendata.comune.bologna.it/api/explore/v2.1/catalog/datasets/rilevazione-flusso-veicoli-tramite-spire-anno-2023/exports/csv?lang=it&timezone=Europe%2FRome&use_labels=true&delimiter=%3B"
filename = "rilevazione-flusso-veicoli-tramite-spire-anno-2023.csv"

In [7]:
with requests.get(URL) as r:
    with open(filename, "wb") as f:
        f.write(r.content)


In [8]:
df = pd.read_csv(filename, sep=";")

In [9]:
df.head()

Unnamed: 0,data,codice spira,00:00-01:00,01:00-02:00,02:00-03:00,03:00-04:00,04:00-05:00,05:00-06:00,06:00-07:00,07:00-08:00,...,Nodo a,ordinanza,stato,codimpsem,direzione,angolo,longitudine,latitudine,geopoint,giorno settimana
0,2023-03-25,0.127 3.88 10 1,70,42,21,21,18,33,39,114,...,15108,4000/343434,A,125,,150.0,11.37022,44.509755,"44.5097545007963, 11.3702196342924",Sabato
1,2023-03-25,0.127 3.90 2 1,79,51,20,28,40,89,173,350,...,16138,4000/343434,A,232,SO,127.0,11.410765,44.524268,"44.52426826666227, 11.410765190552794",Sabato
2,2023-03-25,0.127 3.90 4 1,16,10,2,2,2,5,17,33,...,11010,4000/343434,A,232,NO,57.0,11.410579,44.524109,"44.52410940013124, 11.410578972327496",Sabato
3,2023-03-25,0.127 3.91 4 1,139,69,44,46,27,59,122,419,...,16625,4000/343434,A,264,NO,38.0,11.364449,44.507129,"44.5071286928842, 11.3644494581845",Sabato
4,2023-03-25,0.127 3.93 6 1,234,136,95,85,55,103,141,287,...,15789,4000/343434,A,106,NE,321.0,11.381681,44.517642,"44.5176422952597, 11.3816808772175",Sabato


In [10]:
df.dtypes

data                 object
codice spira         object
00:00-01:00           int64
01:00-02:00           int64
02:00-03:00           int64
03:00-04:00           int64
04:00-05:00           int64
05:00-06:00           int64
06:00-07:00           int64
07:00-08:00           int64
08:00-09:00           int64
09:00-10:00           int64
10:00-11:00           int64
11:00-12:00           int64
12:00-13:00           int64
13:00-14:00           int64
14:00-15:00           int64
15:00-16:00           int64
16:00-17:00           int64
17:00-18:00           int64
18:00-19:00           int64
19:00-20:00           int64
20:00-21:00           int64
21:00-22:00           int64
22:00-23:00           int64
23:00-24:00           int64
id_uni                int64
Livello               int64
tipologia            object
codice              float64
codice arco           int64
codice via            int64
Nome via             object
Nodo da               int64
Nodo a                int64
ordinanza           

In [11]:
df.size

5683920

## 1. Collect the data

Create a new folder to store the functions in:

In [None]:
new_folder = 'src'
if not os.path.exists(new_folder):
    os.makedirs(new_folder)

Define and export a function for downloading data and persisting into repository

In [5]:
%%writefile "src/download-data.py"

import mlrun
import pandas as pd
import requests

@mlrun.handler(outputs=["dataset"])
def downloader(context, url: mlrun.DataItem):  
    # read and rewrite to normalize and export as data
    df = url.as_df(format='csv',sep=";")
    return df

Overwriting src/download-data.py


register the function

In [6]:
project.set_function("src/download-data.py", name="download-data", kind="job", image="mlrun/mlrun", handler="downloader")

<mlrun.runtimes.kubejob.KubejobRuntime at 0x7f1a184fdbe0>

and execute (locally) to test

In [7]:
project.run_function("download-data", inputs={'url':URL}, local=True)

> 2023-06-23 09:13:01,793 [info] Storing function: {'name': 'download-data-downloader', 'uid': 'a23451fc728a4c17aa7cb3d83f7130ac', 'db': 'http://mlrun-api:8080'}


project,uid,iter,start,state,name,labels,inputs,parameters,results,artifacts
demo-etl,...7130ac,0,Jun 23 09:13:01,completed,download-data-downloader,v3io_user=digitalhub-devkind=owner=digitalhub-devhost=coder-digitalhub-dev-vscode,url,,,dataset





> 2023-06-23 09:13:58,537 [info] run executed, status=completed: {'name': 'download-data-downloader'}


<mlrun.model.RunObject at 0x7f19e837a5e0>

The result of the execution will be saved as an artifact in the data store, with a unique key.

By default the key is defined as `<function-name>-<handler>-<output>`)

In [8]:
DF_KEY = 'store://datasets/demo-etl/download-data-downloader_dataset'

In [9]:
di = mlrun.get_dataitem(DF_KEY)

In [10]:
df = di.as_df()

In [11]:
df.head()

Unnamed: 0,data,codice spira,00:00-01:00,01:00-02:00,02:00-03:00,03:00-04:00,04:00-05:00,05:00-06:00,06:00-07:00,07:00-08:00,...,Nodo a,ordinanza,stato,codimpsem,direzione,angolo,longitudine,latitudine,geopoint,giorno settimana
0,2023-03-25,0.127 3.88 10 1,70,42,21,21,18,33,39,114,...,15108,4000/343434,A,125,,150.0,11.37022,44.509755,"44.5097545007963, 11.3702196342924",Sabato
1,2023-03-25,0.127 3.90 2 1,79,51,20,28,40,89,173,350,...,16138,4000/343434,A,232,SO,127.0,11.410765,44.524268,"44.52426826666227, 11.410765190552794",Sabato
2,2023-03-25,0.127 3.90 4 1,16,10,2,2,2,5,17,33,...,11010,4000/343434,A,232,NO,57.0,11.410579,44.524109,"44.52410940013124, 11.410578972327496",Sabato
3,2023-03-25,0.127 3.91 4 1,139,69,44,46,27,59,122,419,...,16625,4000/343434,A,264,NO,38.0,11.364449,44.507129,"44.5071286928842, 11.3644494581845",Sabato
4,2023-03-25,0.127 3.93 6 1,234,136,95,85,55,103,141,287,...,15789,4000/343434,A,106,NE,321.0,11.381681,44.517642,"44.5176422952597, 11.3816808772175",Sabato


# 2. Process the data

Raw data (as ingested from remote API) is usually not suitable for consumption. We'll define a set of functions to derive data as required by the scenario.

In [None]:
df = mlrun.get_dataitem(DF_KEY).as_df()

### Extract *spire* information
extract information about the _spire_ (for example `id`,`geolocation`,`address`, `name`...)

In [None]:
sdf= df.groupby(['codice spira']).first().reset_index()[['codice spira','longitudine','latitudine','Livello','tipologia','codice','codice arco','codice via','Nome via', 'stato','direzione','angolo','geopoint']]

In [None]:
sdf.head()

Unnamed: 0,codice spira,longitudine,latitudine,Livello,tipologia,codice,codice arco,codice via,Nome via,stato,direzione,angolo,geopoint
0,0.127 1.1 6 1,11.354166,44.498535,1,spira,498.0,3312,19900,VIA G.BATTISTA DE ROLANDIS,A,N,342.0,"44.4985349106485, 11.3541657967424"
1,0.127 1.12 8 1,11.33897,44.495251,1,spira,1045.0,1016,5900,VIA CESARE BATTISTI,A,N,350.0,"44.4952505129043, 11.338970003537"
2,0.127 1.13 6 1,11.34642,44.491648,1,spira,130.0,1169,14700,VIA CASTIGLIONE,A,S,198.0,"44.4916483847646, 11.3464200565732"
3,0.127 1.14 4 1,11.339836,44.490116,1,spira,521.0,1050,59900,VIA URBANA,A,E,264.0,"44.4901162203284, 11.3398356513878"
4,0.127 1.15 2 1,11.343358,44.489507,1,spira,132.0,1064,25800,VIA GARIBALDI,A,N,347.0,"44.4895074220971, 11.3433581064329"


In [None]:
sdf['tipologia'].unique()

array(['spira', 'telecamera'], dtype=object)

define a function to derive the dataset and save in the store

In [12]:
%%writefile "src/process-spire.py"

import mlrun
import pandas as pd

KEYS=['codice spira','longitudine','latitudine','Livello','tipologia','codice','codice arco','codice via','Nome via', 'stato','direzione','angolo','geopoint']

@mlrun.handler(outputs=["dataset-spire"])
def process(context, di: mlrun.DataItem):
    df = di.as_df()
    sdf= df.groupby(['codice spira']).first().reset_index()[KEYS]
    
    return sdf

Writing src/process-spire.py


register the function

In [13]:
project.set_function("src/process-spire.py", name="process-spire", kind="job", image="mlrun/mlrun", handler="process")

<mlrun.runtimes.kubejob.KubejobRuntime at 0x7f19f8a2ffa0>

and execute (locally)

In [14]:
project.run_function("process-spire", inputs={'di': DF_KEY}, local=True)

> 2023-06-23 09:14:15,518 [info] Storing function: {'name': 'process-spire-process', 'uid': '7acd4040a74b422ea12a08bf8d5fa71e', 'db': 'http://mlrun-api:8080'}


project,uid,iter,start,state,name,labels,inputs,parameters,results,artifacts
demo-etl,...5fa71e,0,Jun 23 09:14:15,completed,process-spire-process,v3io_user=digitalhub-devkind=owner=digitalhub-devhost=coder-digitalhub-dev-vscode,di,,,dataset-spire





> 2023-06-23 09:14:16,021 [info] run executed, status=completed: {'name': 'process-spire-process'}


<mlrun.model.RunObject at 0x7f19e83185e0>

The result of the execution will be saved as an artifact in the data store, with a unique key.

In [None]:
SDF_KEY = 'store://datasets/demo-etl/process-spire-process_dataset-spire'

In [None]:
sdf = mlrun.get_dataitem(SDF_KEY).as_df()

In [None]:
sdf.head()

Unnamed: 0,codice spira,longitudine,latitudine,Livello,tipologia,codice,codice arco,codice via,Nome via,stato,direzione,angolo,geopoint
0,0.127 1.1 6 1,11.354166,44.498535,1,spira,498.0,3312,19900,VIA G.BATTISTA DE ROLANDIS,A,N,342.0,"44.4985349106485, 11.3541657967424"
1,0.127 1.12 8 1,11.33897,44.495251,1,spira,1045.0,1016,5900,VIA CESARE BATTISTI,A,N,350.0,"44.4952505129043, 11.338970003537"
2,0.127 1.13 6 1,11.34642,44.491648,1,spira,130.0,1169,14700,VIA CASTIGLIONE,A,S,198.0,"44.4916483847646, 11.3464200565732"
3,0.127 1.14 4 1,11.339836,44.490116,1,spira,521.0,1050,59900,VIA URBANA,A,E,264.0,"44.4901162203284, 11.3398356513878"
4,0.127 1.15 2 1,11.343358,44.489507,1,spira,132.0,1064,25800,VIA GARIBALDI,A,N,347.0,"44.4895074220971, 11.3433581064329"


### Extract measures
extract measures for traffic as recorded by _spire_ (e.g. `time`,`value`)

In [None]:
df = mlrun.get_dataitem(DF_KEY).as_df()

In [None]:
keys = ['00:00-01:00',
         '01:00-02:00',
         '02:00-03:00',
         '03:00-04:00',
         '04:00-05:00',
         '05:00-06:00',
         '06:00-07:00',
         '07:00-08:00',
         '08:00-09:00',
         '09:00-10:00',
         '10:00-11:00',
         '11:00-12:00',
         '12:00-13:00',
         '13:00-14:00',
         '14:00-15:00',
         '15:00-16:00',
         '16:00-17:00',
         '17:00-18:00',
         '18:00-19:00',
         '19:00-20:00',
         '20:00-21:00',
         '21:00-22:00',
         '22:00-23:00',
         '23:00-24:00']
columns=['data','codice spira'] + keys

In [None]:
rdf = df[columns]

In [None]:
rdf.head()

Unnamed: 0,data,codice spira,00:00-01:00,01:00-02:00,02:00-03:00,03:00-04:00,04:00-05:00,05:00-06:00,06:00-07:00,07:00-08:00,...,14:00-15:00,15:00-16:00,16:00-17:00,17:00-18:00,18:00-19:00,19:00-20:00,20:00-21:00,21:00-22:00,22:00-23:00,23:00-24:00
0,2023-03-25,0.127 3.88 4 1,90,58,37,23,22,35,79,235,...,287,257,274,353,322,362,208,163,144,113
1,2023-03-25,0.127 3.89 2 1,81,52,37,20,14,38,69,147,...,252,253,270,269,235,249,174,123,108,117
2,2023-03-25,0.127 3.89 4 1,56,32,18,10,15,24,33,97,...,185,166,180,239,199,201,136,97,85,61
3,2023-03-25,0.127 3.92 2 1,132,56,43,37,46,78,160,357,...,631,673,708,829,959,823,543,327,279,363
4,2023-03-25,0.127 3.92 8 1,21,12,8,9,6,18,25,131,...,125,177,155,141,157,141,79,34,47,41


In [None]:
tdf = rdf.head()

In [None]:
key = '00:00-01:00'

In [None]:
k = key.split("-")[0]

In [None]:
xdf = tdf[['data','codice spira',key]]

In [None]:
xdf['time'] = xdf.data.apply(lambda x: x+' ' +k)
xdf['value'] = xdf[key]


A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy


In [None]:
xdf

Unnamed: 0,data,codice spira,00:00-01:00,time,value
0,2023-03-25,0.127 3.88 4 1,90,2023-03-25 00:00,90
1,2023-03-25,0.127 3.89 2 1,81,2023-03-25 00:00,81
2,2023-03-25,0.127 3.89 4 1,56,2023-03-25 00:00,56
3,2023-03-25,0.127 3.92 2 1,132,2023-03-25 00:00,132
4,2023-03-25,0.127 3.92 8 1,21,2023-03-25 00:00,21


In [None]:
vdf = xdf[['time','codice spira','value']]

In [None]:
vdf

Unnamed: 0,time,codice spira,value
0,2023-03-25 00:00,0.127 3.88 4 1,90
1,2023-03-25 00:00,0.127 3.89 2 1,81
2,2023-03-25 00:00,0.127 3.89 4 1,56
3,2023-03-25 00:00,0.127 3.92 2 1,132
4,2023-03-25 00:00,0.127 3.92 8 1,21


In [None]:
ls = []
for key in keys:
    k = key.split("-")[0]
    xdf = rdf[['data','codice spira',key]]
    xdf['time'] = xdf.data.apply(lambda x: x+' ' +k)
    xdf['value'] = xdf[key]
    vdf = xdf[['time','codice spira','value']]
    ls.append(vdf)


A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/

In [None]:
edf = pd.concat(ls)

In [None]:
edf

Unnamed: 0,time,codice spira,value
0,2023-03-25 00:00,0.127 3.88 4 1,90
1,2023-03-25 00:00,0.127 3.89 2 1,81
2,2023-03-25 00:00,0.127 3.89 4 1,56
3,2023-03-25 00:00,0.127 3.92 2 1,132
4,2023-03-25 00:00,0.127 3.92 8 1,21
...,...,...,...
129175,2023-05-07 23:00,0.127 4.86 6 1,8
129176,2023-05-07 23:00,0.127 4.90 4 1,390
129177,2023-05-07 23:00,0.127 4.92 2 1,50
129178,2023-05-07 23:00,0.127 4.95 4 1,34


write function to process and save in store

In [15]:
%%writefile "src/process-measures.py"

import mlrun
import pandas as pd


KEYS = ['00:00-01:00',
         '01:00-02:00',
         '02:00-03:00',
         '03:00-04:00',
         '04:00-05:00',
         '05:00-06:00',
         '06:00-07:00',
         '07:00-08:00',
         '08:00-09:00',
         '09:00-10:00',
         '10:00-11:00',
         '11:00-12:00',
         '12:00-13:00',
         '13:00-14:00',
         '14:00-15:00',
         '15:00-16:00',
         '16:00-17:00',
         '17:00-18:00',
         '18:00-19:00',
         '19:00-20:00',
         '20:00-21:00',
         '21:00-22:00',
         '22:00-23:00',
         '23:00-24:00']
COLUMNS=['data','codice spira']

@mlrun.handler(outputs=["dataset-measures"])
def process(context, di: mlrun.DataItem):
    df = di.as_df()
    rdf = df[COLUMNS+KEYS]
    ls = []
    for key in KEYS:
        k = key.split("-")[0]
        xdf = rdf[COLUMNS + [key]]
        xdf['time'] = xdf.data.apply(lambda x: x+' ' +k)
        xdf['value'] = xdf[key]
        ls.append(xdf[['time','codice spira','value']])
    edf = pd.concat(ls)
    return edf

Writing src/process-measures.py


register the function

In [16]:
project.set_function("src/process-measures.py", name="process-measures", kind="job", image="mlrun/mlrun", handler="process")

<mlrun.runtimes.kubejob.KubejobRuntime at 0x7f19efbe0a90>

and execute (locally)

In [17]:
project.run_function("process-measures", inputs={'di': DF_KEY}, local=True)

> 2023-06-23 09:14:25,428 [info] Storing function: {'name': 'process-measures-process', 'uid': '9055eaf1274340a4a215ad0ead39648d', 'db': 'http://mlrun-api:8080'}



A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/

project,uid,iter,start,state,name,labels,inputs,parameters,results,artifacts
demo-etl,...39648d,0,Jun 23 09:14:25,completed,process-measures-process,v3io_user=digitalhub-devkind=owner=digitalhub-devhost=coder-digitalhub-dev-vscode,di,,,dataset-measures





> 2023-06-23 09:14:27,068 [info] run executed, status=completed: {'name': 'process-measures-process'}


<mlrun.model.RunObject at 0x7f19e823d190>

inspect the resulting data artifact 

In [None]:
MDF_KEY = 'store://datasets/demo-etl/process-measures-process_dataset-measures'

In [None]:
mdf = mlrun.get_dataitem(MDF_KEY).as_df()

In [None]:
mdf.head()

Unnamed: 0,time,codice spira,value
0,2023-03-25 00:00,0.127 3.88 4 1,90
1,2023-03-25 00:00,0.127 3.89 2 1,81
2,2023-03-25 00:00,0.127 3.89 4 1,56
3,2023-03-25 00:00,0.127 3.92 2 1,132
4,2023-03-25 00:00,0.127 3.92 8 1,21


# Workflow
Define a simple workflow which will execute all the ETL steps by composing functions

In [18]:
%%writefile "pipeline.py"

from kfp import dsl
import mlrun

URL = "https://opendata.comune.bologna.it/api/explore/v2.1/catalog/datasets/rilevazione-flusso-veicoli-tramite-spire-anno-2023/exports/csv?lang=it&timezone=Europe%2FRome&use_labels=true&delimiter=%3B"


@dsl.pipeline(name="Demo ETL pipeline")
def pipeline():
    project = mlrun.get_current_project()

    # run downloader
    downloader = project.run_function("download-data",inputs={'url':URL},outputs=["dataset"])

    # process spire
    process_spire = project.run_function("process-spire",inputs={'di': downloader.outputs["dataset"]})

    # process measures
    process_measures = project.run_function("process-measures",inputs={'di': downloader.outputs["dataset"]})
  

Writing pipeline.py


register the workflow 

In [19]:
project.set_workflow("pipeline","./pipeline.py", handler="pipeline")

and run (remote)

In [20]:
project.run("pipeline")



> 2023-06-23 09:14:34,316 [info] submitted pipeline demo-etl-pipeline 2023-06-23 09-14-34 id=0c6c798a-53c7-4f43-b495-2039b462bfe8
> 2023-06-23 09:14:34,317 [info] Pipeline run id=0c6c798a-53c7-4f43-b495-2039b462bfe8, check UI for progress


> 2023-06-23 09:14:34,347 [info] started run workflow demo-etl-pipeline with run id = '0c6c798a-53c7-4f43-b495-2039b462bfe8' by kfp engine


0c6c798a-53c7-4f43-b495-2039b462bfe8

# 3. Expose datasets as API
Define a simple api to expose data as REST.


In [29]:
%%writefile 'src/api.py'

import mlrun
import pandas as pd
import os

DF_URL = os.environ["DF_URL"]
df = None


def init_context(context):
    global df
    context.logger.info("retrieve data from {}".format(DF_URL))
    di = mlrun.run.get_dataitem(DF_URL)
    df = di.as_df()


def handler(context, event):
    global df
    if df is None:
        return context.Response(
            body="", headers={}, content_type="application/json", status_code=500
        )

    # mock REST api
    method = event.method
    path = event.path
    fields = event.fields

    id = False

    # pagination
    page = 0
    pageSize = 50

    if "page" in fields:
        page = int(fields['page'])

    if "size" in fields:
        pageSize = int(fields['size'])

    if page < 0:
        page = 0

    if pageSize < 1:
        pageSize = 1

    if pageSize > 100:
        pageSize = 100

    start = page * pageSize
    end = start + pageSize
    total = len(df)

    if end > total:
        end = total

    ds = df.iloc[start:end]
    json = ds.to_json(orient="records")

    res = {"data": json, "page": page, "size": pageSize, "total": total}

    return context.Response(
        body=res, headers={}, content_type="application/json", status_code=200
    )


Overwriting src/api.py


register the function

In [30]:
api_fn = project.set_function("src/api.py", name="api", kind="nuclio", image="mlrun/mlrun", handler='handler')

configure the function for deploy

In [31]:
DF_KEY = 'store://datasets/demo-etl/download-data-downloader_dataset'

In [32]:
api_fn.set_env(name='DF_URL', value=DF_KEY)
api_fn.with_requests(mem='64M',cpu="250m")
api_fn.spec.replicas = 1

In [33]:
project.save()

<mlrun.projects.project.MlrunProject at 0x7f1a18429790>

deploy and make a request

In [34]:
api_fn.deploy()

> 2023-06-23 09:19:08,966 [info] Starting remote function deploy
2023-06-23 09:19:09  (info) Deploying function
2023-06-23 09:19:09  (info) Building
2023-06-23 09:19:09  (info) Staging files and preparing base images
2023-06-23 09:19:09  (info) Building processor image
2023-06-23 09:20:04  (info) Build complete
2023-06-23 09:20:12  (info) Function deploy complete
> 2023-06-23 09:20:19,407 [info] successfully deployed function: {'internal_invocation_urls': ['nuclio-demo-etl-api.mlrun.svc.cluster.local:8080'], 'external_invocation_urls': [':31630']}


'http://:31630'

In [37]:
res = api_fn.invoke("/?page=5&size=10")

> 2023-06-23 09:21:42,728 [info] invoking function: {'method': 'GET', 'path': 'http://nuclio-demo-etl-api.mlrun.svc.cluster.local:8080/?page=5&size=10'}


In [38]:
print(res)

{'data': '[{"data":"2023-03-25","codice spira":"1.10 1.9 4 2","00:00-01:00":146,"01:00-02:00":120,"02:00-03:00":58,"03:00-04:00":37,"04:00-05:00":36,"05:00-06:00":33,"06:00-07:00":50,"07:00-08:00":97,"08:00-09:00":178,"09:00-10:00":200,"10:00-11:00":264,"11:00-12:00":261,"12:00-13:00":318,"13:00-14:00":273,"14:00-15:00":204,"15:00-16:00":243,"16:00-17:00":234,"17:00-18:00":289,"18:00-19:00":293,"19:00-20:00":347,"20:00-21:00":354,"21:00-22:00":263,"22:00-23:00":268,"23:00-24:00":232,"id_uni":266,"Livello":1,"tipologia":"spira","codice":918.0,"codice arco":961,"codice via":50550,"Nome via":"VIA RIVA DI RENO","Nodo da":13570,"Nodo a":13540,"ordinanza":"4000\\/343434","stato":"A","codimpsem":294,"direzione":"SO","angolo":125.0,"longitudine":11.3313901059,"latitudine":44.4981528821,"geopoint":"44.4981528821138, 11.3313901059351","giorno settimana":"Sabato"},{"data":"2023-03-25","codice spira":"1.10 1.18 8 1","00:00-01:00":94,"01:00-02:00":50,"02:00-03:00":32,"03:00-04:00":27,"04:00-05:00":

In [41]:
rdf = pd.read_json(res['data'], orient='records')

In [42]:
rdf.head()

Unnamed: 0,data,codice spira,00:00-01:00,01:00-02:00,02:00-03:00,03:00-04:00,04:00-05:00,05:00-06:00,06:00-07:00,07:00-08:00,...,Nodo a,ordinanza,stato,codimpsem,direzione,angolo,longitudine,latitudine,geopoint,giorno settimana
0,2023-03-25,1.10 1.9 4 2,146,120,58,37,36,33,50,97,...,13540,4000/343434,A,294,SO,125,11.33139,44.498153,"44.4981528821138, 11.3313901059351",Sabato
1,2023-03-25,1.10 1.18 8 1,94,50,32,27,24,32,45,85,...,195,4000/343434,A,321,NE,286,11.336102,44.498543,"44.4985433372994, 11.3361015838849",Sabato
2,2023-03-25,1.11 0.127 6 1,250,194,99,73,31,33,53,171,...,18,4000/343434,A,322,SO,155,11.33575,44.493073,"44.4930734341144, 11.3357500770563",Sabato
3,2023-03-25,1.12 1.80 2 1,92,53,63,41,42,23,40,58,...,166,4000/343434,A,347,E,260,11.340702,44.495192,"44.4951923248476, 11.3407015529974",Sabato
4,2023-03-25,1.14 1.15 4 1,90,79,44,28,17,10,23,74,...,6047,4000/343434,A,92,E,272,11.341165,44.49001,"44.4900102626705, 11.3411645442514",Sabato
