In [None]:
# from google.colab import drive
# drive.flush_and_unmount()           # ignore errors if already unmounted

#If cannot remount, simply delete the mounted drive and then remount
# rm -rf /content/drive


Drive not mounted, so nothing to flush and unmount.


In [None]:
# Colab cell
from google.colab import drive

drive.mount('/content/drive', force_remount=True)



Mounted at /content/drive


In [None]:
# Adjust these two for YOUR repo
REPO_OWNER = "ywanglab"
REPO_NAME  = "STAT4160"   # e.g., unified-stocks-team1
BASE_DIR   = "/content/drive/MyDrive/dspt25"
CLONE_DIR  = f"{BASE_DIR}/{REPO_NAME}"
REPO_URL   = f"https://github.com/{REPO_OWNER}/{REPO_NAME}.git"

# if on my office computer

# REPO_NAME  = "lectureNotes"   # e.g., on my office computer
# BASE_DIR = r"E:\OneDrive - Auburn University Montgomery\teaching\AUM\STAT 4160 Productivity Tools" # on my office computer
# CLONE_DIR  = f"{BASE_DIR}\{REPO_NAME}"

import os, pathlib
pathlib.Path(BASE_DIR).mkdir(parents=True, exist_ok=True)


In [None]:
import os, subprocess, shutil, pathlib

if not pathlib.Path(CLONE_DIR).exists():
    !git clone {REPO_URL} {CLONE_DIR}
else:
    # If the folder exists, just ensure it's a git repo and pull latest
    os.chdir(CLONE_DIR)
    # !git status
    # !git pull --rebase # !git pull --ff-only
os.chdir(CLONE_DIR)
print("Working dir:", os.getcwd())

Working dir: /content/drive/MyDrive/dspt25/STAT4160


---
title: "Rolling Windows, Resampling, and Leakage‑Safe Features"
---

**Educational use only — not trading advice.** Python‑only. Colab + Drive assumed. If you don’t already have the repo and folders used below, the first cells create them.

------------------------------------------------------------------------

# **Session 10 — Rolling Windows, Resampling, and Leakage‑Safe Features**

### Learning goals

By the end of class students can:

1.  Use `groupby('ticker')` with `shift`, `rolling`, `expanding`, and `ewm` to engineer features **without leakage**.
2.  Resample safely (daily → weekly/monthly) and understand how to aggregate OHLC + volume.
3.  Produce a tidy `features_v1.parquet` with sensible dtypes.

------------------------------------------------------------------------

## Agenda

-    leakage‑free features; lags vs rolling; resampling patterns
-    common pitfalls (min_periods, alignment, mixed frequencies)
-   **In‑class lab**: load returns → build features → (optional) weekly aggregates → write `features_v1.parquet`
-    Wrap‑up + homework brief
-    Buffer

------------------------------------------------------------------------



**Feature timing = everything**

-   Predict $r_{t+1}$ using info up to and including **time t**.
-   **Rule:** compute any rolling stat at $t$ from data $\le t$, then **shift by 1** if that stat includes the current target variable.

**Core pandas patterns**

-   **Lags:** `s.shift(k)` (past), never negative shifts.
-   **Rolling:** `s.rolling(W, min_periods=W).agg(...)` and then **no extra shift** if the rolling window ends at $t$.
-   **Expanding:** long‑memory features (e.g., expanding mean).
-   **EWM:** `s.ewm(span=W, adjust=False).mean()` for decayed memory.

**Resampling safely**

-   Use `groupby('ticker').resample('W-FRI', on='date')` then aggregate:

    -   OHLC: `first/open`, `max/high`, `min/low`, `last/adj_close`
    -   Volume: `sum`
    -   Returns: compound via `np.log(prod(1+r))` or sum of log returns.

**Dtypes**

-   `ticker` = `category`; calendar ints `int8`; features `float32` (fine for class).

## In‑class lab (Colab‑friendly)

> Run each block as its own cell. Adjust `REPO_NAME` as needed.

### 1) Load inputs or build small fallbacks


In [None]:
import os, pathlib, numpy as np, pandas as pd
from pathlib import Path
rng = np.random.default_rng(0)

# Fallback synthetic if missing
def make_synth_prices():
    dates = pd.bdate_range("2022-01-03", periods=300)
    frames=[]
    for tkr in ["AAPL","MSFT","GOOGL","AMZN","NVDA"]:
        base = 100 + rng.normal(0,1, size=len(dates)).cumsum()
        d = pd.DataFrame({
            "date": dates, "ticker": tkr,
            "adj_close": np.maximum(base, 1.0).astype("float32"),
            "volume": rng.integers(1e6, 5e6, size=len(dates)).astype("int64")
        })
        frames.append(d)
    prices = pd.concat(frames, ignore_index=True)
    prices["ticker"] = prices["ticker"].astype("category")
    prices.to_parquet("data/processed/prices.parquet", index=False)
    return prices

ppath = Path("data/processed/prices.parquet")
rpath = Path("data/processed/returns.parquet")

if ppath.exists():
    prices = pd.read_parquet(ppath)
else:
    prices = make_synth_prices()

# Build returns if missing (from Session 9 logic)
if rpath.exists():
    returns = pd.read_parquet(rpath)
else:
    df = prices.sort_values(["ticker","date"]).copy()
    df["log_return"] = (df.groupby("ticker")["adj_close"]
                        .apply(lambda s: np.log(s/s.shift(1))).reset_index(level=0, drop=True))
    df["r_1d"] = df.groupby("ticker")["log_return"].shift(-1)
    df["weekday"] = df["date"].dt.weekday.astype("int8")
    df["month"]   = df["date"].dt.month.astype("int8")
    returns = df[["date","ticker","log_return","r_1d","weekday","month"]].copy()
    returns["ticker"] = returns["ticker"].astype("category")
    returns.to_parquet("data/processed/returns.parquet", index=False)

prices.head(3), returns.head(3)

(        date ticker   adj_close   volume  name   sector
 0 2020-01-01   AAPL  100.001228  4457901  AAPL  Unknown
 1 2020-01-02   AAPL  100.300423  2664190  AAPL  Unknown
 2 2020-01-03   AAPL  100.025841  4100245  AAPL  Unknown,
         date ticker  log_return      r_1d  weekday  month
 0 2020-01-01   AAPL         NaN  0.002987        2      1
 1 2020-01-02   AAPL    0.002987 -0.002741        3      1
 2 2020-01-03   AAPL   -0.002741 -0.008906        4      1)


```python
ret.sort_values(["ticker","date"]).groupby("ticker", group_keys=False)
```

###  `.groupby("ticker", group_keys=False)`

* Groups the (now sorted) DataFrame by the `"ticker"` column.

* Each group corresponds to one stock ticker.

* **`group_keys` option:**

  * Default `True`: group labels get added to the index of the result after aggregation or apply.
  * With `False`: the group labels are **not** inserted into the result index.
  * This makes the output "flatter" and often easier to merge back into the original DataFrame.

---

### 3. Example

```python
import pandas as pd

ret = pd.DataFrame({
    "ticker": ["MSFT","AAPL","MSFT","AAPL"],
    "date": pd.to_datetime(["2020-01-02","2020-01-01","2020-01-01","2020-01-02"]),
    "value": [200, 300, 180, 310]
})

g = ret.sort_values(["ticker","date"]).groupby("ticker", group_keys=False)

Nice — this is a very typical pattern in financial time-series analysis. Let’s unpack your line:

```python
rm = g["log_return"].rolling(W, min_periods=W).mean()
```

where earlier you defined

```python
g = ret.sort_values(["ticker","date"]).groupby("ticker", group_keys=False)
```



* **`.rolling(W, min_periods=W)`**

  * Creates a rolling window of size `W` (e.g. 20 days).
  * `min_periods=W` means you only get a non-NaN result once the full window size is available.

    * First `W-1` rows in each ticker group will be `NaN`.
  * This rolling is **applied separately within each group** because of the prior `groupby("ticker")`.


### 3. Example

```python
import pandas as pd, numpy as np

ret = pd.DataFrame({
    "ticker": ["AAPL"]*5 + ["MSFT"]*5,
    "date": pd.date_range("2020-01-01", periods=5).tolist()*2,
    "log_return": np.random.randn(10)/100
})

g = ret.sort_values(["ticker","date"]).groupby("ticker", group_keys=False)
W = 3

rm = g["log_return"].rolling(W, min_periods=W).mean()
ret["roll_mean"] = rm.reset_index(level=0, drop=True)

print(ret)
```

Output (sample):

```
  ticker       date  log_return  roll_mean
0   AAPL 2020-01-01   -0.001692        NaN
1   AAPL 2020-01-02    0.005013        NaN
2   AAPL 2020-01-03    0.007641   0.003654
3   AAPL 2020-01-04   -0.002223   0.003477
4   AAPL 2020-01-05    0.004300   0.003239
5   MSFT 2020-01-01    0.001405        NaN
...
```
###  `.expanding(min_periods=20)`

* Expanding window = cumulative calculation that grows over time.
* Unlike rolling (fixed-size window), expanding includes **all prior rows** from the start.
* `min_periods=20` → requires at least 20 rows before computing a value; the first 19 rows per ticker will be `NaN`.

So, at row *i* for a given ticker:

$$
\text{expanding mean}(i) = \frac{1}{i}\sum_{t=1}^{i} \text{log\_return}_t \quad \text{(for } i \ge 20\text{)}
$$




### 5. Example

```python
import pandas as pd, numpy as np

ret = pd.DataFrame({
    "ticker": ["AAPL"]*25 + ["MSFT"]*25,
    "date": pd.date_range("2020-01-01", periods=25).tolist()*2,
    "log_return": np.random.randn(50)/100
})

g = ret.sort_values(["ticker","date"]).groupby("ticker", group_keys=False)

ret["expanding_mean"] = (
    g["log_return"].expanding(min_periods=20)
      .mean()
      .reset_index(level=0, drop=True)
)

print(ret.head(22))
```

Sample output (AAPL section):

```
   ticker       date  log_return  expanding_mean
0    AAPL 2020-01-01    0.013009             NaN
1    AAPL 2020-01-02   -0.013291             NaN
...
18   AAPL 2020-01-19   -0.000385             NaN
19   AAPL 2020-01-20    0.007406       0.003153
20   AAPL 2020-01-21   -0.004474       0.002693
...
```

* First 19 rows = NaN.
* From row 20 onwards, you see the cumulative mean.
Nice — now you’re looking at **exponentially weighted moving averages (EWMA)** applied within each ticker group. Let’s break it down:



```python
g["log_return"].apply(lambda s: s.ewm(span=W, adjust=False).mean())
```

* **`.apply(lambda s: ...)`**
  Runs the function on each ticker’s Series `s`.

* **`.ewm(span=W, adjust=False)`**

  * `ewm` = exponentially weighted window.
  * `span=W` sets the *decay rate*: a larger `W` makes weights decay more slowly.
  * `adjust=False` → uses the “recursive” formula, equivalent to how finance often defines exponential moving averages (EMA).

    * `adjust=True` (default) → weights are normalized so they sum to 1 at each step (less common for trading).

* **`.mean()`**
  Computes the exponentially weighted moving average for each ticker’s log returns.


Unlike a simple moving average (equal weights over the last *W* days), EWMA gives **more weight to recent values** and decays exponentially into the past.

Formula (recursive form, with `adjust=False`):

$$
\text{EMA}_t = \alpha \cdot x_t + (1-\alpha)\cdot \text{EMA}_{t-1}
$$

where

$$
\alpha = \frac{2}{W+1}.
$$

So if $W=20$, then $\alpha \approx 0.095$: about 9.5% weight on the newest return, \~90.5% on the past.

---

### 4. Example

```python
import pandas as pd, numpy as np

ret = pd.DataFrame({
    "ticker": ["AAPL"]*5 + ["MSFT"]*5,
    "date": pd.date_range("2020-01-01", periods=5).tolist()*2,
    "log_return": np.random.randn(10)/100
})

g = ret.sort_values(["ticker","date"]).groupby("ticker", group_keys=False)
W = 3

ema = g["log_return"].apply(lambda s: s.ewm(span=W, adjust=False).mean())
ret["ema"] = ema.reset_index(level=0, drop=True)

print(ret)
```

Output (sample):

```
  ticker       date  log_return       ema
0   AAPL 2020-01-01   -0.003268 -0.003268
1   AAPL 2020-01-02   -0.006113 -0.004422
2   AAPL 2020-01-03    0.013886  0.003693
3   AAPL 2020-01-04    0.006176  0.004930
4   AAPL 2020-01-05   -0.004273 -0.000405
...
```


### How EMA₀ is chosen (common conventions)

1. **Pandas (`ewm(..., adjust=False).mean()`)**
   Pandas seeds the recursion with the **first observation**:

   $$
   \text{EMA}_0 = x_0.
   $$

   Then it applies the recursive update for $t\ge1$. (With `adjust=True`, pandas uses the equivalent normalized weighted-mean form; the first value is still $x_0$.)

2. **Some technical-analysis packages (finance TA lore)**
   They sometimes initialize with a **Simple Moving Average (SMA)** of the first $W$ points:

   $$
   \text{EMA}_0=\text{SMA}_W=\frac{1}{W}\sum_{i=0}^{W-1}x_i,
   $$

   and then start the EMA recursion from $t=W$.
   **Pandas does not do this by default.**

RSI = **Relative Strength Index** — a momentum oscillator that measures the **speed and magnitude of recent price changes**. It’s computed from up/down moves and maps to a value between **0 and 100**.

## Definition (Wilder’s 14-period RSI, the standard)

1. Compute price changes: $\Delta_t = P_t - P_{t-1}$.
2. Split into gains/losses:

   * $G_t=\max(\Delta_t,0)$, $L_t=\max(-\Delta_t,0)$.
3. Smooth with Wilder’s EMA (an exponential smoothing with $\alpha=1/n$; usually $n=14$):

   * $\text{avgGain}_t = \text{EMA}_{\alpha=1/n}(G_t)$
   * $\text{avgLoss}_t = \text{EMA}_{\alpha=1/n}(L_t)$
4. Relative Strength: $\text{RS}_t=\frac{\text{avgGain}_t}{\text{avgLoss}_t}$ (treat 0/0 as NaN; if avgLoss=0 and avgGain>0, RS→∞).
5. RSI:

$$
\text{RSI}_t = 100 - \frac{100}{1+\text{RS}_t}.
$$

**Range:** 0–100.
**Common interpretations:** >70 “overbought”, <30 “oversold” (heuristics, not rules).
**Typical window:** $n=14$ periods (can be days, minutes, etc.).



### Notes & gotchas

* Use **closing prices** (not returns).
* The first \~$n$ values will be NaN due to warm-up.
* If you prefer **simple averages** (not Wilder), replace the `ewm(...)` with `rolling(n, min_periods=n).mean()`.
* RSI is often combined with trend filters (e.g., only take signals in trend direction) and confirmed with **divergences** or **RSI(50)** crossovers.






### 2) Rolling, lag, expanding, ewm features (no leakage)

In [None]:
def build_features(ret: pd.DataFrame, windows=(5,10,20), add_rsi=True):
    g = ret.sort_values(["ticker","date"]).groupby("ticker", group_keys=False)
    out = ret.copy()

    # Lags of log_return (past info)
    for k in [1,2,3]:
        out[f"lag{k}"] = g["log_return"].shift(k)

    # Rolling mean/std and z-score of returns using past W days **including today**,
    # which is fine because target is r_{t+1}. No extra shift needed.
    for W in windows:
        rm = g["log_return"].rolling(W, min_periods=W).mean()
        rsd= g["log_return"].rolling(W, min_periods=W).std()
        out[f"roll_mean_{W}"] = rm.reset_index(level=0, drop=True) #level 0 is the grouping ticker
        out[f"roll_std_{W}"]  = rsd.reset_index(level=0, drop=True)
        out[f"zscore_{W}"]    = (out["log_return"] - out[f"roll_mean_{W}"]) / (out[f"roll_std_{W}"] + 1e-8)

    # Expanding stats (from start to t): long-memory
    out["exp_mean"] = g["log_return"].expanding(min_periods=20).mean().reset_index(level=0, drop=True)
    out["exp_std"]  = g["log_return"].expanding(min_periods=20).std().reset_index(level=0, drop=True)

    # Exponential weighted (decayed memory)
    for W in [10,20]:
        out[f"ewm_mean_{W}"] = g["log_return"].apply(lambda s: s.ewm(span=W, adjust=False).mean())
        out[f"ewm_std_{W}"]  = g["log_return"].apply(lambda s: s.ewm(span=W, adjust=False).std())

    # Optional RSI(14) using returns sign proxy (toy version)
    if add_rsi:
        def rsi14(s):
            delta = s.diff()
            up = delta.clip(lower=0).ewm(alpha=1/14, adjust=False).mean()
            dn = (-delta.clip(upper=0)).ewm(alpha=1/14, adjust=False).mean()
            rs = up / (dn + 1e-12)
            return 100 - (100 / (1 + rs))
        out["rsi_14"] = g["adj_close"].apply(rsi14) if "adj_close" in out else g["log_return"].apply(rsi14)

    # Cast dtypes
    for c in out.columns:
        if c not in ["date","ticker","weekday","month"] and pd.api.types.is_float_dtype(out[c]):
            out[c] = out[c].astype("float32")
    out["ticker"] = out["ticker"].astype("category")
    return out

# Merge adj_close and volume into returns (if not already)
ret2 = returns.merge(prices[["ticker","date","adj_close","volume"]], on=["ticker","date"], how="left")
features = build_features(ret2, windows=(5,10,20), add_rsi=True)
features.head(5)

Unnamed: 0,date,ticker,log_return,r_1d,weekday,month,adj_close,volume,lag1,lag2,...,roll_mean_20,roll_std_20,zscore_20,exp_mean,exp_std,ewm_mean_10,ewm_std_10,ewm_mean_20,ewm_std_20,rsi_14
0,2020-01-01,AAPL,,0.002987,2,1,100.001228,4457901,,,...,,,,,,,,,,
1,2020-01-02,AAPL,0.002987,-0.002741,3,1,100.300423,2664190,,,...,,,,,,0.002987,,0.002987,,100.0
2,2020-01-03,AAPL,-0.002741,-0.008906,4,1,100.025841,4100245,0.002987,,...,,,,,,0.001946,0.004051,0.002442,0.004051,93.405983
3,2020-01-06,AAPL,-0.008906,-0.004547,0,1,99.138977,4586613,-0.002741,0.002987,...,,,,,,-2.7e-05,0.006581,0.001361,0.006601,75.979301
4,2020-01-07,AAPL,-0.004547,-0.009916,1,1,98.68924,1556062,-0.008906,-0.002741,...,,,,,,-0.000849,0.005724,0.000798,0.005985,68.953682



```python
g.resample("W-FRI", on="date")
```
### By default

`DataFrame.resample()` expects the **DataFrame’s index** to be a `DatetimeIndex`.

Example:

```python
g = g.set_index("date")
g.resample("W-FRI")
```

would work, because now the index is dates.

---

### With `on="date"`

If your `DataFrame` just has a **normal column** called `"date"`, and the index is something else (like integers, or not set), then you can tell pandas:

```python
g.resample("W-FRI", on="date")
```

* `on="date"` says: *“don’t look at the index — instead, use the `date` column as the time key for resampling.”*


```python
wk = (
    g.resample("W-FRI", on="date")        # 1. resample by calendar week, anchored on Friday
      .agg({"adj_close": "last",          # 2. for each weekly group:
            "volume": "sum"})             #    - take last adj_close
                                          #    - sum all volumes
      .dropna()                           # 3. drop weeks where adj_close or volume is NaN
      .reset_index()                      # 4. turn the DateTimeIndex back into a column
)
```



### Step 4. `.reset_index()`

* After resampling, the **index is the week-ending Friday date**.
* Resetting makes `"date"` a normal column again so you can sort/merge easily.

---

So `wk` is a **weekly time series DataFrame for a single ticker**, with columns:

```
date        adj_close   volume
2025-09-05  103.2       1250000
2025-09-12  104.8       1320000
2025-09-19  106.0       1180000
...
```



### 3) (Optional) Weekly resampling demo (OHLCV + returns)

In [None]:
# Safe weekly resample per ticker, aggregating OHLCV and log returns
def weekly_ohlcv(df):
    df = df.sort_values(["ticker","date"]).copy()
    df["date"] = pd.to_datetime(df["date"])
    res=[]
    for tkr, g in df.groupby("ticker"):
        wk = (g.resample("W-FRI", on="date")
              .agg({"adj_close":"last","volume":"sum"}).dropna().reset_index())
        wk["ticker"] = tkr
        # Weekly log return = log(adj_close_t / adj_close_{t-1})
        wk = wk.sort_values("date")
        wk["wk_log_return"] = np.log(wk["adj_close"]/wk["adj_close"].shift(1))
        res.append(wk)
    return pd.concat(res, ignore_index=True)

weekly = weekly_ohlcv(prices[["ticker","date","adj_close","volume"]])
weekly.head(5)

Unnamed: 0,date,adj_close,volume,ticker,wk_log_return
0,2020-01-03,100.025841,11222336,AAPL,
1,2020-01-10,99.093422,7653693,AAPL,-0.009366
2,2020-01-17,98.934464,8698674,AAPL,-0.001605
3,2020-01-24,96.911209,14831266,AAPL,-0.020662
4,2020-01-31,90.780533,6812497,AAPL,-0.06535


### 4) Save `features_v1.parquet` (+ optional partition by ticker)

In [None]:
# Select a compact set to start with
keep = ["date","ticker","log_return","r_1d","weekday","month",
        "lag1","lag2","lag3",
        "roll_mean_5","roll_std_5","zscore_5",
        "roll_mean_10","roll_std_10","zscore_10",
        "roll_mean_20","roll_std_20","zscore_20",
        "ewm_mean_10","ewm_std_10","ewm_mean_20","ewm_std_20",
        "exp_mean","exp_std","rsi_14","adj_close","volume"]

keep = [c for c in keep if c in features.columns]
fv1 = features.loc[:, keep].dropna().sort_values(["ticker","date"]).reset_index(drop=True)
fv1["weekday"] = fv1["weekday"].astype("int8")
fv1["month"]   = fv1["month"].astype("int8")
fv1["ticker"]  = fv1["ticker"].astype("category")

fv1_path = "data/processed/features_v1.parquet"
fv1.to_parquet(fv1_path, compression="zstd", index=False)
print("Wrote:", fv1_path, "| rows:", len(fv1), "| cols:", len(fv1.columns))

# Optional partition
part_dir = "data/processed/features_v1_by_ticker"
try:
    fv1.to_parquet(part_dir, compression="zstd", index=False, engine="pyarrow", partition_cols=["ticker"])
    print("Wrote partitioned:", part_dir)
except TypeError:
    print("Partition writing skipped (engine missing).")

Wrote: data/processed/features_v1.parquet | rows: 3975 | cols: 27
Wrote partitioned: data/processed/features_v1_by_ticker


## Wrap‑up (what to emphasize)

-   For **next‑day** targets $r_{t+1}$, rolling stats up to **t** are fine; never use future rows.
-   Be explicit about **`min_periods`** to avoid unstable early rows.
-   Keep features small and typed; document your cookbook in the repo.


## Homework (due before Session 11)

**Goal:** Add an **automated leakage check** and re‑run feature build.

### A. Script: `scripts/build_features_v1.py`

In [None]:
#!/usr/bin/env python
import numpy as np, pandas as pd, pathlib
def build():
    p = pathlib.Path("data/processed/returns.parquet")
    if not p.exists(): raise SystemExit("Missing returns.parquet — finish Session 9.")
    prices = pd.read_parquet("data/processed/prices.parquet")
    ret = pd.read_parquet(p)
    ret2 = ret.merge(prices[["ticker","date","adj_close","volume"]], on=["ticker","date"], how="left")
    # (Paste the build_features() from class)
    # ...
    fv1 = build_features(ret2)
    keep = ["date","ticker","log_return","r_1d","weekday","month",
            "lag1","lag2","lag3","roll_mean_20","roll_std_20","zscore_20",
            "ewm_mean_20","ewm_std_20","exp_mean","exp_std","adj_close","volume"]
    keep = [c for c in keep if c in fv1.columns]
    fv1 = fv1[keep].dropna().sort_values(["ticker","date"])
    fv1.to_parquet("data/processed/features_v1.parquet", compression="zstd", index=False)
    print("Wrote data/processed/features_v1.parquet", fv1.shape)
if __name__ == "__main__":
    build()

Wrote data/processed/features_v1.parquet (3975, 18)


Copy the previous code (build_features and build_features_v1) into build_features_v1.py and save it into scripts/

In [None]:
!pwd

/content/drive/MyDrive/dspt25/STAT4160


In [None]:
%%bash
chmod +x scripts/build_features_v1.py

python scripts/build_features_v1.py

Wrote data/processed/features_v1.parquet (3975, 18)



By default, `np.allclose(a, b)` treats `NaN != NaN`. That is, if both arrays have a `NaN` in the same place, the comparison **fails**.

The keyword **`equal_nan=True`** changes that:

```python
import numpy as np

x = np.array([1.0, np.nan, 3.0])
y = np.array([1.0, np.nan, 3.0])

np.allclose(x, y)                   # False (NaN != NaN)
np.allclose(x, y, equal_nan=True)   # True  (NaN == NaN)
```



### B. Test: `tests/test_no_lookahead.py`

In [None]:
import pandas as pd, numpy as np

def test_features_no_lookahead():
    df = pd.read_parquet("data/processed/features_v1.parquet").sort_values(["ticker","date"])
    # For each ticker, recompute roll_mean_20 with an independent method and compare
    for tkr, g in df.groupby("ticker"):
        s = g["log_return"]
        rm = s.rolling(20, min_periods=20).mean()
        # Our feature should equal this rolling mean (within tol)
        if "roll_mean_20" in g:
            assert np.allclose(g["roll_mean_20"].values, rm.values, equal_nan=True, atol=1e-7)
        # r_1d must be the **lead** of log_return
        assert g["r_1d"].shift(1).iloc[21:].equals(g["log_return"].iloc[21:])

Save the above file into tests/test_no_lookahead.py



By default, pytest discovers files named **`test_*.py`** or **`*_test.py`**.


```
tests/test_features.py
```


* Functions must be named `test_*` (e.g., `def test_features_no_lookahead():`).
* Test classes (if any) must be named `Test*` and contain methods `test_*`.


Put Python test files in a `tests/` directory. `.ipynb` won’t be collected unless you use a plugin.



In [None]:
#See what pytest thinks it can collect:
!pytest -q --collect-only -vv


platform linux -- Python 3.12.11, pytest-8.4.2, pluggy-1.6.0 -- /usr/bin/python3
cachedir: .pytest_cache
rootdir: /content/drive/MyDrive/dspt25/STAT4160
plugins: typeguard-4.4.4, anyio-4.10.0, langsmith-0.4.28
collected 1 item                                                               [0m

<Dir STAT4160>
  <Dir tests>
    <Module test_no_lookahead.py>
      <Function test_features_no_lookahead>



In [None]:
%%bash
cd "/content/drive/MyDrive/dspt25/STAT4160"
pytest -q  # The Test will fail showing the data do not match.

F                                                                        [100%]
__________________________ test_features_no_lookahead __________________________

    def test_features_no_lookahead():
        df = pd.read_parquet("data/processed/features_v1.parquet").sort_values(["ticker","date"])
        # For each ticker, recompute roll_mean_20 with an independent method and compare
        for tkr, g in df.groupby("ticker"):
            s = g["log_return"]
            rm = s.rolling(20, min_periods=20).mean()
            # Our feature should equal this rolling mean (within tol)
            if "roll_mean_20" in g:
>               assert np.allclose(g["roll_mean_20"].values, rm.values, equal_nan=True, atol=1e-7)
E               assert False
E                +  where False = <function allclose at 0x117746976c30>(array([-4.08561481e-03, -4.35253140e-03, -4.84918989e-03, -4.26826347e-03,\n       -3.96254659e-03, -3.56018892e-03, -4...985913e-03, -3.08635482e-03, -3.54444701e-03,\n       -

CalledProcessError: Command 'b'cd "/content/drive/MyDrive/dspt25/STAT4160"\npytest -q  # The Test will fail showing the data do not match. \n'' returned non-zero exit status 1.

# What pytest does

* **Discovers** tests by filename and function/class name patterns.
* **Collects** them into a test suite.
* **Runs** them with helpful assertion introspection, fixtures, and plugins.
* **Reports** results with clear failure diffs and exit codes.

# Test discovery (how it finds your tests)

By default pytest looks for:

* Files named `test_*.py` or `*_test.py`
* Functions named `test_*`
* Classes named `Test*` containing methods `test_*` (no `__init__`)

Typical layout:

```
project/
├─ src/ ... (your code)
├─ tests/
│  ├─ test_features.py
│  └─ test_utils.py
└─ pytest.ini  (optional)
```

If pytest can’t find any tests, you’ll see exit code **5** (“no tests collected”).

# Running tests

From your project root:

```bash
pytest          # verbose default
pytest -q       # quiet
pytest -q tests/test_features.py::test_features_no_lookahead   # single test
pytest -k "no_lookahead and not slow"   # expression filter
pytest -x       # stop after first failure
pytest --maxfail=2
pytest -s       # don't capture stdout/stderr
pytest --collect-only -vv  # debug discovery
```

# Assertions (no `unittest` needed)

Just use plain `assert`:

```python
def test_sum():
    assert 1 + 2 == 3
```

Pytest rewrites assertions to show **rich diffs** on failure (lists, dicts, numbers, strings). For pandas/numpy, prefer their testing helpers for better messages:

```python
import pandas.testing as pdt
pdt.assert_series_equal(a, b, check_exact=False, atol=1e-7)
```

# Fixtures (dependency injection for tests)

Fixtures provide reusable setup/teardown and test data.

```python
# tests/conftest.py  (auto-discovered by pytest)
import pandas as pd
import pytest

@pytest.fixture
def small_df():
    return pd.DataFrame({"x":[1,2,3], "y":[4,5,6]})
```

Use it in a test by naming the parameter:

```python
def test_shape(small_df):
    assert small_df.shape == (3, 2)
```

Built-ins you’ll use often:

* `tmp_path` – temporary directory as a `Path` object
* `monkeypatch` – temporarily set env vars/attributes
* `capsys` – capture stdout/stderr

# Parametrization (run the same test over many cases)

```python
import pytest

@pytest.mark.parametrize("n, expected", [(1, 1), (2, 4), (3, 9)])
def test_square(n, expected):
    assert n*n == expected
```

# Markers (tag tests)

```python
import pytest

@pytest.mark.slow
def test_big():
    ...
```

Run selected markers:

```bash
pytest -m slow
```

Declare custom markers in `pytest.ini` to silence warnings:

```ini
# pytest.ini
[pytest]
markers =
    slow: long-running tests
```


# Configuration (pytest.ini)

Tweak discovery patterns and defaults:

```ini
[pytest]
testpaths = tests
python_files = test_*.py *_test.py
python_functions = test_*
addopts = -q
```

# Common exit codes

* **0**: all tests passed
* **1**: tests failed
* **2**: interrupted (Ctrl-C)
* **3**: internal error in pytest
* **4**: usage error (bad CLI args)
* **5**: **no tests collected** (the one you hit earlier)

# Typical debugging flow (for your use case)

1. Ensure you’re in the **project root** (where `tests/` lives).
2. Check discovery:

   ```bash
   pytest --collect-only -vv
   ```
3. Fix names to match patterns (`tests/test_*.py`, `def test_*`).
4. Re-run a specific failing test for speed:

   ```bash
   pytest -q tests/test_no_lookahead.py::test_features_no_lookahead
   ```



# Tiny end-to-end example

```python
# src/mathy.py
def moving_avg(xs, w):
    if w <= 0: raise ValueError("w>0")
    out = []
    for i in range(len(xs)):
        j = max(0, i-w+1)
        out.append(sum(xs[j:i+1])/(i-j+1))
    return out
```

```python
# tests/test_mathy.py
import pytest
from src.mathy import moving_avg

def test_moving_avg_basic():
    assert moving_avg([1,2,3,4], 2) == [1, 1.5, 2.5, 3.5]

def test_moving_avg_bad_window():
    with pytest.raises(ValueError):
        moving_avg([1,2,3], 0)
```

Run:

```bash
pytest -q
```




In [None]:
# src/mathy.py
def moving_avg(xs, w):
    if w <= 0: raise ValueError("w>0")
    out = []
    for i in range(len(xs)):
        j = max(0, i-w+1)
        out.append(sum(xs[j:i+1])/(i-j+1))
    return out


In [None]:
# tests/test_mathy.py
import pytest
import sys, pathlib
sys.path.append(str(pathlib.Path.cwd() / "src"))

from src.mathy import moving_avg


def test_moving_avg_basic():
    assert moving_avg([1,2,3,4], 2) == [1, 1.5, 2.5, 3.5]

def test_moving_avg_bad_window():
    with pytest.raises(ValueError):
        moving_avg([1,2,3], 0)


In [None]:
# using conftest.py
%%bash
cd "/content/drive/MyDrive/dspt25/STAT4160"
pytest -q tests/test_mathy.py::test_moving_avg_basic


.                                                                        [100%]
1 passed in 0.08s


In [None]:
# one-shot env var
%%bash
cd "/content/drive/MyDrive/dspt25/STAT4160"
PYTHONPATH=. pytest -q tests/test_mathy.py::test_moving_avg_basic


.                                                                        [100%]
1 passed in 0.10s


# 1. What is a Python module vs package?

* **Module** = a single `.py` file that you can import.
  Example: `mathy.py` with `def moving_avg(...): ...`

* **Package** = a folder with an `__init__.py` file, possibly containing many modules.
  Example:

  ```
  src/
  └── mypkg/
      ├── __init__.py
      ├── mathy.py
      └── finance.py
  ```

  You can then import with:

  ```python
  from mypkg.mathy import moving_avg
  ```

---

# 2. Project layout (modern `src/` style)

Here’s a good minimal layout for the STAT4160 project:

```
STAT4160/
├── pyproject.toml
├── src/
│   └── stat4160/
│       ├── __init__.py
│       ├── mathy.py
│       └── features.py
├── tests/
│   ├── test_mathy.py
│   └── test_features.py
```

* `src/stat4160/` is the package.
* `__init__.py` can be empty, or re-export common functions.
* Tests live outside the package in `tests/`.

---

# 3. Write code in your package

`src/stat4160/mathy.py`:

```python
def moving_avg(xs, w):
    if w <= 0:
        raise ValueError("window must be > 0")
    out = []
    for i in range(len(xs)):
        j = max(0, i - w + 1)
        out.append(sum(xs[j:i+1]) / (i - j + 1))
    return out
```

---

# 4. Add `pyproject.toml`

This file tells pip/setuptools how to build and install.

`pyproject.toml`:

```toml
[project]
name = "stat4160"            # package name on PyPI
version = "0.1.0"
description = "Productivity tools for data science (STAT 4160 course)"
authors = [{name = "Yi Wang"}]
requires-python = ">=3.9"
dependencies = [
    "pandas",
    "numpy",
]

[build-system]
requires = ["setuptools>=61.0"]
build-backend = "setuptools.build_meta"

[tool.setuptools.packages.find]
where = ["src"]
```

---

# 5. Editable install (local dev)

From the project root:

```bash
pip install -e .
```

This tells pip: “install this project in editable/development mode.”
Now you can import anywhere:

```python
from stat4160.mathy import moving_avg
```

---

# 6. Running tests

Now pytest will find your package automatically :

```bash
pytest -q
```

---

# 7. Uploading to PyPI (optional)

If you ever want to share:

1. Build a wheel and sdist:

   ```bash
   pip install build twine
   python -m build
   ```

   → creates `dist/stat4160-0.1.0.tar.gz` and `.whl`.

2. Upload to TestPyPI:

   ```bash
   python -m twine upload --repository testpypi dist/*
   ```

3. If all good, upload to real PyPI:

   ```bash
   python -m twine upload dist/*
   ```

Then others can install with:

```bash
pip install stat4160
```

---

In summary:

* Module = single file. Package = folder with `__init__.py`.
* Use `src/` layout + `pyproject.toml`.
* Run `pip install -e .` for local editable install.
* Use pytest to test.
* Use build + twine to distribute on PyPI if you want.



###  `__init__.py` can be empty, or re-export common functions

* A folder is only treated as a **Python package** if it has an `__init__.py` file.
* If the file is empty → the package just groups submodules.
* If you want to **make imports shorter**, you can put exports in it.

Example:

```
src/stat4160/
├── __init__.py
├── mathy.py
└── features.py
```

`mathy.py`:

```python
def moving_avg(...): ...
```

`__init__.py` (empty):

```python
# nothing here
```

Now usage:

```python
from stat4160.mathy import moving_avg
```

But if you re-export inside `__init__.py`:

```python
from .mathy import moving_avg
from .features import build_features
```

Then usage becomes shorter:

```python
from stat4160 import moving_avg, build_features
```


* In `src/stat4160/mathy.py`, the **package name** is `stat4160`.
* Everything under that folder is part of the `stat4160` package.
* You import using that folder name:

  ```python
  from stat4160.mathy import moving_avg
  ```
* The `src/` folder itself is not the package, it’s just a container. That’s why you add `src` to `sys.path` (or use the `pyproject.toml` config).

---


These lines serve different purposes:

```bash
pip install build twine   # installs the build tools (one-time setup)
python -m build           # actually builds your package into dist/*.whl and dist/*.tar.gz
```

* You only need `pip install build twine` **once** (or whenever you set up a new environment).
* You run `python -m build` **every time** you want to produce new distribution files (e.g., after changing code or version).

If you want to upload to PyPI/TestPyPI, then you’d also use `twine`:

```bash
python -m twine upload --repository testpypi dist/*
```
So the typical workflow is:

1. Write code in `src/<yourpackage>/`.
2. Add `__init__.py`.
3. Write `pyproject.toml`.
4. Run once:

   ```bash
   pip install build twine
   ```
5. Build each time you want new wheels/tarballs:

   ```bash
   python -m build
   ```
6. Upload (optional):

   ```bash
   python -m twine upload dist/*
   ```
A Python package can absolutely contain **subfolders**, and each subfolder can itself be a **subpackage**.

The only requirement: if you want the subfolder to be importable as a package, it needs an `__init__.py` file (even if it’s empty).

---

### Example layout

```
src/
└── stat4160/
    ├── __init__.py
    ├── mathy.py
    ├── features/
    │   ├── __init__.py
    │   ├── timeseries.py
    │   └── transforms.py
    └── finance/
        ├── __init__.py
        └── models.py
```

---

### Imports with subpackages

With that structure:

```python
from stat4160.mathy import moving_avg
from stat4160.features.timeseries import rolling_mean
from stat4160.finance.models import BlackScholes
```

If you re-export inside `features/__init__.py`:

```python
# features/__init__.py
from .timeseries import rolling_mean
from .transforms import normalize
```

Then the user can do:

```python
from stat4160.features import rolling_mean, normalize
```

---

### Why subfolders are useful

* Organize large codebases by domain (e.g., `features/`, `finance/`, `plots/`).
* Keep related files grouped (e.g., `timeseries.py`, `transforms.py`).
* Control what’s exposed at each level via `__init__.py`.


