# Lab 2: Data Acquisition & APIs

Build a Reliable Pipeline with Free APIs (Colab Version)

> **Lab Versions**
>
> This is the **Colab version** using free APIs (yfinance). For
> Bloomberg Terminal access in the Financial Innovation Lab, see [Lab 2:
> Bloomberg Version](lab02_bloomberg.qmd).
>
> **Expected Time:**
>
> -   FIN510: Seminar hands‑on ≈ 60 min; Directed learning ≈ 90–120 min
> -   FIN720: Computer lab ≈ 120 min

<figure>
<a
href="https://colab.research.google.com/github/quinfer/fin510-colab-notebooks/blob/main/labs/lab02_apis.ipynb"><img
src="https://colab.research.google.com/assets/colab-badge.svg" /></a>
<figcaption>Open in Colab</figcaption>
</figure>

> **Bloomberg Terminal Access**
>
> Ulster students have access to ~20 Bloomberg terminals in the
> Financial Innovation Lab. If you’re interested in comparing
> professional-grade data with free APIs, check out the [Bloomberg
> Terminal version of this lab](lab02_bloomberg.qmd).

## Setup (Colab‑only installs)

In [1]:
try:
    import yfinance, pandas, pandas_datareader
except Exception:
    !pip -q install yfinance pandas pandas-datareader

## Before You Code: The Big Picture

Real-world financial data science starts with data acquisition. APIs are
your gateway to market data, but they’re unreliable—rate limits,
outages, data quality issues. Professional systems need **resilience**:
retry logic, fallback sources, validation, and logging.

> **The Three Pillars of Reliable Data Pipelines**
>
> **1. Resilience** → Handle API failures gracefully (retries,
> fallbacks, synthetic data)  
> **2. Validation** → Check data quality before analysis (missing
> values, outliers, gaps)  
> **3. Provenance** → Log data sources and transformations
> (reproducibility, debugging)
>
> These aren’t optional extras—they’re the difference between research
> toys and production systems.

### What You’ll Build Today

By the end of this lab, you will have:

-   ✅ Robust data fetching with retry logic and fallback sources
-   ✅ Data quality validation pipeline (missing values, outliers, range
    checks)
-   ✅ Provenance logging (what data, from where, when, what issues
    found)
-   ✅ Clean return series ready for analysis

**Time estimate:** 60 minutes (FIN510) \| 90-120 minutes (FIN720 with
extensions)

> **Why This Matters**
>
> In Coursework 2, you’ll build factor models using the JKP dataset. If
> your data pipeline has silent bugs (wrong dates, missing values,
> look-ahead bias), your entire analysis is invalidated. Build good
> habits now.

## Objectives

-   Pull assets with yfinance; validate and log
-   Handle missing values and out‑of‑range returns
-   Understand look-ahead bias and proper time alignment

## Task 1 — Download and Validate

This task implements a **resilient data pipeline**: try yfinance first,
fall back to Stooq if it fails, use synthetic data as last resort. This
three-tier approach ensures your analysis never stops due to API
failures.

> **📚 Professional Practice: Resilient API Design**
>
> **Why we need fallbacks:** - Free APIs have rate limits (yfinance:
> ~2000 calls/hour) - APIs go down (maintenance, outages, deprecated
> endpoints) - Network issues are common in cloud environments
>
> **Three-tier strategy:** 1. **Primary**: yfinance (best for US
> equities, free, widely used) 2. **Fallback**: Stooq via
> pandas-datareader (alternative free source) 3. **Last resort**:
> Synthetic data (ensures code always runs for testing)
>
> This pattern appears in production systems at every scale.

### Step 1: Define robust fetching functions

In [2]:
import os, time, random
import yfinance as yf
import pandas as pd
import numpy as np

symbols = ['AAPL', 'MSFT', 'SPY']

def get_close_from_yf(symbols, period='2y', tries=3):
    """
    Fetch adjusted closing prices from Yahoo Finance with retry logic.
    
    Implements exponential backoff to handle rate limits gracefully. Returns
    adjusted prices (splits/dividends applied) suitable for return calculations.
    
    Parameters
    ----------
    symbols : list of str
        Stock tickers (e.g., ['AAPL', 'MSFT'])
    period : str, default='2y'
        Data period: '1d', '5d', '1mo', '3mo', '6mo', '1y', '2y', '5y', '10y', 'ytd', 'max'
    tries : int, default=3
        Number of retry attempts before raising error
        
    Returns
    -------
    pd.DataFrame
        Adjusted closing prices with DatetimeIndex and symbol columns
        
    Raises
    ------
    RuntimeError
        If all retry attempts fail
        
    Notes
    -----
    - Uses `auto_adjust=True` to get split/dividend-adjusted prices
    - Implements exponential backoff: waits 2, 4, 6 seconds between retries
    - Handles yfinance MultiIndex columns (multiple symbols) vs single-symbol format
    - Returns empty rows are filtered (only keeps days with at least one price)
    
    Examples
    --------
    >>> prices = get_close_from_yf(['AAPL', 'MSFT'], period='1y')
    >>> prices.shape
    (252, 2)  # Roughly 252 trading days in a year
    >>> prices.iloc[-1]  # Most recent closing prices
    AAPL    182.50
    MSFT    415.20
    """
    last_err = None
    
    for attempt in range(tries):
        try:
            # Download with adjusted prices (splits/dividends applied)
            df = yf.download(symbols, period=period, auto_adjust=True, 
                           progress=False, group_by='ticker', threads=True)
            
            # yfinance returns MultiIndex columns when fetching multiple symbols
            if isinstance(df.columns, pd.MultiIndex):
                # Extract 'Close' price for each symbol
                closes = pd.concat(
                    {sym: df[sym]['Close'] for sym in symbols if sym in df.columns.levels[0]}, 
                    axis=1
                )
                closes.columns = [c if isinstance(c, str) else c[0] for c in closes.columns]
            else:
                # Single symbol returns simple columns
                closes = df['Close'].to_frame(symbols[0])
            
            # Only return if we got at least one non-empty row
            if closes.dropna(how='all').shape[0] > 0:
                return closes
                
        except Exception as e:
            last_err = e
        
        # Exponential backoff with jitter to avoid thundering herd
        time.sleep(2 * (attempt + 1) + random.random())
    
    raise RuntimeError(f"yfinance download failed after {tries} tries: {last_err}")


def get_close_from_stooq(symbols, years=2):
    """
    Fetch closing prices from Stooq via pandas-datareader (fallback source).
    
    Stooq provides historical price data for global markets. This function
    serves as fallback when yfinance fails. Fetches data sequentially to
    respect rate limits.
    
    Parameters
    ----------
    symbols : list of str
        Stock tickers (e.g., ['AAPL', 'MSFT'])
    years : int, default=2
        Number of years of historical data to fetch
        
    Returns
    -------
    pd.DataFrame
        Closing prices with DatetimeIndex (sorted) and symbol columns
        
    Raises
    ------
    RuntimeError
        If no symbols successfully fetched
        
    Notes
    -----
    - Fetches symbols sequentially (not parallel) to respect rate limits
    - Waits 0.4 seconds between requests
    - Silently skips symbols that fail (rather than stopping entire process)
    - Returns only successfully fetched symbols
    
    Examples
    --------
    >>> prices = get_close_from_stooq(['AAPL'], years=1)
    >>> prices.index.name
    'Date'
    """
    from datetime import datetime, timedelta
    from pandas_datareader import data as web
    
    # Define date range
    end = datetime.today()
    start = end - timedelta(days=365 * years + 14)  # Extra days for weekends/holidays
    
    series = []
    for sym in symbols:
        try:
            # Fetch from Stooq and extract 'Close' column
            s = web.DataReader(sym, 'stooq', start, end)['Close'].sort_index()
            s.name = sym
            series.append(s)
            
            # Respectful rate limiting
            time.sleep(0.4)
        except Exception:
            # Skip symbols that fail rather than stopping entire fetch
            pass
    
    if not series:
        raise RuntimeError("stooq fallback returned no data")
    
    return pd.concat(series, axis=1)


def synthetic_prices(symbols, periods=252, mu=0.0004, sigma=0.012):
    """
    Generate synthetic price series using geometric Brownian motion.
    
    This is a last-resort fallback when all real APIs fail. Useful for
    testing and development when APIs are unavailable. NOT FOR ANALYSIS.
    
    Parameters
    ----------
    symbols : list of str
        Symbol names for columns (can be anything)
    periods : int, default=252
        Number of business days to generate (~1 year of trading days)
    mu : float, default=0.0004
        Daily drift (mean return): 0.0004 ≈ 10% annual
    sigma : float, default=0.012
        Daily volatility: 0.012 ≈ 19% annual volatility
        
    Returns
    -------
    pd.DataFrame
        Synthetic prices starting at 100, with business day index
        
    Notes
    -----
    - Uses fixed seed (42) for reproducibility
    - Generates prices via: P(t) = 100 * exp(cumsum(returns))
    - Returns ~ Normal(mu, sigma) independently across symbols
    - Index: business days ending at today
    
    Examples
    --------
    >>> synth = synthetic_prices(['SYN1', 'SYN2'], periods=10)
    >>> synth.shape
    (10, 2)
    >>> (synth.iloc[-1] / synth.iloc[0]).mean()  # Typical growth
    1.04  # Roughly 4% over 10 days
    
    Warnings
    --------
    DO NOT use for actual analysis. This is for code testing only.
    """
    rng = np.random.default_rng(42)  # Fixed seed for reproducibility
    
    # Business day index ending today
    dates = pd.bdate_range(end=pd.Timestamp.today().normalize(), periods=periods)
    
    # Generate returns: Normal(mu, sigma)
    shocks = rng.normal(mu, sigma, size=(len(dates), len(symbols)))
    
    # Convert to prices: P(t) = P(0) * exp(cumsum(returns))
    levels = 100 * np.exp(np.cumsum(shocks, axis=0))
    
    return pd.DataFrame(levels, index=dates, columns=symbols)

### Step 2: Execute three-tier fetching strategy

In [3]:
# === Try primary source (yfinance) ===
try:
    prices = get_close_from_yf(symbols)
    source = 'yfinance'
    print(f"✔ Successfully fetched from yfinance")
    
except Exception as e1:
    print(f"⚠ yfinance failed: {e1}")
    
    # === Try fallback source (Stooq) ===
    try:
        prices = get_close_from_stooq(symbols)
        source = 'stooq (pandas-datareader)'
        print(f"✔ Successfully fetched from Stooq fallback")
        
    except Exception as e2:
        print(f"⚠ Stooq failed: {e2}")
        
        # === Last resort: synthetic data ===
        prices = synthetic_prices(symbols)
        source = f'synthetic (fallback due to API failures)'
        print(f"⚠ Using synthetic data (NOT for real analysis!)")

# Display first and last few rows
print(f"\nData shape: {prices.shape}")
print(f"\nFirst 3 rows:")
print(prices.head(3))
print(f"\nLast 3 rows:")
print(prices.tail(3))

⚠ yfinance failed: yfinance download failed after 3 tries: None
✔ Successfully fetched from Stooq fallback

Data shape: (511, 3)

First 3 rows:
               AAPL     MSFT      SPY
Date                                 
2023-09-26  170.709  309.196  417.532
2023-09-27  169.191  309.839  417.701
2023-09-28  169.447  310.681  420.122

Last 3 rows:
              AAPL    MSFT     SPY
Date                              
2025-10-06  256.69  528.57  671.61
2025-10-07  256.48  523.98  669.12
2025-10-08  258.06  524.85  673.11

> **What Just Happened?**
>
> The try-except cascade ensures you **always get data**, even if APIs
> fail. In production, you’d add alerts when fallbacks activate so
> engineers can investigate the primary failure.

### Step 3: Validate data quality

In [4]:
# === Build provenance log ===
log = {}
log['source'] = source
log['symbols_requested'] = len(symbols)
log['symbols_received'] = len(prices.columns)
log['date_range'] = f"{prices.index[0]} to {prices.index[-1]}"
log['trading_days'] = len(prices)

# === Check for missing prices ===
log['missing_prices'] = int(prices.isna().sum().sum())
log['missing_pct'] = f"{(prices.isna().sum().sum() / prices.size * 100):.2f}%"

# === Calculate returns and check quality ===
rets = prices.pct_change()
log['missing_returns'] = int(rets.isna().sum().sum())

# === Flag outliers (|return| > 20% = likely data error or halt) ===
log['out_of_range'] = int((rets.abs() > 0.2).sum().sum())

# Display log
import json
print("\n=== Data Quality Log ===")
print(json.dumps(log, indent=2))

# === Quality gate ===
if prices.dropna(how='all').shape[0] > 0:
    print(f"\n✔ Data source: {source}")
    print(f"✔ Download and validation checks passed")
else:
    print(f"\n⚠ Warning: no data returned from any source")


=== Data Quality Log ===
{
  "source": "stooq (pandas-datareader)",
  "symbols_requested": 3,
  "symbols_received": 3,
  "date_range": "2023-09-26 00:00:00 to 2025-10-08 00:00:00",
  "trading_days": 511,
  "missing_prices": 0,
  "missing_pct": "0.00%",
  "missing_returns": 3,
  "out_of_range": 0
}

✔ Data source: stooq (pandas-datareader)
✔ Download and validation checks passed

> **Quality Checks Explained**
>
> -   **Missing prices**: Gaps in time series (holidays, halts,
>     delisting)
> -   **Missing returns**: First row is always NaN (no prior price to
>     compare)
> -   **Out-of-range**: \|return\| \> 20% often indicates data errors,
>     stock splits, or trading halts
>
> **Rule of thumb:** \<1% missing is acceptable, \>5% requires
> investigation
>
> **Checkpoint:** Look at your log. Which quality issues did you find?
> How would you handle them differently for a production system
> vs. academic analysis?

## Task 2 — Clean and Save

Raw data always has issues. Professional practice: **clean
conservatively** and **document decisions**.

> **Cleaning Strategy**
>
> 1.  **Drop NaN rows** - Can’t calculate returns without prices
> 2.  **Clip outliers** - Cap extreme values at ±20% (likely errors or
>     halts)
> 3.  **Save intermediate output** - Enables reproducibility and
>     debugging
> 4.  **Document transformations** - What did you change and why?

### Step 1: Clean the data

In [5]:
# === Remove rows where all returns are missing ===
clean = rets.dropna()

print(f"Original shape: {rets.shape}")
print(f"After dropna: {clean.shape}")
print(f"Rows removed: {rets.shape[0] - clean.shape[0]}")

# === Clip extreme values (conservative approach) ===
# Instead of deleting outliers, cap them at reasonable limits
# -20% to +20% captures 99%+ of normal daily returns
clean_clipped = clean.clip(lower=-0.2, upper=0.2)

# Count how many values were clipped
clipped_low = (clean < -0.2).sum().sum()
clipped_high = (clean > 0.2).sum().sum()

print(f"\n=== Outlier Treatment ===")
print(f"Values clipped at lower bound (-20%): {clipped_low}")
print(f"Values clipped at upper bound (+20%): {clipped_high}")

# Display sample
print(f"\nCleaned returns (last 5 days):")
print(clean_clipped.tail())

Original shape: (511, 3)
After dropna: (510, 3)
Rows removed: 1

=== Outlier Treatment ===
Values clipped at lower bound (-20%): 0
Values clipped at upper bound (+20%): 0

Cleaned returns (last 5 days):
                AAPL      MSFT       SPY
Date                                    
2025-10-02  0.006577 -0.007639  0.001152
2025-10-03  0.003461  0.003122 -0.000015
2025-10-06 -0.005155  0.021687  0.003586
2025-10-07 -0.000818 -0.008684 -0.003708
2025-10-08  0.006160  0.001660  0.005963

> **Why Clip Instead of Delete?**
>
> -   **Deleting** outliers shortens your time series (breaks
>     continuity)
> -   **Clipping** preserves all dates while limiting extreme values
> -   For academic analysis, document which approach you use and why

### Step 2: Save cleaned data

In [6]:
# === Save to CSV for later use ===
clean_clipped.to_csv('returns_clean.csv')

# === Verify file was created ===
import os
if os.path.exists('returns_clean.csv'):
    file_size = os.path.getsize('returns_clean.csv')
    print(f"✔ Saved returns_clean.csv ({file_size:,} bytes)")
else:
    print("⚠ Warning: File not created")

# === Document the cleaning process ===
cleaning_log = {
    'rows_original': rets.shape[0],
    'rows_after_dropna': clean.shape[0],
    'values_clipped_low': int(clipped_low),
    'values_clipped_high': int(clipped_high),
    'clip_bounds': '[-20%, +20%]',
    'output_file': 'returns_clean.csv'
}

print(f"\n=== Cleaning Summary ===")
print(json.dumps(cleaning_log, indent=2))

✔ Saved returns_clean.csv (38,609 bytes)

=== Cleaning Summary ===
{
  "rows_original": 511,
  "rows_after_dropna": 510,
  "values_clipped_low": 0,
  "values_clipped_high": 0,
  "clip_bounds": "[-20%, +20%]",
  "output_file": "returns_clean.csv"
}

**Deliverable:** Write a short note (100-150 words) describing: - What
issues you found (missing values, outliers) - How you handled them
(dropna, clipping) - Why these choices are appropriate for financial
return data - What trade-offs you made (e.g., information loss
vs. robustness)

> **Troubleshooting**
>
> -   API download empty: try fewer symbols or shorter period.
> -   Many outliers: inspect corporate actions/adjustments; consider
>     `auto_adjust=True`.
> -   CSV not found: ensure current working directory permissions in
>     Colab.

> **Further Reading (Hilpisch 2019)**
>
> -   See: [Hilpisch Code Resources](../resources/hilpisch-code.qmd) —
>     Week 2
> -   Chapter 13 (ML pipelines) shows end‑to‑end workflows (features →
>     pipeline → evaluation) you can mirror with time‑aware splits.

## Mini‑Task — JKP Sample (Primer for Coursework 2)

This short exercise previews the JKP factor dataset used in Coursework
2. Load a small sample CSV, compute quick stats, and (optionally) run a
one‑line CAPM alpha.

In [7]:
# JKP sample (course mirror) — small monthly slice with MKT, SMB, HML, MOM
import pandas as pd, os
import statsmodels.api as sm

# Prefer local file during site build; fall back to raw GitHub if needed
local_path = os.path.join('..','resources','jkp-sample.csv')
if os.path.exists(local_path):
    jkp = pd.read_csv(local_path, parse_dates=['date']).set_index('date').sort_index()
else:
    # Use public notebooks repo URL for Colab
    url = "https://raw.githubusercontent.com/quinfer/fin510-colab-notebooks/main/resources/jkp-sample.csv"
    jkp = pd.read_csv(url, parse_dates=['date']).set_index('date').sort_index()

# Summary stats and quick cumulative return for MOM
summary = jkp[['MKT','SMB','HML','MOM']].describe().round(3)
cum = (1 + jkp['MOM']).cumprod() - 1
summary.tail(3), cum.tail()

# Optional: CAPM alpha (no HAC here — use HAC in the assessment)
ls = jkp['MOM'].dropna()
mkt = jkp['MKT'].reindex(ls.index)
capm = sm.OLS(ls, sm.add_constant(mkt)).fit()
float(capm.params['const']), float(capm.tvalues['const'])

(0.005408343449950338, 1.763810502676042)

Notes - In the assessment you will use a larger CSV downloaded from the
JKP portal and apply HAC standard errors. - Keep scope tight (few
factors, limited window) and focus on quality of evidence.

## Quick Leakage Check (Practice)

In [8]:
# Ensure prediction tasks shift the target correctly
import pandas as pd

# Intentionally wrong design (no shift) for demonstration
X_wrong = jkp[['MKT','SMB','HML','MOM']].dropna()
y_next   = jkp['MOM'].shift(-1)               # next-month target

# Overlap of indices indicates potential leakage if you don't drop/shift properly
overlap = X_wrong.index.intersection(y_next.dropna().index)
print("Potential leakage rows with wrong design:", len(overlap))

# Correct design: predictors at t, target at t+1 → align and drop NA
X = jkp[['MKT','SMB','HML']].shift(0)
y = jkp['MOM'].shift(-1)
df = pd.concat([X, y.rename('y')], axis=1).dropna()
print("Rows after proper shift/drop:", len(df))

Potential leakage rows with wrong design: 9
Rows after proper shift/drop: 9