# Homework Starter — Stage 05: Data Storage
Name: 
Date: 

Objectives:
- Env-driven paths to `data/raw/` and `data/processed/`
- Save CSV and Parquet; reload and validate
- Abstract IO with utility functions; document choices

In [1]:
import os, pathlib, datetime as dt
import pandas as pd
from dotenv import load_dotenv

load_dotenv()
RAW = pathlib.Path(os.getenv('DATA_DIR_RAW', 'data/raw'))
PROC = pathlib.Path(os.getenv('DATA_DIR_PROCESSED', 'data/processed'))
RAW.mkdir(parents=True, exist_ok=True)
PROC.mkdir(parents=True, exist_ok=True)
print('RAW ->', RAW.resolve())
print('PROC ->', PROC.resolve())

RAW -> /Users/wenshan/Downloads/data/raw
PROC -> /Users/wenshan/Downloads/data/processed


In [None]:
import os
from pathlib import Path
from dotenv import load_dotenv, find_dotenv

dotenv_path = find_dotenv(usecwd=True)
if not dotenv_path:
    Path(".env").write_text("DATA_DIR_RAW=data/raw\nDATA_DIR_PROCESSED=data/processed\n", encoding="utf-8")
    dotenv_path = ".env"

load_dotenv(dotenv_path=dotenv_path, override=True)

os.environ.setdefault("DATA_DIR_RAW", "data/raw")
os.environ.setdefault("DATA_DIR_PROCESSED", "data/processed")

print("Loaded .env from:", Path(dotenv_path).resolve())
print("DATA_DIR_RAW:", os.getenv("DATA_DIR_RAW"))
print("DATA_DIR_PROCESSED:", os.getenv("DATA_DIR_PROCESSED"))


Loaded .env from: /Users/wenshan/Downloads/.env
DATA_DIR_RAW: data/raw
DATA_DIR_PROCESSED: data/processed


## 1) Create or Load a Sample DataFrame
You may reuse data from prior stages or create a small synthetic dataset.

In [None]:
from pathlib import Path

DATA_DIR_RAW = Path(os.getenv("DATA_DIR_RAW"))
DATA_DIR_PROCESSED = Path(os.getenv("DATA_DIR_PROCESSED"))

DATA_DIR_RAW.mkdir(parents=True, exist_ok=True)
DATA_DIR_PROCESSED.mkdir(parents=True, exist_ok=True)

print("RAW dir:", DATA_DIR_RAW.resolve())
print("PROCESSED dir:", DATA_DIR_PROCESSED.resolve())


RAW dir: /Users/wenshan/Downloads/data/raw
PROCESSED dir: /Users/wenshan/Downloads/data/processed


In [3]:
import numpy as np
dates = pd.date_range('2024-01-01', periods=20, freq='D')
df = pd.DataFrame({'date': dates, 'ticker': ['AAPL']*20, 'price': 150 + np.random.randn(20).cumsum()})
df.head()

Unnamed: 0,date,ticker,price
0,2024-01-01,AAPL,152.719671
1,2024-01-02,AAPL,152.669191
2,2024-01-03,AAPL,152.625522
3,2024-01-04,AAPL,152.681287
4,2024-01-05,AAPL,153.16875


## 2) Save CSV to data/raw/ and Parquet to data/processed/ (TODO)
- Use timestamped filenames.
- Handle missing Parquet engine gracefully.

In [None]:

from pathlib import Path
import pandas as pd

RAW_DIR = Path(os.getenv("DATA_DIR_RAW", "data/raw"))

preferred_prefix = "api_"  
candidates = sorted(RAW_DIR.glob(preferred_prefix + "*.csv"), key=lambda p: p.stat().st_mtime, reverse=True)
if not candidates:
    fallback = "web_" if preferred_prefix == "api_" else "api_"
    candidates = sorted(RAW_DIR.glob(fallback + "*.csv"), key=lambda p: p.stat().st_mtime, reverse=True)

if not candidates:
    raise FileNotFoundError(f"No raw CSVs found under {RAW_DIR}. Expected files like api_*.csv or web_*.csv")

csv_path = candidates[0] 
print("Loading:", csv_path.name)
DF_TO_SAVE = pd.read_csv(csv_path)

for col in ["date"]:
    if col in DF_TO_SAVE.columns:
        DF_TO_SAVE[col] = pd.to_datetime(DF_TO_SAVE[col], errors="coerce")

print("DF_TO_SAVE shape:", DF_TO_SAVE.shape)
DF_TO_SAVE.head()


Loading: api_source-alpha_symbol-AAPL_20250818-011602.csv
DF_TO_SAVE shape: (63, 2)


Unnamed: 0,date,adj_close
0,2025-05-16,211.020508
1,2025-05-19,208.54332
2,2025-05-20,206.625504
3,2025-05-21,201.860901
4,2025-05-22,201.131729


In [13]:
def ts(): return dt.datetime.now().strftime('%Y%m%d-%H%M%S')

# TODO: Save CSV
csv_path = RAW / f"sample_{ts()}.csv"
df.to_csv(csv_path, index=False)
csv_path

# TODO: Save Parquet
pq_path = PROC / f"sample_{ts()}.parquet"
try:
    df.to_parquet(pq_path)
except Exception as e:
    print('Parquet engine not available. Install pyarrow or fastparquet to complete this step.')
    pq_path = None
pq_path

Parquet engine not available. Install pyarrow or fastparquet to complete this step.


## 3) Reload and Validate (TODO)
- Compare shapes and key dtypes.

In [11]:
def validate_loaded(original, reloaded):
    checks = {
        'shape_equal': original.shape == reloaded.shape,
        'date_is_datetime': pd.api.types.is_datetime64_any_dtype(reloaded['date']) if 'date' in reloaded.columns else False,
        'price_is_numeric': pd.api.types.is_numeric_dtype(reloaded['price']) if 'price' in reloaded.columns else False,
    }
    return checks

df_csv = pd.read_csv(csv_path, parse_dates=['date'])
validate_loaded(df, df_csv)

{'shape_equal': False, 'date_is_datetime': True, 'price_is_numeric': False}

In [14]:
if pq_path:
    try:
        df_pq = pd.read_parquet(pq_path)
        validate_loaded(df, df_pq)
    except Exception as e:
        print('Parquet read failed:', e)

## 4) Utilities (TODO)
- Implement `detect_format`, `write_df`, `read_df`.
- Use suffix to route; create parent dirs if needed; friendly errors for Parquet.

In [None]:
from __future__ import annotations
import os
from pathlib import Path
from datetime import datetime
import pandas as pd

DATA_DIR_RAW = Path(os.getenv("DATA_DIR_RAW", "data/raw"))
DATA_DIR_PROCESSED = Path(os.getenv("DATA_DIR_PROCESSED", "data/processed"))
DATA_DIR_RAW.mkdir(parents=True, exist_ok=True)
DATA_DIR_PROCESSED.mkdir(parents=True, exist_ok=True)

def _ts() -> str:
    return datetime.now().strftime("%Y%m%d-%H%M%S")


In [None]:

def _choose_parquet_engine() -> str:
    """
    Prefer pyarrow; fall back to fastparquet. Raise with helpful msg if neither.
    """
    try:
        import pyarrow  # noqa: F401
        return "pyarrow"
    except Exception:
        try:
            import fastparquet  # noqa: F401
            return "fastparquet"
        except Exception:
            raise RuntimeError(
                "No Parquet engine found. Install one:\n"
                "  pip install pyarrow   # recommended\n"
                "  # or\n"
                "  pip install fastparquet"
            )

def _build_filename(prefix: str, fmt: str, **meta) -> str:
    """
    Make a filename like: {prefix}_{k1-v1}_{k2-v2}_{timestamp}.{fmt}
    Only include meta with non-empty values; keys sorted for stability.
    """
    parts = [prefix]
    for k in sorted(meta.keys()):
        v = str(meta[k]).strip()
        if v != "":
            parts.append(f"{k}-{v}")
    parts.append(_ts())
    return "_".join(parts) + f".{fmt}"

def write_df(
    df: pd.DataFrame,
    area: str = "raw",    # 'raw' or 'processed' (selects env dir)
    prefix: str = "sample",
    fmt: str = "csv",     # 'csv' or 'parquet'
    index: bool = False,
    **meta,
) -> Path:
    """
    Write a DataFrame to CSV or Parquet under the env-driven directory.
    Returns the absolute Path written. Adds metadata to filename.
    """
    if not isinstance(df, pd.DataFrame):
        raise TypeError("write_df: df must be a pandas DataFrame")
    if fmt not in {"csv", "parquet"}:
        raise ValueError("write_df: fmt must be 'csv' or 'parquet'")
    if area not in {"raw", "processed"}:
        raise ValueError("write_df: area must be 'raw' or 'processed'")

    base_dir = DATA_DIR_RAW if area == "raw" else DATA_DIR_PROCESSED
    base_dir.mkdir(parents=True, exist_ok=True)

    fname = _build_filename(prefix=prefix, fmt=fmt, **meta)
    out_path = (base_dir / fname).resolve()

    if fmt == "csv":
        df.to_csv(out_path, index=index)
    else:
        engine = _choose_parquet_engine()
        df.to_parquet(out_path, index=index, engine=engine)

    print(f"Saved ({fmt}) → {out_path}")
    return out_path

def read_df(path: str | Path) -> pd.DataFrame:
    """
    Read a DataFrame back from CSV/Parquet by file extension.
    """
    p = Path(path)
    if not p.exists():
        raise FileNotFoundError(f"read_df: path not found: {p}")
    ext = p.suffix.lower()
    if ext == ".csv":
        return pd.read_csv(p)
    elif ext == ".parquet":
        # Try both engines transparently
        try:
            return pd.read_parquet(p, engine="pyarrow")
        except Exception:
            return pd.read_parquet(p, engine="fastparquet")
    else:
        raise ValueError(f"read_df: unsupported extension '{ext}' (use .csv or .parquet)")


In [None]:

DF_CHECK = None
for cand in ["DF_TO_SAVE", "df_api", "df_web"]:
    if cand in globals():
        DF_CHECK = globals()[cand]
        which = cand
        break

if DF_CHECK is None:
    raise RuntimeError("No DataFrame found in memory. Load one CSV first (your Stage 05 prep cell).")

print("Using DataFrame in memory:", which, "shape:", DF_CHECK.shape)

csv_path = write_df(
    DF_CHECK,
    area="raw",
    prefix="utiltest",
    fmt="csv",
    source="utilities",
    note="csv"
)

try:
    pq_path = write_df(
        DF_CHECK,
        area="processed",
        prefix="utiltest",
        fmt="parquet",
        source="utilities",
        note="parquet"
    )
    parquet_ok = True
except RuntimeError as e:
    print("Parquet engine missing →", e)
    parquet_ok = False
    pq_path = None

df_csv_back = read_df(csv_path)
print("Read-back CSV shape:", df_csv_back.shape)

if parquet_ok:
    df_pq_back = read_df(pq_path)
    print("Read-back Parquet shape:", df_pq_back.shape)


Using DataFrame in memory: DF_TO_SAVE shape: (63, 2)
Saved (csv) → /Users/wenshan/Downloads/data/raw/utiltest_note-csv_source-utilities_20250818-013433.csv
Parquet engine missing → No Parquet engine found. Install one:
  pip install pyarrow   # recommended
  # or
  pip install fastparquet
Read-back CSV shape: (63, 2)


In [20]:
import sys, subprocess
print("Python:", sys.executable)
subprocess.check_call([sys.executable, "-m", "pip", "install", "-U", "pyarrow"])


Python: /opt/miniconda3/bin/python
Collecting pyarrow
  Downloading pyarrow-21.0.0-cp313-cp313-macosx_12_0_arm64.whl.metadata (3.3 kB)
Downloading pyarrow-21.0.0-cp313-cp313-macosx_12_0_arm64.whl (31.2 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m31.2/31.2 MB[0m [31m2.1 MB/s[0m eta [36m0:00:00[0m00:01[0m00:01[0m
[?25hInstalling collected packages: pyarrow
Successfully installed pyarrow-21.0.0


0

In [22]:
import sys, subprocess
print("Python:", sys.executable)
subprocess.check_call([sys.executable, "-m", "pip", "install", "-U", "fastparquet"])


Python: /opt/miniconda3/bin/python
Collecting fastparquet
  Downloading fastparquet-2024.11.0-cp313-cp313-macosx_11_0_arm64.whl.metadata (4.2 kB)
Collecting cramjam>=2.3 (from fastparquet)
  Downloading cramjam-2.11.0-cp313-cp313-macosx_11_0_arm64.whl.metadata (5.6 kB)
Collecting fsspec (from fastparquet)
  Downloading fsspec-2025.7.0-py3-none-any.whl.metadata (12 kB)
Downloading fastparquet-2024.11.0-cp313-cp313-macosx_11_0_arm64.whl (683 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m683.8/683.8 kB[0m [31m19.2 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading cramjam-2.11.0-cp313-cp313-macosx_11_0_arm64.whl (1.7 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.7/1.7 MB[0m [31m4.6 MB/s[0m eta [36m0:00:00[0m-:--:--[0m
[?25hDownloading fsspec-2025.7.0-py3-none-any.whl (199 kB)
Installing collected packages: fsspec, cramjam, fastparquet
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m3/3[0m [fastparquet]
[1A[2KSuccessfully ins

0

In [None]:
import typing as t, pathlib, pandas as pd, numpy as np

def _choose_parquet_engine() -> str:
    """Prefer pyarrow; fall back to fastparquet; raise helpful message if neither."""
    try:
        import pyarrow  # noqa: F401
        return "pyarrow"
    except Exception:
        try:
            import fastparquet  # noqa: F401
            return "fastparquet"
        except Exception:
            raise RuntimeError(
                "Parquet engine not available. Install one of:\n"
                "  pip install pyarrow   # recommended\n"
                "  # or\n"
                "  pip install fastparquet"
            )

def _parquet_sanitize_for_write(df: pd.DataFrame) -> pd.DataFrame:
    """
    Make df easy for parquet writers:
      - plain string column names
      - tz-naive datetimes
      - convert list/dict objects to JSON strings
      - cast mixed-type object columns to string
    """
    x = df.copy()

    # 1) column names to strings
    x.columns = [str(c) for c in x.columns]

    # 2) tz-naive datetimes
    for c in x.columns:
        if pd.api.types.is_datetime64_any_dtype(x[c]):
            try:
                # If tz-aware, normalize to UTC then drop tz
                if getattr(x[c].dt, "tz", None) is not None:
                    x[c] = x[c].dt.tz_convert("UTC").dt.tz_localize(None)
            except Exception:
                # If parsing fails, coerce and drop tz anyway
                x[c] = pd.to_datetime(x[c], errors="coerce")
            # ensure dtype is datetime64[ns]
            x[c] = pd.to_datetime(x[c], errors="coerce")

    # 3) object columns: lists/dicts -> JSON, mixed -> str
    import json
    for c in x.columns:
        if x[c].dtype == "object":
            sample = x[c].dropna().head(10).tolist()
            if any(isinstance(v, (list, dict)) for v in sample):
                x[c] = x[c].apply(lambda v: json.dumps(v) if isinstance(v, (list, dict)) else v)
            # detect mixed types (cheap heuristic)
            types = {type(v) for v in x[c].dropna().head(25)}
            if len(types) > 1:
                x[c] = x[c].astype(str)

    return x

def _csv_detect_date_cols(header_only_df: pd.DataFrame) -> t.List[str]:
    """
    Given a header-only DataFrame (nrows=0), decide which columns to parse as dates.
    If a 'date' column exists, we parse just that to avoid surprises.
    """
    cols = list(header_only_df.columns)
    return ['date'] if 'date' in cols else []

def detect_format(path: t.Union[str, pathlib.Path]):
    s = str(path).lower()
    if s.endswith('.csv'): return 'csv'
    if s.endswith('.parquet') or s.endswith('.pq') or s.endswith('.parq'): return 'parquet'
    raise ValueError('Unsupported format: ' + s)

def write_df(df: pd.DataFrame, path: t.Union[str, pathlib.Path]):
    """
    Write CSV or Parquet. For Parquet:
      - auto-picks engine (pyarrow -> fastparquet)
      - retries with sanitized copy if the first attempt fails
      - raises a clear message if engines are missing
    """
    p = pathlib.Path(path); p.parent.mkdir(parents=True, exist_ok=True)
    fmt = detect_format(p)

    if fmt == 'csv':
        df.to_csv(p, index=False)
        return p

    # parquet branch
    engine = _choose_parquet_engine()
    try:
        df.to_parquet(p, index=False, engine=engine)
        return p
    except Exception as e:
        # retry once with sanitized data
        try:
            clean = _parquet_sanitize_for_write(df)
            clean.to_parquet(p, index=False, engine=engine)
            return p
        except Exception as e2:
            # last resort: if we have pyarrow, try lower-level conversion for a better error
            try:
                if engine == "pyarrow":
                    import pyarrow as pa, pyarrow.parquet as pq
                    table = pa.Table.from_pandas(_parquet_sanitize_for_write(df), preserve_index=False, safe=False)
                    pq.write_table(table, p)
                    return p
            except Exception as e3:
                pass
            raise RuntimeError(
                f"Parquet write failed with engine '{engine}'. "
                "Tried with sanitized data too. Common fixes:\n"
                "  * Ensure all column names are unique strings\n"
                "  * Convert mixed-type object columns to string\n"
                "  * Make datetimes tz-naive\n"
                "If still failing, try the other engine:\n"
                "  pip install fastparquet   # or pyarrow\n"
                f"Original error: {repr(e)}\n"
                f"Sanitized error: {repr(e2)}"
            ) from e

def read_df(path: t.Union[str, pathlib.Path]):
    """
    Read CSV (parse 'date' column if present) or Parquet (auto engine).
    """
    p = pathlib.Path(path)
    fmt = detect_format(p)

    if fmt == 'csv':
        # read header once, decide parse_dates, then load the full CSV
        header = pd.read_csv(p, nrows=0)
        parse_dates = _csv_detect_date_cols(header)
        if parse_dates:
            return pd.read_csv(p, parse_dates=parse_dates)
        else:
            return pd.read_csv(p)

    # parquet
    try:
        return pd.read_parquet(p, engine="pyarrow")
    except Exception:
        try:
            return pd.read_parquet(p, engine="fastparquet")
        except Exception as e:
            raise RuntimeError('Parquet engine not available or file unreadable with available engines. '
                               'Install pyarrow or fastparquet.') from e

p_csv = RAW / f"util_{ts()}.csv"
p_pq  = PROC / f"util_{ts()}.parquet"

write_df(df, p_csv)
print("CSV round-trip:", read_df(p_csv).shape)

try:
    write_df(df, p_pq)
    print("Parquet round-trip:", read_df(p_pq).shape)
except RuntimeError as e:
    print('Skipping Parquet util demo:', e)


CSV round-trip: (20, 3)
Parquet round-trip: (20, 3)


In [25]:
import typing as t, pathlib

def detect_format(path: t.Union[str, pathlib.Path]):
    s = str(path).lower()
    if s.endswith('.csv'): return 'csv'
    if s.endswith('.parquet') or s.endswith('.pq') or s.endswith('.parq'): return 'parquet'
    raise ValueError('Unsupported format: ' + s)

def write_df(df: pd.DataFrame, path: t.Union[str, pathlib.Path]):
    p = pathlib.Path(path); p.parent.mkdir(parents=True, exist_ok=True)
    fmt = detect_format(p)
    if fmt == 'csv':
        df.to_csv(p, index=False)
    else:
        try:
            df.to_parquet(p)
        except Exception as e:
            raise RuntimeError('Parquet engine not available. Install pyarrow or fastparquet.') from e
    return p

def read_df(path: t.Union[str, pathlib.Path]):
    p = pathlib.Path(path)
    fmt = detect_format(p)
    if fmt == 'csv':
        return pd.read_csv(p, parse_dates=['date']) if 'date' in pd.read_csv(p, nrows=0).columns else pd.read_csv(p)
    else:
        try:
            return pd.read_parquet(p)
        except Exception as e:
            raise RuntimeError('Parquet engine not available. Install pyarrow or fastparquet.') from e

# Demo
p_csv = RAW / f"util_{ts()}.csv"
p_pq  = PROC / f"util_{ts()}.parquet"
write_df(df, p_csv); read_df(p_csv).head()
try:
    write_df(df, p_pq)
    read_df(p_pq).head()
except RuntimeError as e:
    print('Skipping Parquet util demo:', e)

Skipping Parquet util demo: Parquet engine not available. Install pyarrow or fastparquet.


## 5) Documentation (TODO)
- Update README with a **Data Storage** section (folders, formats, env usage).
- Summarize validation checks and any assumptions.

In [None]:
from pathlib import Path
import pandas as pd
import os

RAW  = Path(os.getenv("DATA_DIR_RAW", "data/raw"))
PROC = Path(os.getenv("DATA_DIR_PROCESSED", "data/processed"))

def newest(glob_iter):
    files = sorted(glob_iter, key=lambda p: p.stat().st_mtime, reverse=True)
    return files[0] if files else None

csv_path = newest(RAW.glob("*.csv"))
pq_path  = newest(PROC.glob("*.parquet"))

print("Newest RAW CSV:", csv_path.name if csv_path else "None found")
print("Newest PROCESSED Parquet:", pq_path.name if pq_path else "None found")

if 'read_df' not in globals():
    raise RuntimeError("read_df() not found — run your Utilities cell first.")

df_csv = read_df(csv_path) if csv_path else None
df_pq  = read_df(pq_path) if pq_path else None

def _fallback_validate(df: pd.DataFrame, required_cols):
    missing = [c for c in required_cols if c not in df.columns]
    out = {"missing": missing, "shape": df.shape, "na_total": int(df.isna().sum().sum())}
    return out

def _smart_required(df: pd.DataFrame):
    cols = set(df.columns.str.lower())
    if {"date","adj_close"}.issubset(cols):
        return ["date","adj_close"]
    elif {"symbol","security"}.intersection(cols):
        need = []
        for c in ["symbol","security"]:
            if c in cols: need.append(c)
        return need
    else:
        return []

def _validate(df: pd.DataFrame):
    req = _smart_required(df)
    if 'validate' in globals():
        try:
            return validate(df, req)  # your notebook's helper
        except Exception:
            return _fallback_validate(df, req)
    else:
        return _fallback_validate(df, req)

if df_csv is not None:
    v_csv = _validate(df_csv)
    print("\nCSV reload → shape:", df_csv.shape, "| Validation:", v_csv)

if df_pq is not None:
    v_pq = _validate(df_pq)
    print("Parquet reload → shape:", df_pq.shape, "| Validation:", v_pq)

if df_csv is not None:
    display(df_csv.head(3))
if df_pq is not None:
    display(df_pq.head(3))


Newest RAW CSV: util_20250818-014145.csv
Newest PROCESSED Parquet: util_20250818-014145.parquet

CSV reload → shape: (20, 3) | Validation: {'missing': [], 'shape': (20, 3), 'na_total': 0}
Parquet reload → shape: (20, 3) | Validation: {'missing': [], 'shape': (20, 3), 'na_total': 0}


Unnamed: 0,date,ticker,price
0,2024-01-01,AAPL,152.719671
1,2024-01-02,AAPL,152.669191
2,2024-01-03,AAPL,152.625522


Unnamed: 0,date,ticker,price
0,2024-01-01,AAPL,152.719671
1,2024-01-02,AAPL,152.669191
2,2024-01-03,AAPL,152.625522
