In [1]:
# UOC - Visualització de dades - PRA2 
# Preparació de dades: importa TXT i exporta un CSV
# @author: José V. Grimalt
# 2022-01-15

# import packages
import re
import collections
import pandas as pd
from datetime import datetime, timedelta

# noms dels fitxers
file_municipis = "municipios_pob_2022-01-01.csv"  # dades de població dels municipis
file_fieldnames = "dades-camps.csv"               # dades dels camps del fitxer de matriculacions


In [2]:
# definició de funció per llevar accents

def remove_accents(word):
    "Removes common accent characters, upper form. Uses: regex."
    new_word = word.upper()
    new_word = re.sub(r'[àáÀÁ]', 'A', new_word)
    new_word = re.sub(r'[èéÈÉ]', 'E', new_word)
    new_word = re.sub(r'[ìíïÌÍÏ]', 'I', new_word)
    new_word = re.sub(r'[òóÒÓ]', 'O', new_word)
    new_word = re.sub(r'[ùúüÙÚÜ]', 'U', new_word)
    new_word = re.sub(r'[\-\']', ' ', new_word)
    return new_word

def last_day_of_month(input_date):
    "Return last day of month from given date"
    next_month = input_date.replace(day=28) + timedelta(days=4)
    return next_month - timedelta(days=next_month.day)

# càrrega dades sobre els camps del fitxer de matriculacions
fields = pd.read_csv(file_fieldnames, index_col=0).reset_index()

widths = list(fields.Longitud)

# càrrega de dades de municipis
municipis = pd.read_csv(file_municipis, index_col=0).reset_index()

# Dades dels 151 municipis amb major població
dict_municipis = collections.defaultdict(lambda: 'Otros')
dict_municipis.update({
    remove_accents(x[0]): x[1] 
        for x in municipis.loc[0:151, ["Municipio", "Nombre"]].values
})

# Selecció de camps
fields_list = [
    "MARCA_ITV", "MODELO_ITV", "COD_TIPO", "COD_PROPULSION_ITV", 
    "LOCALIDAD_VEHICULO", "COD_PROVINCIA_VEH", "MUNICIPIO", 
    "CLAVE_TRAMITE", "IND_NUEVO_USADO", 
    "PERSONA_FISICA_JURIDICA", "RENTING", 
    "CO2_ITV", "CATEGORÍA_VEHÍCULO_ELÉCTRICO", "AUTONOMÍA_VEHÍCULO_ELÉCTRICO", 
    "FEC_PROCESO", "FEC_MATRICULA", "FEC_TRAMITACION"
]

# paràmetres i fitxers
file_municipis = "municipios_pob_2022-01-01.csv"  # dades de població dels municipis
file_fieldnames = "dades-camps.csv"               # informació dels camps del fitxer 

file_input_txt = [                                # fitxer de matriculacions
    f"txt/export_mensual_mat_{year}{month:02d}.zip" 
        for year in range(2018, 2023) for month in range(1, 13) 
]
file_output_csv = [                               # fitxer de dades agrupades
    f"csv/matriculacion_{year}{month:02d}.csv" 
        for year in range(2018, 2023) for month in range(1, 13)
]

# TTIPO_VEHICULO
dict_tipo = {
    "0": "CAMION",
    "1": "CAMION ARTICULADO",
    "2": "FURG./TODOTERR.",
    "3": "AUTOBUS",
    "4": "TURISMO",
    "5": "MOTOCICLETA/QUAD",
    "6": "COCHE DIVERS.FUNC.",
    "7": "MAQUINARIA",
    "8": "TRACTOR",
    "9": "CICLOMOTOR",
    "E": "EXTRANJERO",
    "R": "REMOLQ./MAQ. AGRÍCOLA",
    "S": "SEMIREMOLQUE",
}

# COD_PROPULSION_ITV
dict_propulsion = {
    "0": "Gasolina",
    "1": "Diesel",
    "2": "Eléctrico",
    "3": "Otros",
    "4": "Butano",
    "5": "Solar",
    "6": "GLP", # "Gas Licuado de Petróleo",
    "7": "GNC", # Gas Natural Comprimido",
    "8": "GNL", # "Gas Natural Licuado",
    "9": "Hidrógeno",
    "A": "Biometano",
    "B": "Etanol",
    "C": "Biodiesel",
    "99": "[N/A]",
}

# camp COD_PROVINCIA_VEH
dict_provincia = {
    "A"  : "Alacant",
    "AB" : "Albacete",
    "AL" : "Almería",
    "AV" : "Ávila",
    "B"  : "Barcelona",
    "BA" : "Badajoz",
    "BI" : "Bizkaia",
    "BU" : "Burgos",
    "C"  : "Coruña (A)",
    "CA" : "Cádiz",
    "CC" : "Cáceres",
    "CE" : "Ceuta",
    "CO" : "Córdoba",
    "CR" : "Ciudad Real",
    "CS" : "Castelló",
    "CU" : "Cuenca",
    "DS" : "Desconocido",
    "EX" : "Extranjero",
    "GC" : "Palmas (Las)",
    "GI" : "Girona",
    "GR" : "Granada",
    "GU" : "Guadalajara",
    "H"  : "Huelva",
    "HU" : "Huesca",
    "J"  : "Jaén",
    "L"  : "Lleida",
    "LE" : "León",
    "LO" : "Rioja (La)",
    "LU" : "Lugo",
    "M"  : "Madrid",
    "MA" : "Málaga",
    "ML" : "Melilla",
    "MU" : "Murcia",
    "NA" : "Navarra",
    "O"  : "Asturias",
    "OU" : "Ourense",
    "P"  : "Palencia",
    "IB" : "Balears (Illes)",
    "PO" : "Pontevedra",
    "S"  : "Cantabria",
    "SA" : "Salamanca",
    "SE" : "Sevilla",
    "SG" : "Segovia",
    "SO" : "Soria",
    "SS" : "Gipuzkoa",
    "T"  : "Tarragona",
    "TE" : "Teruel",
    "TF" : "Santa Cruz de Tenerife",
    "TO" : "Toledo",
    "V"  : "València",
    "VA" : "Valladolid",
    "VI" : "Araba",
    "Z"  : "Zaragoza",
    "ZA" : "Zamora",
}

# CLAVE_TRAMITE
dict_tramite = {
    "1": "Matriculación ordinaria", 
    "2": "Transferencia", 
    "3": "Baja definitiva", 
    "4": "Baja definitiva Plan Renove", 
    "5": "Rematriculación", 
    "6": "Baja temporal", 
    "7": "Baja definitiva Exportación/Tránsito comunitario", 
    "8": "Matriculación vehículo especial", 
    "9": "Matriculación temporal", 
    "A": "Prorroga matricula temporal", 
    "B": "Matrícula temporal a definitiva", 
}

# IND_NUEVO_USADO
dict_nuevousado = {"N": "Nuevo", "U": "Usado"}

# PERSONA_FISICA_JURIDICA
dict_persona = {"D": "Particular", "X": "Empresa"}

# RENTING
dict_renting = {"N": "NO", "S": "SI"}

# PROPULSIÓN / ELÉCTRICO --> MOTOR / COMBUSTIBLE / TECNOLOGÍA
# dict_motor_combustible = dict()
dict_motor_combustible = collections.defaultdict(lambda: ("Combustión", "Combustión G/D","Combustión"))
dict_motor_combustible.update({
    ("[N/A]", "[N/A]"):    ("Combustión", "Combustión G/D", "Combustión"),
    ("Diesel", "HEV"):     ("Combustión", "Diesel",    "HEV"),
    ("Diesel", "PHEV"):    ("Eléctrico",  "Diesel",    "PHEV"),
    ("Diesel", "REEV"):    ("Eléctrico",  "Diesel",    "REEV"),
    ("Diesel", "[N/D]"):   ("Combustión", "Diesel",    "Combustión"),
    ("Gasolina", "HEV"):   ("Combustión", "Gasolina",  "HEV"),
    ("Gasolina", "PHEV"):  ("Eléctrico",  "Gasolina",  "PHEV"),
    ("Gasolina", "REEV"):  ("Eléctrico",  "Gasolina",  "REEV"),
    ("Gasolina", "[N/D]"): ("Combustión", "Gasolina",  "Combustión"),
    ("GLP", "HEV"):        ("Combustión", "GLP",       "HEV"),
    ("GLP", "PHEV"):       ("Eléctrico",  "GLP",       "PHEV"),
    ("GLP", "REEV"):       ("Eléctrico",  "GLP",       "REEV"),
    ("GLP", "[N/D]"):      ("Combustión", "GLP",       "GLP"),
    ("GNC", "HEV"):        ("Combustión", "GNC",       "HEV"),
    ("GNC", "PHEV"):       ("Eléctrico",  "GNC",       "PHEV"),
    ("GNC", "REEV"):       ("Eléctrico",  "GNC",       "REEV"),
    ("GNC", "[N/D]"):      ("Combustión", "GNC",       "GNC"),
    ("GNL", "HEV"):        ("Combustión", "GNL",       "HEV"),
    ("GNL", "PHEV"):       ("Eléctrico",  "GNL",       "PHEV"),
    ("GNL", "REEV"):       ("Eléctrico",  "GNL",       "REEV"),
    ("GNL", "[N/D]"):      ("Combustión", "GNL",       "GNL"),
    ("Eléctrico", "BEV"):  ("Eléctrico",  "Eléctrico", "BEV"),
    ("Eléctrico", "HEV"):  ("Combustión", "Gasolina",  "HEV"),
    ("Eléctrico", "PHEV"): ("Eléctrico",  "Gasolina",  "PHEV"),
    ("Eléctrico", "REEV"): ("Eléctrico",  "Gasolina",  "REEV"),
    ("Eléctrico", "[N/D]"):("Eléctrico",  "Eléctrico", "BEV"),
    ("Hidrógeno", "HEV"):  ("Eléctrico",  "Hidrógeno", "HEV"),
    ("Hidrógeno", "PHEV"): ("Eléctrico",  "Hidrógeno", "PHEV"),
    ("Hidrógeno", "REEV"): ("Eléctrico",  "Hidrógeno", "REEV"),
    ("Hidrógeno", "[N/D]"):("Eléctrico",  "Hidrógeno", "Hidrógeno"),
    ("Otros", "BEV"):      ("Eléctrico",  "Otros",     "BEV"),
    ("Otros", "HEV"):      ("Combustión", "Otros",     "HEV"),
    ("Otros", "PHEV"):     ("Eléctrico",  "Otros",     "PHEV"),
    ("Otros", "REEV"):     ("Eléctrico",  "Otros",     "REEV"),
    ("Otros", "[N/D]"):    ("Combustión", "Otros",     "Combustión"),
})

df_total = pd.DataFrame()
rows = collections.defaultdict(lambda: 0)


In [3]:
# Bucle principal per als fitxers mensuals

# for file_txt, file_csv in zip(file_input_txt, file_output_csv):
first = 0
last = len(file_input_txt) - 1
files = zip(file_input_txt[first:last+1], file_output_csv[first:last+1])
for file_txt, file_csv in files:
    print(f"- Reading file: '{file_txt}'")

    raw_data = pd.read_fwf(
        file_txt, compression='zip', skiprows=1, 
        names=fields.Campo, widths=widths, encoding='latin-1', 
    )
    print(f"  - rows in file: {len(raw_data)}")
    rows["TOTAL"] += len(raw_data)

    df = raw_data[fields_list].copy(deep=True)

    # transformar camp de data: FEC_PROCESO 
    date_fields = ["FEC_PROCESO", "FEC_MATRICULA", ]
    for field in date_fields:
        df[field] = pd.to_datetime(df.loc[:, field], format="%d%m%Y", errors='coerce')

    # comprovar valors nuls en camps de data
    # for field in date_fields:
    #     print(field, df.loc[:, [field]].isna().sum())

    # camp FEC_PROCESO -> FECHA, PERIODO
    # df["PERIODO"] = df["FEC_PROCESO"].dt.strftime('%Y-%m')
    # df["FECHA"] = df["FEC_PROCESO"].map(last_day_of_month)
    last_day = df["FEC_PROCESO"].max()
    df["PERIODO"] = last_day.strftime('%Y-%m')
    df["FECHA"] = last_day_of_month(last_day)

    # print( df.groupby(["FEC_PROCESO", "PERIODO", "FECHA"])[["FECHA"]].count(), )
    # print(df[df["PERIODO"]=="2022-01"][["FEC_PROCESO", "FEC_MATRICULA", "CLAVE_TRAMITE", ]])

    # camp COD_TIPO -> TIPO_VEHICULO
    field_old = "COD_TIPO"
    field_new = "TIPO_VEHÍCULO"
    df[field_new] = df[field_old].str[0]
    df = df[(df[field_new] < '5')]
    # fem la substitució:
    df = df.replace({field_new: dict_tipo})
    # print(df.groupby([field_new,])[field_new].count())
    rows["TIPO"] += len(df)
    
    # camp COD_PROPULSION_ITV
    field_old = "COD_PROPULSION_ITV"
    field_new = "PROPULSIÓN"
    # print(df.groupby(field_old)[field_old].count())
    
    # tractament de nulls
    if (df[field_old].dtypes == int) or (df[field_old].dtypes == float):
        df[field_new] = df[field_old].fillna(99).astype(int).astype(str)
    else:
        df[field_new] = df[field_old].fillna("99")
    # transformació
    df[field_new] = df[field_new].map(dict_propulsion)
    
    ######################################################################################
    # print(df.groupby(field_new)[field_new].count())

    # camp COD_PROVINCIA_VEH
    field_old = "COD_PROVINCIA_VEH"
    field_new = "PROVINCIA"
    # print("NA:", df[field_old].isna().sum())
    # print(df.groupby([field_old,])[field_old].count())
    df[field_new] = df[field_old].map(dict_provincia)
    # print(df[field_new])
    # print("PROVINCIA N/A:", df[field_new].isna().sum())
    # print(df.groupby(field_new)[field_new].count())

    # Nou camp: POBLACIÓN
    field_old = "MUNICIPIO"
    field_new = "POBLACIÓN"
    df[field_new] = df[field_old].map(dict_municipis)
    # print(df.groupby(field_new)[field_new].count())

    # camp CLAVE_TRAMITE
    # 1 Matriculación ordinaria y de ciclomotores
    # 5 Rematriculación
    # 9 Matriculación temporal
    # B Paso de matrícula temporal a definitiva
    field_old = "CLAVE_TRAMITE"
    field_new = "TRAMITE"
    # print(df.groupby([field_old,])[field_old].count())
    # Filtre: es quedem sols amb "Matriculación ordinaria" i "Paso de matrícula temporal a definitiva"
    df = df[(df[field_old]=='1') | (df[field_old]=='B')]
    # Valor per al nou camp:
    df[field_new] = df[field_old].map(dict_tramite)
    # print(df[field_new])
    # print(df.groupby(field_new)[field_new].count())
    rows["TRAMITE"] += len(df)

    # camp IND_NUEVO_USADO
    # N nuevo
    # U usado
    field_old = "IND_NUEVO_USADO"
    field_new = "NUEVO_USADO"
    # print(df.groupby([field_old,])[field_old].count())
    df[field_new] = df[field_old].map(dict_nuevousado)
    # print(df.groupby(field_new)[field_new].count())

    # camp PERSONA_FISICA_JURIDICA
    # D persona física
    # X persona jurídica
    field_old = "PERSONA_FISICA_JURIDICA"
    field_new = "PROPIETARIO"
    # print(df.groupby([field_old,])[field_old].count())
    df[field_new] = df[field_old].map(dict_persona)
    # print(df.groupby(field_new)[field_new].count())

    # nou camp EMISIONES : sí (tenim registre CO2), no (no tenim)
    # després podem posar a 0 el valors CO2 per als que no tenim emissions
    df["REG_EMISIONES"] = df["CO2_ITV"].isna()
    df["REG_EMISIONES"] = df["REG_EMISIONES"].map({True:"NO", False:"SÍ"})
    # print(df.groupby("REG_EMISIONES")["REG_EMISIONES"].count())
    # print(df.groupby("REG_EMISIONES")["CO2_ITV"].sum())
    # Convertim valors nulls a 0
    df["CO2_ITV"] = df["CO2_ITV"].fillna(0)

    # camp RENTING (N: no, S: sí)
    field = "RENTING"
    df[field] = df[field].map(dict_renting)
    # print("\nRenting:\n", df.groupby(field)[field].count())

    # camp CATEGORÍA_VEHÍCULO_ELÉCTRICO
    field_old = "CATEGORÍA_VEHÍCULO_ELÉCTRICO"
    field_new = "TIPO_ELECTRICO"
    df = df.rename(columns={field_old: field_new})
    # df.groupby([field_new,])[field_new].count()

    # camp AUTONOMÍA_VEHÍCULO_ELÉCTRICO  -> per a convertir a km. -> x / 100
    field = "AUTONOMÍA_VEHÍCULO_ELÉCTRICO"
    # df[field] = df[field].astype(int) / 100
    df[field] = df[field].fillna("000000")
    df[df[field].str[-2:] != "00"][field]
    if (df[field].dtypes == object):
        df[field] = df[field].str[:-2]
        df[field] = pd.to_numeric(df[field], errors='coerce').fillna(0)
    else: 
        df[field] = df[field].fillna(0) / 100
        df[field] = df[field] / 100
    # df.loc[:, "AUTONOMÍA_VEHÍCULO_ELÉCTRICO"].describe()

    # Elimminem outlayers (autonomía > 2000km)
    # print(df[df["AUTONOMÍA_VEHÍCULO_ELÉCTRICO"] > 2000].loc[:, ["MARCA_ITV", "MODELO_ITV", "AUTONOMÍA_VEHÍCULO_ELÉCTRICO"]])
    df = df[df["AUTONOMÍA_VEHÍCULO_ELÉCTRICO"] < 2000]

    # reanomenem les columnes
    df = df.rename(columns = {
        "TIPO_VEHÍCULO": "VEHÍCULO",
        "MARCA_ITV": "MARCA",
        "MODELO_ITV": "MODELO",
        "CO2_ITV": "CO2",
        "TIPO_ELECTRICO": "ELÉCTRICO",
        "AUTONOMÍA_VEHÍCULO_ELÉCTRICO": "AUTONOMÍA",
    })

    print(f"  - Files: {len(df)}")

    # Escollim els camps finals
    camps = [
        'FECHA', 'PERIODO', 'PROVINCIA', 'POBLACIÓN', 
        'VEHÍCULO', 'MARCA', 'MODELO', 'PROPULSIÓN', 
        'REG_EMISIONES', 'CO2', 'ELÉCTRICO', 'AUTONOMÍA', 
        'PROPIETARIO', 'NUEVO_USADO', 'RENTING', 
    ]
    df = df.loc[:, camps]

    # Comprovació de valors nuls
    print("  - Valors nuls:")
    v_nuls = df.isna().sum() 
    print(v_nuls[v_nuls>0])
    rows["N/A"] += len(v_nuls)

    # tractament de valors nuls:
    for field in ["MARCA", "MODELO", "ELÉCTRICO", "PROVINCIA", "RENTING", ]:
        df[field] = df[field].fillna("[N/D]")
    
    # Camps transformació: PROPULSIÓN / ELÉCTRICO --> MOTOR / COMBUSTIBLE / TECNOLOGÍA
    df["input"] = list(zip(df["PROPULSIÓN"], df["ELÉCTRICO"]))
    print(
        df.groupby(["PROPULSIÓN", "ELÉCTRICO"])["PROPULSIÓN"].count()
    )
    df["output"] = df["input"].map(dict_motor_combustible)
    df["MOTOR"] = df["output"].str[0]
    df["COMBUSTIBLE"] = df["output"].str[1]
    df["TECNOLOGÍA"] = df["output"].str[2]
    df = df.drop(columns=["input", "output"])
    print(
        df.groupby(["MOTOR", "COMBUSTIBLE", "TECNOLOGÍA"])["MOTOR"].count()
    )

    # df = df.drop(columns=["CANTIDAD"])
    df["CANTIDAD"] = 1

    # agrupar dades
    fields_group = [
        'FECHA', 'PERIODO', 'PROVINCIA', 'POBLACIÓN', 
        'VEHÍCULO', 'MOTOR', 'COMBUSTIBLE', 'TECNOLOGÍA',
        'REG_EMISIONES',  
    ]
    dades = df.groupby(fields_group)[["CO2", "CANTIDAD"]].sum().reset_index()
    total =  dades["CANTIDAD"].sum()
    dades["%"] = dades["CANTIDAD"] / total * 100
    print(f"  - Total rows: {len(dades)}")
    print(f"  - Total vehicles: {total}")
    dades.to_csv(file_csv, sep=",", index=False)
    rows["GROUP"] += len(dades)

for (k, v) in rows.items():
    print(f"{k}: {v}")


- Reading file: 'txt/export_mensual_mat_201801.zip'
  - rows in file: 148478
  - Files: 126259
  - Valors nuls:
PROVINCIA      1519
ELÉCTRICO    119362
dtype: int64
PROPULSIÓN  ELÉCTRICO
Butano      HEV              1
Diesel      HEV            104
            PHEV             1
            [N/D]        63902
Eléctrico   BEV            522
            HEV              6
            PHEV           309
            REEV            21
            [N/D]           35
GLP         HEV              3
            [N/D]          716
GNC         [N/D]          386
GNL         [N/D]            8
Gasolina    HEV           5864
            PHEV            64
            [N/D]        54311
Otros       HEV              2
[N/A]       [N/D]            4
Name: PROPULSIÓN, dtype: int64
MOTOR       COMBUSTIBLE     TECNOLOGÍA
Combustión  Combustión G/D  Combustión        5
            Diesel          Combustión    63902
                            HEV             104
            GLP             GLP          

In [4]:
# last_day = df["FEC_PROCESO"].max()
# print(
#     last_day, 
#     last_day.strftime('%Y-%m'),
#     last_day_of_month(last_day)
# )


In [5]:
# fields_resum = ['PERIODO',  'COMBUSTIBLE', 'TECNOLOGÍA', 'REG_EMISIONES', ]
# dades_resum = dades.groupby(fields_resum)[["CO2", "CANTIDAD"]].sum().reset_index()
# dades_resum


In [6]:
# dades_resum = dades.groupby(fields_resum)[["CO2", "CANTIDAD"]].agg(['count', 'mean', 'sum']).reset_index()
# dades_resum


In [7]:
df.groupby(field_old)[field_old].count()
df.isna().sum()

if df[field_old].dtypes == int:
    df[field_new] = df[field_old].fillna(99).astype(int).astype(str)
else:
    df[field_new] = df[field_old].fillna("99")
df.isna().sum()

df.groupby(field_new)[field_new].count()


KeyError: 'CATEGORÍA_VEHÍCULO_ELÉCTRICO'

In [None]:
if df[field_old].dtypes == int:
    df[field_new] = df[field_old].fillna(99).astype(int).astype(str)
elif df[field_old].dtypes == float:
    df[field_new] = df[field_old].fillna(99).astype(int).astype(str)
else:
    df[field_new] = df[field_old].fillna("99")

df[field_old]
