# Stage 04 — Data Acquisition & Ingestion

In [24]:
import os, datetime as dt
from pathlib import Path

import pandas as pd
import requests
from bs4 import BeautifulSoup
from dotenv import load_dotenv

# Load .env if present
load_dotenv()
API_KEY = os.getenv("ALPHAVANTAGE_API_KEY", "")

# Project paths relative to repo root
ROOT = Path.cwd().parents[0] if Path.cwd().name == "notebooks" else Path.cwd()
RAW_DIR = (ROOT / "data" / "raw")
PROC_DIR = (ROOT / "data" / "processed")
RAW_DIR.mkdir(parents=True, exist_ok=True)
PROC_DIR.mkdir(parents=True, exist_ok=True)

def nowstamp(fmt="%Y%m%d-%H%M"):
    return dt.datetime.now().strftime(fmt)

def save_raw_csv(df: pd.DataFrame, kind: str, source: str, tag: str) -> Path:
    path = RAW_DIR / f"{kind}_{source}_{tag}_{nowstamp()}.csv"
    df.to_csv(path, index=False)
    print("Saved:", path)
    return path

def ensure_dtypes(df: pd.DataFrame, date_cols=None, float_cols=None, int_cols=None):
    df = df.copy()
    date_cols = date_cols or []
    float_cols = float_cols or []
    int_cols = int_cols or []
    for c in date_cols:
        if c in df.columns:
            df[c] = pd.to_datetime(df[c], errors="coerce")
    for c in float_cols:
        if c in df.columns:
            df[c] = pd.to_numeric(df[c], errors="coerce")
    for c in int_cols:
        if c in df.columns:
            df[c] = pd.to_numeric(df[c], errors="coerce").astype("Int64")
    return df

def validate_df(df: pd.DataFrame, required_cols: list[str], min_rows: int = 1, name: str = "df"):
    if df is None:
        raise ValueError(f"[{name}] is None")
    missing = [c for c in required_cols if c not in df.columns]
    if missing:
        raise ValueError(f"[{name}] missing columns: {missing}")
    if len(df) < min_rows:
        raise ValueError(f"[{name}] not enough rows: {len(df)} < {min_rows}")
    na = df[required_cols].isna().sum().to_dict()
    print(f"[{name}] OK — shape={df.shape} | NA(required)={na}")
    return {"ok": True, "missing_cols": [], "na_counts": na, "shape": df.shape}

In [25]:
import pandas as pd

TICKER = "AAPL"

def _unify_cols_api(df: pd.DataFrame) -> pd.DataFrame:
    # Map both "DAILY" and "DAILY_ADJUSTED" schemas to common names
    mapping = {
        "1. open":"open",
        "2. high":"high",
        "3. low":"low",
        "4. close":"close",
        "5. adjusted close":"adj_close",
        "5. volume":"volume",  # DAILY
        "6. volume":"volume",  # DAILY_ADJUSTED
    }
    out = df.rename(columns={k:v for k,v in mapping.items() if k in df.columns})
    if "adj_close" not in out and "close" in out:
        out["adj_close"] = out["close"]
    keep = [c for c in ["open","high","low","close","adj_close","volume"] if c in out.columns]
    return out[keep]

def fetch_prices(symbol: str, function="TIME_SERIES_DAILY", size="compact") -> tuple[pd.DataFrame, str]:
    # Try Alpha Vantage if API_KEY exists
    if API_KEY:
        try:
            url = "https://www.alphavantage.co/query"
            params = {"function": function, "symbol": symbol, "outputsize": size, "apikey": API_KEY}
            r = requests.get(url, params=params, timeout=30)
            r.raise_for_status()
            js = r.json()
            # throttle or error messages
            if any(k in js for k in ("Note","Error Message","Information")):
                raise RuntimeError(js.get("Note") or js.get("Error Message") or js.get("Information"))
            key = next((k for k in js if "Time Series" in k), None)
            if not key:
                raise RuntimeError(f"No time series in payload. Keys: {list(js.keys())[:6]}")
            raw = pd.DataFrame(js[key]).T
            raw.index = pd.to_datetime(raw.index)
            raw = raw.astype(float)
            df = _unify_cols_api(raw).reset_index().rename(columns={"index":"date"})
            df = ensure_dtypes(df, date_cols=["date"], float_cols=["open","high","low","close","adj_close"])
            df = df.sort_values("date").reset_index(drop=True)
            return df, "alphavantage"
        except Exception as e:
            print("Alpha Vantage error → fallback:", e)

    # Fallback: yfinance (no key)
    import yfinance as yf
    yf_df = yf.download(symbol, period="9mo", interval="1d", auto_adjust=False, progress=False)
    if yf_df.empty:
        raise RuntimeError("yfinance returned no rows")
    yf_df = yf_df.rename(columns={
        "Open":"open","High":"high","Low":"low","Close":"close","Adj Close":"adj_close","Volume":"volume"
    })
    df = yf_df.reset_index().rename(columns={"Date":"date"})
    df = df[["date","open","high","low","close","adj_close","volume"]]
    df = ensure_dtypes(df, date_cols=["date"], float_cols=["open","high","low","close","adj_close"])
    df = df.sort_values("date").reset_index(drop=True)
    return df, "yfinance"

# Run ingestion
api_df, api_src = fetch_prices(TICKER, function="TIME_SERIES_DAILY", size="compact")
validate_df(api_df, required_cols=[c for c in ["date","open","high","low","close","volume"] if c in api_df.columns], min_rows=5, name=f"api:{api_src}")
api_csv = save_raw_csv(api_df, kind="api", source=api_src, tag=TICKER.lower())
api_csv

[api:alphavantage] OK — shape=(100, 7) | NA(required)={'date': 0, 'open': 0, 'high': 0, 'low': 0, 'close': 0, 'volume': 0}
Saved: c:\Users\Tracy\bootcamp_yuning_wang\homework\stage04_data-acquisition-and-ingestion\data\raw\api_alphavantage_aapl_20250820-1343.csv


WindowsPath('c:/Users/Tracy/bootcamp_yuning_wang/homework/stage04_data-acquisition-and-ingestion/data/raw/api_alphavantage_aapl_20250820-1343.csv')

In [26]:
import io, re, numpy as np
import pandas as pd
import requests
from bs4 import BeautifulSoup

URL  = "https://en.wikipedia.org/wiki/List_of_countries_by_GDP_(nominal)"
SITE = "wikipedia"
TAG  = "gdp_nominal"

# 1) download + locate a wikitable that has IMF / World Bank / United Nations columns
resp = requests.get(URL, headers={"User-Agent":"Stage04-Homework/1.0"}, timeout=30)
resp.raise_for_status()
soup = BeautifulSoup(resp.text, "lxml")

# pick the first wikitable whose header row mentions IMF & World Bank
target = None
for tbl in soup.find_all("table", {"class":"wikitable"}):
    first_row = tbl.find("tr")
    if not first_row:
        continue
    head_txt = " ".join(th.get_text(" ", strip=True) for th in first_row.find_all(["th","td"]))
    if ("IMF" in head_txt and "World Bank" in head_txt) or ("United Nations" in head_txt):
        target = tbl
        break
if target is None:
    raise RuntimeError("Could not find expected GDP (nominal) table.")

# 2) read it as a DataFrame with two header rows, then flatten to friendly names
df0 = pd.read_html(io.StringIO(str(target)), header=[0,1])[0]

def friendly_col(col_tup):
    # col_tup is ("Top Header", "Sub Header") or similar
    top  = str(col_tup[0]).strip()
    sub  = str(col_tup[1]).strip() if len(col_tup) > 1 else ""
    # country column shows duplicated label across levels
    if top.lower().startswith("country") and (sub == "" or sub.lower().startswith("country")):
        return "Country/Territory"
    # IMF
    if "IMF" in top:
        return "IMF_Forecast" if "Forecast" in sub else ("IMF_Year" if "Year" in sub else f"IMF_{sub or 'Value'}")
    # World Bank
    if "World Bank" in top:
        return "WB_Estimate" if "Estimate" in sub else ("WB_Year" if "Year" in sub else f"WB_{sub or 'Value'}")
    # United Nations
    if "United Nations" in top:
        return "UN_Estimate" if "Estimate" in sub else ("UN_Year" if "Year" in sub else f"UN_{sub or 'Value'}")
    # fallback: join pieces
    combo = " ".join([p for p in [top, sub] if p and p != "nan"]).strip()
    return re.sub(r"\s+", "_", combo)

df0.columns = [friendly_col(t) for t in df0.columns]

# 3) keep just the small, clear set of columns
wanted = ["Country/Territory", "IMF_Forecast", "IMF_Year", "WB_Estimate", "WB_Year", "UN_Estimate", "UN_Year"]
present = [c for c in wanted if c in df0.columns]
df = df0[present].copy()

# 4) clean text + numerics
# remove footnote brackets like [n 1] anywhere
df["Country/Territory"] = (
    df["Country/Territory"]
    .astype(str)
    .str.replace(r"\[[^\]]*\]", "", regex=True)
    .str.strip()
)

# replace dashes and empty strings with NaN
df.replace({"—": np.nan, "–": np.nan, "": np.nan}, inplace=True)

# coerce numbers
num_cols = [c for c in df.columns if c != "Country/Territory"]
for c in num_cols:
    # keep digits and dots; drop footnote artifacts like [n 1]
    df[c] = (
        df[c].astype(str)
             .str.replace(r"\[[^\]]*\]", "", regex=True)
             .str.replace(r"[^\d\.\-]", "", regex=True)
    )
    # year columns should be integers; estimates are big numbers
    if "Year" in c:
        df[c] = pd.to_numeric(df[c], errors="coerce").astype("Int64")
    else:
        df[c] = pd.to_numeric(df[c], errors="coerce")

# drop rows missing the country name
df = df.dropna(subset=["Country/Territory"]).reset_index(drop=True)

# 5) validate
required = ["Country/Territory"] + [c for c in ["IMF_Forecast","IMF_Year","WB_Estimate","WB_Year","UN_Estimate","UN_Year"] if c in df.columns]
validate_df(df, required_cols=required, min_rows=5, name="scrape:gdp_nominal")

# 6) show a small preview
print("Preview:")
display(df.head(12))  # small preview like your example

# 7) save
scrape_csv = save_raw_csv(df, kind="scrape", source=SITE, tag=TAG)
scrape_csv

[scrape:gdp_nominal] OK — shape=(222, 7) | NA(required)={'Country/Territory': 0, 'IMF_Forecast': 32, 'IMF_Year': 32, 'WB_Estimate': 12, 'WB_Year': 12, 'UN_Estimate': 9, 'UN_Year': 9}
Preview:


Unnamed: 0,Country/Territory,IMF_Forecast,IMF_Year,WB_Estimate,WB_Year,UN_Estimate,UN_Year
0,World,113795678.0,2025,111326370.0,2024,100834796.0,2022
1,United States,30507217.0,2025,29184890.0,2024,27720700.0,2023
2,China,19231705.0,2025,18743803.0,2024,17794782.0,2023
3,Germany,4744804.0,2025,4659929.0,2024,4525704.0,2023
4,India,4187017.0,2025,3912686.0,2024,3575778.0,2023
5,Japan,4186431.0,2025,4026211.0,2024,4204495.0,2023
6,United Kingdom,3839180.0,2025,3643834.0,2024,3380855.0,2023
7,France,3211292.0,2025,3162079.0,2024,3051832.0,2023
8,Italy,2422855.0,2025,2372775.0,2024,2300941.0,2023
9,Canada,2225341.0,2025,2241253.0,2024,2142471.0,2023


Saved: c:\Users\Tracy\bootcamp_yuning_wang\homework\stage04_data-acquisition-and-ingestion\data\raw\scrape_wikipedia_gdp_nominal_20250820-1343.csv


WindowsPath('c:/Users/Tracy/bootcamp_yuning_wang/homework/stage04_data-acquisition-and-ingestion/data/raw/scrape_wikipedia_gdp_nominal_20250820-1343.csv')

# Documentation (Stage 04)
# API Source:
- Base: https://www.alphavantage.co/query
- Params: function=TIME_SERIES_DAILY, symbol=AAPL, outputsize=compact, apikey from .env (ALPHAVANTAGE_API_KEY)
- Fallback: yfinance (period=9mo, interval=1d) if rate-limited or no key
- Output: data/raw/api_<source>_<AAPL>_<YYYYMMDD-HHMM>.csv

# Scrape Source:
- URL: https://en.wikipedia.org/wiki/List_of_countries_by_GDP_(nominal)
- Table: IMF / World Bank / United Nations GDP (two-row header → flattened to:
  Country/Territory, IMF_Forecast, IMF_Year, WB_Estimate, WB_Year, UN_Estimate, UN_Year)
- Output: data/raw/scrape_wikipedia_gdp_nominal_<YYYYMMDD-HHMM>.csv

# Assumptions & Risks:
- API may throttle; code falls back to yfinance.
- Wikipedia structure/headers can change; selector + header-flattening may need updates.
- Footnotes/dashes cleaned before numeric coercion; validations enforce required cols and ≥5 rows.
