# Stage 05 — Data Storage (Starter)

This notebook demonstrates:
- Saving a DataFrame to **CSV** (`data/raw/`) and **Parquet** (`data/processed/`) using environment-driven paths.
- Reloading and validating shape/dtypes.
- Utility functions `write_df` / `read_df` routing by **file suffix**, with graceful handling when Parquet engine is missing.


In [None]:
# --- Setup: env paths & imports (English-only comments) ---
from pathlib import Path
from datetime import datetime
import os
import pandas as pd
from dotenv import load_dotenv

# Load .env and resolve directories
load_dotenv()
ROOT = Path.cwd()
DATA_DIR_RAW = Path(os.getenv("DATA_DIR_RAW", "data/raw"))
DATA_DIR_PROCESSED = Path(os.getenv("DATA_DIR_PROCESSED", "data/processed"))
DATA_DIR_RAW.mkdir(parents=True, exist_ok=True)
DATA_DIR_PROCESSED.mkdir(parents=True, exist_ok=True)

STAMP = datetime.now().strftime("%Y%m%d-%H%M")
print("RAW ->", DATA_DIR_RAW.resolve())
print("PROC->", DATA_DIR_PROCESSED.resolve())


In [None]:
# --- Create a small sample DataFrame (stable, no network) ---
import numpy as np

df = pd.DataFrame({
    "date": pd.date_range(end=pd.Timestamp.today().normalize(), periods=5, freq="D"),
    "ticker": ["AAPL", "MSFT", "GOOGL", "AMZN", "META"],
    "close": [189.2, 414.1, 168.3, 176.4, 512.7],
    "volume": [1_234_000, 2_345_000, 1_111_000, 1_765_000, 2_222_000]
})
df


In [None]:
# --- Task 1: Save in two formats (CSV in raw, Parquet in processed) ---
csv_path = DATA_DIR_RAW / f"sample_{STAMP}.csv"
pq_path = DATA_DIR_PROCESSED / f"sample_{STAMP}.parquet"

# Save CSV
df.to_csv(csv_path, index=False)

# Save Parquet (prefer pyarrow if available)
engine = None
try:
    import pyarrow  # noqa: F401
    engine = "pyarrow"
except Exception:
    engine = None

if engine is None:
    # Try to save with default (may fail if no engine installed)
    try:
        df.to_parquet(pq_path, index=False)
    except Exception as e:
        raise RuntimeError(
            "Parquet engine is missing. Please install pyarrow: `pip install pyarrow`."
        ) from e
else:
    df.to_parquet(pq_path, index=False, engine=engine)

print("Saved:", csv_path)
print("Saved:", pq_path)


In [None]:
# --- Task 2: Reload and validate ---
df_csv = pd.read_csv(csv_path)
df_pq = pd.read_parquet(pq_path)

print("Shapes:", df_csv.shape, df_pq.shape)

# Critical dtype expectations
expected_dtypes = {
    "date": "datetime64[ns]",
    "ticker": "object",
    "close": "float64",
    "volume": "int64"
}

def coerce_and_check(d, expectations):
    # Coerce
    if "date" in d.columns:
        d["date"] = pd.to_datetime(d["date"], errors="coerce")
    for c in ["close", "volume"]:
        if c in d.columns:
            d[c] = pd.to_numeric(d[c], errors="coerce")
    # Report
    ok_shape = (df_csv.shape == df_pq.shape)
    print("Same shape:", ok_shape)
    problems = []
    for col, exp in expectations.items():
        if col not in d.columns:
            problems.append(f"{col}: missing")
            continue
        actual = str(d[col].dtype)
        if exp not in actual:
            problems.append(f"{col}: expected {exp}, got {actual}")
    return problems

problems_csv = coerce_and_check(df_csv.copy(), expected_dtypes)
problems_pq  = coerce_and_check(df_pq.copy(),  expected_dtypes)

print("CSV dtype issues:", problems_csv if problems_csv else "None")
print("Parquet dtype issues:", problems_pq if problems_pq else "None")

df_csv.head(), df_pq.head()


In [None]:
# --- Task 3: Utilities write_df / read_df ---
from typing import Optional

def write_df(df: pd.DataFrame, path: Path) -> None:
    path = Path(path)
    path.parent.mkdir(parents=True, exist_ok=True)
    suffix = path.suffix.lower()
    if suffix == ".csv":
        df.to_csv(path, index=False)
    elif suffix == ".parquet":
        # Prefer pyarrow; provide a clear message if missing
        try:
            import pyarrow  # noqa: F401
            df.to_parquet(path, index=False, engine="pyarrow")
        except Exception as e:
            raise RuntimeError("Parquet engine missing. Install pyarrow: `pip install pyarrow`.") from e
    else:
        raise ValueError(f"Unsupported suffix: {suffix}")

def read_df(path: Path) -> pd.DataFrame:
    path = Path(path)
    if not path.exists():
        raise FileNotFoundError(path)
    suffix = path.suffix.lower()
    if suffix == ".csv":
        return pd.read_csv(path)
    elif suffix == ".parquet":
        try:
            import pyarrow  # noqa: F401
            return pd.read_parquet(path, engine="pyarrow")
        except Exception as e:
            raise RuntimeError("Parquet engine missing. Install pyarrow: `pip install pyarrow`.") from e
    else:
        raise ValueError(f"Unsupported suffix: {suffix}")

# Demo the utilities
csv2 = DATA_DIR_RAW / f"sample_util_{STAMP}.csv"
pq2  = DATA_DIR_PROCESSED / f"sample_util_{STAMP}.parquet"
write_df(df, csv2)
write_df(df, pq2)
r_csv = read_df(csv2)
r_pq  = read_df(pq2)

print("Utility read shapes:", r_csv.shape, r_pq.shape)
r_csv.head()


## Documentation (to include in README)

- **Folder structure**
  - `data/raw/`: immutable raw drops (CSV)
  - `data/processed/`: typed, efficient Parquet used downstream
- **Why CSV and Parquet**
  - CSV is universal & human-readable; Parquet preserves dtypes and is smaller/faster for analytics.
- **Env-driven IO**
  - `.env` defines `DATA_DIR_RAW` and `DATA_DIR_PROCESSED`; code reads these and writes accordingly.
- **Validation**
  - Shapes must match between CSV and Parquet. Critical columns have expected dtypes (`date`, `ticker`, `close`, `volume`).

You can copy this section into your project `README.md`.
