In [1]:
import mlflow

In [2]:
# ====================================================================================
# ===============================================                                   ==
# Libraries for dataset verification with DVC. ==                                   ==
# ===============================================                                   ==
from pathlib import Path  # Cross-platform path handling                            ==
from typing import Dict, Tuple, Optional  # Optional type hints for better clarity  ==
import os  # File system and environment variable handling                          ==
import yaml  # Read .dvc (YAML) pointer files                                       ==
import hashlib  # Compute MD5 hashes to verify data integrity                       ==
import pandas as pd  # Read and manage tabular data                                 ==
import subprocess    # Execute SO commands                                          ==
# ====================================================================================

In [3]:
# ===============================================================
# DVC dataset verification helpers (MD5 check + pull fallback) ==
# ===============================================================

def ensure_repo_ready(repo_root: str = "/work") -> None:
    """
    Verifies that:
    - `repo_root` is a valid project folder with Git and DVC.
    - Directory `repo_root` exists.
    - It contains a `.git` subdirectory (it's a Git repo).
    - It contains a `.dvc` subdirectory (it's a DVC repo).

    Raises:
    - FileNotFoundError if `repo_root` does not exist.
    - RuntimeError if `.git` or `.dvc` is missing.
    """
    if not os.path.isdir(repo_root):
        raise FileNotFoundError(f"Repo root does not exist: {repo_root}")
    if not os.path.isdir(os.path.join(repo_root, ".git")):
        raise RuntimeError(f"Not a Git repo: {repo_root}")
    if not os.path.isdir(os.path.join(repo_root, ".dvc")):
        raise RuntimeError(f"Not a DVC repo: {repo_root} (.dvc not found)")


def _md5_file(path: str, chunk_size: int = 1024 * 1024) -> str:
    """
    Computes the MD5 hash of a file by streaming it from disk to verify integrity
    against the value stored by DVC in the `.dvc` pointer (default md5-based cache).

    Parameters:
    - path: absolute file path.
    - chunk_size: read block size in bytes (default 1 MB).

    Returns:
    - Hex MD5 string of the file content.
    """
    h = hashlib.md5()
    with open(path, "rb") as f:
        for chunk in iter(lambda: f.read(chunk_size), b""):
            h.update(chunk)
    return h.hexdigest()


def _read_expected_md5_from_dvc(pointer_path: str) -> Optional[str]:
    """
    Reads the expected MD5 from a single-file `.dvc` pointer.

    `.dvc` format:
      - md5: <hash>
      - hash: md5
      - path: <file_name>

    Parameters:
    - pointer_path: absolute path to the `.dvc` file.

    Returns:
    - The MD5 string if present, or None if the pointer does not exist / lacks md5.

    Use:
    - Compare the expected MD5 from `.dvc` with the actual local file MD5.
    """
    if not os.path.exists(pointer_path):
        return None
    with open(pointer_path, "r", encoding="utf-8") as f:
        data = yaml.safe_load(f) or {}
    outs = data.get("outs") or []
    if not outs:
        return None
    out = outs[0]
    return out.get("md5") or out.get("checksum") or None


def _dvc_pull_target(path_repo_rel: str, repo_root: str = "/work") -> None:
    """
    Runs `dvc pull <path>` or `<path>.dvc` to materialize the correct version from the remote (S3)
    into the local workspace/cache. Raises if it fails (credentials, permissions, etc.).

    Parameters:
    - path_repo_rel: repo-relative path to fetch (e.g., "data/raw/file.csv").
    - repo_root: repo root (e.g., "/work").
    """
# Build possible targets
    dvc_pointer = os.path.join(repo_root, path_repo_rel + ".dvc")
    if os.path.exists(dvc_pointer):
        target = path_repo_rel + ".dvc"
    else:
        target = path_repo_rel

    # Try pulling
    result = subprocess.run(
        ["dvc", "pull", "--quiet", target],
        cwd=repo_root,
        capture_output=True,
        text=True,
    )

    # Raise if failed
    if result.returncode != 0:
        raise RuntimeError(
            f"Failed to run 'dvc pull {target}':\n"
            f"STDOUT:\n{result.stdout}\nSTDERR:\n{result.stderr}"
        )



def dvc_read_csv_verified(
    path_repo_rel: str,
    repo_root: str = "/work",
    prefer_dvc: bool = False,
    verify_local_md5: bool = True,
    pandas_read_csv_kwargs: Optional[Dict] = None,
) -> Tuple[pd.DataFrame, str]:
    """
    Read a DVC-versioned CSV ensuring integrity when reading locally.

    Strategy:
    - If `prefer_dvc=True`: force fetching the official version with `dvc pull`
      and then read locally. Returns ("pulled").
    - If `prefer_dvc=False`:
        1) If the local file exists and `verify_local_md5=True`, compare local MD5
           with the expected MD5 from the `.dvc` pointer.
           * If equal -> read local (fast). Returns ("local").
           * If NOT equal -> run `dvc pull` and read the official version. Returns ("pulled").
        2) If the file does NOT exist -> run `dvc pull` and read the official version. Returns ("pulled").

    Parameters:
    - path_repo_rel: repo-relative CSV path (e.g., "data/raw/file.csv").
    - repo_root: repo root (e.g., "/work").
    - prefer_dvc: if True, ignore local state and fetch official version with `dvc pull`.
    - verify_local_md5: if True, validate local MD5 before trusting local read.
    - pandas_read_csv_kwargs: kwargs for `pandas.read_csv()` (sep, encoding, etc.).

    Returns:
    - (df, source) where source ∈ {"local", "pulled"} describing the read source.

    Exceptions:
    - Raises if the file cannot be materialized from the remote (credentials,
      permissions, or missing blob).
    """
    ensure_repo_ready(repo_root)
    if pandas_read_csv_kwargs is None:
        pandas_read_csv_kwargs = {}

    local_path = os.path.join(repo_root, path_repo_rel)
    dvc_pointer = local_path + ".dvc"  # e.g., data/raw/file.csv.dvc
    expected_md5 = _read_expected_md5_from_dvc(dvc_pointer)

    # Option: force “official” read by fetching from S3
    if prefer_dvc:
        _dvc_pull_target(path_repo_rel, repo_root)
        # Note: when forcing, we don’t compare MD5; we assume `dvc pull` fetched the official version.
        return pd.read_csv(local_path, **pandas_read_csv_kwargs), "pulled"

    # If a local file exists, decide based on MD5
    if os.path.exists(local_path):
        if verify_local_md5 and expected_md5:
            try:
                md5_local = _md5_file(local_path)
                if md5_local == expected_md5:
                    # Note: “MD5 OK: local matches .dvc”
                    # Use the local version (faster) because it’s identical to the “official” one.
                    return pd.read_csv(local_path, **pandas_read_csv_kwargs), "local"
                else:
                    # MD5 differs: local != .dvc → run dvc pull
                    _dvc_pull_target(path_repo_rel, repo_root)
                    return pd.read_csv(local_path, **pandas_read_csv_kwargs), "pulled"
            except Exception:
                # Any issue during the check → ensure consistency with a pull
                _dvc_pull_target(path_repo_rel, repo_root)
                return pd.read_csv(local_path, **pandas_read_csv_kwargs), "pulled"
        else:
            # Local read without MD5 verification
            return pd.read_csv(local_path, **pandas_read_csv_kwargs), "local"

    # If no local file, fetch the official version
    _dvc_pull_target(path_repo_rel, repo_root)
    return pd.read_csv(local_path, **pandas_read_csv_kwargs), "pulled"


In [4]:
# =======================================
# Configurable dataset read parameters ==
# =======================================

# Docker mounts the project at /work. If your compose changes, adjust REPO_ROOT accordingly.
REPO_ROOT = "/work"  # Where the repo is mounted.
PATH = "data/raw/work_absenteeism_modified.csv"  # Repo-relative path of the DVC-versioned dataset.

# Arguments forwarded to pandas.read_csv. Optional: delimiter, encoding, etc.
READ_KW: Dict = {}  # e.g.: {"sep": ",", "encoding": "utf-8"}

# Read mode:
# - PREFER_DVC=True  -> Force fetching the official version with `dvc pull` and read it.
# - PREFER_DVC=False -> Prefer local only if (and only if) MD5 matches the one in the .dvc.
PREFER_DVC = False
VERIFY_LOCAL_MD5 = True

# =====================================================
# Environment inspection + demo read with MD5 legend ==
# =====================================================
print("Repo root:", REPO_ROOT, "| exists:", Path(REPO_ROOT).exists())
print("Expected CSV:", PATH)

# Show expected MD5 (if the pointer exists)
pointer_path = os.path.join(REPO_ROOT, PATH) + ".dvc"
expected = _read_expected_md5_from_dvc(pointer_path)
print("Expected MD5 (.dvc):", expected or "<no md5 in pointer>")

# If a local file exists, compute local MD5 and compare
local_abs = os.path.join(REPO_ROOT, PATH)
if os.path.exists(local_abs) and expected:
    try:
        local_md5 = _md5_file(local_abs)
        print("Local MD5:", local_md5)
        print("MD5 matches .dvc?", "YES ✅" if local_md5 == expected else "NO ❌")
    except Exception as e:
        print("Could not compute local MD5:", type(e).__name__, str(e)[:120])

# --- Robust read with integrity verification ---
# dvc_read_csv_verified does:
#   1) If PREFER_DVC=True -> run `dvc pull` and read the official version (“pulled”).
#   2) If PREFER_DVC=False:
#        - If the local file exists and VERIFY_LOCAL_MD5=True:
#            compare local MD5 against the MD5 from the .dvc pointer.
#            * If equal -> read local (fast) and consistent.
#            * If different -> `dvc pull` and read the official version.
#        - If the file does not exist locally -> `dvc pull` and read the official version.
df, source = dvc_read_csv_verified(
    PATH,
    repo_root=REPO_ROOT,
    prefer_dvc=PREFER_DVC,
    verify_local_md5=VERIFY_LOCAL_MD5,
    pandas_read_csv_kwargs=READ_KW,
)

print(f"Read from: {source} | rows={len(df)} | cols={len(df.columns)}")


Repo root: /work | exists: True
Expected CSV: data/raw/work_absenteeism_modified.csv
Expected MD5 (.dvc): 96c318341d1846f567be7127f52d03e1
Read from: pulled | rows=754 | cols=22


## Nota:

* Si alguna vez Git en el contenedor advierte “dubious ownership”: "**git config --global --add safe.directory /work**"

* Para evitar sorpresas, mantén data/raw/*.csv en .gitignore y versiona solo los *.dvc.

### Para no corromper kernels ni perder el entorno.

1. Guarda Notebook o "**ctrl+s**"
2. Cierra el kernel limpio:
   * Menú → Kernel → Shut Down Kernel
   * Luego: File → Close and Shutdown Notebook
3. Cierra la pestaña del navegador.
4. Ve a la terminal donde lo ejecutaste.
5. Presiona Ctrl + C (detiene el servidor).
6. Si te pregunta “Shutdown this notebook server (y/[n])?”, escribe y.

### Salir del docker y sincronizar notebook
1. exit
2. docker compose down
3. git add notebooks/EDA/eda_V1.ipynb
4. git commit -m "feat: análisis inicial EDA"
5. git push

In [5]:
df.head()

Unnamed: 0,ID,Reason for absence,Month of absence,Day of the week,Seasons,Transportation expense,Distance from Residence to Work,Service time,Age,Work load Average/day,...,Education,Son,Social drinker,Social smoker,Pet,Weight,Height,Body mass index,Absenteeism time in hours,mixed_type_col
0,11.0,26.0,7.0,3.0,1.0,289.0,36.0,13.0,33.0,239.554,...,1.0,2.0,1.0,0.0,1.0,90.0,172.0,30.0,4.0,535
1,36.0,0.0,7.0,3.0,1.0,118.0,13.0,18.0,50.0,239.554,...,1.0,1.0,1.0,0.0,0.0,98.0,178.0,31.0,0.0,584
2,3.0,23.0,7.0,4.0,1.0,179.0,51.0,18.0,38.0,239.554,...,1.0,0.0,1.0,0.0,0.0,89.0,170.0,31.0,2.0,249
3,7.0,7.0,7.0,5.0,1.0,279.0,5.0,14.0,39.0,239.554,...,1.0,2.0,1.0,1.0,0.0,68.0,168.0,24.0,4.0,538
4,11.0,23.0,7.0,65.0,1.0,289.0,36.0,13.0,33.0,239.554,...,1.0,2.0,1.0,0.0,1.0,90.0,172.0,30.0,2.0,85


In [5]:
import os
print("cwd:", os.getcwd())
print("list cwd:", os.listdir("."))
print("repo_root var (if defined):", globals().get("REPO_ROOT"))

cwd: /work/notebooks
list cwd: ['1.0-ELT-initial-data-exploration.ipynb', '.gitkeep', '07-aa-phase2-pipeline-experiments.ipynb', '04-aa-model-experiments.ipynb', '1.0-ERL-Utilidad-DVC-lectura-datasets.ipynb', '1_EDA', '02-aa-eda-transformations.ipynb', '01-aa-ml-canvas.ipynb', '03-aa-feature-engineering.ipynb', '.ipynb_checkpoints', '05-dl-model-experiments.ipynb']
repo_root var (if defined): /work


In [6]:
import os
import subprocess
import sys
print("Python executable:", sys.executable)
print("whoami:", subprocess.getoutput("whoami"))
print("pwd (shell):", subprocess.getoutput("pwd"))
print("which dvc:", subprocess.getoutput("which dvc"))
print("AWS_ACCESS_KEY_ID:", os.environ.get("AWS_ACCESS_KEY_ID"))
print("Repo root exists?", os.path.isdir("/work"))
print("CSV exists?", os.path.exists("/work/data/raw/work_absenteeism_modified.csv"))
print("DVC file exists?", os.path.exists("/work/data/raw/work_absenteeism_modified.csv.dvc"))

Python executable: /usr/local/bin/python3.13
whoami: root
pwd (shell): /work/notebooks
which dvc: /usr/local/bin/dvc
AWS_ACCESS_KEY_ID: AKIAT4O4LAIQ5QKNTOHS
Repo root exists? True
CSV exists? True
DVC file exists? True
