# Stage 04 — Data Acquisition & Ingestion (ASML)
API pull (Alpha Vantage or yfinance fallback) + scrape Wikipedia table; validate; save to `project/data/raw`.


Imports, paths, utils

In [1]:
from __future__ import annotations
import os, sys, time, json, re
from pathlib import Path
from datetime import datetime

import requests
import pandas as pd
import numpy as np
from bs4 import BeautifulSoup
from dotenv import load_dotenv

# Resolve project root when running from project/notebooks
project_root = Path.cwd().resolve().parents[0] if Path.cwd().name == "notebooks" else Path.cwd().resolve()
sys.path.append(str(project_root / "src"))

DATA_DIR = project_root / "data"
RAW_DIR = DATA_DIR / "raw"
RAW_DIR.mkdir(parents=True, exist_ok=True)

load_dotenv(project_root / ".env")
ALPHA_KEY = os.getenv("ALPHAVANTAGE_API_KEY")

def now_stamp():
    return datetime.now().strftime("%Y%m%d-%H%M")

def validate_df(df: pd.DataFrame, required_cols: list[str] | None = None, type_hints: dict[str, str] | None = None):
    """Minimal, high-value checks (schema, types, completeness). Robust to weird dtypes/2-D columns."""
    msgs = {}

    # 1) Schema
    if required_cols:
        missing = [c for c in required_cols if c not in df.columns]
        if missing:
            msgs["missing_cols"] = f"Missing columns: {missing}"

    # 2) Types/coercion (defensive)
    if type_hints:
        for col, t in type_hints.items():
            if col not in df.columns:
                continue

            s = df[col]

            # If somehow 2-D (e.g., DataFrame slipped in), squeeze to 1-D
            if hasattr(s, "ndim") and getattr(s, "ndim", 1) > 1:
                try:
                    s = s.squeeze()  # try to collapse to 1-D
                except Exception:
                    # fall back to first column if it's a DataFrame
                    try:
                        s = s.iloc[:, 0]
                    except Exception:
                        msgs.setdefault("type_errors", []).append(f"{col}: could not reduce to 1-D")
                        continue

            if isinstance(s, pd.DataFrame):
                # Last resort: pick first column
                s = s.iloc[:, 0]

            if t.startswith("datetime"):
                try:
                    s = pd.to_datetime(s, errors="coerce")
                except Exception as e:
                    msgs.setdefault("type_errors", []).append(f"{col}: {e}")

            elif t in ("float", "float64"):
                # If not numeric, clean common symbols then coerce
                if not pd.api.types.is_numeric_dtype(s):
                    s = (
                        s.astype(str)
                         .str.replace(r"\[.*?\]", "", regex=True)
                         .str.replace(",", "", regex=False)
                         .str.replace("%", "", regex=False)
                         .str.replace("$", "", regex=False)
                         .str.strip()
                    )
                s = pd.to_numeric(s, errors="coerce")

            elif t in ("int", "int64"):
                s = pd.to_numeric(s, errors="coerce").astype("Int64")

            # write back
            df[col] = s

    # 3) Completeness & shape
    msgs["shape"] = f"{df.shape}"
    msgs["na_counts"] = {k: int(v) for k, v in df.isna().sum().to_dict().items()}

    return df, msgs


print("Project root:", project_root)
print("Using RAW_DIR:", RAW_DIR)
print("ALPHAVANTAGE key present:", bool(ALPHA_KEY))


Project root: C:\Users\melin\OneDrive\Desktop\nyu\python\bootcamp_panagiotis_housos\project
Using RAW_DIR: C:\Users\melin\OneDrive\Desktop\nyu\python\bootcamp_panagiotis_housos\project\data\raw
ALPHAVANTAGE key present: False


API pull (Alpha Vantage primary, yfinance fallback)

In [5]:
def fetch_alphavantage_daily(symbol: str, api_key: str, outputsize: str = "compact") -> pd.DataFrame:
    """
    TIME_SERIES_DAILY_ADJUSTED for US symbol (e.g., ASML).
    outputsize: 'compact' (100) or 'full' (20+ years)
    """
    url = "https://www.alphavantage.co/query"
    params = {
        "function": "TIME_SERIES_DAILY_ADJUSTED",
        "symbol": symbol,
        "outputsize": outputsize,
        "datatype": "json",
        "apikey": api_key,
    }
    r = requests.get(url, params=params, timeout=30)
    r.raise_for_status()
    js = r.json()
    if "Time Series (Daily)" not in js:
        raise ValueError(f"Unexpected response keys: {list(js.keys())[:5]}")
    ts = js["Time Series (Daily)"]
    # Build DataFrame
    recs = []
    for d, vals in ts.items():
        recs.append({
            "date": d,
            "open": float(vals["1. open"]),
            "high": float(vals["2. high"]),
            "low": float(vals["3. low"]),
            "close": float(vals["4. close"]),
            "adjusted_close": float(vals["5. adjusted close"]),
            "volume": float(vals["6. volume"]),
            # "dividend_amount": float(vals["7. dividend amount"]),
            # "split_coefficient": float(vals["8. split coefficient"]),
        })
    df = pd.DataFrame(recs)
    df["date"] = pd.to_datetime(df["date"])
    df = df.sort_values("date").reset_index(drop=True)
    return df

def fetch_yfinance_daily(symbol: str, period: str = "5y", interval: str = "1d") -> pd.DataFrame:
    import yfinance as yf

    df = yf.download(
        symbol,
        period=period,
        interval=interval,
        auto_adjust=False,
        progress=False,
        group_by="column",  # keep OHLCV grouped; we'll flatten next
    )

    # If yfinance returned MultiIndex columns (e.g., ('Close','ASML')), flatten them
    if isinstance(df.columns, pd.MultiIndex):
        # If it’s a single ticker (one unique second-level), just drop the ticker level
        if df.columns.nlevels == 2 and len(df.columns.get_level_values(1).unique()) == 1:
            df.columns = df.columns.get_level_values(0)
        else:
            # Otherwise, join levels with underscore
            df.columns = ["_".join(map(str, tup)).strip() for tup in df.columns.to_flat_index()]

    df = df.reset_index()

    # Normalize column names to snake_case
    df.columns = [str(c).strip().lower().replace(" ", "_") for c in df.columns]

    # Unify common synonyms
    if "adj_close" in df.columns and "adjusted_close" not in df.columns:
        df = df.rename(columns={"adj_close": "adjusted_close"})
    if "datetime" in df.columns and "date" not in df.columns:
        df = df.rename(columns={"datetime": "date"})
    if "index" in df.columns and "date" not in df.columns:
        df = df.rename(columns={"index": "date"})

    # Parse types
    df["date"] = pd.to_datetime(df["date"], errors="coerce")

    for col in ["open", "high", "low", "close", "adjusted_close", "volume"]:
        if col in df.columns:
            df[col] = pd.to_numeric(df[col], errors="coerce")

    # Keep only the columns we care about (those that exist)
    needed = ["date", "open", "high", "low", "close", "adjusted_close", "volume"]
    df = df[[c for c in needed if c in df.columns]].sort_values("date").reset_index(drop=True)
    return df


# Choose primary source
SYMBOL = "ASML"
try:
    if not ALPHA_KEY:
        raise RuntimeError("No ALPHAVANTAGE_API_KEY; using yfinance fallback.")
    api_df = fetch_alphavantage_daily(SYMBOL, ALPHA_KEY, outputsize="compact")
    src = "alphavantage"
except Exception as e:
    print("Alpha Vantage failed or key missing →", e)
    api_df = fetch_yfinance_daily(SYMBOL, period="5y", interval="1d")
    src = "yfinance"

# Validate and save
req = ["date","close","adjusted_close","volume"]
types = {
    "date": "datetime64[ns]",
    "close": "float",
    "adjusted_close": "float",
    "volume": "float",   # treat as float; can cast to Int64 later if you want
}
api_df, msgs = validate_df(api_df, req, types)
print(api_df.dtypes)
print("Validation:", msgs)


stamp = now_stamp()
api_path = RAW_DIR / f"api_{src}_{SYMBOL}_{stamp}.csv"
api_df.to_csv(api_path, index=False)
api_path


Alpha Vantage failed or key missing → No ALPHAVANTAGE_API_KEY; using yfinance fallback.
date              datetime64[ns]
open                     float64
high                     float64
low                      float64
close                    float64
adjusted_close           float64
volume                     int64
dtype: object
Validation: {'shape': '(1256, 7)', 'na_counts': {'date': 0, 'open': 0, 'high': 0, 'low': 0, 'close': 0, 'adjusted_close': 0, 'volume': 0}}


WindowsPath('C:/Users/melin/OneDrive/Desktop/nyu/python/bootcamp_panagiotis_housos/project/data/raw/api_yfinance_ASML_20250816-0017.csv')

Scrape a small permitted table (Wikipedia)

In [3]:
WIKI_URL = "https://en.wikipedia.org/wiki/ASML_Holding"

headers = {"User-Agent": "Mozilla/5.0 (compatible; IngestionBot/0.1; +edu-project)"}
resp = requests.get(WIKI_URL, headers=headers, timeout=30)
resp.raise_for_status()

from io import StringIO

# Use pandas to parse HTML tables (requires lxml)
tables = pd.read_html(StringIO(resp.text)); print(f"Found {len(tables)} tables.")

# Pick a simple table with numeric-like content (fallback to the first wikitable-like)
# Heuristic: choose the widest table with at least 2 columns and > 3 rows
cand = sorted(
    (t for t in tables if isinstance(t, pd.DataFrame) and t.shape[1] >= 2 and t.shape[0] >= 3),
    key=lambda x: (x.shape[0]*x.shape[1]),
    reverse=True
)
scrape_df = cand[0].copy() if cand else tables[0].copy()

# Basic cleaning: try to coerce numeric-looking columns safely
for c in scrape_df.columns:
    if scrape_df[c].dtype == "object":
        col = (
            scrape_df[c].astype(str)
            .str.replace(r"\[.*?\]", "", regex=True)  # drop refs like [1]
            .str.replace(",", "", regex=False)
            .str.replace("%", "", regex=False)
            .str.strip()
        )
        # Try numeric conversion; if not mostly numeric, keep as cleaned text
        numeric_try = pd.to_numeric(col, errors="coerce")
        ratio_numeric = numeric_try.notna().mean()  # fraction successfully parsed
        scrape_df[c] = numeric_try if ratio_numeric >= 0.6 else col


# Minimal validation: non-empty, at least 2 cols
if scrape_df.empty or scrape_df.shape[1] < 2:
    raise ValueError("Scraped table is empty or too narrow.")

# Save raw CSV
scrape_path = RAW_DIR / f"scrape_wikipedia_ASML_{now_stamp()}.csv"
scrape_df.to_csv(scrape_path, index=False)
scrape_path


Found 15 tables.


WindowsPath('C:/Users/melin/OneDrive/Desktop/nyu/python/bootcamp_panagiotis_housos/project/data/raw/scrape_wikipedia_ASML_20250816-0010.csv')

In [6]:
doc = {
    "api_source": "yfinance",           # or "alphavantage"
    "symbol": "ASML",
    "api_file": str(api_path),
    "scrape_source": "wikipedia",
    "scrape_url": "https://en.wikipedia.org/wiki/ASML_Holding",
    "scrape_file": str(scrape_path),
    "run_timestamp": now_stamp(),
    "validation_api": msgs,             # from validate_df
    "assumptions_risks": [
        "EOD data; educational use only.",
        "API schema/limits may change; yfinance fallback used.",
        "Scraped HTML structure may shift; table selection is heuristic.",
    ],
}
import json, pprint; print(json.dumps(doc, indent=2)[:1200])


{
  "api_source": "yfinance",
  "symbol": "ASML",
  "api_file": "C:\\Users\\melin\\OneDrive\\Desktop\\nyu\\python\\bootcamp_panagiotis_housos\\project\\data\\raw\\api_yfinance_ASML_20250816-0017.csv",
  "scrape_source": "wikipedia",
  "scrape_url": "https://en.wikipedia.org/wiki/ASML_Holding",
  "scrape_file": "C:\\Users\\melin\\OneDrive\\Desktop\\nyu\\python\\bootcamp_panagiotis_housos\\project\\data\\raw\\scrape_wikipedia_ASML_20250816-0010.csv",
  "run_timestamp": "20250816-0019",
  "validation_api": {
    "shape": "(1256, 7)",
    "na_counts": {
      "date": 0,
      "open": 0,
      "high": 0,
      "low": 0,
      "close": 0,
      "adjusted_close": 0,
      "volume": 0
    }
  },
  "assumptions_risks": [
    "EOD data; educational use only.",
    "API schema/limits may change; yfinance fallback used.",
    "Scraped HTML structure may shift; table selection is heuristic."
  ]
}
