In [4]:
from pathlib import Path
import os
from textwrap import dedent

# 假设本 notebook 就在 homework/homework05/ 下
HW_DIR = Path.cwd()
assert HW_DIR.name.lower().startswith("homework5"), f"请在 homework5 目录下运行，当前：{HW_DIR}"

# 标准结构
DATA_RAW = HW_DIR / "data" / "raw"
DATA_PROCESSED = HW_DIR / "data" / "processed"
SRC_DIR = HW_DIR / "src"
STORAGE_DIR = SRC_DIR / "storage"
UTILS_DIR = SRC_DIR / "utils"

# 创建目录（幂等）
for p in [DATA_RAW, DATA_PROCESSED, STORAGE_DIR, UTILS_DIR]:
    p.mkdir(parents=True, exist_ok=True)

# 写入 .env（若不存在则新建；存在则保留你的旧设置）
env_path = HW_DIR / ".env"
if not env_path.exists():
    env_path.write_text("DATA_DIR_RAW=data/raw\nDATA_DIR_PROCESSED=data/processed\n", encoding="utf-8")

# 写入 README.md（若不存在则生成基础模板）
readme_path = HW_DIR / "README.md"
if not readme_path.exists():
    readme_md = dedent("""
    # Homework 05 — Data Storage

    ## Folder Structure
    - `data/raw/` – first-touch CSV
    - `data/processed/` – Parquet (columnar, compressed)
    - `notebooks/` – this notebook
    - `src/` – utilities for I/O and validation

    ## Environment
    A local `.env` in *this* folder controls where data is written:
    ```
    DATA_DIR_RAW=data/raw
    DATA_DIR_PROCESSED=data/processed
    ```

    ## How to Run
    Execute cells in the notebook. It will:
    1) Create folders and `.env`
    2) Save the sample DataFrame to CSV & Parquet
    3) Reload both and validate shapes/dtypes
    4) Use suffix-based utilities `write_df` / `read_df`
    """).strip() + "\n"
    readme_path.write_text(readme_md, encoding="utf-8")

# 可选：作业内独立 .gitignore（防止误提交数据）
gitignore_path = HW_DIR / ".gitignore"
if not gitignore_path.exists():
    gitignore_path.write_text(dedent("""
    .ipynb_checkpoints/
    .env
    data/raw/**
    data/processed/**
    """).strip()+"\n", encoding="utf-8")

print("✅ Skeleton ready:")
print(f"- {DATA_RAW}")
print(f"- {DATA_PROCESSED}")
print(f"- {env_path}")
print(f"- {readme_path}")
print(f"- {gitignore_path}")

✅ Skeleton ready:
- /Users/mengmeng/bootcamp_Shuchen_Meng/homework/homework5/data/raw
- /Users/mengmeng/bootcamp_Shuchen_Meng/homework/homework5/data/processed
- /Users/mengmeng/bootcamp_Shuchen_Meng/homework/homework5/.env
- /Users/mengmeng/bootcamp_Shuchen_Meng/homework/homework5/README.md
- /Users/mengmeng/bootcamp_Shuchen_Meng/homework/homework5/.gitignore


In [5]:
from textwrap import dedent

io_utils = dedent("""
from __future__ import annotations
from pathlib import Path
from typing import Optional, Sequence
import pandas as pd

def _ensure_parent(path: Path) -> None:
    path.parent.mkdir(parents=True, exist_ok=True)

def write_df(df: pd.DataFrame, path: Path, index: bool = False) -> None:
    \"\"\"Write by suffix (.csv / .parquet). Creates parent dirs. Clear error if parquet engine missing.\"\"\"
    _ensure_parent(path)
    suf = path.suffix.lower()
    if suf == ".csv":
        df.to_csv(path, index=index)
    elif suf == ".parquet":
        try:
            df.to_parquet(path, index=index)  # auto-detect engine; prefers pyarrow
        except Exception as e:
            raise RuntimeError(
                "Failed to write Parquet. Install a Parquet engine, e.g. `pip install pyarrow`\\n"
                f"Original error: {e}"
            ) from e
    else:
        raise ValueError(f"Unsupported suffix: {suf}. Use .csv or .parquet.")

def read_df(path: Path, parse_dates: Optional[Sequence[str]] = None) -> pd.DataFrame:
    \"\"\"Read by suffix (.csv / .parquet).\"\"\"
    suf = path.suffix.lower()
    if suf == ".csv":
        return pd.read_csv(path, parse_dates=list(parse_dates) if parse_dates else None)
    elif suf == ".parquet":
        try:
            return pd.read_parquet(path)
        except Exception as e:
            raise RuntimeError(
                "Failed to read Parquet. Install `pyarrow`.\\n"
                f"Original error: {e}"
            ) from e
    else:
        raise ValueError(f"Unsupported suffix: {suf}. Use .csv or .parquet.")
""").strip()+"\n"

validate_utils = dedent("""
from typing import Dict, Tuple, Optional
import pandas as pd

def validate_df(original: pd.DataFrame, reloaded: pd.DataFrame, expected_dtypes: Optional[Dict[str, str]] = None):
    \"\"\"Return dict: check_name -> (ok: bool, message: str).\"\"\"
    results = {}
    # shape
    same_shape = original.shape == reloaded.shape
    results["shape_match"] = (same_shape, f"original={original.shape}, reloaded={reloaded.shape}")
    # columns (order-sensitive for this HW)
    same_cols = list(original.columns) == list(reloaded.columns)
    results["columns_match_order"] = (same_cols, f"original={list(original.columns)}, reloaded={list(reloaded.columns)}")
    # dtype checks (allow CSV to degrade 'category' -> 'object'/'string')
    if expected_dtypes:
        dtype_ok = True
        msgs = []
        for col, exp in expected_dtypes.items():
            if col not in reloaded.columns:
                dtype_ok = False
                msgs.append(f"{col}: MISSING")
                continue
            got = str(reloaded[col].dtype)
            if exp == "category" and got in ("category","object","string"):
                msgs.append(f"{col}: OK (got {got}, expected {exp} acceptable)")
            elif got != exp:
                dtype_ok = False
                msgs.append(f"{col}: got {got}, expected {exp}")
            else:
                msgs.append(f"{col}: OK ({got})")
        results["dtypes_expected"] = (dtype_ok, "; ".join(msgs))
    return results
""").strip()+"\n"

(STORAGE_DIR / "io_utils.py").write_text(io_utils, encoding="utf-8")
(UTILS_DIR / "validate.py").write_text(validate_utils, encoding="utf-8")

# 让 src 变成包（空 __init__.py）
for d in [SRC_DIR, STORAGE_DIR, UTILS_DIR]:
    init_p = d / "__init__.py"
    if not init_p.exists():
        init_p.write_text("", encoding="utf-8")

print("✅ Utilities written to:")
print("-", STORAGE_DIR / "io_utils.py")
print("-", UTILS_DIR / "validate.py")

✅ Utilities written to:
- /Users/mengmeng/bootcamp_Shuchen_Meng/homework/homework5/src/storage/io_utils.py
- /Users/mengmeng/bootcamp_Shuchen_Meng/homework/homework5/src/utils/validate.py


In [6]:
import os
from dotenv import load_dotenv

# 加载作业内 .env
load_dotenv(HW_DIR / ".env")

DATA_DIR_RAW = Path(os.getenv("DATA_DIR_RAW", "data/raw"))
DATA_DIR_PROCESSED = Path(os.getenv("DATA_DIR_PROCESSED", "data/processed"))

# 相对作业目录
DATA_DIR_RAW = (HW_DIR / DATA_DIR_RAW).resolve()
DATA_DIR_PROCESSED = (HW_DIR / DATA_DIR_PROCESSED).resolve()

DATA_DIR_RAW.mkdir(parents=True, exist_ok=True)
DATA_DIR_PROCESSED.mkdir(parents=True, exist_ok=True)

DATA_DIR_RAW, DATA_DIR_PROCESSED

(PosixPath('/Users/mengmeng/bootcamp_Shuchen_Meng/homework/homework5/data/raw'),
 PosixPath('/Users/mengmeng/bootcamp_Shuchen_Meng/homework/homework5/data/processed'))

In [7]:
import pandas as pd
import numpy as np

rng = np.random.default_rng(42)
dates = pd.date_range("2024-01-01", periods=10, freq="D")
df = pd.DataFrame({
    "date": dates,
    "ticker": pd.Series(["SPY"]*10, dtype="category"),
    "close": np.linspace(450.0, 470.0, 10).round(2),
    "volume": rng.integers(1_000_000, 2_000_000, size=10, dtype=np.int64)
})
df.dtypes, df.head()

(date      datetime64[ns]
 ticker          category
 close            float64
 volume             int64
 dtype: object,
         date ticker   close   volume
 0 2024-01-01    SPY  450.00  1089250
 1 2024-01-02    SPY  452.22  1773956
 2 2024-01-03    SPY  454.44  1654571
 3 2024-01-04    SPY  456.67  1438878
 4 2024-01-05    SPY  458.89  1433015)

In [8]:
from src.storage.io_utils import write_df
csv_path = DATA_DIR_RAW / "sample_prices.csv"
parq_path = DATA_DIR_PROCESSED / "sample_prices.parquet"

# 写 CSV
write_df(df, csv_path, index=False)

# 写 Parquet（若没装 pyarrow，会抛出带安装提示的错误；此作业允许）
parquet_saved = True
try:
    write_df(df, parq_path, index=False)
except Exception as e:
    parquet_saved = False
    print("⚠️ Parquet not saved:", e)

csv_path, parq_path, parquet_saved

⚠️ Parquet not saved: Failed to write Parquet. Install a Parquet engine, e.g. `pip install pyarrow`
Original error: Unable to find a usable engine; tried using: 'pyarrow', 'fastparquet'.
A suitable version of pyarrow or fastparquet is required for parquet support.
Trying to import the above resulted in these errors:
 - Missing optional dependency 'pyarrow'. pyarrow is required for parquet support. Use pip or conda to install pyarrow.
 - Missing optional dependency 'fastparquet'. fastparquet is required for parquet support. Use pip or conda to install fastparquet.


(PosixPath('/Users/mengmeng/bootcamp_Shuchen_Meng/homework/homework5/data/raw/sample_prices.csv'),
 PosixPath('/Users/mengmeng/bootcamp_Shuchen_Meng/homework/homework5/data/processed/sample_prices.parquet'),
 False)

In [9]:
from src.storage.io_utils import read_df
from src.utils.validate import validate_df

# 读 CSV（解析日期）
df_csv = read_df(csv_path, parse_dates=["date"])

# 读 Parquet（如果写入失败则跳过）
df_parq = read_df(parq_path) if parquet_saved else None

expected = {
    "date": "datetime64[ns]",
    "ticker": "category",   # CSV 读回可能是 object/string，我们在校验里接受
    "close": "float64",
    "volume": "int64",
}

print("CSV validation:")
for k, (ok, msg) in validate_df(df, df_csv, expected).items():
    print(f" - {k}: {'✅' if ok else '❌'}  {msg}")

if df_parq is not None:
    print("\nParquet validation:")
    for k, (ok, msg) in validate_df(df, df_parq, expected).items():
        print(f" - {k}: {'✅' if ok else '❌'}  {msg}")
else:
    print("\nParquet validation: skipped (not saved)")

CSV validation:
 - shape_match: ✅  original=(10, 4), reloaded=(10, 4)
 - columns_match_order: ✅  original=['date', 'ticker', 'close', 'volume'], reloaded=['date', 'ticker', 'close', 'volume']
 - dtypes_expected: ✅  date: OK (datetime64[ns]); ticker: OK (got object, expected category acceptable); close: OK (float64); volume: OK (int64)

Parquet validation: skipped (not saved)


In [11]:
append_md = f"""
## Data Storage (Auto-Generated)

**Folders**
- `{DATA_DIR_RAW.relative_to(HW_DIR)}/` – first-touch CSV
- `{DATA_DIR_PROCESSED.relative_to(HW_DIR)}/` – Parquet for analytics

**Formats & Why**
- **CSV**: simple, universal; larger on disk, slower for analytics.
- **Parquet**: columnar + compressed (via `pyarrow`); smaller & faster for analytics.  
  If `pyarrow` is missing, code shows a clear install hint.

**Env-Driven Paths**
Values come from `.env` in this folder:
```
DATA_DIR_RAW=data/raw
DATA_DIR_PROCESSED=data/processed
\`\`\`

**Utilities**
Suffix-routed I/O:
\`\`\`python
from src.storage.io_utils import write_df, read_df

write_df(df, DATA_DIR_RAW / "table.csv")
write_df(df, DATA_DIR_PROCESSED / "table.parquet")

df_csv = read_df(DATA_DIR_RAW / "table.csv", parse_dates=["date"])
df_parq = read_df(DATA_DIR_PROCESSED / "table.parquet")
\`\`\`

**Validation**
The notebook prints checks for shape/columns/dtypes using `validate_df`.
"""

with open(HW_DIR / "README.md", "a", encoding="utf-8") as f:
    f.write("\n" + append_md.strip() + "\n")

print("✅ Data Storage section appended to homework05/README.md")

In [None]:

**Utilities**
Suffix-routed I/O:
```python
from src.storage.io_utils import write_df, read_df

write_df(df, Path("{(DATA_DIR_RAW/ 'sample_prices.csv').relative_to(HW_DIR)}"))
write_df(df, Path("{(DATA_DIR_PROCESSED/ 'sample_prices.parquet').relative_to(HW_DIR)}"))

df_csv = read_df(Path("{(DATA_DIR_RAW/ 'sample_prices.csv').relative_to(HW_DIR)}"), parse_dates=["date"])
df_parq = read_df(Path("{(DATA_DIR_PROCESSED/ 'sample_prices.parquet').relative_to(HW_DIR)}"))
