# Stock Market Data ETL Pipeline

This notebook demonstrates how to extract stock market data and fundamentals from Yahoo Finance and store them in a database for further analysis.

## Overview

This notebook covers:
- Connecting to Yahoo Finance API through yfinance
- Extracting end-of-day market data and fundamentals
- Processing and transforming the data
- Storing market data in a database

## Prerequisites

Before running this notebook, ensure you have:
- Required Python packages installed (yfinance, pandas, keyring)
- Database credentials stored in keyring
- Appropriate database schema and functions
- Internet connection for Yahoo Finance API access

## Required Libraries

In [3]:
import sys
import os

# Get the current notebook's directory and go up to parent
current_dir = os.getcwd()
parent_dir = os.path.dirname(os.path.dirname(current_dir))

if parent_dir not in sys.path:
    sys.path.append(parent_dir)

print(f"Current directory: {current_dir}")
print(f"Added to path: {parent_dir}")

# Verify the data_engineering folder exists
data_eng_path = os.path.join(parent_dir, 'data_engineering')
print(f"data_engineering exists: {os.path.exists(data_eng_path)}")

if os.path.exists(data_eng_path):
    print(f"data_engineering contents: {os.listdir(data_eng_path)}")

Current directory: c:\Users\menon\OneDrive\Documents\SourceCode\InvestmentManagement\toolkit\notebooks
Added to path: c:\Users\menon\OneDrive\Documents\SourceCode\InvestmentManagement
data_engineering exists: True
data_engineering contents: ['database', 'eod_data', 'fundamental_data', '__init__.py', '__pycache__']


In [4]:
import datetime as dt
import json
from urllib.request import urlopen

import certifi
import keyring
import pandas as pd

from data_engineering.database import db_functions as database
from data_engineering.eod_data import yahoo_functions as yahoo

## Utility Functions

### Date Configuration Function

This function provides flexible date range configuration for different data retrieval scenarios:

In [5]:
def get_latest_trading_day():
    """
    Get the latest trading day (excluding weekends).
    For market data, we typically want the most recent business day.
    
    Returns:
        str: Latest trading day in 'YYYY-MM-DD' format
    """
    today = dt.datetime.now()
    # If it's weekend, go back to Friday
    while today.weekday() >= 5:  # 5 = Saturday, 6 = Sunday
        today -= dt.timedelta(days=1)
    return today.strftime('%Y-%m-%d')

def configure_date_range(mode='latest_eod', days_back=1):
    """
    Configure date range for data retrieval.
    
    Args:
        mode (str): 'latest_eod', 'recent', 'historical', or 'ytd'
        days_back (int): Number of days back (used for 'recent' mode)
        
    Returns:
        tuple: (start_date, end_date) in 'YYYY-MM-DD' format
    """
    if mode == 'latest_eod':
        # Get just the latest end-of-day data
        latest_day_str = get_latest_trading_day()
        latest_day_obj = dt.datetime.strptime(latest_day_str, '%Y-%m-%d')
        next_day_obj = latest_day_obj + dt.timedelta(days=1)
        next_day_str = next_day_obj.strftime('%Y-%m-%d')
        return latest_day_str, next_day_str
    elif mode == 'recent':
        end_date = get_latest_trading_day()
        start_date = (dt.datetime.strptime(end_date, '%Y-%m-%d') - dt.timedelta(days=days_back)).strftime('%Y-%m-%d')
        return start_date, end_date
    elif mode == 'ytd':
        start_date = f"{dt.datetime.now().year}-01-01"
        end_date = get_latest_trading_day()
        return start_date, end_date
    else:
        # Default to latest EOD
        latest_day = get_latest_trading_day()
        return latest_day, latest_day

# Configure default date range (latest end-of-day only)
start_date, end_date = configure_date_range('latest_eod')
print(f"üìÖ Default configuration: Latest End-of-Day data")
print(f"üìÖ Date range: {start_date} to {end_date}")

if start_date == end_date:
    print(f"üìä Fetching single day data for: {start_date}")
else:
    print(f"üìä Fetching data from {start_date} to {end_date}")

# Alternative configurations (uncomment as needed):
# Recent data (last 7 days)
# start_date, end_date = configure_date_range('recent', 7)

# Recent data (last 30 days)  
# start_date, end_date = configure_date_range('recent', 30)

# Year to date
# start_date, end_date = configure_date_range('ytd')

# Historical data (specific dates)
# start_date, end_date = '2023-01-01', '2023-12-31'

print(f"\nüéØ Final date range: {start_date} to {end_date}")

üìÖ Default configuration: Latest End-of-Day data
üìÖ Date range: 2025-08-15 to 2025-08-16
üìä Fetching data from 2025-08-15 to 2025-08-16

üéØ Final date range: 2025-08-15 to 2025-08-16


## Database Connection

### Secure Credential Management

**Note**: Ensure your database credentials are stored in the system keyring using the specified service name.

In [6]:
# Secure credentials using keyring
service_name = "ihub_sql_connection"
db = keyring.get_password(service_name, "db")
db_user = keyring.get_password(service_name, "uid")
db_password = keyring.get_password(service_name, "pwd")

print("Database credentials retrieved from keyring")

Database credentials retrieved from keyring


### Establishing Database Connection

In [7]:
# Establish database connection
try:
    engine, connection, session = database.get_db_connection()
    print("‚úÖ Database connection established successfully")
    
except Exception as e:
    print(f"‚ùå Database connection failed: {e}")
    print("Please check your credentials and database availability")

Database connection successful.
‚úÖ Database connection established successfully


## Security Master Data Extraction

### Reading Available Securities

In [8]:
# Read security master data from database
try:
    print("Reading security master data...")
    df_securities = database.read_security_master(orm_session=session, orm_engine=engine)
    print(f"‚úÖ Found {len(df_securities)} total securities")
    
    # Display sample data
    print("\nSecurity Master Data Sample:")
    display(df_securities.head())
    
    # Show active vs inactive breakdown
    active_count = df_securities['is_active'].sum()
    inactive_count = (df_securities['is_active'] == 0).sum()
    print(f"\nüìä Securities Summary:")
    print(f"   Active securities: {active_count}")
    print(f"   Inactive securities: {inactive_count}")
    print(f"   Total securities: {len(df_securities)}")
    
except Exception as e:
    print(f"‚ùå Error reading security master data: {e}")
    df_securities = pd.DataFrame()  # Create empty DataFrame for error handling

Reading security master data...
‚úÖ Found 159115 total securities

Security Master Data Sample:


Unnamed: 0,security_id,symbol,name,isin,sedol,cusip,figi,loanxid,country,currency,sector,industry_group,industry,security_type,asset_class,exchange,is_active,source_vendor,upsert_date,upsert_by
0,1,000002.SZ,"China Vanke Co., Ltd.",CNE100001SR9,,,,,China,CNY,Real Estate,Real Estate,Real Estate Management & Development,Common Stock,Equity,SHZ,0,FinanceDatabase,2025-06-13 17:10:30,mf
1,2,000004.SZ,two,,,,,,United States,CNY,Financials,Diversified Financials,Diversified Financial Services,Common Stock,Equity,SHZ,0,FinanceDatabase,2025-06-13 17:10:30,mf
2,3,000005.SZ,Shenzhen Fountain Corporation,CNE0000001L7,,,,,China,CNY,Real Estate,Real Estate,Real Estate Management & Development,Common Stock,Equity,SHZ,0,FinanceDatabase,2025-06-13 17:10:30,mf
3,4,000006.SZ,"Shenzhen Zhenye (Group) Co.,Ltd.",CNE000000164,,,,,China,CNY,Real Estate,Real Estate,Real Estate Management & Development,Common Stock,Equity,SHZ,0,FinanceDatabase,2025-06-13 17:10:30,mf
4,5,000007.SZ,"Shenzhen Quanxinhao Co., Ltd.",CNE0000000P0,,,,,China,CNY,Consumer Discretionary,Consumer Services,"Hotels, Restaurants & Leisure",Common Stock,Equity,SHZ,0,FinanceDatabase,2025-06-13 17:10:30,mf



üìä Securities Summary:
   Active securities: 105
   Inactive securities: 159010
   Total securities: 159115


### Active Securities Preparation

Filter and prepare the ticker DataFrame for data retrieval:

In [9]:
# Filter for active securities and prepare ticker DataFrame
if not df_securities.empty:
    df_active_securities = df_securities[df_securities["is_active"] == 1][["symbol", "security_id"]].rename(
        columns={"symbol": "ticker"}
    )
    
    print(f"üìã Active securities for data retrieval: {len(df_active_securities)}")
    print("\nActive Securities Sample:")
    display(df_active_securities.head(10))
    
    # Show ticker distribution by first letter (for curiosity)
    ticker_dist = df_active_securities['ticker'].str[0].value_counts().head()
    print(f"\nüìà Top ticker prefixes:")
    for letter, count in ticker_dist.items():
        print(f"   {letter}: {count} tickers")
        
else:
    print("‚ö†Ô∏è No securities data available - cannot proceed with data retrieval")
    df_active_securities = pd.DataFrame()

üìã Active securities for data retrieval: 105

Active Securities Sample:


Unnamed: 0,ticker,security_id
25522,AAPL,25523
25861,ABNB,25862
26481,ADBE,26482
26591,ADI,26592
26695,ADP,26696
26746,ADSK,26747
26982,AEP,26983
29103,AMAT,29104
29166,AMD,29167
29220,AMGN,29221



üìà Top ticker prefixes:
   A: 18 tickers
   C: 14 tickers
   M: 11 tickers
   P: 7 tickers
   T: 6 tickers


## Market Data Extraction

### End-of-Day Data Retrieval

**Note**: This process may take several minutes depending on the number of securities and date range.

In [10]:
# Get end-of-day market data using the improved function
if not df_active_securities.empty:
    try:
        print("üîÑ Fetching end-of-day market data...")
        print("‚è±Ô∏è This may take a few minutes depending on the number of securities...")
        print(f"üìÖ Date range: {start_date} to {end_date}")
        print(f"üéØ Target securities: {len(df_active_securities)}")
        
        df_eod = yahoo.get_stock_data(
            ticker_df=df_active_securities,
            start_date=start_date, 
            end_date=end_date,
            interval="1d"
        )
        
        if not df_eod.empty:
            print(f"\n‚úÖ Successfully retrieved {len(df_eod)} market data records")
            print(f"üìä Data Summary:")
            print(f"   Date range: {df_eod['as_of_date'].min()} to {df_eod['as_of_date'].max()}")
            print(f"   Unique securities: {df_eod['security_id'].nunique()}")
            print(f"   Trading days: {df_eod['as_of_date'].nunique()}")
            print(f"   Average volume: {df_eod['volume'].mean():.0f}")
            
            print("\nüìã Market Data Sample:")
            display(df_eod.head())
            
        else:
            print("‚ùå No market data retrieved - check tickers and date range")
            
    except Exception as e:
        print(f"‚ùå Error fetching market data: {e}")
        df_eod = pd.DataFrame()  # Create empty DataFrame for error handling
        
else:
    print("‚ö†Ô∏è Skipping market data retrieval - no active securities available")
    df_eod = pd.DataFrame()

üîÑ Fetching end-of-day market data...
‚è±Ô∏è This may take a few minutes depending on the number of securities...
üìÖ Date range: 2025-08-15 to 2025-08-16
üéØ Target securities: 105


$ANSS: possibly delisted; no price data found  (1d 2025-08-15 -> 2025-08-16)


No data found for ticker: ANSS


HTTP Error 404: 
$UBER-ORIGINAL: possibly delisted; no timezone found


No data found for ticker: UBER-original
Tickers with no data: ['ANSS', 'UBER-original']

‚úÖ Successfully retrieved 103 market data records
üìä Data Summary:
   Date range: 2025-08-15 to 2025-08-15
   Unique securities: 103
   Trading days: 1
   Average volume: 14158162

üìã Market Data Sample:


Unnamed: 0,as_of_date,security_id,open,high,low,close,adj_close,volume,dividends,stock_splits,dataload_date,interval,Capital Gains
0,2025-08-15,25523,234.0,234.28,229.34,231.59,231.59,56010500,0.0,0.0,2025-08-17 21:55:53,1d,
1,2025-08-15,25862,125.0,126.345,124.33,125.1,125.1,4307800,0.0,0.0,2025-08-17 21:55:53,1d,
2,2025-08-15,26482,348.33,357.29,348.0,354.85,354.85,3717400,0.0,0.0,2025-08-17 21:55:53,1d,
3,2025-08-15,26592,237.21,237.21,231.04,231.63,231.63,3826400,0.0,0.0,2025-08-17 21:55:53,1d,
4,2025-08-15,26696,301.31,304.03,300.06,301.79,301.79,1362800,0.0,0.0,2025-08-17 21:55:53,1d,


### Market Data Quality Check

Validate the retrieved data before database storage:

In [11]:
# Perform data quality checks
if not df_eod.empty:
    print("üîç Data Quality Analysis:")
    print("="*40)
    
    # Check for missing data
    missing_data = df_eod.isnull().sum()
    if missing_data.sum() > 0:
        print("‚ö†Ô∏è Missing data found:")
        for col, count in missing_data[missing_data > 0].items():
            print(f"   {col}: {count} missing values")
    else:
        print("‚úÖ No missing data detected")
    
    # Check for suspicious values
    zero_volume = df_eod[df_eod['volume'] == 0]
    if len(zero_volume) > 0:
        print(f"‚ö†Ô∏è Found {len(zero_volume)} records with zero volume")
    
    negative_prices = df_eod[df_eod['close'] <= 0]
    if len(negative_prices) > 0:
        print(f"‚ùå Found {len(negative_prices)} records with negative/zero prices")
    else:
        print("‚úÖ All prices are positive")
    
    # Price range analysis
    print(f"\nüí∞ Price Analysis:")
    print(f"   Minimum close price: ${df_eod['close'].min():.2f}")
    print(f"   Maximum close price: ${df_eod['close'].max():.2f}")
    print(f"   Average close price: ${df_eod['close'].mean():.2f}")
    
else:
    print("‚ö†Ô∏è No data available for quality check")

üîç Data Quality Analysis:
‚ö†Ô∏è Missing data found:
   Capital Gains: 102 missing values
‚úÖ All prices are positive

üí∞ Price Analysis:
   Minimum close price: $11.85
   Maximum close price: $5454.29
   Average close price: $313.30


## Database Storage Operations

### Market Data Storage

In [12]:
# Write market data to database
if not df_eod.empty:
    try:
        print("üíæ Writing market data to database...")
        print(f"üìù Records to write: {len(df_eod)}")
        
        database.write_market_data(df_eod, session)
        print("‚úÖ Market data written to database successfully")
        
    except Exception as e:
        print(f"‚ùå Error writing market data: {e}")
        print("üîÑ Rolling back transaction...")
        session.rollback()
        print("‚úÖ Transaction rolled back")
        
else:
    print("‚ö†Ô∏è Skipping database write - no market data to store")

üíæ Writing market data to database...
üìù Records to write: 103
‚úÖ Market data written to database successfully


## Fundamentals Data Extraction

### Company Fundamentals Retrieval

In [13]:
if not df_active_securities.empty:
    try:
        print("üìä Fetching fundamentals data...")
        print("‚è±Ô∏è This may take a few minutes...")
        
        df_fundamentals = yahoo.fetch_fundamentals(
            securities_df=df_active_securities,
            metrics=["sharesOutstanding", "marketCap"]
        )
        
        if not df_fundamentals.empty:
            print(f"\n‚úÖ Retrieved fundamentals for {len(df_fundamentals)} records")
            print("\nüìã Fundamentals Data Sample:")
            display(df_fundamentals.head())
            
            # Show fundamentals summary by metric type
            metric_counts = df_fundamentals['metric_type'].value_counts()
            print(f"\nüìä Metrics Retrieved:")
            for metric, count in metric_counts.items():
                print(f"   {metric}: {count} securities")
            
            # Show market cap analysis if available
            market_cap_data = df_fundamentals[df_fundamentals['metric_type'] == 'marketCap']
            if not market_cap_data.empty:
                print(f"\nüíº Market Cap Analysis:")
                print(f"   Average Market Cap: ${market_cap_data['metric_value'].mean()/1e9:.2f}B")
                print(f"   Largest Market Cap: ${market_cap_data['metric_value'].max()/1e9:.2f}B")
                print(f"   Smallest Market Cap: ${market_cap_data['metric_value'].min()/1e9:.2f}B")
                
        else:
            print("‚ùå No fundamentals data retrieved")
            
    except Exception as e:
        print(f"‚ùå Error fetching fundamentals: {e}")
        df_fundamentals = pd.DataFrame()  # Create empty DataFrame for consistency
        
else:
    print("‚ö†Ô∏è Skipping fundamentals retrieval - no active securities available")
    df_fundamentals = pd.DataFrame()

üìä Fetching fundamentals data...
‚è±Ô∏è This may take a few minutes...


HTTP Error 404: 



‚úÖ Retrieved fundamentals for 208 records

üìã Fundamentals Data Sample:


Unnamed: 0,security_id,metric_type,metric_value,source_vendor,effective_date,end_date
0,25523,sharesOutstanding,14840400000.0,Yahoo Finance,2025-08-17,
1,25523,marketCap,3436888000000.0,Yahoo Finance,2025-08-17,
2,25862,sharesOutstanding,429080000.0,Yahoo Finance,2025-08-17,
3,25862,marketCap,76590730000.0,Yahoo Finance,2025-08-17,
4,26482,sharesOutstanding,424200000.0,Yahoo Finance,2025-08-17,



üìä Metrics Retrieved:
   sharesOutstanding: 104 securities
   marketCap: 104 securities

üíº Market Cap Analysis:
   Average Market Cap: $323.60B
   Largest Market Cap: $4400.74B
   Smallest Market Cap: $11.94B


### Fundamentals Data Storage

In [14]:
# Write fundamentals data to database
if not df_fundamentals.empty:
    try:
        print("üíæ Writing fundamentals data to database...")
        print(f"üìù Records to write: {len(df_fundamentals)}")
        
        database.write_security_fundamentals(df_fundamentals, session)
        print("‚úÖ Fundamentals data written to database successfully")
        
    except Exception as e:
        print(f"‚ùå Error writing fundamentals data: {e}")
        print("üîÑ Rolling back transaction...")
        session.rollback()
        print("‚úÖ Transaction rolled back")
        
else:
    print("‚ö†Ô∏è Skipping database write - no fundamentals data to store")

üíæ Writing fundamentals data to database...
üìù Records to write: 208
No existing records found. Inserting 208 new records.
Security fundamentals data successfully written.
‚úÖ Fundamentals data written to database successfully


## Pipeline Summary and Cleanup

### Execution Summary

In [15]:
# Display comprehensive summary
print("\n" + "="*60)
print("üéØ STOCK MARKET DATA PIPELINE SUMMARY")
print("="*60)

print(f"üìÖ Date Range: {start_date} to {end_date}")
print(f"üéØ Target Securities: {len(df_active_securities) if not df_active_securities.empty else 0}")

print(f"\nüìä Data Retrieved:")
eod_records = len(df_eod) if not df_eod.empty else 0
fund_records = len(df_fundamentals) if not df_fundamentals.empty else 0

print(f"   üìà Market Data Records: {eod_records:,}")
if eod_records > 0:
    print(f"      - Unique Securities: {df_eod['security_id'].nunique()}")
    print(f"      - Trading Days: {df_eod['as_of_date'].nunique()}")

print(f"   üìã Fundamentals Records: {fund_records:,}")

print(f"\nüíæ Database Operations:")
print(f"   Market Data: {'‚úÖ Success' if eod_records > 0 else '‚ö†Ô∏è No Data'}")
print(f"   Fundamentals: {'‚úÖ Success' if fund_records > 0 else '‚ö†Ô∏è No Data'}")

# Calculate processing efficiency
if not df_active_securities.empty and eod_records > 0:
    success_rate = (df_eod['security_id'].nunique() / len(df_active_securities)) * 100
    print(f"\nüìà Processing Efficiency: {success_rate:.1f}%")

print("="*60)
print(f"‚è∞ Pipeline completed at: {dt.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")


üéØ STOCK MARKET DATA PIPELINE SUMMARY
üìÖ Date Range: 2025-08-15 to 2025-08-16
üéØ Target Securities: 105

üìä Data Retrieved:
   üìà Market Data Records: 103
      - Unique Securities: 103
      - Trading Days: 1
   üìã Fundamentals Records: 208

üíæ Database Operations:
   Market Data: ‚úÖ Success
   Fundamentals: ‚úÖ Success

üìà Processing Efficiency: 98.1%
‚è∞ Pipeline completed at: 2025-08-17 21:56:42


### Database Connection Cleanup

In [16]:
# Clean up database connections
try:
    if 'session' in locals() and session:
        session.close()
        print("‚úÖ Database session closed")
        
    if 'connection' in locals() and connection:
        connection.close()
        print("‚úÖ Database connection closed")
        
    if 'engine' in locals() and engine:
        engine.dispose()
        print("‚úÖ Database engine disposed")
        
    print("\nüéâ Pipeline execution completed successfully!")
    
except Exception as e:
    print(f"‚ö†Ô∏è Warning during cleanup: {e}")
    print("üéâ Pipeline execution completed with warnings!")

‚úÖ Database session closed
‚úÖ Database connection closed
‚úÖ Database engine disposed

üéâ Pipeline execution completed successfully!


## Summary

This notebook demonstrates a complete workflow for:

1. **üîó Database Connection**: Secure connection using keyring credentials
2. **üìã Data Preparation**: Loading and filtering active securities
3. **üìà Market Data Extraction**: Fetching end-of-day prices from Yahoo Finance
4. **üìä Fundamentals Extraction**: Retrieving company fundamental data
5. **üîç Data Quality Validation**: Checking data integrity before storage
6. **üíæ Data Storage**: Persisting processed data for analysis
7. **üßπ Resource Cleanup**: Proper connection management

### Key Features

- **Flexible Date Configuration**: Easy to modify date ranges for different scenarios
- **Robust Error Handling**: Graceful handling of API failures and data issues
- **Data Quality Checks**: Validation of retrieved data before storage
- **Progress Monitoring**: Clear status updates throughout the process
- **Resource Management**: Proper cleanup of database connections

### Next Steps

Consider enhancing this workflow with:
- **üìä Advanced Analytics**: Add data analysis and visualization cells
- **üîÑ Incremental Updates**: Only fetch new/changed data
- **üìß Alert System**: Email notifications for failures or anomalies
- **ü§ñ Automated Scheduling**: Set up regular execution via cron or task scheduler
- **üìù Detailed Logging**: Replace print statements with proper logging
- **üß™ Unit Tests**: Add validation tests for each component

### Troubleshooting

**Common Issues:**
- **üîê Authentication Error**: Verify keyring credentials are correctly stored
- **üåê Network Issues**: Check internet connection for Yahoo Finance API
- **üíæ Database Error**: Confirm database schema and permissions are correct
- **üìä Data Mismatch**: Ensure symbols in securities master are valid Yahoo Finance tickers
- **‚è∞ Timeout Issues**: Consider reducing batch size or increasing timeout values

**Performance Tips:**
- Run during off-peak hours for better API response times
- Consider chunking large security lists for better memory management
- Use appropriate date ranges - larger ranges take significantly longer