In [1]:
import mlrun
import os
import pandas as pd

In [2]:
project = mlrun.get_or_create_project("daticomuni", context="./")

> 2024-04-16 13:47:29,011 [info] Loading project from path: {'project_name': 'daticomuni', 'path': './'}
> 2024-04-16 13:47:29,504 [info] Project loaded successfully: {'project_name': 'daticomuni', 'path': './', 'stored_in_db': True}


In [3]:
new_folder = 'src'
if not os.path.exists(new_folder):
    os.makedirs(new_folder)


In [4]:
%%writefile "src/convert-all.py"

import mlrun
import pandas as pd

@mlrun.handler()
def convert_all(context, source_url_base: str):
    # direct processing, no actions required
    for ds_name in ["azioni", "campi", "macroambiti", "piani", "tassonomia"]:
        source_url = source_url_base + ds_name + ".txt"
        input_data = mlrun.get_dataitem(source_url)
        df = input_data.as_df(format="csv", encoding="windows-1251", delimiter=";")
        df.reset_index(drop=True, inplace=True)
        context.log_dataset(ds_name, df=df, index=False)

    # comuni: process name and dates
    source_url = source_url_base + "comuni" + ".txt"
    input_data = mlrun.get_dataitem(source_url)
    df = input_data.as_df(format="csv", encoding="windows-1251", delimiter=";")
    df["comune"] = df["NomeOrganizzazione"].str.replace("COMUNE DI ", "").str.upper()
    df["Data_det_assegnazione"] = pd.to_datetime(df["Data_det_assegnazione"], format="%d/%m/%Y %H:%M:%S", errors="ignore")
    df["Data_det_revoca"] = df["Data_det_revoca"].fillna("")
    df["Data_det_revoca"] = pd.to_datetime(df["Data_det_revoca"], format="%d/%m/%Y %H:%M:%S", errors="ignore")
    df.reset_index(drop=True, inplace=True)
    context.log_dataset("comuni", df=df, index=False)

    # valutazioni: process dates
    source_url = source_url_base + "valutazioni" + ".txt"
    input_data = mlrun.get_dataitem(source_url)
    df = input_data.as_df(format="csv", encoding="windows-1251", delimiter=";")
    df["data_pub"] = pd.to_datetime(df["data_pub"], format="%d/%m/%Y %H:%M:%S", errors="ignore")
    df.reset_index(drop=True, inplace=True)
    context.log_dataset("valutazioni", df=df, index=False)

Overwriting src/convert-all.py


In [5]:
project.set_function("src/convert-all.py", "convert-all", kind="job", image="mlrun/mlrun", handler="convert_all")
project.save()



<mlrun.projects.project.MlrunProject at 0x7f434e22dee0>

In [6]:
source_url_base = "s3://datalake/projects/daticomuni/base/"
project.run_function("convert-all", params={"source_url_base": source_url_base})

> 2024-04-16 13:47:29,600 [info] Storing function: {'name': 'convert-all-convert-all', 'uid': 'e56c08fb544f49ae98095de729217665', 'db': 'http://mlrun-api:8080'}
> 2024-04-16 13:47:29,740 [info] Job is running in the background, pod: convert-all-convert-all-prb2v
> 2024-04-16 13:47:34,953 [info] Run execution finished: {'status': 'completed', 'name': 'convert-all-convert-all'}
final state: completed


project,uid,iter,start,state,name,labels,inputs,parameters,results,artifacts
daticomuni,...217665,0,Apr 16 13:47:33,completed,convert-all-convert-all,v3io_user=testkind=jobowner=testmlrun/client_version=1.4.0mlrun/client_python_version=3.9.13host=convert-all-convert-all-prb2v,,source_url_base=s3://datalake/projects/daticomuni/base/,,azionicampimacroambitipianitassonomiacomunivalutazioni





> 2024-04-16 13:47:36,161 [info] Run execution finished: {'status': 'completed', 'name': 'convert-all-convert-all'}


<mlrun.model.RunObject at 0x7f434dc7d130>