# Stage 04: Data Acquisition and Ingestion Homework

This notebook demonstrates data acquisition from APIs and web scraping, with proper validation and file naming conventions.

## Setup and Imports

In [None]:
import os, json, datetime as dt
from pathlib import Path
from typing import Dict
import pandas as pd
import requests
from bs4 import BeautifulSoup
from dotenv import load_dotenv
import yfinance as yf

print("All imports successful!")

## Project Setup

In [None]:
# --- Project paths
DATA_RAW = Path("data/raw")
DATA_RAW.mkdir(parents=True, exist_ok=True)

# --- Load secrets
load_dotenv()
ALPHA_KEY = os.getenv("ALPHAVANTAGE_API_KEY")

print(f"Data directory created: {DATA_RAW}")
print(f"Alpha Vantage API key available: {bool(ALPHA_KEY)}")

## Helper Functions

In [None]:
# --- Helper functions
def safe_stamp():
    return dt.datetime.now().strftime("%Y%m%d-%H%M%S")

def safe_filename(prefix: str, meta: Dict[str, str]) -> str:
    mid = "_".join([f"{k}-{str(v).replace(' ', '-')[:30]}" for k,v in meta.items()])
    return f"{prefix}_{mid}_{safe_stamp()}.csv"

def validate_df(df: pd.DataFrame,
                required_cols: list,
                dtypes_map: Dict[str,str] = None,
                min_rows: int = 1) -> dict:
    dtypes_map = dtypes_map or {}
    msgs = {
        "missing_cols": [],
        "bad_dtypes": {},
        "na_count": int(df.isna().sum().sum()),
        "n_rows": int(df.shape[0]),
        "n_cols": int(df.shape[1]),
    }
    # required cols
    msgs["missing_cols"] = [c for c in required_cols if c not in df.columns]
    # dtype checks (attempt coercion on a copy)
    for col, t in dtypes_map.items():
        if col not in df.columns: 
            msgs["bad_dtypes"][col] = f"missing for dtype {t}"
            continue
        try:
            if t.startswith("datetime"):
                _ = pd.to_datetime(df[col], errors="coerce")
            elif t in ("float", "float64"):
                _ = pd.to_numeric(df[col], errors="coerce")
            elif t in ("int","int64"):
                _ = pd.to_numeric(df[col], errors="coerce").astype("Int64")
            else:
                _ = df[col].astype(t)
            # if coercion yields too many NA, flag
            if _.isna().mean() > 0.1:
                msgs["bad_dtypes"][col] = f"too many NA after cast to {t}"
        except Exception as e:
            msgs["bad_dtypes"][col] = f"cast to {t} failed: {e}"
    # basic shape rule
    if msgs["n_rows"] < min_rows:
        msgs["bad_shape"] = f"rows<{min_rows}"
    return msgs

print("Helper functions defined successfully!")

## Part 1: API Data Acquisition

In [None]:
SYMBOL = "AAPL"  # 你可以换成任意股票代码
use_alpha = bool(ALPHA_KEY)

if use_alpha:
    url = "https://www.alphavantage.co/query"
    params = {
        "function": "TIME_SERIES_DAILY_ADJUSTED",
        "symbol": SYMBOL,
        "outputsize": "compact",
        "apikey": ALPHA_KEY,
        "datatype": "json"
    }
    try:
        r = requests.get(url, params=params, timeout=30)
        r.raise_for_status()
        js = r.json()
        # 找到时间序列键
        ts_key = [k for k in js.keys() if "Time Series" in k]
        assert ts_key, f"No time series key in response: keys={list(js.keys())[:5]}"
        ts = js[ts_key[0]]
        # 转成 DataFrame（date, adj_close）
        df_api = (
            pd.DataFrame(ts)
              .T.reset_index()
              .rename(columns={"index":"date", "5. adjusted close":"adj_close"})
              [["date","adj_close"]]
        )
        # 类型转换
        df_api["date"] = pd.to_datetime(df_api["date"])
        df_api["adj_close"] = pd.to_numeric(df_api["adj_close"], errors="coerce")
        print(f"✅ Successfully fetched data from Alpha Vantage for {SYMBOL}")
    except Exception as e:
        print("Alpha Vantage failed, falling back to yfinance. Error:", e)
        use_alpha = False

if not use_alpha:
    print(f"📊 Fetching {SYMBOL} data from yfinance...")
    df_api = yf.download(SYMBOL, period="6mo", interval="1d").reset_index()[["Date","Adj Close"]]
    df_api.columns = ["date","adj_close"]
    df_api["date"] = pd.to_datetime(df_api["date"])
    df_api["adj_close"] = pd.to_numeric(df_api["adj_close"], errors="coerce")
    print(f"✅ Successfully fetched data from yfinance for {SYMBOL}")

# 排序 + 校验 + 落盘
df_api = df_api.sort_values("date").reset_index(drop=True)
msgs = validate_df(df_api,
                   required_cols=["date","adj_close"],
                   dtypes_map={"date":"datetime64[ns]","adj_close":"float"},
                   min_rows=10)
print("API validation:", msgs)

fname = safe_filename(prefix="api",
                      meta={"source": "alpha" if use_alpha else "yfinance",
                            "symbol": SYMBOL})
out_path = DATA_RAW / fname
df_api.to_csv(out_path, index=False)
print("Saved:", out_path)

# Display first few rows
print("\nFirst 5 rows of API data:")
print(df_api.head())

## Part 2: Web Scraping

In [None]:
SCRAPE_URL = "https://en.wikipedia.org/wiki/List_of_S%26P_500_companies"  # 可替换为你确认允许抓取的页面
headers = {"User-Agent": "AFE-Course-Notebook/1.0 (contact: your_email@example.com)"}

def parse_first_table(html: str) -> pd.DataFrame:
    soup = BeautifulSoup(html, "html.parser")
    table = soup.find("table")
    assert table is not None, "No <table> element found"
    rows = []
    for tr in table.find_all("tr"):
        cells = [td.get_text(strip=True) for td in tr.find_all(["td","th"])]
        if cells:
            rows.append(cells)
    header, *data = rows
    return pd.DataFrame(data, columns=header)

try:
    print(f"🌐 Attempting to scrape: {SCRAPE_URL}")
    resp = requests.get(SCRAPE_URL, headers=headers, timeout=30)
    resp.raise_for_status()
    df_scrape = parse_first_table(resp.text)
    print(f"✅ Successfully scraped table with {df_scrape.shape[0]} rows and {df_scrape.shape[1]} columns")
except Exception as e:
    print("Scrape failed (using inline demo table).", e)
    html = """
    <table>
      <tr><th>Ticker</th><th>Price</th></tr>
      <tr><td>AAA</td><td>101.2</td></tr>
      <tr><td>BBB</td><td>98.7</td></tr>
    </table>
    """
    df_scrape = parse_first_table(html)
    print(f"📋 Using demo table with {df_scrape.shape[0]} rows and {df_scrape.shape[1]} columns")

# 尝试把可能的数字列转为数值（示例：列名包含 Price 或带逗号/美元符号）
for col in df_scrape.columns:
    if any(x in col.lower() for x in ["price","close","volume","market","cap","weight","%","chg","change"]):
        cleaned = (
            df_scrape[col]
            .str.replace(r"[^0-9.\-]", "", regex=True)
            .replace({"": None})
        )
        maybe_num = pd.to_numeric(cleaned, errors="coerce")
        # 只有在多数值可转换时才替换
        if maybe_num.notna().mean() > 0.5:
            df_scrape[col] = maybe_num
            print(f"🔢 Converted column '{col}' to numeric")

msgs2 = validate_df(df_scrape, required_cols=list(df_scrape.columns), dtypes_map={}, min_rows=3)
print("SCRAPE validation:", msgs2)

fname2 = safe_filename(prefix="scrape", meta={"site":"wikipedia","table":"first"})
out_path2 = DATA_RAW / fname2
df_scrape.to_csv(out_path2, index=False)
print("Saved:", out_path2)

# Display first few rows
print("\nFirst 5 rows of scraped data:")
print(df_scrape.head())

## Summary

In [None]:
# List all generated files
print("📁 Generated files in data/raw/:")
for file in DATA_RAW.glob("*.csv"):
    print(f"  - {file.name} ({file.stat().st_size} bytes)")

print("\n✅ Data acquisition and ingestion completed successfully!")
print(f"📊 API data: {df_api.shape[0]} rows, {df_api.shape[1]} columns")
print(f"🌐 Scraped data: {df_scrape.shape[0]} rows, {df_scrape.shape[1]} columns")