In [2]:
import os
import shutil

import requests

import time
import numpy as np

import pandas as pd
import polars as pl
import polars.selectors as cs


import duckdb

import pyarrow.parquet as pq

In [3]:
os.environ["POLARS_TEMP_DIR"] = "D:\\TEMP_DIR"
os.environ["TMPDIR"] = "D:\\TEMP_DIR"
os.environ["TEMP"] = "D:\\TEMP_DIR"
os.environ["TMP"] = "D:\\TEMP_DIR"

duckdb.sql("""
    SET temp_directory = "D:\\TEMP_DIR"
""")

In [3]:
import os

# Verificación de la variable de entorno
temp_path = os.environ.get("POLARS_TEMP_DIR")

if temp_path:
    print(f"✅ Polars usará: {temp_path}")
    if not os.path.exists(temp_path):
        print("⚠️ Advertencia: La carpeta no existe, Polars podría fallar o usar el default.")
else:
    print("❌ La variable no está seteada. Polars usará el disco C (Temp del sistema).")

✅ Polars usará: D:\TEMP_DIR


## <p style='text-align: center; text-decoration: underline; color: #10A0B4;'> Datos oficiales de la **Union Europea** </p>
[Origen de los datos](https://www.eea.europa.eu/en/datahub/datahubitem-view/fa8b1229-3db6-495d-b18e-9c9b3267c02b)

---

## Autos registrados desde **2010** hasta **2023**

---

#### El dataset contiene mas de **16 Gigas** de informacion (80.378.486 filas).

El gran tamano del conjunto de datos trae problemas:
- No puedo subirlo a GitHub (+100MB). 
- Lento procesamiento.

Vamos a resolver ambas cuentiones achicando el dataset de manera tal que el impacto por usar menos informacion sea minimo

---

El archivo pesa +16GB y tiene más de 80 millones de filas. Tarda +3 min en ejecutar. 

In [6]:
url = 'ignore_data.csv'
file_weight = os.path.getsize(url)
print('Database size: ', round((file_weight/1024)/1024/1024, 2), 'GB')

count = duckdb.sql("SELECT count(*) FROM ignore_data.csv").fetchone()[0]
print(f"El archivo .csv tiene {count} registros.")

duckdb.sql("SELECT * FROM 'ignore_data.csv' LIMIT 0").show() # 10 seg aprox. 
duckdb.sql("SUMMARIZE ignore_data.csv").show(max_rows=100) # 10 seg aprox. 
# pl.scan_parquet("ignore_data_complete_clean.parquet").describe().head(45) # 45 seg aprox.

Database size:  16.38 GB


FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

El archivo .csv tiene 80378486 registros.
┌───────┬─────────┬─────────┬─────────┬─────────┬─────────┬─────────┬─────────┬─────────┬─────────┬─────────┬─────────┬─────────┬─────────┬─────────┬───────┬────────┬───────┬──────────────┬──────────────┬────────┬──────────┬──────────┬─────────┬─────────┬──────────┬─────────┬───────────┬─────────┬───────────────┬───────────────┬────────┬───────┬─────────┬───────┬──────────────────────┬──────────────────┬─────────┬─────────┬─────────────────────┐
│  ID   │ Country │   VFN   │   Mp    │   Mh    │   Man   │   MMS   │   Tan   │    T    │   Va    │   Ve    │   Mk    │   Cn    │   Ct    │   Cr    │   r   │ m (kg) │  Mt   │ Enedc (g/km) │ Ewltp (g/km) │ W (mm) │ At1 (mm) │ At2 (mm) │   Ft    │   Fm    │ ec (cm3) │ ep (KW) │ z (Wh/km) │   IT    │ Ernedc (g/km) │ Erwltp (g/km) │   De   │  Vf   │ Status  │ year  │ Date of registration │ Fuel consumption │   ech   │  RLFI   │ Electric range (km) │
│ int64 │ varchar │ varchar │ varchar │ varchar │ varchar 

FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

┌──────────────────────┬─────────────┬────────────────────────────────────────────────────┬────────────────────────────────────────────────────────────────────┬───────────────┬──────────────────────┬──────────────────────┬────────────────────────┬──────────────────────┬──────────────────────┬──────────┬─────────────────┐
│     column_name      │ column_type │                        min                         │                                max                                 │ approx_unique │         avg          │         std          │          q25           │         q50          │         q75          │  count   │ null_percentage │
│       varchar        │   varchar   │                      varchar                       │                              varchar                               │     int64     │       varchar        │       varchar        │        varchar         │       varchar        │       varchar        │  int64   │  decimal(9,2)   │
├──────────────────────┼───────

Tabla informacional de códigos y datatypes

<table>
<tbody>
<tr>
    <td>Name</td>
    <td>Definition</td>
    <td>Datatype </td>

</tr>
<tr>
    <td>ID</td>
    <td>Identification number.</td>
    <td>integer </td>
</tr>
<tr>
    <td>MS</td>
    <td>Member state.</td>
    <td>varchar(2) </td>
</tr>
<tr>
    <td>Mp</td>
    <td>Manufacturer pooling.</td>
    <td>varchar(50) </td>
</tr>
<tr>
    <td>VFN</td>
    <td>Vehicle family identification number.</td>
    <td>varchar(25) </td>
</tr>
<tr>
    <td>Mh</td>
    <td>Manufacturer name EU standard denomination .</td>
    <td>varchar(50) </td>
</tr>
<tr>
    <td>Man</td>
    <td>Manufacturer name OEM declaration.</td>
    <td>varchar(50) </td>
</tr>
<tr>
    <td>MMS</td>
    <td>Manufacturer name MS registry denomination .</td>
    <td>varchar(125) </td>
</tr>
<tr>
    <td>TAN</td>
    <td>Type approval number.</td>
    <td>varchar(50) </td>
</tr>
<tr>
    <td>T</td>
    <td>Type.</td>
    <td>varchar(25) </td>
</tr>
<tr>
    <td>Va</td>
    <td>Variant.</td>
    <td>varchar(25) </td>
</tr>
<tr>
    <td>Ve</td>
    <td>Version.</td>
    <td>varchar(35) </td>
</tr>
<tr>
    <td>Mk</td>
    <td>Make.</td>
    <td>varchar(25) </td>
</tr>
<tr>
    <td>Cn</td>
    <td>Commercial name.</td>
    <td>varchar(50) </td>
</tr>
<tr>
    <td>Ct</td>
    <td>Category of the vehicle type approved.</td>
    <td>varchar(5)  </td>
</tr>
<tr>
    <td>Cr</td>
    <td>Category of the vehicle registered.</td>
    <td>varchar(5)  </td>
</tr>
<tr>
    <td>M (kg)</td>
    <td>Mass in running order Completed/complete vehicle .</td>
    <td>integer </td>
</tr>
<tr>
    <td>Mt</td>
    <td>WLTP test mass.</td>
    <td>integer</td>
</tr>
<tr>
    <td>Enedc (g/km)</td>
    <td>Specific CO2 Emissions (NEDC).</td>
    <td>integer </td>
</tr>
<tr>
    <td>Ewltp (g/km)</td>
    <td>Specific CO2 Emissions (WLTP).</td>
    <td>integer </td>
</tr>
<tr>
    <td>W (mm)</td>
    <td>Wheel Base.</td>
    <td>integer </td>
</tr>
<tr>
    <td>At1 (mm)</td>
    <td>Axle width steering axle.</td>
    <td>integer </td>
</tr>
<tr>
    <td>At2 (mm)</td>
    <td>Axle width other axle.</td>
    <td>integer </td>
</tr>
<tr>
    <td>Ft</td>
    <td>Fuel type.</td>
    <td>varchar(25) </td>
</tr>
<tr>
    <td>Fm</td>
    <td>Fuel mode.</td>
    <td>varchar(1)  </td>
</tr>
<tr>
    <td>Ec (cm3)</td>
    <td>Engine capacity.</td>
    <td>integer </td>
</tr>
<tr>
    <td>Ep (KW)</td>
    <td>Engine power.</td>
    <td>integer </td>
</tr>
<tr>
    <td>Z (Wh/km)</td>
    <td>Electric energy consumption.</td>
    <td>integer </td>
</tr>
<tr>
    <td>IT</td>
    <td>Innovative technology or group of innovative technologies.</td>
    <td>varchar(25) </td>
</tr>
<tr>
    <td>Ernedc (g/km)</td>
    <td>Emissions reduction through innovative technologies.</td>
    <td>float </td>
</tr>
<tr>
    <td>Erwltp (g/km)</td>
    <td>Emissions reduction through innovative technologies (WLTP).</td>
    <td>float </td>
</tr>
<tr>
    <td>De</td>
    <td>Deviation factor.</td>
    <td>float </td>
</tr>
<tr>
    <td>Vf</td>
    <td>Verification factor.</td>
    <td>integer </td>
</tr>
<tr>
    <td>R</td>
    <td>Total new registrations.</td>
    <td>integer </td>
</tr>
<tr>
    <td>Year</td>
    <td>Reporting year.</td>
    <td>integer </td>
</tr>
<tr>
    <td>Status</td>
    <td>P = Provisional data, F = Final data.</td>
    <td>varchar(1)  </td>
</tr>
<tr>
    <td>Version_file</td>
    <td>Internal versioning of deliverables.</td>
    <td>varchar(10) </td>
</tr>
<tr>
    <td>E (g/km)</td>
    <td>Specific CO2 Emission. Deprecated value, only relevant for data until 2016.</td>
    <td>float </td>
</tr>
<tr>
    <td>Er (g/km)</td>
    <td>Emissions reduction through innovative technologies. Deprecated value, only relevant for data until 2016.</td>
    <td>float </td>
</tr>
<tr>
    <td>Zr</td>
    <td>Electric range.</td>
    <td>integer </td>
</tr>
<tr>
    <td>Dr</td>
    <td>Registration date.</td>
    <td>date </td>
</tr>
<tr>
    <td>Fc</td>
    <td>Fuel consumption.</td>
    <td>float </td>
</tr>
</tbody>
</table>

---

<p style="font-size: 35px; text-align: center; color: yellow;"> Limpiado de datos </p>

<p style="font-size: 25px;">Crearé el DataFrame con la información cruda. Tal como la descargamos.</p>
<p style="font-size: 20px">Guardaré el DataFrame en archivo .parquet ya que internamente divide la información en bloques para que a la hora de procesarla vaya de a bloques de 1M registros por ejemplo.</p>

<p style="font-size: 25px">Para poder procesar los 80 millones de registros, usar <u>pandas</u> es inviable para mi computadora de 16GB de RAM. Provoca error 'MemoryError'</p>

- Pandas trabaja con el DF en memoria y a demás el DF debe de ocupar como máximo entre el 10% y 30% de la memoria RAM. 


<p>Se puede guardar el DataFrame en 80 archivos .parquet con 1M de registros cada uno o <u>en uno único</u> de 80M de registros. Usar <u>polars</u> o <u>duckdb</u> para procesar la información es mejor ya que ambos procesan archivos .parquet de a bloques (cargando y descargando en RAM los bloques .parquet) y a demás utilizan todos los núcleos de la CPU. </p>

- <b><u>Polars:</u></b> 
    - <b>Procesamiento:</b> Utiliza un motor escrito en Rust que procesa datos de forma paralela
    - <b>Sintaxis:</b> Utiliza expresiones parecidas a las de pandas pero más optimizadas. 
    - <b>Memoria:</b> Trabaja con "streaming" ("Lazy"), cargando y transformando sólolos trozos necesarios en cada momento. 

- <b><u>duckDB:</u></b>
    - <b>Procesamiento:</b> Es un motor SQL analítico que puede ejecutarse directamente en el script de Python.
    - <b>Flexibilidad:</b> Permite realizar transformaciones complejas mediante consultas SQL o su propia API de Python, siendo extremadamente eficiente para cruzar (join) múltiples archivos grandes.
    - <b>Out-of-Core:</b> Si los datos superan la RAM, DuckDB puede usar el disco duro como apoyo para que el proceso no falle.

<p style="font-size: 35px; text-align: center; color: orange;">Usaré las 2 librerías. POLARS para modificaciones del DF y DUCKDB para consultar.</p>

##### Si el registro no tiene información en: Marca, modelo, tipo de combustible y ano de fabricación entonces no me sirve.  

In [7]:
columns_to_load = ['ID', 'year', 'Country', 'Mh', 'z (Wh/km)', 'ep (KW)', 'Mk', 'Cn', 
                   'Electric range (km)', 'm (kg)', 'Ewltp (g/km)', 'Ft', 'ec (cm3)', 
                   'Fuel consumption ', 'MMS']

rename_columns = {
    'm (kg)': 'mass_kg', 'Ewltp (g/km)': 'co2_emission_g/km', 'Ft': 'fuel_type',
    'ec (cm3)': 'engine_size_cm3', 'Fuel consumption ': 'fuel_consumption_l/100km',
    'year': 'year_of_fabrication', 'Mh': 'manufacturer', 'z (Wh/km)': 'energy_consumption_Wh/km',
    'ep (KW)': 'electric_power_KW', 'Mk': 'model', 'Cn': 'commercial_name',
    'Electric range (km)': 'electric_range_km', 'MMS': 'manufacturar_name', "Country": "country"
}

# scan_csv (Lazy) - Esto NO carga el archivo en RAM
df_lazy = (
    pl.scan_csv('ignore_data.csv', infer_schema_length=10000) # En base a las primeras 10k filas, Polars infiere el tipo de cada columna.
    .select(columns_to_load)
    .rename({k: v for k, v in rename_columns.items() if k in columns_to_load})
    .drop_nulls(subset=['model', 'commercial_name', 'fuel_type', 'year_of_fabrication'])
    .with_columns(cs.string().str.strip_chars().str.to_lowercase().replace("", None))
)

print("Iniciando procesamiento en streaming...")
df_lazy.sink_parquet(f"ignore_data_complete_clean.parquet") # 2 min aprox. para procesar. 
print("Done!")

file_weight = os.path.getsize("ignore_data_complete_clean.parquet")
print('.parquet size: ', round((file_weight/1024)/1024/1024, 2), 'GB')

Iniciando procesamiento en streaming...
Done!
.parquet size:  0.4 GB


#### Unificamos datos. 

In [11]:
duckdb.sql("""
    SELECT fuel_type, count(*) as total_fueltype
    FROM 'ignore_data_complete_clean.parquet'
    GROUP BY fuel_type
    ORDER BY total_fueltype DESC
""")

┌───────────────┬────────────────┐
│   fuel_type   │ total_fueltype │
│    varchar    │     int64      │
├───────────────┼────────────────┤
│ petrol        │       46162947 │
│ diesel        │       23342285 │
│ electric      │        5191226 │
│ hybrid_petrol │        3465215 │
│ natural_gas   │        1536467 │
│ hybrid_diesel │         190853 │
│ biofuel       │         159867 │
│ hydrogen      │           4257 │
│               │           2809 │
│ NULL          │           2673 │
│ petrol-gas    │              1 │
├───────────────┴────────────────┤
│ 11 rows              2 columns │
└────────────────────────────────┘

In [None]:
mapeo_manufacturers = {
    "": ["aa-iva", "duplicate", "out of scope", "cng technik", "unknown", "aa-nss", "zhaoqing"], 
    "bmw": ["bmw ag", "bmw gmbh", "bayerische motoren werke ag", "bmw m gmbh"], 
    "volkswagen": [], 
    "toyota": ["toyota motor corporation", "toyota motor europe"], 
    "ford": ["ford werke gmbh", "ford india", "ford motor company", "ford-werke gmbh"], 
    "audi": ["audi ag", "audi sport", "audi hungaria", "quattro"],
    "peugeot": ["automobiles peugeot"],
    "citroen": ["automobiles citroen"],
    "fiat": ["fiat group", "fiat group automobiles spa"], 
    "mercedes-benz": ["mercedes-benz ag", "daimler ag", "mercedes amg", "mercedes-amg"],
    "opel": ["opel automobile"],
    "stellantis": ["psa", "stellantis europe"], 
    "hyundai": ["hyundai czech", "hyundai assan", "hyundai europe"],
    "kia": ["kia slovakia"],
    "jaguar": ["jaguar land rover limited", "jaguar cars ltd"],
    "suzuki": ["suzuki motor corporation", "magyar suzuki", "suzuki thailand", "maruti suzuki", "magyar suzuki corporation ltd"],
    "nissan": ["nissan automotive europe", "nissan international sa"],
    "honda": ["honda motor co", "honda uk", "honda turkiye", "honda china", "honda of the uk manufacturing", "honda automobile china co", "honda turkiye as", "honda automobile thailand co", "honda thailand"], 
    "mitsubishi": ["mitsubishi motors corporation", "mitsubishi motors thailand", "mitsubishi motors europe", "mitsubishi motors corporation mmc"], 
    "saic": ["saic motor corporation", "saic motor"], 
    "dr": ["dr automobiles", "dr motor"],
    "mazda": ["mazda europe", "mazda motor corporation"], 
    "kgm": ["ssangyong", "kg mobility"], 
    "mg": ["mg motor"], 
    "gm": ["gm korea", "gm korea company", "gm daewoo auto u tech comp"], 
    "subaru": ["fuji heavy industries", "fuji heavy industries ltd"],
    "general motors": ["general motors holdings", "general motors company", "gm italia"], 
    "mahindra & mahindra": ["mahindra"], 
    "chrysler": ["chrysler group llc"],
    "jmc": ["jiangling motor", "jiangxi jiangling", "jiangling motors"], 
    "faw": ["china faw"], 
    "saab": ["saab automobile ab"],
    "rolls-royce": ["rolls royce", "rolls-royce motor cars ltd"], 
    "aston Martin": ["aston martin lagonda ltd"], 
    "bluecar": ["bluecar italy", "vehicules electriques pininfarina-bollore s.a.s.", "vepb"],
    "bugatti": ["bugatti rimac"], 
    "chevrolet": ["chevrolet italia"],
    "daihatsu": ["daihatsu motor co"], 
    "dongfeng": ["dongfeng motor", "dongfeng liuzhou", "dongfeng motor corporation"], 
    "e-go": ["next ego mobile"], 
    "iveco": ["iveco spa"], 
    "lanzhou": ["lanzhou zhidou"], 
    "levc": ["lti carbodies"], 
    "lotus": ["wuhan lotus", "lotus group plc"], 
    "micro-vett": ["micro vett"],
    "osv": ["osv opel special vehicles"], 
    "radical motorsport": ["radical motorsport ltd", "radical motosport"], 
    "renault": ["renault trucks", "sovab"], 
    "zotye": ["zotye holding ng group"]
}
dic_aux_manufacturers = {}
for correcto, lista_errores in mapeo_manufacturers.items():
    for error in lista_errores: 
        dic_aux_manufacturers[error] = correcto
print(dic_aux_manufacturers)
print(len(dic_aux_manufacturers))

mapeo_fueltype = { 
    "hybrid_petrol": ["petrol-electric", "petrol/electric", "hybrid/petrol/e", "petrol phev"], 
    "hybrid_diesel": ["diesel/electric", "diesel-electric"], 
    "biofuel": ["e85", "biodiesel", "ng-biomethane", "ng_biomethane"], 
    "natural_gas": ["ng", "cng", "gnl", "lpg", "petrol-gas"], 
    "": ["unknown", "other", "NULL"]

}
dic_aux_fueltype = {}
for correcto, lista_errores in mapeo_fueltype.items():
    for error in lista_errores: 
        dic_aux_fueltype[error] = correcto
print(dic_aux_fueltype)
print(len(dic_aux_fueltype))


input_path = "ignore_data_complete_clean.parquet"
temp_path = "temporal_ignore_data_complete_clean.parquet"

query = (
    pl.scan_parquet(input_path)
    .with_columns([
        (pl.col("engine_size_cm3") / 1000).round(2).alias("engine_size_L"),
        
        pl.col("manufacturer")
        .str.strip_chars() # Elimina espacios en blanco al inicio y al final de la cadena.
        .str.to_lowercase() # Convierte la cadena a minúsculas para estandarizarla.
        .replace_strict(dic_aux_manufacturers, default=pl.col("manufacturer")), # Reemplaza los valores según el diccionario, si no encuentra una coincidencia, deja el valor original.

        pl.col("fuel_type")
        .str.strip_chars()
        .str.to_lowercase()
        .replace_strict(dic_aux_fueltype, default=pl.col("fuel_type"))
    ]).drop("engine_size_cm3") # Eliminamos la columna original para que no esté duplicada
    
    .with_columns(
        pl.col(["fuel_consumption_l/100km", "engine_size_L"]).cast(pl.Float32), 
        pl.col("mass_kg").cast(pl.Int16)
    )
)
query.sink_parquet(temp_path, compression="zstd", compression_level=15)
shutil.copy2(temp_path, "ignore_backup_1.parquet")
os.remove(input_path)
os.rename(temp_path, input_path)

file_weight = os.path.getsize(input_path)
print('.parquet size: ', round((file_weight/1024)/1024/1024, 2), 'GB')

{'aa-iva': '', 'duplicate': '', 'out of scope': '', 'cng technik': '', 'unknown': '', 'aa-nss': '', 'zhaoqing': '', 'bmw ag': 'bmw', 'bmw gmbh': 'bmw', 'bayerische motoren werke ag': 'bmw', 'bmw m gmbh': 'bmw', 'toyota motor corporation': 'toyota', 'toyota motor europe': 'toyota', 'ford werke gmbh': 'ford', 'ford india': 'ford', 'ford motor company': 'ford', 'ford-werke gmbh': 'ford', 'audi ag': 'audi', 'audi sport': 'audi', 'audi hungaria': 'audi', 'quattro': 'audi', 'automobiles peugeot': 'peugeot', 'automobiles citroen': 'citroen', 'fiat group': 'fiat', 'fiat group automobiles spa': 'fiat', 'mercedes-benz ag': 'mercedes-benz', 'daimler ag': 'mercedes-benz', 'mercedes amg': 'mercedes-benz', 'mercedes-amg': 'mercedes-benz', 'opel automobile': 'opel', 'psa': 'stellantis', 'stellantis europe': 'stellantis', 'hyundai czech': 'hyundai', 'hyundai assan': 'hyundai', 'hyundai europe': 'hyundai', 'kia slovakia': 'kia', 'jaguar land rover limited': 'jaguar', 'jaguar cars ltd': 'jaguar', 'suzuk

#### El auto que menos consume en el mundo: 
    - Diesel es el Renault Clio Blue dCi 100 con un consumo de 3,6l/100km. 
    - Petrol, es el Suzuki Swift 1.2 con 4,4l/100km.  
#### El auto más pesado y liviano del mundo: 
    - Más pesado es el Rolls-Royce Specrte con 2975kg
    - Más liviano es el suzuki alto (versíon Japonesa) con 810kg

In [9]:
count = duckdb.sql("SELECT COUNT(*) FROM 'ignore_data_complete_clean.parquet'").fetchone()
coun_delete = duckdb.sql("""
    SELECT COUNT(CASE WHEN ("fuel_consumption_l/100km" < 3.6 AND "fuel_type" IN ('petrol', 'diesel')) OR ("mass_kg" <= 800 OR "mass_kg" >= 3000) THEN 1 END) AS eliminar
    FROM 'ignore_data_complete_clean.parquet'
""").fetchone()
print(f"Cantidad de registros antes de la limpieza: {count}")
print(f"Cantidad de registros eliminados: {coun_delete}")

######## Si tengo más de 32GB de RAM, puedo cargar el .parquet en memoria y hacer las transformaciones sin necesidad de escribir en disco. 
# Eliminamos los registros que no tienen sentido. 
# df = pl.read_parquet("ignore_data_complete_clean.parquet")
# df = df.filter(
#     (
#         # Condition to remove. 
#         (pl.col("fuel_consumption_l/100km") < 3.6) & (pl.col("fuel_type").is_in(["petrol", "diesel"])) 
#         |
#         (pl.col("mass_kg") <= 800) | (pl.col("mass_kg") >= 3000)
#     ).not_().fill_null(True)
# )   
# duckdb.sql("SELECT COUNT(*) FROM df ").show()
# df.write_parquet("ignore_data_complete_clean.parquet")
# duckdb.sql("SELECT COUNT(*) FROM 'ignore_data_complete_clean.parquet'").show()

input_path = "ignore_data_complete_clean.parquet"
temp_path = "temporal_ignore_data_complete_clean.parquet"

query = (
    pl.scan_parquet(input_path)
    .filter(
        # Condition to remove. 
        (
            ((pl.col("fuel_consumption_l/100km") < 3.6) & (pl.col("fuel_type").is_in(["petrol", "diesel"]))) 
            | 
            ((pl.col("mass_kg") <= 800) | (pl.col("mass_kg") >= 3000))
        ).eq(False).fill_null(True)
    )
)

df_result_count = query.select(pl.len()).collect()
print("Cantidad de registros después de la limpieza: ", df_result_count[0, 0])
query.sink_parquet(temp_path)

os.remove(input_path) 
os.rename(temp_path, input_path)

#duckdb.sql("SELECT COUNT(*) FROM query").show()
count2 = duckdb.sql(f"SELECT COUNT(*) FROM '{input_path}'").fetchone()
print(f"Cantidad de registros después de la limpieza: {count2}")

file_weight = os.path.getsize(input_path)
print('.parquet size: ', round((file_weight/1024)/1024/1024, 2), 'GB')

Cantidad de registros antes de la limpieza: (80082340,)
Cantidad de registros eliminados: (23740,)
Cantidad de registros después de la limpieza:  80058600
Cantidad de registros después de la limpieza: (80058600,)
.parquet size:  0.41 GB


#### Edit datatype for reduce file size and some others changes. 

In [10]:
# read_parquet() vs sink_parquet() - La primera carga el archivo en memoria, mientras que la segunda lo hace en streaming (carga de a pedzos en memoria y luego pasa a disco). Si el archivo me entra en memoria, es más rápido cargarlo con read_parquet() y hacer las transformaciones en memoria ya que escribir en disco es más lento. 

# df = pl.read_parquet("ignore_data_complete_clean.parquet")

# df = df.with_columns((pl.col("engine_size_cm3")/1000)
#     .round(2)
#     .alias("engine_size_L")
# ).drop("engine_size_cm3")

# df = df.with_columns([
#     pl.col(["country", "manufacturer", "model", "commercial_name", "fuel_type", "manufacturar_name"]).cast(pl.Categorical), # Categorical: Es un tipo de dato que almacena los valores únicos de una columna y luego asigna un código numérico a cada valor. Esto reduce el espacio de almacenamiento y mejora el rendimiento en operaciones de filtrado y agrupamiento.
#     pl.col("ID").cast(pl.Int32), 
#     pl.col(["year_of_fabrication", "energy_consumption_Wh/km", "electric_power_KW", "electric_range_km", "mass_kg", "co2_emission_g/km"]).cast(pl.Int16), 
#     pl.col(["fuel_consumption_l/100km", "engine_size_L"]).cast(pl.Float32)
# ])

#df_optimized.write_parquet("ignore_data_optimized_clean.parquet", compression="zstd", compression_level=15)


input_path = "ignore_data_complete_clean.parquet"
temp_path = "temporal_ignore_data_complete_clean.parquet"

query = (
    pl.scan_parquet(input_path)
    .with_columns([
        pl.col("ID").cast(pl.Int32), 
        pl.col("year_of_fabrication").cast(pl.String),
        pl.col(["energy_consumption_Wh/km", "electric_power_KW", "electric_range_km", "mass_kg", "co2_emission_g/km"]).cast(pl.Int16), 
        pl.col(["fuel_consumption_l/100km", "engine_size_L"]).cast(pl.Float32), 
        pl.col(["country", "manufacturer", "model", "commercial_name", "fuel_type", "manufacturar_name"]).cast(pl.Categorical) # Categorical: Es un tipo de dato que almacena los valores únicos de una columna y luego asigna un código numérico a cada valor. Esto reduce el espacio de almacenamiento y mejora el rendimiento en operaciones de filtrado y agrupamiento.
    ])
)

# Sirve para que no me cargue en memoria el DataFrame. Pero ya que el .parquet es mas liviano, lo puedo cargar en memoria y hacer las transformaciones. En memoria es más rápido que escribir en disco. 
query.sink_parquet(temp_path, compression="zstd", compression_level=15) 
shutil.copy2(temp_path, "ignore_backup_1.parquet")

os.remove(input_path) 
os.rename(temp_path, input_path)

file_weight = os.path.getsize(input_path)
print('.parquet size: ', round((file_weight/1024)/1024, 2), 'MB')
print("Done!")

.parquet size:  426.37 MB
Done!


#### Reescribimos bien los datos necesarios

In [9]:
duckdb.sql("SELECT * FROM 'ignore_data_complete_clean.parquet' LIMIT 0").show()

┌───────┬─────────────────────┬─────────┬──────────────┬──────────────────────────┬───────────────────┬─────────┬─────────────────┬───────────────────┬─────────┬───────────────────┬───────────┬──────────────────────────┬───────────────────┬───────────────┐
│  ID   │ year_of_fabrication │ country │ manufacturer │ energy_consumption_Wh/km │ electric_power_KW │  model  │ commercial_name │ electric_range_km │ mass_kg │ co2_emission_g/km │ fuel_type │ fuel_consumption_l/100km │ manufacturar_name │ engine_size_L │
│ int32 │       varchar       │ varchar │   varchar    │          int16           │       int16       │ varchar │     varchar     │       int16       │  int16  │       int16       │  varchar  │          float           │      varchar      │     float     │
├───────┴─────────────────────┴─────────┴──────────────┴──────────────────────────┴───────────────────┴─────────┴─────────────────┴───────────────────┴─────────┴───────────────────┴───────────┴──────────────────────────┴─────────

---

In [10]:
duckdb.sql("SUMMARIZE SELECT * FROM 'ignore_data_complete_clean.parquet'").show(max_rows=25)

FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

┌──────────────────────────┬─────────────┬────────────────────────┬───────────────┬───────────────┬────────────────────┬────────────────────┬────────────────────┬────────────────────┬────────────────────┬──────────┬─────────────────┐
│       column_name        │ column_type │          min           │      max      │ approx_unique │        avg         │        std         │        q25         │        q50         │        q75         │  count   │ null_percentage │
│         varchar          │   varchar   │        varchar         │    varchar    │     int64     │      varchar       │      varchar       │      varchar       │      varchar       │      varchar       │  int64   │  decimal(9,2)   │
├──────────────────────────┼─────────────┼────────────────────────┼───────────────┼───────────────┼────────────────────┼────────────────────┼────────────────────┼────────────────────┼────────────────────┼──────────┼─────────────────┤
│ ID                       │ INTEGER     │ 1                    

#### Convertir el archivo en html
<code>jupyter nbconvert --to html clean_data.ipynb</code>