In [0]:
%pip install yfinance requests

In [0]:
import yfinance as yf
import pandas as pd
from datetime import datetime
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, DateType, TimestampType

In [0]:
%sql
CREATE SCHEMA IF NOT EXISTS bronze.stock_data;

In [0]:
schema = StructType([
    StructField("symbol", StringType(), False),
    StructField("date", DateType(), False),
    StructField("close", DoubleType(), True),
    StructField("dividends", DoubleType(), True),
    StructField("stock_splits", DoubleType(), True),
    StructField("ingestion_timestamp", TimestampType(), False)
])

In [0]:
def fetch_stock_data(symbol):
    try:
        ticker = yf.Ticker(symbol)
        hist = ticker.history(period="10y", interval="1wk")
        
        if hist.empty:
            return None
            
        hist = hist.reset_index()
        hist['symbol'] = symbol
        hist['ingestion_timestamp'] = datetime.now()
        
        return hist[['symbol', 'Date', 'Close', 'Dividends', 'Stock Splits', 'ingestion_timestamp']]
        
    except Exception as e:
        print(f"Failed {symbol}: {str(e)}")
        return None

In [0]:
tickers = spark.table("bronze.stock_reference.company_info").select("symbol").toPandas()['symbol'].tolist()

batch_size = 100
total_rows = 0

for i in range(0, len(tickers), batch_size):
    batch = tickers[i:i+batch_size]
    print(f"Batch {i//batch_size + 1}: Processing {len(batch)} stocks")
    
    batch_data = []
    for symbol in batch:
        data = fetch_stock_data(symbol)
        if data is not None:
            batch_data.append(data)
    
    if batch_data:
        combined = pd.concat(batch_data, ignore_index=True)
        combined.columns = ['symbol', 'date', 'close', 'dividends', 'stock_splits', 'ingestion_timestamp']
        
        spark.createDataFrame(combined, schema=schema).write \
            .format("delta") \
            .mode("append") \
            .partitionBy("symbol") \
            .saveAsTable("bronze.stock_data.price_history")
        
        total_rows += len(combined)
        print(f"Saved {len(combined)} rows (Total: {total_rows})")

print(f"\n Complete: {total_rows} rows loaded")

In [0]:
%sql
select 
    count(distinct symbol) as stocks_loaded,
    min(date) as earliest_date,
    max(date) as latest_date,
    sum(case when dividends > 0 then 1 else 0 end) as dividend_payments
from bronze.stock_data.price_history