# Homework Starter — Stage 05: Data Storage
Name: Yuqing Yan
Date: 08/17/2025

Objectives:
- Env-driven paths to `data/raw/` and `data/processed/`
- Save CSV and Parquet; reload and validate
- Abstract IO with utility functions; document choices

In [1]:
import os, pathlib, datetime as dt
import pandas as pd
import numpy as np
from dotenv import load_dotenv

# 1. Initialize environment paths
load_dotenv()
RAW = pathlib.Path(os.getenv('DATA_DIR_RAW', 'data/raw'))
PROC = pathlib.Path(os.getenv('DATA_DIR_PROCESSED', 'data/processed'))
RAW.mkdir(parents=True, exist_ok=True)
PROC.mkdir(parents=True, exist_ok=True)
print('RAW ->', RAW.resolve())
print('PROC ->', PROC.resolve())

RAW -> /Users/yuqingyan/Desktop/bootcamp_Yuqing_Yan/homework/homework5/data/raw
PROC -> /Users/yuqingyan/Desktop/bootcamp_Yuqing_Yan/homework/homework5/data/processed


## 1) Create or Load a Sample DataFrame
You may reuse data from prior stages or create a small synthetic dataset.

In [4]:
# 2. Create sample DataFrame
def create_sample_data():
    np.random.seed(42)
    dates = pd.date_range('2024-01-01', periods=10, freq='D')
    return pd.DataFrame({
        'date': dates,
        'ticker': ['AAPL']*5 + ['MSFT']*5,
        'price': np.round(150 + np.random.randn(10).cumsum(), 2),
        'volume': np.random.randint(10000, 50000, 10)
    })

df = create_sample_data()
print("\nSample DataFrame:")
print(df.head())


Sample DataFrame:
        date ticker   price  volume
0 2024-01-01   AAPL  150.50   15311
1 2024-01-02   AAPL  150.36   47819
2 2024-01-03   AAPL  151.01   49188
3 2024-01-04   AAPL  152.53   27568
4 2024-01-05   AAPL  152.30   29769


## 2) Save CSV to data/raw/ and Parquet to data/processed
- Use timestamped filenames.
- Handle missing Parquet engine gracefully.

In [5]:
# 3. Save in both formats
def get_timestamp():
    return dt.datetime.now().strftime('%Y%m%d_%H%M%S')

# Save CSV to raw
csv_filename = f"stock_data_{get_timestamp()}.csv"
csv_path = RAW / csv_filename
df.to_csv(csv_path, index=False)
print(f"\nSaved CSV to: {csv_path}")

# Save Parquet to processed
pq_filename = f"stock_data_{get_timestamp()}.parquet"
pq_path = PROC / pq_filename
try:
    df.to_parquet(pq_path, engine='pyarrow')
    print(f"Saved Parquet to: {pq_path}")
except Exception as e:
    print(f"\nParquet save failed (install pyarrow/fastparquet): {e}")
    pq_path = None
pq_path


Saved CSV to: data/raw/stock_data_20250817_135352.csv
Saved Parquet to: data/processed/stock_data_20250817_135352.parquet


PosixPath('data/processed/stock_data_20250817_135352.parquet')

## 3) Reload and Validate
- Compare shapes and key dtypes.

In [6]:
# 4. Reload and validate
def validate_reloaded(original, reloaded):
    """Validate shape, dtypes and critical columns"""
    validation = {
        'shapes_match': original.shape == reloaded.shape,
        'columns_match': set(original.columns) == set(reloaded.columns),
        'date_dtype_ok': pd.api.types.is_datetime64_any_dtype(reloaded['date']),
        'price_dtype_ok': pd.api.types.is_float_dtype(reloaded['price']),
        'volume_dtype_ok': pd.api.types.is_integer_dtype(reloaded['volume']),
        'data_integrity': original.equals(reloaded)
    }
    return pd.Series(validation)

# Validate CSV
df_csv = pd.read_csv(csv_path, parse_dates=['date'])
print("\nCSV Validation Results:")
print(validate_reloaded(df, df_csv))


CSV Validation Results:
shapes_match       True
columns_match      True
date_dtype_ok      True
price_dtype_ok     True
volume_dtype_ok    True
data_integrity     True
dtype: bool


In [7]:
# Validate Parquet if saved
if pq_path:
    try:
        df_pq = pd.read_parquet(pq_path)
        print("\nParquet Validation Results:")
        print(validate_reloaded(df, df_pq))
    except Exception as e:
        print(f"\nParquet read failed: {e}")


Parquet Validation Results:
shapes_match       True
columns_match      True
date_dtype_ok      True
price_dtype_ok     True
volume_dtype_ok    True
data_integrity     True
dtype: bool


## 4) Utilities
- Implement `detect_format`, `write_df`, `read_df`.
- Use suffix to route; create parent dirs if needed; friendly errors for Parquet.

In [8]:
import typing as t, pathlib

def detect_format(path: t.Union[str, pathlib.Path]):
    s = str(path).lower()
    if s.endswith('.csv'): return 'csv'
    if s.endswith('.parquet') or s.endswith('.pq') or s.endswith('.parq'): return 'parquet'
    raise ValueError('Unsupported format: ' + s)

# 5. Implement utility functions
def write_df(df: pd.DataFrame, path: pathlib.Path) -> pathlib.Path:
    """Save DataFrame based on file extension"""
    path.parent.mkdir(parents=True, exist_ok=True)
    suffix = path.suffix.lower()
    
    if suffix == '.csv':
        df.to_csv(path, index=False)
    elif suffix in ('.parquet', '.pq'):
        try:
            df.to_parquet(path, engine='pyarrow')
        except Exception as e:
            raise RuntimeError(
                "Parquet support requires pyarrow or fastparquet. "
                f"Error: {str(e)}"
            )
    else:
        raise ValueError(f"Unsupported file format: {suffix}")
    
    return path

def read_df(path: pathlib.Path) -> pd.DataFrame:
    """Load DataFrame based on file extension"""
    suffix = path.suffix.lower()
    
    if suffix == '.csv':
        # Auto-detect date columns
        date_cols = [col for col in pd.read_csv(path, nrows=0).columns 
                    if 'date' in col.lower()]
        return pd.read_csv(path, parse_dates=date_cols)
    elif suffix in ('.parquet', '.pq'):
        try:
            return pd.read_parquet(path)
        except Exception as e:
            raise RuntimeError(
                "Parquet support requires pyarrow or fastparquet. "
                f"Error: {str(e)}"
            )
    else:
        raise ValueError(f"Unsupported file format: {suffix}")
    
# Test utilities
print("\nTesting utility functions...")
test_csv = RAW / f"test_util_{get_timestamp()}.csv"
test_pq = PROC / f"test_util_{get_timestamp()}.parquet"

# Test CSV
write_df(df, test_csv)
df_test_csv = read_df(test_csv)
print(f"CSV utility test passed: {df.equals(df_test_csv)}")

# Test Parquet if available
try:
    write_df(df, test_pq)
    df_test_pq = read_df(test_pq)
    print(f"Parquet utility test passed: {df.equals(df_test_pq)}")
except RuntimeError as e:
    print(f"Parquet utility test skipped: {e}")


Testing utility functions...
CSV utility test passed: True
Parquet utility test passed: True


## 5) Documentation: See in README.md
- Update README with a **Data Storage** section (folders, formats, env usage).
- Summarize validation checks and any assumptions.
