In [2]:
print(os.getenv("TICKERS"))


../../05_src/data/tickers/sp500_wiki.csv


In [1]:
import os
import logging
import yfinance as yf
import dask.dataframe as dd
import pandas as pd
from dotenv import load_dotenv
from concurrent.futures import ThreadPoolExecutor

# Load environment variables
load_dotenv()
TICKERS = os.getenv("TICKERS", "AAPL,MSFT,GOOGL").split(",")
DATA_DIR = os.getenv("DATA_DIR", "data")
PARQUET_PATH = os.path.join(DATA_DIR, "stock_data.parquet")

# Ensure the data directory exists
os.makedirs(DATA_DIR, exist_ok=True)

# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

def fetch_stock_data(ticker):
    """Fetch historical stock data for a given ticker using Yahoo Finance."""
    try:
        stock = yf.Ticker(ticker)
        df = stock.history(start="2013-12-01", end="2024-02-01")
        if df.empty:
            logger.warning(f"No data for {ticker}")
            return None
        df["ticker"] = ticker  # Add ticker column for multi-stock processing
        return df
    except Exception as e:
        logger.error(f"Error fetching data for {ticker}: {e}")
        return None

def fetch_all_stocks(tickers):
    """Fetch data for all tickers in parallel using ThreadPoolExecutor."""
    with ThreadPoolExecutor(max_workers=10) as executor:
        results = list(executor.map(fetch_stock_data, tickers))
    return [df for df in results if df is not None]

def save_to_parquet(dataframes, path):
    """Save stock data to a Parquet file using Dask for efficiency."""
    if not dataframes:
        logger.warning("No valid stock data to save.")
        return
    
    df = pd.concat(dataframes, axis=0)
    ddf = dd.from_pandas(df, npartitions=4)
    ddf.to_parquet(path, engine="pyarrow", write_index=False)
    logger.info(f"Data saved to {path}")

def main():
    """Main function to execute the pipeline."""
    logger.info("Starting stock data pipeline...")
    stock_data = fetch_all_stocks(TICKERS)
    save_to_parquet(stock_data, PARQUET_PATH)
    logger.info("Pipeline execution completed.")

if __name__ == "__main__":
    main()


2025-01-30 18:03:40,798 - INFO - Starting stock data pipeline...
2025-01-30 18:03:42,049 - ERROR - Failed to get ticker '../../05_SRC/DATA/TICKERS/SP500_WIKI.CSV' reason: Expecting value: line 1 column 1 (char 0)
2025-01-30 18:03:42,052 - ERROR - $../../05_SRC/DATA/TICKERS/SP500_WIKI.CSV: possibly delisted; no timezone found
2025-01-30 18:03:42,054 - INFO - Pipeline execution completed.
