In [8]:
import os, pathlib, datetime as dt
import pandas as pd
from dotenv import load_dotenv

load_dotenv()
RAW_DIR = pathlib.Path(os.getenv("DATA_DIR_RAW", "data/raw"))
PROC_DIR = pathlib.Path(os.getenv("DATA_DIR_PROCESSED", "data/processed"))
RAW_DIR.mkdir(parents=True, exist_ok=True)
PROC_DIR.mkdir(parents=True, exist_ok=True)
print("RAW_DIR:", RAW_DIR.resolve())
print("PROC_DIR:", PROC_DIR.resolve())

RAW_DIR: C:\Users\Bobli\bootcamp_Calvin_Li\homework\homework05\data\raw
PROC_DIR: C:\Users\Bobli\bootcamp_Calvin_Li\homework\homework05\data\processed


In [9]:
import numpy as np
dates = pd.date_range("2024-01-01", periods=10, freq="D")
df = pd.DataFrame({
    'date': dates,
    'ticker': ['AAPL']*10,
    'price': 150 + np.random.randn(10).cumsum()
})
df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 10 entries, 0 to 9
Data columns (total 3 columns):
 #   Column  Non-Null Count  Dtype         
---  ------  --------------  -----         
 0   date    10 non-null     datetime64[ns]
 1   ticker  10 non-null     object        
 2   price   10 non-null     float64       
dtypes: datetime64[ns](1), float64(1), object(1)
memory usage: 368.0+ bytes


In [23]:
def ts():
    return dt.datetime.now().strftime('%Y%m%d-%H%M%S')
csv_path=RAW_DIR/f"prices_{ts()}.csv"
df.to_csv(csv_path,index=True)
parq_path = PROC_DIR / f"prices_{ts()}.parquet"
try:
    df.to_parquet(parq_path, engine="fastparquet")
  # uses installed engine if available
    print("Saved Parquet →", parq_path)
except Exception as e:
    print("Parquet save failed (engine missing?). Skipping Parquet demo.")
    print("Error:", e)

Saved Parquet → data\processed\prices_20250818-102004.parquet


In [15]:
import sys
!{sys.executable} -m pip install pyarrow



Collecting pyarrow
  Downloading pyarrow-21.0.0-cp310-cp310-win_amd64.whl.metadata (3.4 kB)
Downloading pyarrow-21.0.0-cp310-cp310-win_amd64.whl (26.2 MB)
   ---------------------------------------- 0.0/26.2 MB ? eta -:--:--
    --------------------------------------- 0.5/26.2 MB 16.4 MB/s eta 0:00:02
   ----------- ---------------------------- 7.3/26.2 MB 18.9 MB/s eta 0:00:02
   --------------------- ------------------ 14.4/26.2 MB 25.1 MB/s eta 0:00:01
   --------------------------------- ------ 22.3/26.2 MB 28.7 MB/s eta 0:00:01
   ---------------------------------------- 26.2/26.2 MB 26.8 MB/s eta 0:00:00
Installing collected packages: pyarrow
Successfully installed pyarrow-21.0.0


In [17]:
import pandas as pd, pyarrow as pa
print("pandas:", pd.__version__)
print("pyarrow:", pa.__version__)


pandas: 2.3.1
pyarrow: 21.0.0


In [18]:
!{sys.executable} -m pip install --upgrade pandas pyarrow fastparquet

Collecting fastparquet
  Downloading fastparquet-2024.11.0-cp310-cp310-win_amd64.whl.metadata (4.3 kB)
Collecting cramjam>=2.3 (from fastparquet)
  Downloading cramjam-2.11.0-cp310-cp310-win_amd64.whl.metadata (681 bytes)
Collecting fsspec (from fastparquet)
  Downloading fsspec-2025.7.0-py3-none-any.whl.metadata (12 kB)
Downloading fastparquet-2024.11.0-cp310-cp310-win_amd64.whl (670 kB)
   ---------------------------------------- 0.0/670.7 kB ? eta -:--:--
   ---------------------------------------- 670.7/670.7 kB 8.7 MB/s eta 0:00:00
Downloading cramjam-2.11.0-cp310-cp310-win_amd64.whl (1.7 MB)
   ---------------------------------------- 0.0/1.7 MB ? eta -:--:--
   ---------------------------------------- 1.7/1.7 MB 23.0 MB/s eta 0:00:00
Downloading fsspec-2025.7.0-py3-none-any.whl (199 kB)
Installing collected packages: fsspec, cramjam, fastparquet

   -------------------------- ------------- 2/3 [fastparquet]
   ---------------------------------------- 3/3 [fastparquet]

Successfu

In [21]:
def validate_loaded(original,reloaded,cols=('date','ticker','price')):
    checks={'shape_equal':original.shape==reloaded.shape,
            'cols_present': all(c in reloaded.columns for c in cols)}
    if 'price' in reloaded.columns:
        checks['price_is_numeric'] = pd.api.types.is_numeric_dtype(reloaded['price'])
    if 'date' in reloaded.columns:
        checks['date_is_datetime'] = pd.api.types.is_datetime64_any_dtype(reloaded['date'])
    return checks
df_csv=pd.read_csv(csv_path,parse_dates=['date'])
print('csv validation:',validate_loaded(df,df_csv))

csv validation: {'shape_equal': False, 'cols_present': True, 'price_is_numeric': True, 'date_is_datetime': True}


In [25]:
if parq_path.exists():
    try:
        df_parq = pd.read_parquet(parq_path,engine="fastparquet")
        print('Parquet validation:', validate_loaded(df, df_parq))
    except Exception as e:
        print('Parquet read failed:', e)
else:
    print('Parquet file not present (skipped earlier).')

Parquet validation: {'shape_equal': True, 'cols_present': True, 'price_is_numeric': True, 'date_is_datetime': True}


In [27]:
from typing import Union

def ensure_dir(path:pathlib.Path):
    path.parent.mkdir(parents=True,exist_ok=True)
def detect_format(path:Union[str,pathlib.Path]):
    suf=str(path).lower()
    if suf.endswith('.csv'): return 'csv'
    if suf.endswith('.parquet') or suf.endswith('.pq') or suf.endswith('.parq'): return 'parquet'
    raise ValueError('Unsupported format for: ' + str(path))
def write_df(df,path: Union[str, pathlib.Path]):
    path=pathlib.Path(path)
    ensure_dir(path)
    fmt=detect_format(path)
    if fmt=='csv':
        df.to_csv(path,index=False)
    elif fmt=='parquet':
        try:
            df.to_parquet(path,engine="fastparquet")
        except Exception as e:
            raise RuntimeError('Parquet engine not available. Install pyarrow or fastparquet.') from e
    return path    
def read_df(path: Union[str, pathlib.Path]):
    path = pathlib.Path(path)
    fmt = detect_format(path)
    if fmt == 'csv':
        return pd.read_csv(path, parse_dates=['date']) if 'date' in pd.read_csv(path, nrows=0).columns else pd.read_csv(path)
    elif fmt == 'parquet':
        try:
            return pd.read_parquet(path,engine="fastparquet")
        except Exception as e:
            raise RuntimeError('Parquet engine not available. Install pyarrow or fastparquet.') from e
csv2 = RAW_DIR / f"prices_util_{ts()}.csv"
pq2  = PROC_DIR / f"prices_util_{ts()}.parquet"
write_df(df, csv2)
df2 = read_df(csv2)
print('Reloaded CSV via util, shape:', df2.shape)

try:
    write_df(df, pq2)
    df3 = read_df(pq2)
    print('Reloaded Parquet via util, shape:', df3.shape)
except RuntimeError as e:
    print('Parquet util demo skipped:', e)

Reloaded CSV via util, shape: (10, 3)
Reloaded Parquet via util, shape: (10, 3)
