In [1]:
# data_validation.py
from __future__ import annotations

import os
from dataclasses import dataclass, asdict
from typing import Dict, List, Optional, Tuple

import numpy as np
import pandas as pd

import sys
from pathlib import Path

notebook_path = Path().resolve()
sys.path.append(str(notebook_path.parent.parent))

from src.conf import config
from src.utils.utils import safe_bool_to_int, add_week_start


# ==========================================================
# Infraestructura ligera para issues y reporte
# ==========================================================

@dataclass
class DQIssue:
    dataset: str
    check: str
    severity: str  # "ERROR" | "WARNING" | "INFO"
    n_affected: int
    pct_affected: float
    description: str
    suggestion: str
    sample_index: Optional[List] = None  # índices/llaves afectadas (muestra)
    extra: Optional[Dict] = None         # metadatos opcionales

    def to_dict(self) -> Dict:
        d = asdict(self)
        # Redondeos amigables
        d["pct_affected"] = float(np.round(d["pct_affected"], 6))
        return d


class DQReport:
    def __init__(self):
        self.issues: List[DQIssue] = []
        self.summary: Dict[str, Dict[str, int]] = {}  # filas por dataset

    def add_issue(self, issue: DQIssue):
        self.issues.append(issue)

    def set_dataset_size(self, dataset: str, n_rows: int):
        self.summary.setdefault(dataset, {})["rows"] = int(n_rows)

    def to_dataframe(self) -> pd.DataFrame:
        if not self.issues:
            return pd.DataFrame(columns=[
                "dataset","check","severity","n_affected","pct_affected","description","suggestion","sample_index","extra"
            ])
        return pd.DataFrame([i.to_dict() for i in self.issues])

    def to_markdown(self) -> str:
        lines = ["# Reporte de Calidad de Datos\n"]
        # Resumen tamaños
        lines.append("## Resumen de filas por dataset\n")
        for ds, meta in self.summary.items():
            lines.append(f"- **{ds}**: {meta.get('rows', 0):,} filas")
        lines.append("\n---\n")
        # Issues
        if not self.issues:
            lines.append("✅ No se encontraron problemas.")
            return "\n".join(lines)

        lines.append("## Hallazgos\n")
        df = self.to_dataframe()
        for _, row in df.sort_values(["severity","dataset","check"]).iterrows():
            lines.append(f"### [{row['severity']}] {row['dataset']} → {row['check']}")
            lines.append(f"- **Impacto**: {row['n_affected']:,} filas ({row['pct_affected']*100:.4f}%)")
            lines.append(f"- **Descripción**: {row['description']}")
            if row.get("suggestion"):
                lines.append(f"- **Sugerencia**: `{row['suggestion']}`")
            if isinstance(row.get("sample_index"), list) and row["sample_index"]:
                lines.append(f"- **Muestra**: {row['sample_index'][:10]}")
            lines.append("")
        return "\n".join(lines)


# ==========================================================
# Utilidades de chequeo
# ==========================================================

BOOLEAN_STRINGS_TRUE = {"true","t","1","yes","y"}
BOOLEAN_STRINGS_FALSE = {"false","f","0","no","n"}

def _pct(numer: int, denom: int) -> float:
    if denom <= 0:
        return 0.0
    return numer / denom

def check_required_columns(df: pd.DataFrame, required_cols: List[str]) -> Tuple[List[str], List[str]]:
    present = [c for c in required_cols if c in df.columns]
    missing = [c for c in required_cols if c not in df.columns]
    return present, missing

def check_duplicates(df: pd.DataFrame, key_cols: List[str]) -> pd.Series:
    if not set(key_cols).issubset(df.columns):
        return pd.Series(dtype=bool)
    return df.duplicated(subset=key_cols, keep=False)

def is_boolean_like_series(s: pd.Series) -> pd.Series:
    """
    Devuelve máscara booleana indicando qué valores SON interpretables como booleanos.
    """
    if pd.api.types.is_bool_dtype(s):
        return pd.Series(True, index=s.index)
    if pd.api.types.is_numeric_dtype(s):
        return s.isna() | s.isin([0,1])
    # objeto/cadena
    lower = s.astype(str).str.lower()
    return s.isna() | lower.isin(BOOLEAN_STRINGS_TRUE | BOOLEAN_STRINGS_FALSE)

def coercible_numeric_mask(s: pd.Series) -> pd.Series:
    """
    True si el valor puede convertirse a numérico; False si no.
    """
    # Pandas to_numeric con errors='coerce' → NaN para no coercibles
    as_num = pd.to_numeric(s, errors="coerce")
    return ~as_num.isna() | s.isna()

def outlier_mask_iqr(x: pd.Series, k: float = 3.0) -> pd.Series:
    """
    Marca outliers tipo IQR*k (por encima). Solo positivos (ventas/transacciones no deberían ser < -∞).
    """
    x = pd.to_numeric(x, errors="coerce")
    q1 = x.quantile(0.25)
    q3 = x.quantile(0.75)
    iqr = q3 - q1
    if pd.isna(iqr) or iqr == 0:
        # fallback: percentil alto
        thr = x.quantile(0.999)
        return x > thr
    thr = q3 + k * iqr
    return x > thr


# ==========================================================
# Chequeos por dataset
# ==========================================================

def validate_ventas(ventas: pd.DataFrame, report: DQReport) -> None:
    ds = "ventas"
    report.set_dataset_size(ds, len(ventas))

    # 1) Esquema y claves
    present, missing = check_required_columns(ventas, config.SCHEMA_EXPECTED[ds]["required_cols"])
    if missing:
        report.add_issue(DQIssue(
            ds, "schema_required_cols", "ERROR",
            n_affected=len(missing), pct_affected=_pct(len(missing), len(present)+len(missing)),
            description=f"Faltan columnas requeridas: {missing}",
            suggestion="Revisar extracción/lectura del CSV (usecols/encoding).",
            extra={"present": present}
        ))

    # 2) Tipos/coercibilidad
    for col in ["unit_sales"]:
        mask = coercible_numeric_mask(ventas[col]) if col in ventas.columns else pd.Series(dtype=bool)
        if col in ventas.columns and (~mask).any():
            bad = (~mask).sum()
            report.add_issue(DQIssue(
                ds, f"non_numeric_{col}", "ERROR",
                n_affected=int(bad), pct_affected=_pct(int(bad), len(ventas)),
                description=f"{col} contiene valores no numéricos o corruptos.",
                suggestion=f"Coercer con pd.to_numeric({col}, errors='coerce') y revisar NaN resultantes."
            ))

    # 3) onpromotion interpretable
    if "onpromotion" in ventas.columns:
        ok_bool = is_boolean_like_series(ventas["onpromotion"])
        if (~ok_bool).any():
            nbad = int((~ok_bool).sum())
            report.add_issue(DQIssue(
                ds, "onpromotion_non_boolean_like", "WARNING",
                n_affected=nbad, pct_affected=_pct(nbad, len(ventas)),
                description="Valores de 'onpromotion' no interpretables como booleanos.",
                suggestion="Normalizar con utils.safe_bool_to_int(ventas['onpromotion'])."
            ))

    # 4) Duplicados (date, store_nbr, item_nbr)
    key = config.SCHEMA_EXPECTED[ds]["key"]
    dups_mask = check_duplicates(ventas, key)
    if not dups_mask.empty and dups_mask.any():
        dup_idx = ventas.loc[dups_mask, key].head(10).to_dict("records")
        report.add_issue(DQIssue(
            ds, "duplicate_rows", "ERROR",
            n_affected=int(dups_mask.sum()), pct_affected=_pct(int(dups_mask.sum()), len(ventas)),
            description=f"Duplicados por clave {key}. Puede sesgar agregaciones.",
            suggestion="Resolver sumando devoluciones/ventas o consolidando registros por clave.",
            sample_index=dup_idx
        ))

    # 5) Rango de fechas
    if "date" in ventas.columns:
        dmin, dmax = pd.to_datetime(ventas["date"]).min(), pd.to_datetime(ventas["date"]).max()
        if dmin < pd.to_datetime(config.EXPECTED_DATE_MIN) or dmax > pd.to_datetime(config.EXPECTED_DATE_MAX):
            report.add_issue(DQIssue(
                ds, "date_range_outside_expected", "WARNING",
                n_affected=1, pct_affected=0.0,
                description=f"Rango observado {dmin.date()}–{dmax.date()} fuera de [{config.EXPECTED_DATE_MIN.date()}–{config.EXPECTED_DATE_MAX.date()}].",
                suggestion="Verificar fuente/mezcla de datasets; filtrar al rango esperado si aplica."
            ))

    # 6) Ventas negativas extremas o sumas semanales que rompen log1p
    #    (Si Y_isw < -1, log1p será NaN)
    try:
        temp = ventas[["date","store_nbr","item_nbr","unit_sales"]].copy()
        temp["date"] = pd.to_datetime(temp["date"])
        temp["unit_sales"] = pd.to_numeric(temp["unit_sales"], errors="coerce")
        temp = add_week_start(temp, "date", "week_start")
        agg = (temp
               .groupby(["store_nbr","item_nbr","week_start"], as_index=False)
               .agg(Y_isw=("unit_sales","sum")))
        bad_weekly = agg[agg["Y_isw"] < -1]
        if not bad_weekly.empty:
            samp = bad_weekly.head(10)[["store_nbr","item_nbr","week_start","Y_isw"]].to_dict("records")
            report.add_issue(DQIssue(
                ds, "weekly_sum_lt_minus1", "ERROR",
                n_affected=int(len(bad_weekly)), pct_affected=_pct(len(bad_weekly), len(agg)),
                description="Suma semanal de ventas menor a -1 (log1p generará NaN). Suele indicar devoluciones no compensadas.",
                suggestion="Separar devoluciones o truncar: Y_isw = max(Y_isw, 0) antes de log1p.",
                sample_index=samp
            ))
    except Exception as e:
        report.add_issue(DQIssue(
            ds, "weekly_sum_check_failed", "INFO",
            n_affected=0, pct_affected=0.0,
            description=f"No se pudo evaluar sumas semanales (detalle: {type(e).__name__}).",
            suggestion="Revisar tipos de columnas 'date'/'unit_sales'."
        ))

    # 7) Outliers muy altos en unit_sales (potenciales errores de carga)
    if "unit_sales" in ventas.columns:
        mask_out = outlier_mask_iqr(ventas["unit_sales"], k=6.0)
        if mask_out.any():
            report.add_issue(DQIssue(
                ds, "unit_sales_high_outliers", "WARNING",
                n_affected=int(mask_out.sum()), pct_affected=_pct(int(mask_out.sum()), len(ventas)),
                description="Valores atípicos muy altos en 'unit_sales'.",
                suggestion="Winsorizar o revisar manualmente top casos.",
                sample_index=ventas.loc[mask_out, ["date","store_nbr","item_nbr","unit_sales"]].head(10).to_dict("records")
            ))


def validate_items(items: pd.DataFrame, report: DQReport) -> None:
    ds = "items"
    report.set_dataset_size(ds, len(items))

    present, missing = check_required_columns(items, config.SCHEMA_EXPECTED[ds]["required_cols"])
    if missing:
        report.add_issue(DQIssue(
            ds, "schema_required_cols", "ERROR",
            n_affected=len(missing), pct_affected=_pct(len(missing), len(present)+len(missing)),
            description=f"Faltan columnas requeridas: {missing}",
            suggestion="Revisar extracción/lectura del CSV."
        ))

    dups = check_duplicates(items, config.SCHEMA_EXPECTED[ds]["key"])
    if not dups.empty and dups.any():
        report.add_issue(DQIssue(
            ds, "duplicate_keys", "ERROR",
            n_affected=int(dups.sum()), pct_affected=_pct(int(dups.sum()), len(items)),
            description="item_nbr duplicado.",
            suggestion="Eliminar duplicados conservando el registro de mayor confianza."
        ))

    # Familias mínimas
    if "family" in items.columns:
        nfam = int(items["family"].nunique(dropna=True))
        if nfam < config.EXPECTED_MIN_FAMILIES:
            report.add_issue(DQIssue(
                ds, "families_below_expected", "WARNING",
                n_affected=config.EXPECTED_MIN_FAMILIES - nfam, pct_affected=0.0,
                description=f"Se esperaban ≥{config.EXPECTED_MIN_FAMILIES} familias; hay {nfam}.",
                suggestion="Verificar mapeo de items a familias."
            ))


def validate_stores(stores: pd.DataFrame, report: DQReport) -> None:
    ds = "stores"
    report.set_dataset_size(ds, len(stores))

    present, missing = check_required_columns(stores, config.SCHEMA_EXPECTED[ds]["required_cols"])
    if missing:
        report.add_issue(DQIssue(
            ds, "schema_required_cols", "ERROR",
            n_affected=len(missing), pct_affected=_pct(len(missing), len(present)+len(missing)),
            description=f"Faltan columnas requeridas: {missing}",
            suggestion="Revisar extracción/lectura del CSV."
        ))

    dups = check_duplicates(stores, config.SCHEMA_EXPECTED[ds]["key"])
    if not dups.empty and dups.any():
        report.add_issue(DQIssue(
            ds, "duplicate_keys", "ERROR",
            n_affected=int(dups.sum()), pct_affected=_pct(int(dups.sum()), len(stores)),
            description="store_nbr duplicado.",
            suggestion="Eliminar o consolidar duplicados."
        ))

    # Nº de tiendas mínimo
    nstores = int(stores["store_nbr"].nunique()) if "store_nbr" in stores.columns else 0
    if nstores < config.EXPECTED_MIN_STORES:
        report.add_issue(DQIssue(
            ds, "stores_below_expected", "WARNING",
            n_affected=config.EXPECTED_MIN_STORES - nstores, pct_affected=0.0,
            description=f"Se esperaban ≥{config.EXPECTED_MIN_STORES} tiendas; hay {nstores}.",
            suggestion="Verificar archivo 'stores.csv'."
        ))


def validate_transactions(trans: pd.DataFrame, report: DQReport) -> None:
    ds = "trans"
    report.set_dataset_size(ds, len(trans))

    present, missing = check_required_columns(trans, config.SCHEMA_EXPECTED[ds]["required_cols"])
    if missing:
        report.add_issue(DQIssue(
            ds, "schema_required_cols", "ERROR",
            n_affected=len(missing), pct_affected=_pct(len(missing), len(present)+len(missing)),
            description=f"Faltan columnas requeridas: {missing}",
            suggestion="Revisar extracción/lectura del CSV."
        ))

    dups = check_duplicates(trans, config.SCHEMA_EXPECTED[ds]["key"])
    if not dups.empty and dups.any():
        report.add_issue(DQIssue(
            ds, "duplicate_keys", "ERROR",
            n_affected=int(dups.sum()), pct_affected=_pct(int(dups.sum()), len(trans)),
            description="Duplicados por (date, store_nbr) en transacciones.",
            suggestion="Consolidar por suma de 'transactions'."
        ))

    # Transacciones negativas/No numéricas
    if "transactions" in trans.columns:
        mask_num = coercible_numeric_mask(trans["transactions"])
        if (~mask_num).any():
            report.add_issue(DQIssue(
                ds, "non_numeric_transactions", "ERROR",
                n_affected=int((~mask_num).sum()), pct_affected=_pct(int((~mask_num).sum()), len(trans)),
                description="Valores no numéricos en 'transactions'.",
                suggestion="Coercer con pd.to_numeric(..., errors='coerce') y revisar NaN."
            ))
        neg = pd.to_numeric(trans["transactions"], errors="coerce") < 0
        if neg.any():
            report.add_issue(DQIssue(
                ds, "negative_transactions", "WARNING",
                n_affected=int(neg.sum()), pct_affected=_pct(int(neg.sum()), len(trans)),
                description="Transacciones negativas detectadas.",
                suggestion="Revisar fuentes o truncar a cero si se justifica."
            ))

    # Cobertura por tienda (fechas faltantes)
    try:
        trans["date"] = pd.to_datetime(trans["date"])
        expected_days = pd.date_range(config.EXPECTED_DATE_MIN, config.EXPECTED_DATE_MAX, freq="D")
        gaps = []
        for s, grp in trans.groupby("store_nbr", dropna=True):
            present_days = pd.DatetimeIndex(grp["date"].unique())
            coverage = present_days.size / expected_days.size
            if coverage < 0.9:  # umbral configurable
                gaps.append((int(s), float(coverage)))
        if gaps:
            report.add_issue(DQIssue(
                ds, "store_date_coverage_low", "INFO",
                n_affected=len(gaps), pct_affected=_pct(len(gaps), trans["store_nbr"].nunique()),
                description="Tiendas con cobertura diaria < 90% en 'transactions'.",
                suggestion="Completar días faltantes con 0 o imputación si aplica.",
                sample_index=gaps[:10]
            ))
    except Exception:
        pass


def validate_oil(oil: pd.DataFrame, report: DQReport) -> None:
    ds = "oil"
    report.set_dataset_size(ds, len(oil))

    present, missing = check_required_columns(oil, config.SCHEMA_EXPECTED[ds]["required_cols"])
    if missing:
        report.add_issue(DQIssue(
            ds, "schema_required_cols", "ERROR",
            n_affected=len(missing), pct_affected=_pct(len(missing), len(present)+len(missing)),
            description=f"Faltan columnas requeridas: {missing}",
            suggestion="Revisar extracción/lectura del CSV."
        ))

    if "dcoilwtico" in oil.columns:
        num = pd.to_numeric(oil["dcoilwtico"], errors="coerce")
        miss = num.isna().sum()
        if miss > 0:
            report.add_issue(DQIssue(
                ds, "missing_oil_values", "WARNING",
                n_affected=int(miss), pct_affected=_pct(int(miss), len(oil)),
                description="Valores faltantes en 'dcoilwtico' (fines de semana/feriados son comunes).",
                suggestion="Aplicar forward-fill antes de promediar semanalmente."
            ))


def validate_holidays(hol: pd.DataFrame, stores: pd.DataFrame, report: DQReport) -> None:
    ds = "hol"
    report.set_dataset_size(ds, len(hol))

    present, missing = check_required_columns(hol, config.SCHEMA_EXPECTED[ds]["required_cols"])
    if missing:
        report.add_issue(DQIssue(
            ds, "schema_required_cols", "ERROR",
            n_affected=len(missing), pct_affected=_pct(len(missing), len(present)+len(missing)),
            description=f"Faltan columnas requeridas: {missing}",
            suggestion="Revisar extracción/lectura del CSV."
        ))

    # Valores válidos en 'locale'
    if "locale" in hol.columns:
        allowed = {"National","Regional","Local"}
        invalid = ~hol["locale"].isin(list(allowed))
        if invalid.any():
            report.add_issue(DQIssue(
                ds, "invalid_locale_values", "ERROR",
                n_affected=int(invalid.sum()), pct_affected=_pct(int(invalid.sum()), len(hol)),
                description="Valores no válidos en 'locale'.",
                suggestion="Restringir a {'National','Regional','Local'}."
            ))

    # Eventos transferidos presentes (deberían excluirse corriente abajo)
    if "transferred" in hol.columns:
        ntrans = int((hol["transferred"] == True).sum())
        if ntrans > 0:
            report.add_issue(DQIssue(
                ds, "contains_transferred", "INFO",
                n_affected=ntrans, pct_affected=_pct(ntrans, len(hol)),
                description="Hay eventos con transferred=True (se recomienda excluirlos).",
                suggestion="hol = hol[hol['transferred']==False].copy()"
            ))

    # Integridad con stores para Regional/Local
    try:
        issues = 0
        # Regional
        reg = hol[hol["locale"]=="Regional"]
        if not reg.empty and {"locale_name","state"}.issubset(stores.columns.union({"locale_name"})):
            valid_states = set(stores["state"].dropna().unique())
            bad_reg = ~reg["locale_name"].isin(valid_states)
            if bad_reg.any():
                n = int(bad_reg.sum()); issues += n
                report.add_issue(DQIssue(
                    ds, "regional_unmatched_state", "WARNING",
                    n_affected=n, pct_affected=_pct(n, len(reg)),
                    description="Algunos 'locale_name' Regional no hacen match con stores.state.",
                    suggestion="Normalizar nombres de estado o diccionario de equivalencias.",
                    sample_index=reg.loc[bad_reg,"locale_name"].dropna().unique().tolist()[:10]
                ))
        # Local
        loc = hol[hol["locale"]=="Local"]
        if not loc.empty and {"locale_name","city"}.issubset(stores.columns.union({"locale_name"})):
            valid_cities = set(stores["city"].dropna().unique())
            bad_loc = ~loc["locale_name"].isin(valid_cities)
            if bad_loc.any():
                n = int(bad_loc.sum()); issues += n
                report.add_issue(DQIssue(
                    ds, "local_unmatched_city", "WARNING",
                    n_affected=n, pct_affected=_pct(n, len(loc)),
                    description="Algunos 'locale_name' Local no hacen match con stores.city.",
                    suggestion="Normalizar nombres de ciudad o diccionario de equivalencias.",
                    sample_index=loc.loc[bad_loc,"locale_name"].dropna().unique().tolist()[:10]
                ))
    except Exception:
        pass


# ==========================================================
# Chequeos cruzados (integridad referencial)
# ==========================================================

def validate_cross_integrity(raw: Dict[str, pd.DataFrame], report: DQReport) -> None:
    ventas = raw["ventas"]; items = raw["items"]; stores = raw["stores"]
    trans = raw["trans"]

    # ventas.item_nbr ⊆ items.item_nbr
    if set(["item_nbr"]).issubset(ventas.columns) and "item_nbr" in items.columns:
        missing_items = set(ventas["item_nbr"].unique()) - set(items["item_nbr"].unique())
        if missing_items:
            report.add_issue(DQIssue(
                "cross", "ventas_items_fk", "ERROR",
                n_affected=len(missing_items), pct_affected=_pct(len(missing_items), ventas["item_nbr"].nunique()),
                description="Existen item_nbr en ventas sin correspondencia en items.",
                suggestion="Revisar 'items.csv' o depurar registros huérfanos.",
                sample_index=list(sorted(list(missing_items)))[:10]
            ))

    # ventas.store_nbr ⊆ stores.store_nbr
    if set(["store_nbr"]).issubset(ventas.columns) and "store_nbr" in stores.columns:
        missing_stores = set(ventas["store_nbr"].unique()) - set(stores["store_nbr"].unique())
        if missing_stores:
            report.add_issue(DQIssue(
                "cross", "ventas_stores_fk", "ERROR",
                n_affected=len(missing_stores), pct_affected=_pct(len(missing_stores), ventas["store_nbr"].nunique()),
                description="Existen store_nbr en ventas sin correspondencia en stores.",
                suggestion="Revisar 'stores.csv' o depurar registros huérfanos.",
                sample_index=list(sorted(list(missing_stores)))[:10]
            ))

    # trans.store_nbr ⊆ stores.store_nbr
    if set(["store_nbr"]).issubset(trans.columns) and "store_nbr" in stores.columns:
        missing_stores_t = set(trans["store_nbr"].unique()) - set(stores["store_nbr"].unique())
        if missing_stores_t:
            report.add_issue(DQIssue(
                "cross", "trans_stores_fk", "ERROR",
                n_affected=len(missing_stores_t), pct_affected=_pct(len(missing_stores_t), trans["store_nbr"].nunique()),
                description="Existen store_nbr en trans sin correspondencia en stores.",
                suggestion="Revisar 'stores.csv' o depurar registros huérfanos.",
                sample_index=list(sorted(list(missing_stores_t)))[:10]
            ))


# ==========================================================
# Orquestador
# ==========================================================

def run_all_validations(raw: Dict[str, pd.DataFrame]) -> DQReport:
    """
    Ejecuta todas las validaciones y retorna un DQReport con issues y resumen.
    """
    report = DQReport()

    # Por dataset
    validate_ventas(raw["ventas"], report)
    validate_items(raw["items"], report)
    validate_stores(raw["stores"], report)
    validate_transactions(raw["trans"], report)
    validate_oil(raw["oil"], report)
    validate_holidays(raw["hol"], raw["stores"], report)

    # Integridad cruzada
    validate_cross_integrity(raw, report)

    return report


# ==========================================================
# Persistencia de reportes
# ==========================================================

def save_report(report: DQReport, out_dir: Optional[str] = None) -> Dict[str, str]:
    """
    Guarda el reporte en CSV y Markdown. Devuelve rutas generadas.
    """
    if out_dir is None:
        out_dir = config.REPORT_DIR
    os.makedirs(out_dir, exist_ok=True)
    df = report.to_dataframe()
    csv_path = os.path.join(out_dir, "data_quality_report.csv")
    md_path  = os.path.join(out_dir, "data_quality_report.md")
    df.to_csv(csv_path, index=False)
    with open(md_path, "w", encoding="utf-8") as f:
        f.write(report.to_markdown())
    return {"csv": csv_path, "md": md_path}

In [2]:
raw = {
    "ventas": pd.read_csv(config.DATA_DIR + "\\train.csv"),
    "items": pd.read_csv(config.DATA_DIR + "\\items.csv"),
    "stores": pd.read_csv(config.DATA_DIR + "\\stores.csv"),
    "trans": pd.read_csv(config.DATA_DIR + "\\transactions.csv"),
    "oil": pd.read_csv(config.DATA_DIR + "\\oil.csv"),
    "hol": pd.read_csv(config.DATA_DIR + "\\holidays_events.csv"),
}

report = run_all_validations(raw)
save_report(report)

  "ventas": pd.read_csv(config.DATA_DIR + "\\train.csv"),


{'csv': 'D:\\repos\\cannibalization_reatail\\.data\\raw_data\\data_quality_report.csv',
 'md': 'D:\\repos\\cannibalization_reatail\\.data\\raw_data\\data_quality_report.md'}