In [1]:
# Ni ravno najlepš, ampak dela
# Vsi seti morajo biti pravilno razdeljeni po mapah. 
# Vsi csv-ji evidence reg. vozil za posamezno leto grejo v svojo mapo, ki ima za ime leto. 
# Set občin gre v mapo regije_in_obcine
# Csv-ji tehničnih pregledov grejo v mapo teh_pregledi_sample in vsak csv v mapo za svoje leto

import polars as pl # Pandas je bil prepočasen
from glob import glob
from tqdm import tqdm
from thefuzz import process, fuzz

In [2]:
def read_files_in_evidenca_registriranih_vozil(year):
    df = pl.DataFrame()
    files = glob(f"data/{year}/*.csv")
    if len(files) == 0:
        files = glob(f"data/{year}/*.txt")
    for file in files:
        partial_df = pl.read_csv(file, separator=";", encoding="mbcs", infer_schema_length=0)
        df = df.vstack(partial_df)   
    return df


def read_teh_pregledi_files_in(year):
    df = pl.DataFrame()
    if year == 2022:
        encoding = "mbcs"
    else:
        encoding = "utf-8-sig"
    
    files = glob(f"data/teh_pregledi_sample/{year}/*.csv")
    if len(files) == 0:
        files = glob(f"data/teh_pregledi_sample/{year}/*.txt")
    
    for file in files:
        partial_df = pl.read_csv(file, separator=";", encoding=encoding, infer_schema_length=0)
        df = df.vstack(partial_df)
    return df


def select_features(df, features):
    return df.select(features)


def remove_na_vins(df):
    df = df.drop_nulls(subset=["vin"])
    return df


def filter_by_vehicle_category(df):
    return df.filter(
        (pl.col("kategorija") == "M1") |
        (pl.col("kategorija") == "M1G"))


def filter_by_registered(df):
    return df.filter(pl.col("status_vozila") == "1")


def format_dates(df, features):
    for feature in features:
        df = df.with_columns(pl.col(feature).str.to_date("%d.%m.%Y", strict=False))
    return df
        

def add_geo_region(df):
    regije = pl.read_csv("data/regije_in_obcine/obcine_regije_map.csv", separator=";", infer_schema_length=0)
    df = df.with_columns(pl.col("uporabnik_obcina_oznaka").cast(pl.Int32))
    regije = regije.select(["Šifra", "Regija"])
    regije = regije.with_columns(pl.col("Šifra").cast(pl.Int32))
    regije = regije.rename({"Šifra": "uporabnik_obcina_oznaka"})
    
    regije = df.join(regije, on="uporabnik_obcina_oznaka", how="left")["Regija"]
    df = df.with_columns(
        pl.lit(regije).alias("uporabnik_regija")
    )
    df = df.drop("uporabnik_obcina_oznaka")
    return df
    

def extract_date_features(df):
    df = df.with_columns([
        (pl.col("prva_registracija").dt.year()).cast(pl.Int32, strict=False).alias("prva_registracija_leto"),
        (pl.col("prva_registracija_slo").dt.year()).cast(pl.Int32, strict=False).alias("prva_registracija_slo_leto"),
        (pl.col("prva_registracija").dt.month()).cast(pl.Int32, strict=False).alias("prva_registracija_mesec"),
        (pl.col("prva_registracija_slo").dt.month()).cast(pl.Int32, strict=False).alias("prva_registracija_slo_mesec")
    ])
    return df
    

def rename_features(df, mapper):
    df = df.rename(mapper)
    return df


def make_all_strings_lowercase(df):
    df = df.with_columns([
        pl.col(i).str.to_lowercase() for i in df.columns if df[i].dtype == pl.Utf8
    ])
    return df


def remove_duplicated_vins(df):
    df = df.unique(subset=["vin"], keep="first")
    return df


def remove_vins_with_lt_17chars(df):
    df = df.filter(
        pl.col("vin").str.n_chars() == 17
    )
    return df


def extras(df):
    goriva_rename_map = {
        "p": "bencin",
        "d": "dizel",
        "lpg": "utekočinjen naftni plin",
        "m": "mešanica",
        "cng": "komprimiran zemeljski plin",
        "h": "vodik",
        "lng": "utekočinjen zemeljski plin",
        "p/et": "bencin/etanol",
        "p/lpg": "bencin/utekočinjen naftni plin",
        "p/cng": "bencin/komprimiran zemeljski plin",
        "p/h": "bencin/vodik",
        "d/bd": "dizel/biodizel",
        "d/lpg": "dizel/utekočinjen naftni plin",
        "d/cng": "dizel/komprimiran zemeljski plin",
        "lng/d": "dvojno gorivo",
        "-": "ni goriva",
        "o": "ostalo",
        "ni": None,
        "xxx": None
    }
    
    drzave_rename_map = {
        "x": None,
        "koreja, republika": "južna koreja",
        "koreja, demokraticna ljudska republika": "severna koreja",
        "koreja, demokratična ljudska republika": "severna koreja",
        "iran (islamska republika)": "iran",
        "ceška republika": "češka republika",
        "kitajska ": "kitajska"
        }
    
    df = df.with_columns([
        pl.col("uporabnik_je_lastnik").map_dict({"da": "1", "ne": "0"}, default=pl.first()).cast(pl.Int32, strict=False),
        pl.col("uporabnik_obcina").map_dict({"neznana": None}, default=pl.first()),
        pl.col("oblika_nadgradnje").map_dict({"-1": None, "-": None}, default=pl.first()),
        pl.col("gorivo").map_dict(goriva_rename_map, default=pl.first()),
        pl.col("drzava_izvora").map_dict(drzave_rename_map, default=pl.first()),
        pl.col("prva_registracija_obmocje").map_dict({"neznano": None}, default=pl.first()),
        pl.col("masa").cast(pl.Float32, strict=False),
        pl.col("motor_v").cast(pl.Float32, strict=False),
        pl.col("motor_moc").cast(pl.Float32, strict=False),
        pl.col("km").cast(pl.Float32, strict=False),
        pl.col("uporabnik_starost").cast(pl.Int32, strict=False),
        pl.col("lastnik_starost").cast(pl.Int32, strict=False),
        (pl.col("barva_detail").str.split(" - ").list.get(1)).alias("barva_simple"),
        (pl.col("model_detail").str.split(" / ").list.get(0).str.replace("/", "").str.strip_chars()).alias("model_simple")
        
        
    ])
    
    df = df.with_columns(pl.when(pl.col("uporabnik_starost") < 18).then(pl.lit(None).alias("uporabnik_starost")).otherwise(pl.col("uporabnik_starost")))
    df = df.with_columns(pl.when(pl.col("lastnik_starost") < 18).then(pl.lit(None).alias("lastnik_starost")).otherwise(pl.col("lastnik_starost")))
    df = df.with_columns(pl.when((pl.col("masa") < 20) | (pl.col("masa") > 70000)).then(pl.lit(None).alias("masa")).otherwise(pl.col("masa")))
    df = df.with_columns(pl.when((pl.col("motor_v") <= 0) | (pl.col("motor_v") > 50000)).then(pl.lit(None).alias("motor_v")).otherwise(pl.col("motor_v")))
    df = df.with_columns(pl.when(pl.col("motor_moc") <= 0).then(pl.lit(None).alias("motor_moc")).otherwise(pl.col("motor_moc")))
    df = df.with_columns(pl.when(pl.col("km") > 3000000).then(pl.lit(None).alias("km")).otherwise(pl.col("km")))
    return df
    

def remove_text_errors(df, col, t, _scorer="normal"):
    if _scorer == "normal":
        scorer = fuzz.ratio
    if _scorer == "partial":
        scorer = fuzz.partial_ratio
        
    r_vred = df[col].value_counts(sort=True)[col]
    uporabljene = []
    map_dict = {}
    
    for i in tqdm(r_vred):
        if i not in uporabljene:
            res = process.extract(query=i, choices=r_vred, scorer=scorer)
            for r in res[1:]:
                if r[1] > t:
                    map_dict.update({r[0]: i})
                    uporabljene.append(r[0])
                    
    df = df.with_columns(pl.col(col).map_dict(map_dict, default=pl.first()))
    return df


def add_km_categories(df):
    df = df.with_columns(
        pl.when(
            pl.col("km") >= 300000)
        .then(pl.lit("+300000").alias("razpon_km") )
        .when(
            (pl.col("km") >= 200000) & (pl.col("km") < 300000)
        ).then(pl.lit("+200000").alias("razpon_km") )
        .when(
            (pl.col("km") >= 150000) & (pl.col("km") < 200000)
        ).then(pl.lit("+150000").alias("razpon_km") )
        .when(
            (pl.col("km") >= 100000) & (pl.col("km") < 150000)
        ).then(pl.lit("+100000").alias("razpon_km") )
        .when(
            (pl.col("km") >= 50000) & (pl.col("km") < 100000)
        ).then(pl.lit("+50000").alias("razpon_km") )
        .when(
            (pl.col("km") >= 10000) & (pl.col("km") < 50000)
        ).then(pl.lit("+10000").alias("razpon_km") )
        .when(
            (pl.col("km") >= 0) & (pl.col("km") < 10000)
        ).then(pl.lit("+0").alias("razpon_km") )
        .otherwise(pl.lit(None).alias("razpon_km")) 
    )
    return df


def add_prevrteni_km(df):
    _by_km = df.group_by("vin").agg([
        pl.col("km")
    ])
    _by_km = _by_km.with_columns(
    pl.when(
        pl.col("km") != pl.col("km").list.sort(descending=True))
    .then(pl.lit(1))
    .otherwise(pl.lit(0)).alias("prevrteni_km"))
    
    df = df.join(_by_km.select(["vin", "prevrteni_km"]), on="vin", how="left")


def replace_special_chars(df):
    df = df.with_columns([
        pl.col(i).str.replace("č", "c") for i in df.columns if df[i].dtype == pl.Utf8 
    ])
    df = df.with_columns([
        pl.col(i).str.replace("ž", "z") for i in df.columns if df[i].dtype == pl.Utf8 
    ])
    df = df.with_columns([
        pl.col(i).str.replace("š", "s") for i in df.columns if df[i].dtype == pl.Utf8 
    ])
    return df


def add_id(df, year):
    return df.with_columns((pl.col("vin") + str(year)).alias("id"))


def add_uvozeno(df):
    return df.with_columns((pl.when(pl.col("prva_registracija") < pl.col("prva_registracija_slo")).then(1).otherwise(0)).alias("uvozeno"))  

In [4]:
def pipeline():
    years = [2022, 2021, 2020, 2019, 2018, 2017, 2016]
    
    features_evidenca_registracij = [
        "vin",
        "status_vozila",
        "znamka",
        "model_detail",
        "kategorija",
        "oblika_nadgradnje",
        "drzava_izvora",
        "masa",
        "gorivo",
        "barva_detail",
        "motor_v",
        "motor_moc",
        "km",
        "prva_registracija",
        "prva_registracija_slo",
        "prva_registracija_obmocje",
        "uporabnik_p_f",
        "uporabnik_obcina_oznaka",
        "uporabnik_obcina",
        "uporabnik_starost",
        "lastnik_starost",
        "uporabnik_spol",
        "uporabnik_je_lastnik",
        "lastnik_p_f",
    ]
        
    features_teh_pregledi = [
        "vin",
        "teh_pregled_datum",
        "teh_pregled_status",
        "teh_pregled_izv_enota"
    ]
    
    # Pred letom 2018 so nekateri stolpci drugače poimenovani
    rename_2018_evidenca_registracij = {
        "D.1-Znamka": "D1-Znamka",
        "D.3-Komerc. oznaka": "D3-Komerc oznaka",
        "D.4.2-Drzava (opis)": "D42-Drzava (opis)",
        "P.1.3-Vrsta goriva (oznaka)": "P13-Vrsta goriva (oznaka)",
        "P.1.1-Delovna prostornina": "P11-Delovna prostornina",
        "C1.3-Obcina uporabnika vozila (oznaka)": "C13-Obcina uporabnika vozila (oznaka)",
        "C1.3-Obcina uporabnika vozila (opis)": "C13-Obcina uporabnika vozila (opis)",
        "P.1.2-Nazivna moc": "P12-Nazivna moc"
    }
    
    rename_evidenca_registracij = {
        "E-VIN": "vin",
        "Status vozila (id)": "status_vozila",
        "D1-Znamka": "znamka",
        "D3-Komerc oznaka": "model_detail",
        "J-Kategorija in vrsta vozila (oznaka)": "kategorija",
        "X-Oblika nadgradnje (opis)": "oblika_nadgradnje",
        "D42-Drzava (opis)": "drzava_izvora",
        "G-Masa vozila": "masa",
        "P13-Vrsta goriva (oznaka)": "gorivo",
        "R-Barva vozila (opis)": "barva_detail",
        "P11-Delovna prostornina": "motor_v",
        "P12-Nazivna moc":"motor_moc",
        "Prevozeni kilometri milje": "km",
        "B-Datum prve registracije vozila": "prva_registracija",
        "2A-Datum prve registracije vozila v SLO": "prva_registracija_slo",
        "4A-Registrsko obmocje tablice prve registracije": "prva_registracija_obmocje",
        "C-Ali je uporabnik pravna ali fizicna oseba": "uporabnik_p_f",
        "C13-Obcina uporabnika vozila (opis)": "uporabnik_obcina",
        "C13-Obcina uporabnika vozila (oznaka)": "uporabnik_obcina_oznaka",
        "C-Starost uporabnika vozila": "uporabnik_starost",
        "C-Spol uporabnika (ce gre za fizicno osebo)": "uporabnik_spol",
        "C-Ali je uporabnik tudi lastnik vozila": "uporabnik_je_lastnik",
        "C2-Ali je lastnik pravna ali fizicna oseba": "lastnik_p_f",
        "C2-Starost lastnika vozila": "lastnik_starost",
    }
    
    rename_teh_pregledi = {
        "VIN": "vin",
        "DATUM_PREGLEDA": "teh_pregled_datum", 
        "TEHNICNI_PREGLED_STATUS": "teh_pregled_status", 
        "IZVAJALNA_ENOTA_OPIS": "teh_pregled_izv_enota"
    }
    
    
    df_main = pl.DataFrame()
    # Za vsako leto
    for year in tqdm(years):
            
        # Naloži in uredi set registriranih vozil
        df = read_files_in_evidenca_registriranih_vozil(year)
        try:
            df = rename_features(df, rename_evidenca_registracij)
        except (pl.exceptions.ColumnNotFoundError, pl.exceptions.SchemaFieldNotFoundError):
            df = rename_features(df, rename_2018_evidenca_registracij)
            df = rename_features(df, rename_evidenca_registracij)
        
        df = select_features(df, features_evidenca_registracij)
        df = add_geo_region(df)
        df = format_dates(df, ["prva_registracija", "prva_registracija_slo"])
        df = remove_na_vins(df)
        df = filter_by_vehicle_category(df)
        df = filter_by_registered(df)
        df = remove_duplicated_vins(df)
        df = make_all_strings_lowercase(df)
        df = extras(df)

        # Naloži in uredi set tehničnih pregledov 
        df_tp = read_teh_pregledi_files_in(year)
        df_tp = rename_features(df_tp, rename_teh_pregledi)
        df_tp = select_features(df_tp, features_teh_pregledi)
        df_tp = format_dates(df_tp, ["teh_pregled_datum"])
        df_tp = make_all_strings_lowercase(df_tp)
        df_tp = df_tp.sort(by="teh_pregled_datum")
        
        # Obdrži samo zadnji pregled v letu - če je imel nekdo več tehničnih pregledov
        df_tp = df_tp.unique(subset=["vin"], keep="last")
        
        # Obdrži vse teh preglede vozila v določenem letu
        #df_tp = df_tp.group_by(by="vin").agg([
        #    pl.col("teh_pregled_datum"),
        #    pl.col("teh_pregled_status"),
        #    pl.col("teh_pregled_izv_enota")
        #])


        # Združi evidenco registriranih vozil s tehničnimi pregledi
        df = df.join(df_tp, on="vin", how="left")
        df = replace_special_chars(df)
        df = add_uvozeno(df)
        df = df.with_columns(pl.lit(year).alias("leto_zapisa"))
        
        df_main = df_main.vstack(df)
        
    # Primerjanje podobnosti in tole odpravljanje napak je počasno
    df_main = remove_text_errors(df_main, col="model_simple", t=90)
    df_main = remove_text_errors(df_main, col="znamka", t=90)
    
    return df_main

In [5]:
df = pipeline()

100%|████████████████████████████████████████████████████████████████████████████████████| 7/7 [01:50<00:00, 15.79s/it]
100%|████████████████████████████████████████████████████████████████████████████| 10528/10528 [05:04<00:00, 34.60it/s]
100%|███████████████████████████████████████████████████████████████████████████████| 361/361 [00:00<00:00, 623.11it/s]


In [6]:
df.head()

vin,status_vozila,znamka,model_detail,kategorija,oblika_nadgradnje,drzava_izvora,masa,gorivo,barva_detail,motor_v,motor_moc,km,prva_registracija,prva_registracija_slo,prva_registracija_obmocje,uporabnik_p_f,uporabnik_obcina,uporabnik_starost,lastnik_starost,uporabnik_spol,uporabnik_je_lastnik,lastnik_p_f,uporabnik_regija,barva_simple,model_simple,teh_pregled_datum,teh_pregled_status,teh_pregled_izv_enota,uvozeno,leto_zapisa
str,str,str,str,str,str,str,f32,str,str,f32,f32,f32,date,date,str,str,str,i32,i32,str,i32,str,str,str,str,date,str,str,i32,i32
"""xmclndg3a4f017…","""1""","""mitsubishi""","""mitsubishi / s…","""m1""","""vozilo z dvizn…","""nizozemska""",1160.0,"""bencin""","""kovinski - zel…",1584.0,72.0,156000.0,2005-01-04,2005-01-04,"""murska sobota""","""f""","""lendava""",58,58.0,"""m""",1,"""f""","""pomurska""","""zelena""","""mitsubishi""",2022-12-19,"""pogojno brezhi…","""upravna enota …",0,2022
"""wvwzzz3cz6e235…","""1""","""volkswagen""","""passat / varia…","""m1""","""karavan""","""nemcija""",1687.0,"""dizel""","""kovinski - crn…",1968.0,103.0,425771.0,2006-05-26,2009-11-09,"""ljubljana""","""f""","""ivancna gorica…",29,29.0,"""m""",1,"""f""","""osrednjesloven…","""crna""","""passat""",2022-08-17,"""brezhiben""","""pan jan d.o.o.…",1,2022
"""wbapg31010vj40…","""1""","""bmw""","""320 / / i""","""m1""","""limuzina""","""nemcija""",1445.0,"""bencin""","""kovinski - mod…",1995.0,125.0,124400.0,2009-10-19,2009-10-19,"""celje""","""f""","""velenje""",55,55.0,"""z""",1,"""f""","""savinjska""","""modra""","""320""",2022-10-08,"""brezhiben""","""avto velenje t…",0,2022
"""nlhdn51aanz112…","""1""","""hyundai""","""i10 / 1.0 /""","""m1""","""vozilo z dvizn…","""turcija""",1044.0,"""bencin""","""navaden - bela…",998.0,49.299999,,2021-11-30,2021-11-30,"""ljubljana""","""f""","""menges""",39,,"""z""",0,"""p""","""osrednjesloven…","""bela""","""i10""",,,,0,2022
"""wvgzzz1tzew064…","""1""","""volkswagen""","""touran / 1.6 /…","""m1""","""karavan""","""nemcija""",1544.0,"""dizel""","""kovinski - rja…",1598.0,77.0,134538.0,2014-03-24,2014-03-24,"""kranj""","""f""","""gorenja vas-po…",42,42.0,"""m""",1,"""f""","""gorenjska""","""rjava""","""touran""",2022-03-22,"""brezhiben""","""loski center d…",0,2022


In [35]:
df.write_parquet("dataset_v1.parquet")