# 01 - Data Quality

## Objetivo
- Ingestar los 3 archivos base del proyecto (microdato, barrios JS, trazabilidad).
- Validar columnas reales, nulos, duplicados y outliers.
- Normalizar unidades de precio y superficie y registrar reglas en `artifacts/data_dictionary.csv`.
- Definir holdout final y guardar evidencias de split.
- Guardar un dataset limpio reproducible para EDA y modelado.

## Inputs 
- `/madrid_rent_with_geolocation.csv`
- `/alquiler_barrios_madrid_oct2025.js`
- `/Como_se_ha_generado_el_dataset_madrid_rent_with_geolocation.csv.txt`

## Outputs
- `artifacts/data_dictionary.csv`
- `artifacts/splits/holdout_ids.csv`
- `reports/target_selection.md`
- `reports/parse_numeric_report.md`
- `reports/environment.txt`


In [None]:
from __future__ import annotations
from pathlib import Path
import sys
import json
import hashlib
import re

import numpy as np
import pandas as pd

# --- Base setup y reproducibilidad ---
SEED = 42
np.random.seed(SEED)


def get_repo_root() -> Path:
    """Return repo root by walking parents and checking .git/pyproject.toml."""
    current = Path.cwd().resolve()
    for parent in [current] + list(current.parents):
        if (parent / ".git").exists() or (parent / "pyproject.toml").exists():
            return parent
    return current

# Resolver ROOT y exponerlo al path de imports
ROOT = get_repo_root()
if str(ROOT) not in sys.path:
    sys.path.append(str(ROOT))


In [None]:
from src.utils import haversine, parse_js_barrios, make_time_splits

# --- Crear estructura de carpetas base ---
for folder in [
    "artifacts",
    "artifacts/splits",
    "models",
    "reports",
    "data/external",
    "data/external/admin",
    "data/external/socioeco",
    "data/external/mobility",
    "data/external/env",
    "data/external/urban",
    "data/external/timeseries",
    "data/external/docs",
]:
    (ROOT / folder).mkdir(parents=True, exist_ok=True)

# --- Rutas de input ---
raw_path = ROOT / "data" / "raw" / "madrid_rent_with_geolocation.csv"
barrios_js_path = ROOT / "data" / "raw" / "alquiler_barrios_madrid_oct2025.js"
trace_path = ROOT / "data" / "raw" / "Como_se_ha_generado_el_dataset_madrid_rent_with_geolocation.csv.txt"

# --- Ingesta de datos ---
rent_df = pd.read_csv(raw_path)
barrios_js_text = barrios_js_path.read_text(encoding="utf-8", errors="ignore")
trace_text = trace_path.read_text(encoding="utf-8", errors="ignore")

# Vista rapida y extracto de trazabilidad
print(rent_df.head(3))
print(trace_text[:800])


**Outputs esperados (tablas)**
- Vista rapida de `rent_df` y extracto de trazabilidad.
- Tabla con tipos, porcentaje de nulos y ejemplos por columna.
- Reportes de parseo numerico y seleccion de target.


In [None]:
# Resumen de trazabilidad
import unicodedata

# Tokens a excluir del texto de trazabilidad (ruido de instalaciones, URLs, etc.)
skip_tokens = (
    "!pip",
    "pip ",
    "collecting",
    "downloading",
    "requirement",
    "installing",
    "whl",
    "metadata",
    "http",
    "kb",
    "mb",
    "==",
    "attempting uninstall",
    "found existing installation",
    "successfully uninstalled",
    "successfully installed",
)
clean_lines = []
raw_lines = trace_text.splitlines()

# Limpieza basica de caracteres invisibles
null_char = chr(0)
cr_char = chr(13)
bom_char = chr(0xFEFF)
para_char = chr(0x00B6)
newline = chr(10)
for line in raw_lines:
    line = line.replace(bom_char, "").replace(null_char, "").replace(cr_char, "").replace(para_char, "")
    low = line.strip().lower()
    if any(token in low for token in skip_tokens):
        continue
    if not low:
        continue
    if low.startswith("import ") or low.startswith("#"):
        if clean_lines:
            break
        continue
    # Normalizar a ASCII para evitar mojibake en reportes
    line = unicodedata.normalize("NFKD", line).encode("ascii", "ignore").decode("ascii")
    clean_lines.append(line)

trace_summary = newline.join(clean_lines[:12])
text_low = newline.join(raw_lines).lower()
context_lines = []
if "kaggle" in text_low:
    context_lines.append("- Fuente base referenciada en Kaggle (dataset de alquileres Madrid).")
if "google maps" in text_low or "geolocaliz" in text_low:
    context_lines.append("- Geolocalizacion completada con script externo y Google Maps API (segun trazabilidad).")
if "madrid_rent_geolocation.py" in text_low:
    context_lines.append("- Script de geolocalizacion mencionado: madrid_rent_geolocation.py.")

# Bloques narrativos para el reporte de trazabilidad
extra = [
    "",
    "Contexto del origen:",
]
if context_lines:
    extra.extend(context_lines)
else:
    extra.append("- No se detectaron referencias adicionales en el texto.")
extra += [
    "",
    "Supuestos y riesgos:",
    "- Posibles duplicados entre fuentes; se deduplica por id/fingerprint.",
    "- Temporalidad sin anio; se asume 2025 para ordenar y se documenta la limitacion.",
    "- Coordenadas/superficie pueden contener errores; se aplican filtros y parseo robusto.",
    "",
    "Implicaciones para limpieza/validacion:",
    "- Filtrado geografico por bbox/radio y percentiles.",
    "- Preprocesado solo con train via Pipeline/ColumnTransformer.",
]
trace_report = ROOT / "reports" / "traceability_summary.md"
trace_report.write_text(
    "## Resumen de trazabilidad" + newline + newline + trace_summary + newline + newline.join(extra) + newline,
    encoding="utf-8",
)


In [None]:
# Tabla de tipos, nulos y ejemplos
summary = []
for col in rent_df.columns:
    series = rent_df[col]
    summary.append({
        "column": col,
        "dtype": str(series.dtype),
        "null_pct": round(series.isna().mean() * 100, 2),
        "examples": series.dropna().astype(str).head(5).tolist(),
    })
summary_df = pd.DataFrame(summary)
summary_df.head()


In [None]:
# Normalizacion de columnas clave (precio, superficie, precio m2)

euro = chr(8364)
sup2 = chr(178)

# Parser robusto para valores numericos con separadores mixtos

def parse_numeric(value):
    if value is None or (isinstance(value, float) and np.isnan(value)):
        return np.nan
    if isinstance(value, (int, float, np.integer, np.floating)):
        return float(value)
    text = str(value).strip().lower()
    if text in {"", "nan", "none", "null"}:
        return np.nan
    text = text.replace(euro, "").replace("eur", "")
    text = text.replace(f"m{sup2}", "m2").replace("m^2", "m2")
    text = re.sub(r"[^0-9,\.\-]", "", text)
    # Normalizar separadores: 1.234,56 -> 1234.56
    if text.count(",") and text.count("."):
        text = text.replace(".", "")
        text = text.replace(",", ".")
    elif text.count(",") and not text.count("."):
        text = text.replace(",", ".")
    try:
        return float(text)
    except ValueError:
        return np.nan


def pick_first(candidates, preferred=None):
    # Selecciona una columna candidata priorizando un nombre conocido
    if preferred and preferred in candidates:
        return preferred
    return candidates[0] if candidates else None


def _parse_with_report(series: pd.Series, col_name: str, standard: str):
    # Aplica parseo y mide perdida de informacion
    parsed = series.apply(parse_numeric)
    raw_non_null = series.notna().sum()
    parsed_non_null = parsed.notna().sum()
    loss_pct = 0.0 if raw_non_null == 0 else round((raw_non_null - parsed_non_null) / raw_non_null * 100, 2)
    fallback_used = False
    # Fallback si el parseo pierde demasiado (>30%)
    if raw_non_null > 0 and loss_pct > 30:
        extracted = series.astype(str).str.extract(r"(-?[0-9]+(?:[\.,][0-9]+)?)")[0]
        extracted = extracted.str.replace(",", ".", regex=False)
        fallback = pd.to_numeric(extracted, errors="coerce")
        fallback_non_null = fallback.notna().sum()
        fallback_loss_pct = 0.0 if raw_non_null == 0 else round((raw_non_null - fallback_non_null) / raw_non_null * 100, 2)
        if fallback_loss_pct < loss_pct:
            parsed = fallback
            loss_pct = fallback_loss_pct
            fallback_used = True
    parse_report.append({
        "column": col_name,
        "standard": standard,
        "loss_pct": loss_pct,
        "fallback_used": fallback_used,
        "raw_non_null": int(raw_non_null),
    })
    return parsed

# Detectar columnas candidatas por nombre
lower_cols = {col: col.lower() for col in rent_df.columns}
price_candidates = [col for col, low in lower_cols.items() if re.search(r"price|precio", low) and not re.search(r"m2|m\^2", low)]
price_m2_candidates = [col for col, low in lower_cols.items() if re.search(r"price|precio", low) and re.search(r"m2|m\^2", low)]
surface_candidates = [
    col for col, low in lower_cols.items()
    if re.search(r"surface|superficie|area|m2|metros", low)
    and not re.search(r"price|precio", low)
]

price_col = pick_first(price_candidates, preferred="price")
price_m2_col = pick_first(price_m2_candidates)
surface_col = pick_first(surface_candidates, preferred="floor_area")

rules = []
parse_report = []
if price_col:
    rent_df["price"] = _parse_with_report(rent_df[price_col], price_col, "price")
    rules.append({"column": price_col, "standard": "price", "rule": "parsed_numeric_eur_month"})
if surface_col:
    rent_df["surface_m2"] = _parse_with_report(rent_df[surface_col], surface_col, "surface_m2")
    rules.append({"column": surface_col, "standard": "surface_m2", "rule": "parsed_numeric_m2"})
if price_m2_col:
    rent_df["price_m2"] = _parse_with_report(rent_df[price_m2_col], price_m2_col, "price_m2")
    rules.append({"column": price_m2_col, "standard": "price_m2", "rule": "parsed_numeric_eur_m2"})

# Derivar price_m2 si no existe pero hay price y surface
if "price_m2" not in rent_df.columns and "price" in rent_df.columns and "surface_m2" in rent_df.columns:
    rent_df["price_m2"] = rent_df["price"] / rent_df["surface_m2"].replace({0: np.nan})
    rules.append({"column": "derived", "standard": "price_m2", "rule": "price / surface_m2"})

# Guardar diccionario de datos y reporte de parseo
pd.DataFrame(rules).to_csv(ROOT / "artifacts" / "data_dictionary.csv", index=False)

report_lines = ["## Perdida de informacion tras parseo", ""]
for row in parse_report:
    report_lines.append(f"- {row['column']}: {row['loss_pct']}% (fallback={row['fallback_used']})")
report_lines.append("")
report_lines.append("## Fallback aplicado")
for row in parse_report:
    if row["fallback_used"]:
        report_lines.append(f"- {row['column']}: fallback mejoro la perdida")
if not any(row["fallback_used"] for row in parse_report):
    report_lines.append("- Ninguno")

(ROOT / "reports" / "parse_numeric_report.md").write_text("".join(report_lines), encoding="utf-8")


In [None]:
# Seleccion de target (price / price_m2) y evidencia

derived_price_m2 = price_m2_col is None and "price" in rent_df.columns and "surface_m2" in rent_df.columns
price_m2_source = price_m2_col or ("derived_from_price_surface" if derived_price_m2 else None)

euro = chr(8364)
sup2 = chr(178)

selection_report = ROOT / "reports" / "target_selection.md"
with selection_report.open("w", encoding="utf-8") as handle:
    handle.write("# Seleccion de target\n\n")
    handle.write(f"- Columna precio mensual candidata: {price_col}\n")
    handle.write(f"- Columna superficie candidata: {surface_col}\n")
    handle.write(f"- Columna precio m2 candidata: {price_m2_source}\n\n")
    if price_col is None:
        handle.write("- No se encontro una columna clara de precio mensual.\n")
    if surface_col is None:
        handle.write("- No se encontro una columna clara de superficie.\n")
    if price_m2_col is None:
        if derived_price_m2:
            handle.write("- Se derivo price_m2 desde precio mensual y superficie.\n")
        else:
            handle.write("- No se encontro una columna clara de precio por m2.\n")

    if price_col:
        raw_price = rent_df[price_col].astype(str).str.lower()
        raw_norm = raw_price.str.replace(euro, "", regex=False).str.replace(sup2, "2", regex=False)
        mixed_m2 = raw_norm.str.contains("m2") | raw_norm.str.contains("m^2")
        mixed_count = int(mixed_m2.sum())
        if mixed_count > 0:
            handle.write(f"- Aviso: {mixed_count} filas en {price_col} contienen indicios de m2.\n")
        else:
            handle.write("- No se detectaron strings tipo m2 en la columna de precio.\n")

        stats = rent_df["price"].describe(percentiles=[0.1, 0.5, 0.9]).to_dict()
        handle.write("\n## Estadisticas rapidas price\n")
        for key, value in stats.items():
            handle.write(f"- {key}: {value}\n")


In [None]:
# Duplicados defendibles
if "id" in rent_df.columns:
    dedup_key = "id"
elif "url" in rent_df.columns:
    dedup_key = "url"
else:
    # Construir fingerprint con columnas numericas y categoricas cortas
    lat_col = next((c for c in rent_df.columns if "lat" in c.lower()), None)
    lon_col = next((c for c in rent_df.columns if "lon" in c.lower() or "lng" in c.lower()), None)
    numeric_cols = [
        c for c in rent_df.columns
        if c not in ["price", "price_m2"] and pd.api.types.is_numeric_dtype(rent_df[c])
    ]
    obj_cols = [c for c in rent_df.columns if rent_df[c].dtype == "object"]
    short_obj_cols = []
    for col in obj_cols:
        sample = rent_df[col].dropna().astype(str)
        if sample.empty:
            continue
        avg_len = sample.str.len().mean()
        unique_ratio = sample.nunique() / len(sample)
        if avg_len <= 50 and unique_ratio <= 0.8:
            short_obj_cols.append(col)
    core_cols = numeric_cols + short_obj_cols
    temp_df = rent_df[core_cols].copy() if core_cols else rent_df.copy()
    if lat_col and lon_col:
        temp_df[lat_col] = rent_df[lat_col].round(5)
        temp_df[lon_col] = rent_df[lon_col].round(5)
    hash_source = temp_df.fillna("NA").astype(str).agg("|".join, axis=1)
    rent_df["listing_fingerprint"] = hash_source.apply(lambda x: hashlib.md5(x.encode()).hexdigest())
    dedup_key = "listing_fingerprint"

before = len(rent_df)
rent_df = rent_df.drop_duplicates(subset=[dedup_key])
after = len(rent_df)
print(f"Duplicados removidos: {before - after}")


In [None]:
# Filtrado geografico: bbox + distancia al centro
center_lat, center_lon = 40.4168, -3.7038
lat_col = next((c for c in rent_df.columns if "lat" in c.lower()), None)
lon_col = next((c for c in rent_df.columns if "lon" in c.lower() or "lng" in c.lower()), None)

if lat_col and lon_col:
    # Distancia al centro y recorte por percentil alto
    rent_df["distance_center_km"] = rent_df.apply(
        lambda row: haversine(center_lat, center_lon, row[lat_col], row[lon_col]), axis=1
    )
    max_km = min(30, rent_df["distance_center_km"].quantile(0.995))
    rent_df = rent_df[rent_df["distance_center_km"] <= max_km]

    # Filtro bbox robusto (0.5%-99.5%)
    lat_low, lat_high = rent_df[lat_col].quantile([0.005, 0.995])
    lon_low, lon_high = rent_df[lon_col].quantile([0.005, 0.995])
    rent_df = rent_df[(rent_df[lat_col].between(lat_low, lat_high)) & (rent_df[lon_col].between(lon_low, lon_high))]

# Reindex tras filtros para alinear indices con el holdout
rent_df = rent_df.reset_index(drop=True)


In [None]:
# Outliers basicos (IQR) para price y surface_m2
outlier_report = {}
for col in ["price", "surface_m2"]:
    if col in rent_df.columns:
        q1, q3 = rent_df[col].quantile([0.25, 0.75])
        iqr = q3 - q1
        lower, upper = q1 - 1.5 * iqr, q3 + 1.5 * iqr
        outlier_pct = ((rent_df[col] < lower) | (rent_df[col] > upper)).mean() * 100
        outlier_report[col] = round(outlier_pct, 2)

outlier_report


In [None]:
# Holdout final fijo (temporal si hay fecha fiable)
from sklearn.model_selection import train_test_split
import json

HOLDOUT_TEST_SIZE = 0.2
GAP_DAYS = 7

# Detectar columna temporal

date_keys = ("date", "fecha", "update", "scrape", "publica")
date_cols = [c for c in rent_df.columns if any(k in c.lower() for k in date_keys)]
temporal_ok = False
date_col = None
assumed_year = None
if date_cols:
    date_col = date_cols[0]
    date_series = rent_df[date_col].dropna().astype(str).str.strip()
    if not date_series.empty:
        has_year = date_series.str.contains(r"\d{4}")
        day_month_only = date_series.str.match(r"^\d{1,2}\s+[A-Za-z]+$")
        if day_month_only.all() and not has_year.any():
            assumed_year = 2025
            parsed = pd.to_datetime(
                date_series + f" {assumed_year}",
                errors="coerce",
                format="%d %B %Y",
            )
            if parsed.notna().sum() == 0:
                temporal_ok = False
                (ROOT / "reports" / "temporal_limitations.md").write_text(
                    f"La columna {date_col} no incluye anio y no se pudo parsear con anio asumido.",
                    encoding="utf-8",
                )
            else:
                temporal_ok = True
                rent_df.loc[parsed.index, date_col] = parsed
                (ROOT / "reports" / "temporal_limitations.md").write_text(
                    f"La columna {date_col} solo trae dia/mes; se asume anio {assumed_year} para ordenar temporalmente.",
                    encoding="utf-8",
                )
        else:
            temporal_ok = True
            rent_df[date_col] = pd.to_datetime(rent_df[date_col], errors="coerce", format="mixed")
else:
    (ROOT / "reports" / "temporal_limitations.md").write_text(
        "No se detectaron columnas temporales fiables para holdout temporal.",
        encoding="utf-8",
    )

# Split holdout (temporal o aleatorio)
if temporal_ok:
    train_idx, test_idx = make_time_splits(
        rent_df,
        date_col,
        test_size=HOLDOUT_TEST_SIZE,
        seed=SEED,
        gap_days=GAP_DAYS,
    )
else:
    train_idx, test_idx = train_test_split(rent_df.index, test_size=HOLDOUT_TEST_SIZE, random_state=SEED)

# Guardar indices de split
np.savez(ROOT / "artifacts" / "splits" / "holdout_indices.npz", train_idx=train_idx, test_idx=test_idx)

split_config = {
    "date_col": date_col,
    "temporal_ok": bool(temporal_ok),
    "test_size": HOLDOUT_TEST_SIZE,
    "gap_days": GAP_DAYS if temporal_ok else 0,
    "assumed_year": assumed_year,
}
(ROOT / "artifacts" / "splits" / "split_config.json").write_text(
    json.dumps(split_config, ensure_ascii=False, indent=2),
    encoding="utf-8",
)

# Guardar ids/fingerprint por split
id_col = None
if "id" in rent_df.columns:
    id_col = "id"
elif "url" in rent_df.columns:
    id_col = "url"
elif "listing_fingerprint" in rent_df.columns:
    id_col = "listing_fingerprint"

if id_col:
    split_labels = pd.Series("train", index=rent_df.index)
    split_labels.loc[test_idx] = "holdout"
    holdout_ids = rent_df.loc[:, [id_col]].copy()
    holdout_ids["split"] = split_labels.values
    holdout_ids.to_csv(ROOT / "artifacts" / "splits" / "holdout_ids.csv", index=False)


In [None]:
# Parsear barriosMadrid y guardar artefacto
barrios = parse_js_barrios(barrios_js_text)
if barrios:
    with (ROOT / "artifacts" / "barriosMadrid.json").open("w", encoding="utf-8") as handle:
        json.dump(barrios, handle, ensure_ascii=False, indent=2)


In [None]:
# Guardar artefacto procesado
out_path = ROOT / "artifacts" / "processed_rent.parquet"
try:
    rent_df.to_parquet(out_path, index=False)
except Exception:
    out_path = ROOT / "artifacts" / "processed_rent.csv.gz"
    rent_df.to_csv(out_path, index=False, compression="gzip")

# Registrar entorno (python -V y pip freeze)
import subprocess
py_version = subprocess.check_output(["python", "-V"]).decode().strip()
reqs = subprocess.check_output(["python", "-m", "pip", "freeze"]).decode().strip()
with (ROOT / "reports" / "environment.txt").open("w", encoding="utf-8") as handle:
    handle.write(py_version + "" + reqs)


**Outputs esperados (figuras)**
- Histogramas de price, price_m2, surface_m2 y distancia al centro.
- Barras de missingness por columna.
- Dispersion geoespacial (lon/lat) para detectar outliers.
- Scatter price vs surface_m2 para validar coherencia.


### Visualizaciones de calidad
- Revisar colas extremas (precios/superficies) y clusters fuera de Madrid.
- Confirmar que la distribucion de lat/lon cae dentro de la bbox esperada.
- Validar la relacion price vs surface_m2 (tendencia positiva).


In [None]:
import matplotlib.pyplot as plt
import seaborn as sns

# Distribuciones basicas de numericas clave
numeric_cols = [c for c in ["price", "price_m2", "surface_m2", "distance_center_km"] if c in rent_df.columns]
if numeric_cols:
    plot_df = rent_df[numeric_cols].replace([np.inf, -np.inf], np.nan)
    plot_df.hist(figsize=(12, 6), bins=40)
    plt.suptitle("Distribuciones principales")
    plt.show()

# Top columnas con mayor missingness
missing = rent_df.isna().mean().sort_values(ascending=False).head(20)
plt.figure(figsize=(6, 4))
missing.plot(kind="barh")
plt.title("Top 20 columnas con nulos")
plt.xlabel("% nulos")
plt.show()

# Dispersion geografica
if lat_col and lon_col:
    plt.figure(figsize=(6, 6))
    plt.scatter(rent_df[lon_col], rent_df[lat_col], s=4, alpha=0.4)
    plt.title("Distribucion geografica de anuncios")
    plt.xlabel("Lon")
    plt.ylabel("Lat")
    plt.show()

# Relacion precio vs superficie
if "price" in rent_df.columns and "surface_m2" in rent_df.columns and len(rent_df) > 0:
    sample = rent_df.sample(min(2000, len(rent_df)), random_state=SEED)
    plt.figure(figsize=(6, 4))
    sns.scatterplot(data=sample, x="surface_m2", y="price", alpha=0.4)
    plt.title("Precio vs superficie")
    plt.show()


## Conclusiones
**Hallazgos**
- Completar con resultados de esta ejecucion.

**Decisiones**
- Completar con reglas aplicadas (parseo, deduplicacion, filtros).

**Siguientes pasos**
- Continuar con `02_eda.ipynb` usando `artifacts/processed_rent.*`.
