# **01_extract**

## Objectives

* Fetch the credit card customer dataset directly from Kaggle using the prepared ingestion script.  
* Store the dataset in the project's `data/raw/` folder so it is available for later stages of the ETL process.  
* Capture the accompanying data dictionary (if provided) to help guide future cleaning and transformation.

## Inputs

* Access to the Kaggle platform with an API key configured (`kaggle.json` placed in `.kaggle/`).  
* The dataset name defined inside the ingestion script (`sakshigoyal7/credit-card-customers`).  
* Project directory structure with a `data/` folder to hold raw and interim data.

## Outputs

* A local copy of the Kaggle dataset saved under `data/raw/`.  
* A JSON version of the dataset's data dictionary (if available), saved alongside the raw data.  
* A Parquet file containing the initial snapshot of the dataset, saved under `data/interim/`.  
* This interim Parquet file is committed to the GitHub repository, so it is **not necessary** to re-run the ingestion step if you prefer to begin directly with the transformation stage.  
* A short console report showing the dataset location and initial structure (rows, columns, dtypes).

## Additional Comments

* This notebook does not analyse or transform the data; its role is only to fetch and stage it.  
* The use of the data dictionary is intended to give structure to the dataset early, simplifying later validation and transformation.  
* All subsequent notebooks in the ETL pipeline assume that this step has completed successfully and that the dataset is present in `data/raw/` and mirrored as an interim Parquet file.

---

# Data Ingestion from Kaggle

This project uses a Python script (`ingest.py`) in the project root to download the dataset directly from Kaggle.  
The script relies on the official Kaggle API and performs the following steps:

- Downloads the dataset specified in `DATASET_NAME`.
- Copies the raw files (CSV, data dictionary, etc.) into `<project-root>/data/raw/`.
- If a data dictionary is provided (e.g. Excel), parses it into a JSON schema for later use.
- Prints a summary report of rows, columns, and dtypes.

⚠️ **Kaggle API key requirement:**  
To use the Kaggle API, you must have a Kaggle account and create an API token.  
1. Go to [https://www.kaggle.com/](https://www.kaggle.com/) → *Account* → *Create New API Token*.  
2. This downloads a file called `kaggle.json`.  
3. Place the `kaggle.json` file in the hidden directory: `/.kaggle/kaggle.json` (create the `.kaggle` folder if it doesn’t exist).  
4. Ensure it has correct permissions (on Unix/Mac: `chmod 600 ~/.kaggle/kaggle.json`).

Once configured, running the notebook cell that calls `ingest.py` will automatically fetch the dataset from Kaggle and populate the `data/raw/` directory for subsequent ETL steps.

## Imports & Configuration

All imports are kept together at the top of the notebook. This is a deliberate choice:

* **Consistency with Python standards:** Ruff (our linter) enforces [PEP 8](https://peps.python.org/pep-0008/) which says all module-level imports should be at the start of a file. In notebooks, each `.ipynb` is effectively a Python module, so the same rule applies.
* **Clarity for readers:** Having imports scattered across cells makes it hard to know which libraries are required to run the notebook. Consolidating them up-front provides a single, predictable place to look.
* **Reproducibility:** When imports are at the top, you can quickly check your environment and install what’s missing. If imports are sprinkled throughout, you might only discover a missing package halfway through execution.
* **Efficiency:** Importing a library multiple times in different cells doesn’t hurt Python (it caches modules), but it clutters the notebook and increases cognitive overhead.
* **Tooling compatibility:** Linters and formatters (Ruff, nbQA) assume imports are at the top. Following this convention avoids noisy errors (e.g. E402) and makes automated checks pass smoothly.

While many data science notebooks mix imports and code ad-hoc, we are treating this notebook like a real Python module: all imports first, then configuration, then data loading and analysis. This keeps our workflow clean and professional.

In [None]:
# Imports & config (keep imports here to satisfy Ruff E402)
import os
import json
import glob
import pathlib
import hashlib
import time
import pandas as pd

import subprocess
import sys
from pathlib import Path

## Run ingest.py from the project root

In [None]:
# Figure out the project root: one directory up from this notebook
project_root = Path.cwd().parent

ingest_script = project_root / "ingest.py"

# Run the ingest script using the same Python interpreter as the notebook
result = subprocess.run(
    [sys.executable, str(ingest_script)], capture_output=True, text=True
)

print("STDOUT:\n", result.stdout)
print("STDERR:\n", result.stderr)

# Load raw data + apply schema (from data dictionary)
This cell demonstrates using the ingested Kaggle credit card customer dataset (CSV) and the derived schema JSON to build a structured pandas DataFrame with correct types, categories, and parsed dates.

In [None]:
RAW_DIR = pathlib.Path("../data/raw")


def _find_first_csv(raw_dir: pathlib.Path) -> str | None:
    csvs = glob.glob(str(raw_dir / "**" / "*.csv"), recursive=True)
    return csvs[0] if csvs else None


def _schema_path_for_csv(csv_path: str) -> str | None:
    """We saved schema as: data/raw/<dataset_folder>_schema.json"""
    folder = os.path.basename(os.path.dirname(csv_path))
    candidate = RAW_DIR / f"{folder}_schema.json"
    if candidate.exists():
        return str(candidate)
    # Fallback: first *_schema.json in raw
    any_schema = glob.glob(str(RAW_DIR / "*_schema.json"))
    return any_schema[0] if any_schema else None


def _apply_schema(df: pd.DataFrame, schema: dict) -> pd.DataFrame:
    """
    Enforce schema-defined types, categories, and dates on the DataFrame.
    The schema comes from the Kaggle data dictionary (parsed earlier and saved as JSON).
    Using this structure up-front simplifies downstream cleaning, validation, and visuals.
    """
    df = df.copy()
    dtypes = schema.get("dtypes", {})
    cats = schema.get("categories", {})
    dates = schema.get("parse_dates", [])
    descs = schema.get("descriptions", {})

    # 1) Set base dtypes (string/boolean/Int64/float64). Categories handled after.
    for col, dtype in dtypes.items():
        if col not in df.columns:
            continue
        try:
            if dtype in ("string", "boolean", "Int64", "float64"):
                df[col] = df[col].astype(dtype)
        except Exception:
            # Leave as-is if conversion fails; we’ll still try dates/categories below
            pass

    # 2) Parse dates
    for col in dates:
        if col in df.columns:
            df[col] = pd.to_datetime(df[col], errors="coerce")

    # 3) Apply categoricals (ordered if provided)
    for col, categories in cats.items():
        if col in df.columns and categories:
            try:
                ctyp = pd.api.types.CategoricalDtype(
                    categories=categories, ordered=True
                )
                df[col] = df[col].astype(ctyp)
            except Exception:
                pass

    # 4) Attach column descriptions for reference (handy in later EDA cells)
    df.attrs["dictionary"] = descs
    return df


# ---- Load CSV + Schema and build the DataFrame ----
csv_path = _find_first_csv(RAW_DIR)
assert csv_path is not None, "No CSV found in data/raw. Run the ingest step first."

df = pd.read_csv(csv_path)
schema_path = _schema_path_for_csv(csv_path)

if schema_path:
    with open(schema_path, "r", encoding="utf-8") as f:
        schema = json.load(f)
    df = _apply_schema(df, schema)
    print(f"Loaded CSV:\n  {csv_path}\nApplied schema:\n  {schema_path}")
else:
    print(f"Loaded CSV (no schema JSON found):\n  {csv_path}")

print("\nShape:", df.shape)
print("\nDtypes (first 12 columns):")
print(df.dtypes.head(12))
df

---

## Validation and Interim Data Snapshot

At this stage we pause between **Extract** and **Transform** to check the integrity of the dataset and create a stable copy for future use.

**What happens here:**
- A **validation report** is generated in JSON format.  
  - Confirms the dataset shape (rows, columns).  
  - Records detected column types and compares them with the schema supplied in the data dictionary.  
  - Flags potential issues such as missing values, duplicate IDs, negative values, or unparsed dates.  
  - Captures a simple hash of sample data so we can track provenance.

- A **Parquet file** is created under `data/interim/`.  
  - This acts as a snapshot of the dataset immediately after ingestion and validation.  
  - Parquet is chosen because it is efficient, preserves data types, and is well-suited for analytical workflows.

**Why we do this:**
- Ensures we have a trusted baseline of the data before making any changes.  
- Provides a record we can revisit if transformations introduce errors.  
- Allows collaborators (or future runs of this project) to skip the ingestion process and begin directly from a consistent, version-controlled dataset included in the GitHub repository.

In [None]:
# Assume df already loaded in memory
project_root = (
    pathlib.Path.cwd().parent
)  # notebook in jupyter_notebooks/, root is parent
interim_dir = project_root / "data" / "interim"
interim_dir.mkdir(parents=True, exist_ok=True)

report = {}

# 1) Basics
report["shape"] = {"rows": int(df.shape[0]), "cols": int(df.shape[1])}
report["columns"] = df.columns.tolist()

# 2) Schema alignment (if schema JSON exists)
schema = {}
raw_dir = project_root / "data" / "raw"
schema_jsons = list(raw_dir.glob("*_schema.json"))
if schema_jsons:
    schema_path = schema_jsons[0]
    schema = json.loads(schema_path.read_text())
    expected = (
        set(schema.get("dtypes", {}))
        | set(schema.get("parse_dates", []))
        | set(schema.get("categories", {}))
    )
    report["schema_alignment"] = {
        "expected_present": sorted(set(df.columns) & expected),
        "expected_missing": sorted(expected - set(df.columns)),
        "unexpected_columns": sorted(set(df.columns) - expected),
    }
else:
    report["schema_alignment"] = "no_schema_json_found"

# 3) Dtypes summary
report["dtypes"] = {c: str(t) for c, t in df.dtypes.items()}

# 4) Primary key check (simple heuristic)
PRIMARY_KEY = next(
    (c for c in df.columns if c.lower() in {"clientnum", "customer_id", "id"}), None
)
if PRIMARY_KEY:
    report["primary_key"] = {
        "column": PRIMARY_KEY,
        "duplicates": int(df[PRIMARY_KEY].duplicated().sum()),
        "nulls": int(df[PRIMARY_KEY].isna().sum()),
    }
else:
    report["primary_key"] = "no obvious key guessed"

# 5) Missingness map (%)
null_pct = (df.isna().mean() * 100).round(2).sort_values(ascending=False)
report["missingness_top10_pct"] = null_pct.head(10).to_dict()

# 6) Range checks
num_cols = df.select_dtypes("number").columns
report["negative_value_columns"] = [c for c in num_cols if (df[c] < 0).any()]

# 7) Date sanity
date_cols = [c for c, t in df.dtypes.items() if "datetime64" in str(t)]
report["date_columns_na_counts"] = {c: int(df[c].isna().sum()) for c in date_cols}

# 8) Provenance hash
sample_bytes = df.head(1000).to_csv(index=False).encode("utf-8")
report["provenance"] = {
    "sample_sha1": hashlib.sha1(sample_bytes).hexdigest(),
    "timestamp": int(time.time()),
}

# Save the validation report JSON
snapshot_path = interim_dir / "pre_transform_validation.json"
snapshot_path.write_text(json.dumps(report, indent=2))

# Save the interim DataFrame as Parquet
parquet_path = interim_dir / "pre_transform_snapshot.parquet"
df.to_parquet(parquet_path, index=False)

print("Validation report →", snapshot_path)
print("Interim Parquet saved →", parquet_path)

pd.Series(report["dtypes"]).head(12)

---