In [1]:
import os
import time
import json
import requests
import pandas as pd
from datetime import datetime
import yfinance as yf
from dotenv import load_dotenv

## Fetching Price from alphavantage API

In [None]:
# data/alphavantage.py
RAW_DIR = "../raw"
os.makedirs(RAW_DIR, exist_ok=True)

class AlphaVantagePriceFetcher:
    """
    Fetch daily OHLCV data for stocks, FX, crypto, or commodities.
    Priority:
      1. Local CSV if exists
      2. Alpha Vantage (if API key available)
      3. yfinance fallback
    """
    BASE_URL = "https://www.alphavantage.co/query"
    def __init__(self, api_key_env="ALPHAVANTAGE_API_KEY"):
        self.api_key = os.environ.get(api_key_env, None)
        
    def _load_local_csv(self, symbol):
        csv_path = f"{RAW_DIR}/{symbol}.csv"
        if os.path.exists(csv_path):
            df = pd.read_csv(csv_path, parse_dates=["date"])
            return df
        return None
    
    def _save_csv(self, df: pd.DataFrame, symbol: str):
        df.to_csv(f"{RAW_DIR}/{symbol}.csv", index=False)
    
    def _fetch_alpha_vantage(self, symbol: str, asset_type: str):
        """
        asset_type: stock, fx, crypto, commodity
        """
        if not self.api_key:
            return None
        function_map = {
            "stock": "TIME_SERIES_DAILY_ADJUSTED",
            "fx": "FX_DAILY",
            "crypto": "DIGITAL_CURRENCY_DAILY",
            "commodity": "TIME_SERIES_DAILY"
        }

        params = {
            "apikey": self.api_key,
            "function": function_map.get(asset_type, "TIME_SERIES_DAILY_ADJUSTED"),
            "symbol": symbol,
            "outputsize": "full",
            "datatype": "json"
        }
        
        # FX pairs require special params
        if asset_type == "fx":
            if "/" in symbol:
                from_sym, to_sym = symbol.split("/")
            else:
                from_sym, to_sym = symbol[:3], symbol[3:]
            params["from_symbol"] = from_sym
            params["to_symbol"] = to_sym
            params.pop("symbol")
        # Crypto requires special params
        if asset_type == "crypto":
            params["market"] = "USD"
        # Retry with exponential backoff for rate limits
        for retry in range(5):
            r = requests.get(self.BASE_URL, params=params)
            if r.status_code == 200:
                data = r.json()
                print(data)
                if "Error Message" in data or "Note" in data:
                    # Probably rate limited or symbol invalid
                    time.sleep(2 ** retry)
                    continue
                return data
            time.sleep(2 ** retry)
        return None   
    def _parse_alpha_vantage(self, data: dict, symbol: str, asset_type: str):
        """
        Turns AlphaVantage JSON into standardized OHLCV DataFrame.
        """
        key_map = {
            "stock": "Time Series (Daily)",
            "commodity": "Time Series (Daily)",
            "fx": "Time Series FX (Daily)",
            "crypto": "Time Series (Digital Currency Daily)"
        }
        key = key_map.get(asset_type, "Time Series (Daily)")
        if key not in data:
            return None

        raw = data[key]
        records = []
        for date_str, vals in raw.items():
            date = datetime.strptime(date_str, "%Y-%m-%d")

            if asset_type == "crypto":
                row = {
                    "date": date,
                    "open": float(vals["1. open"]),
                    "high": float(vals["2. high"]),
                    "low": float(vals["3. low"]),
                    "close": float(vals["4. close"]),
                    "volume": float(vals["5. volume"])
                }
            elif asset_type == "fx":
                row = {
                    "date": date,
                    "open": float(vals["1. open"]),
                    "high": float(vals["2. high"]),
                    "low": float(vals["3. low"]),
                    "close": float(vals["4. close"]),
                    "volume": float("0")  # FX has no volume
                }
            else:
                row = {
                    "date": date,
                    "open": float(vals["1. open"]),
                    "high": float(vals["2. high"]),
                    "low": float(vals["3. low"]),
                    "close": float(vals["4. close"]),
                    "volume": float(vals.get("6. volume", 0))
                }

            row["symbol"] = symbol
            row["source"] = "alpha_vantage"
            records.append(row)

        df = pd.DataFrame(records)
        df.sort_values("date", inplace=True)
        return df
    
    def _fetch_yfinance(self, symbol: str):
        try:
            df = yf.download(symbol, period="max", interval="1d")
            if df.empty:
                return None

            df = df.rename(columns={
                "Open": "open",
                "High": "high",
                "Low": "low",
                "Close": "close",
                "Volume": "volume"
            })
            df["date"] = df.index
            df["symbol"] = symbol
            df["source"] = "yfinance"
            df = df.reset_index(drop=True)
            return df
        except:
            return None

    def fetch(self, symbol: str, asset_type: str = "stock"):
        # 1. Local CSV
        local_df = self._load_local_csv(symbol)
        if local_df is not None:
            return local_df

        # 2. Alpha Vantage
        data = self._fetch_alpha_vantage(symbol, asset_type)
        if data:
            print("Alpha Vantage data fetched.")
            df = self._parse_alpha_vantage(data, symbol, asset_type)
            if df is not None:
                self._save_csv(df, symbol)
                return df

        # 3. yfinance fallback
        df = self._fetch_yfinance(symbol)
        if df is not None:
            self._save_csv(df, symbol)
            return df

        return None


In [None]:
# data/ingest.py
# from .alphavantage import AlphaVantagePriceFetcher
# Dependency Inversion to bulk download for different assets

def ingest_price(symbol: str, asset_type: str = "stock"):
    fetcher = AlphaVantagePriceFetcher()
    df = fetcher.fetch(symbol, asset_type)
    return df

def batch_ingest(symbols: list, asset_type="stock"):
    results = {}
    for s in symbols:
        results[s] = ingest_price(s, asset_type)
    return results

In [4]:
print(load_dotenv(dotenv_path="../../config.env"))  # Load environment variables from .env file if present
fetcher = AlphaVantagePriceFetcher()
symbol = "BTC"
df = fetcher.fetch(symbol, asset_type="crypto")
print(df.info())

True
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 5627 entries, 0 to 5626
Data columns (total 8 columns):
 #   Column  Non-Null Count  Dtype         
---  ------  --------------  -----         
 0   date    5627 non-null   datetime64[ns]
 1   open    5627 non-null   float64       
 2   high    5627 non-null   float64       
 3   low     5627 non-null   float64       
 4   close   5627 non-null   float64       
 5   volume  5627 non-null   float64       
 6   symbol  5627 non-null   object        
 7   source  5627 non-null   object        
dtypes: datetime64[ns](1), float64(5), object(2)
memory usage: 351.8+ KB
None


## Fetching several economic indicators 

In [None]:
# data/econ_indicators.py
ECON_DIR = "../econ"
os.makedirs(ECON_DIR, exist_ok=True)

class EconIndicatorsFetcher:
    """
    Fetchers economic indicators: 
        - Fed Funds Rate
        - CPI 
        - PPI 
        - Unemployment Rate
        ....
    Sources:
        - Local CSVs
        - FRED API
        - Alpha Vantage macro endpoints
    """
    FRED_URL = "https://api.stlouisfed.org/fred/series/observations"
    
    def __init__(self, fred_key_env="FRED_API_KEY", alpha_key_env="ALPHAVANTAGE_API_KEY"):
        self.fred_key = os.environ.get(fred_key_env, None)
        print(self.fred_key)
        self.alpha_key = os.environ.get(alpha_key_env, None)
        print(self.alpha_key)
        
    def _local_csv(self, name):
        path = f"{ECON_DIR}/{name}.csv"
        if os.path.exists(path):
            return pd.read_csv(path, parse_dates=["date"])
        return None
    
    def _save_csv(self, df, name):
        df.to_csv(f"{ECON_DIR}/{name}.csv", index=False)
        
    def _fetch_fred(self, series_id: str):
        """
        Fetch from FRED API 
        Args:
            series_id (str): type of data we fetch
        """
        if not self.fred_key:
            return None
        params = {
            "series_id": series_id,
            "api_key": self.fred_key,
            "file_type": "json"
            #"observation_start": "1900-01-01"
        }
        r = requests.get(self.FRED_URL, params=params)
        if r.status_code != 200:
            return None
        data = r.json()
        if "observations" not in data:
            return None
        rows = []
        for obs in data["observations"]:
            date = obs["date"]
            val = obs["value"]
            try:
                val = float(val)
            except:
                continue
            rows.append({"date": pd.to_datetime(date), "value": val})
        df = pd.DataFrame(rows)
        df.sort_values("date", inplace=True)
        return df
    
    def _ffill_daily(self, df, start,end):
        """
        Convert to daily, then forward fill

        Args:
            df (_type_): _description_
            start (_type_): _description_
            end (_type_): _description_
        """
        daily = pd.DataFrame({"date": pd.date_range(start=start, end=end, freq="D")})
        merged = pd.merge(daily, df, on="date", how="left")
        merged["value"] = merged["value"].ffill()
        return merged
    
    def fetch_indicator(self, name:str, fred_series: str):
        """
        General Indicator fetcher with fallback
        Args:
            name (str): _description_
            fred_series (str): _description_
        """
        # Local CSV
        local = self._local_csv(name)
        if local is not None:
            return local

        # FRED API
        df = self._fetch_fred(fred_series)
        if df is not None:
            self._save_csv(df, name)
            return df

        # Fallback: empty CSV for compatibility
        empty = pd.DataFrame({"date": [], "value": []})
        self._save_csv(empty, name)
        return empty
    
    def fetch_all(self):
        indicators = {
        # CORE INTEREST & LABOR
        "fed_funds_rate": "FEDFUNDS",
        "unemployment_rate": "UNRATE",
        "nonfarm_payrolls": "PAYEMS",       # All Employees, Total Nonfarm (Thousands of Persons)

        # PRICE INDICES
        "cpi": "CPIAUCSL",                  # Consumer Price Index for All Urban Consumers: All Items (Seasonally Adjusted)
        "ppi": "PPIACO",                    # Producer Price Index: All Commodities (Not Seasonally Adjusted)
        
        # GROWTH & SPENDING (Quarterly/Monthly)
        "gdp": "GDPC1",                     # Real Gross Domestic Product (Quarterly, Billions of Chained 2017 Dollars)
        "retail_sales": "RSXFS",            # Advance Retail Sales: Retail Trade (Monthly, Seasonally Adjusted)
        "industrial_production": "INDPRO",  # Industrial Production Index (Monthly)
        "building_permits": "PERMIT",       # New Private Housing Units Authorized by Building Permits (Monthly)
        
        # SENTIMENT & TRADE
        "consumer_confidence": "UMCSENT",   # University of Michigan: Consumer Sentiment (Monthly)
        "trade_balance": "NETEXC",          # Net Exports of Goods and Services (Quarterly, Billions of Dollars)
        
        # MONEY SUPPLY (M3 is discontinued, M1/M2 are standard)
        "money_supply_m1": "M1SL",          # M1 Money Stock (Monthly)
        "money_supply_m2": "M2SL",          # M2 Money Stock (Monthly)
        
        # INFLATION (often calculated, but PCE is an alternative index)
        "pce_inflation": "PCEPI",           # Personal Consumption Expenditures Price Index (Monthly)
        }
        out = {}
        for name, series_id in indicators.items():
            df = self.fetch_indicator(name, series_id)
            out[name] = df
        return out
    
    def generate_daily_econ(self, start="2000-01-01", end=None):
        """
        Produces daily versions of all indicators.
        """
        if end is None:
            end = datetime.today().strftime("%Y-%m-%d")

        raw = self.fetch_all()
        daily_frames = []

        for name, df in raw.items():
            if df.empty:
                continue
            ddf = self._ffill_daily(df, start, end)
            ddf["indicator"] = name
            daily_frames.append(ddf)

        if not daily_frames:
            return None

        result = pd.concat(daily_frames, axis=0)
        result = result.sort_values("date")

        out_path = f"{ECON_DIR}/daily_econ.csv"
        result.to_csv(out_path, index=False)
        return result

In [10]:
print(load_dotenv(dotenv_path="../../config.env"))  # Load environment variables from .env file if present
econ = EconIndicatorsFetcher()
df = econ.fetch_all()
print(df)

True
4f284382df023beeb5441b4f78d69fb7
SU3Q55SZG98ZHARI
{'fed_funds_rate':           date  value
0   1954-07-01   0.80
1   1954-08-01   1.22
2   1954-09-01   1.07
3   1954-10-01   0.85
4   1954-11-01   0.83
..         ...    ...
852 2025-07-01   4.33
853 2025-08-01   4.33
854 2025-09-01   4.22
855 2025-10-01   4.09
856 2025-11-01   3.88

[857 rows x 2 columns], 'unemployment_rate':           date  value
0   1948-01-01    3.4
1   1948-02-01    3.8
2   1948-03-01    4.0
3   1948-04-01    3.9
4   1948-05-01    3.5
..         ...    ...
928 2025-05-01    4.2
929 2025-06-01    4.1
930 2025-07-01    4.2
931 2025-08-01    4.3
932 2025-09-01    4.4

[933 rows x 2 columns], 'nonfarm_payrolls':            date     value
0    1939-01-01   29923.0
1    1939-02-01   30100.0
2    1939-03-01   30280.0
3    1939-04-01   30094.0
4    1939-05-01   30299.0
...         ...       ...
1036 2025-05-01  159452.0
1037 2025-06-01  159439.0
1038 2025-07-01  159511.0
1039 2025-08-01  159507.0
1040 2025-09-01  1596