# Separated Stock Price and Financial Statement Data Collection

This notebook collects financial data for **US stocks with market cap > $1B** and separates:
1. **Stock price data** - Aligned to calendar quarter ends (March 31, June 30, Sept 30, Dec 31)
2. **Financial statement data** - Aligned to company fiscal quarters with calendar date mapping

**Key features:**
1. **Market cap filter**: Only collects data for stocks with market cap > $1B
2. **Rate limiting**: 750 API calls per minute
3. **Year-by-year collection**: Each year saved to separate files
4. **Fiscal/Calendar alignment**: Properly handles different fiscal year ends
5. **Enhanced metadata**: Includes company name, ETF/Fund flags
6. **Error tracking**: Detailed logs for debugging

**Output files per year:**
- `stock_prices_YYYY.csv`: ticker, company_name, quarter_end_date, stock_price, market_cap, mkt_cap_rank, industry, sector, isETF, isFund
- `financial_statements_YYYY.csv`: ticker, company_name, fiscal_quarter, calendar_date, debt_to_assets, book_to_market, earnings_yield, industry, sector


## Setup and Configuration


In [None]:
import requests
import pandas as pd
import time
from typing import Optional, List, Dict, Any, Tuple
from datetime import datetime, timedelta
import json
import os
from dotenv import load_dotenv
import numpy as np

load_dotenv(".env")
API = os.getenv("API")  

# Rate limiting configuration
API_CALLS_PER_MINUTE = 750
SECONDS_PER_CALL = 60 / API_CALLS_PER_MINUTE  # 0.08 seconds per call

# Session and timer for rate limiting
session = requests.Session()
LAST_API_CALL = 0.0

# Market cap threshold (1 billion)
MARKET_CAP_THRESHOLD = 1e9

print(f"Rate limit configured: {API_CALLS_PER_MINUTE} calls/minute ({SECONDS_PER_CALL:.2f} seconds/call)")
print(f"Market cap filter: > ${MARKET_CAP_THRESHOLD/1e9:.0f}B")


## Helper Functions


In [None]:
def get_json(url: str, params: Dict[str, Any] = {}) -> Optional[Any]:
    """Safely get JSON data from API with error handling and rate limit retry"""
    global LAST_API_CALL, session
    try:
        params['apikey'] = API
        elapsed = time.time() - LAST_API_CALL
        if elapsed < SECONDS_PER_CALL:
            time.sleep(SECONDS_PER_CALL - elapsed)
        response = session.get(url, params=params, timeout=10)
        LAST_API_CALL = time.time()
        if response.status_code == 429:
            print('⚠️  Rate limit hit! Waiting 30 seconds...')
            time.sleep(30)
            return get_json(url, params)
        response.raise_for_status()
        js = response.json()
        if isinstance(js, dict) and 'historical' in js:
            return js['historical']
        elif isinstance(js, list):
            return js
        else:
            return js
    except requests.exceptions.HTTPError as e:
        print(f'HTTP Error {e.response.status_code}: {e}')
        return None
    except Exception as e:
        print(f'Error fetching data: {e}')
        return None


In [None]:
def get_quarter_end_dates(year: int) -> List[datetime]:
    """Get calendar quarter end dates for a given year"""
    return [
        datetime(year, 3, 31),   # Q1
        datetime(year, 6, 30),   # Q2
        datetime(year, 9, 30),   # Q3
        datetime(year, 12, 31)   # Q4
    ]


In [None]:
def check_market_cap(ticker: str, year: int, precomputed: Optional[float] = None) -> Tuple[bool, Optional[float]]:
    """Check if ticker had market cap above threshold in given year"""
    if precomputed is not None:
        return precomputed > MARKET_CAP_THRESHOLD, precomputed
    try:
        start_date = f'{year}-01-01'
        end_date = f'{year}-12-31'
        mc_data = get_json(
            f'https://financialmodelingprep.com/api/v3/historical-market-capitalization/{ticker}',
            {'from': start_date, 'to': end_date}
        )
        if not mc_data:
            return False, None
        mc_df = pd.DataFrame(mc_data)
        avg_market_cap = mc_df['marketCap'].mean()
        return avg_market_cap > MARKET_CAP_THRESHOLD, avg_market_cap
    except Exception as e:
        print(f'Error checking market cap for {ticker}: {e}')
        return False, None


In [None]:
def get_bulk_profiles(tickers: List[str]) -> Dict[str, Any]:
    """Fetch company profiles in bulk."""
    data = get_json(f'https://financialmodelingprep.com/api/v3/profile/{','.join(tickers)}')
    profiles = {}
    if isinstance(data, list):
        for item in data:
            symbol = item.get('symbol')
            profiles[symbol] = item
    return profiles


In [None]:
def get_historical_tickers(year: int) -> List[str]:
    """Get list of US tickers that existed in a specific year"""
    print(f"Fetching ticker list for year {year}...")
    
    # Try to get historical ticker list from end of previous year
    date = f"{year-1}-12-31"
    
    # First try to get available stocks for that date
    available_stocks = get_json(
        f"https://financialmodelingprep.com/api/v3/available-traded/list",
        {"date": date}
    )
    
    if available_stocks:
        # Filter for US exchanges
        us_tickers = [
            stock["symbol"] for stock in available_stocks 
            if stock.get("exchangeShortName") in ["NYSE", "NASDAQ", "AMEX"]
            and len(stock["symbol"]) <= 5
            and "." not in stock["symbol"]
        ]
        print(f"✅ Found {len(us_tickers)} US tickers for {year}")
        return us_tickers
    
    # Fallback: use current ticker list with a warning
    print(f"⚠️  Could not get historical ticker list for {year}, using current list")
    tickers_data = get_json("https://financialmodelingprep.com/api/v3/stock/list")
    
    if tickers_data:
        # Filter for US exchanges and remove penny stocks
        us_tickers = [
            d["symbol"] for d in tickers_data 
            if d["exchangeShortName"] in ["NYSE", "NASDAQ"] 
            and (d.get("price") is not None and d.get("price", 0) > 5)
            and len(d["symbol"]) <= 5
            and "." not in d["symbol"]
        ]
        
        print(f"✅ Found {len(us_tickers)} current US tickers")
        return us_tickers
    else:
        print("❌ Failed to fetch ticker list. Using sample tickers.")
        return ["AAPL", "MSFT", "GOOGL", "AMZN", "TSLA", "META", "NVDA", "JPM", "JNJ", "V"]


## Stock Price Collection Functions


In [None]:
def collect_stock_prices_for_ticker(ticker: str, year: int, profile_data: Optional[Dict[str, Any]] = None) -> Tuple[Optional[pd.DataFrame], Dict[str, Any], int]:
    """Collect stock price data aligned to calendar quarter ends"""
    error_log = {'ticker': ticker, 'year': year, 'errors': []}
    api_calls = 0
    
    try:
        # Check market cap
        is_large_cap, avg_market_cap = check_market_cap(ticker, year)
        api_calls += 1
        
        if not is_large_cap:
            error_log['errors'].append(f'Market cap below threshold (avg: ${avg_market_cap:,.0f})')
            return None, error_log, api_calls
        
        # Get profile data if not provided
        if profile_data is None:
            profile = get_json(f'https://financialmodelingprep.com/api/v3/profile/{ticker}')
            api_calls += 1
            profile_data = profile[0] if profile and len(profile) > 0 else {}
        
        # Extract profile information
        company_name = profile_data.get('companyName', 'Unknown')
        industry = profile_data.get('industry', 'Unknown')
        sector = profile_data.get('sector', 'Unknown')
        is_etf = profile_data.get('isEtf', False)
        is_fund = profile_data.get('isFund', False)
        
        # Get historical prices for the year
        start_date = f'{year}-01-01'
        end_date = f'{year}-12-31'
        
        px_data = get_json(
            f'https://financialmodelingprep.com/api/v3/historical-price-full/{ticker}',
            {'from': start_date, 'to': end_date}
        )
        api_calls += 1
        
        mc_data = get_json(
            f'https://financialmodelingprep.com/api/v3/historical-market-capitalization/{ticker}',
            {'from': start_date, 'to': end_date}
        )
        api_calls += 1
        
        if not px_data or not mc_data:
            error_log['errors'].append('Missing price or market cap data')
            return None, error_log, api_calls
        
        # Convert to DataFrames
        px_df = pd.DataFrame(px_data)
        px_df['date'] = pd.to_datetime(px_df['date'])
        
        mc_df = pd.DataFrame(mc_data)
        mc_df['date'] = pd.to_datetime(mc_df['date'])
        
        # Get quarter end dates
        quarter_ends = get_quarter_end_dates(year)
        
        # Collect data for each quarter end
        quarter_data = []
        for q_idx, q_end in enumerate(quarter_ends, 1):
            # Find closest trading day to quarter end (within 7 days)
            date_range = pd.date_range(q_end - timedelta(days=7), q_end)
            
            # Get price data
            px_match = px_df[px_df['date'].isin(date_range)].sort_values('date', ascending=False)
            if len(px_match) == 0:
                continue
                
            px_row = px_match.iloc[0]
            
            # Get market cap data
            mc_match = mc_df[mc_df['date'].isin(date_range)].sort_values('date', ascending=False)
            if len(mc_match) == 0:
                continue
                
            mc_row = mc_match.iloc[0]
            
            quarter_data.append({
                'ticker': ticker,
                'company_name': company_name,
                'quarter_end_date': q_end.strftime('%Y-%m-%d'),
                'actual_date': px_row['date'].strftime('%Y-%m-%d'),
                'stock_price': px_row['adjClose'],
                'market_cap': mc_row['marketCap'],
                'industry': industry,
                'sector': sector,
                'isETF': is_etf,
                'isFund': is_fund,
                'quarter': f'{year}Q{q_idx}'
            })
        
        if not quarter_data:
            error_log['errors'].append('No valid quarter-end data found')
            return None, error_log, api_calls
            
        return pd.DataFrame(quarter_data), error_log, api_calls
        
    except Exception as e:
        error_log['errors'].append(f'Exception: {str(e)}')
        return None, error_log, api_calls


## Financial Statement Collection Functions


In [None]:
def collect_financial_statements_for_ticker(ticker: str, year: int, profile_data: Optional[Dict[str, Any]] = None) -> Tuple[Optional[pd.DataFrame], Dict[str, Any], int]:
    """Collect financial statement data aligned to fiscal quarters"""
    error_log = {'ticker': ticker, 'year': year, 'errors': []}
    api_calls = 0
    
    try:
        # Check market cap
        is_large_cap, avg_market_cap = check_market_cap(ticker, year)
        api_calls += 1
        
        if not is_large_cap:
            error_log['errors'].append(f'Market cap below threshold (avg: ${avg_market_cap:,.0f})')
            return None, error_log, api_calls
        
        # Get profile data if not provided
        if profile_data is None:
            profile = get_json(f'https://financialmodelingprep.com/api/v3/profile/{ticker}')
            api_calls += 1
            profile_data = profile[0] if profile and len(profile) > 0 else {}
        
        # Extract profile information
        company_name = profile_data.get('companyName', 'Unknown')
        industry = profile_data.get('industry', 'Unknown')
        sector = profile_data.get('sector', 'Unknown')
        
        # Get financial statements
        bs = get_json(
            f'https://financialmodelingprep.com/api/v3/balance-sheet-statement/{ticker}',
            {'period': 'quarter', 'limit': 20}
        )
        api_calls += 1
        
        inc = get_json(
            f'https://financialmodelingprep.com/api/v3/income-statement/{ticker}',
            {'period': 'quarter', 'limit': 20}
        )
        api_calls += 1
        
        if not bs or not inc:
            error_log['errors'].append('Missing financial statement data')
            return None, error_log, api_calls
        
        # Convert to DataFrames and filter for year
        bs_df = pd.DataFrame(bs)
        bs_df['date'] = pd.to_datetime(bs_df['date'])
        bs_df = bs_df[bs_df['date'].dt.year == year]
        
        inc_df = pd.DataFrame(inc)
        inc_df['date'] = pd.to_datetime(inc_df['date'])
        inc_df = inc_df[inc_df['date'].dt.year == year]
        
        if len(bs_df) == 0 or len(inc_df) == 0:
            error_log['errors'].append(f'No financial data for year {year}')
            return None, error_log, api_calls
        
        # Get stock prices for book-to-market and earnings yield calculations
        start_date = f'{year}-01-01'
        end_date = f'{year}-12-31'
        px_data = get_json(
            f'https://financialmodelingprep.com/api/v3/historical-price-full/{ticker}',
            {'from': start_date, 'to': end_date}
        )
        api_calls += 1
        
        if not px_data:
            error_log['errors'].append('Missing price data for ratios')
            return None, error_log, api_calls
            
        px_df = pd.DataFrame(px_data)
        px_df['date'] = pd.to_datetime(px_df['date'])
        
        # Process each fiscal quarter
        statement_data = []
        
        for idx, bs_row in bs_df.iterrows():
            # Find matching income statement
            inc_match = inc_df[inc_df['date'] == bs_row['date']]
            if len(inc_match) == 0:
                continue
                
            inc_row = inc_match.iloc[0]
            
            # Calculate debt to assets
            total_debt = (bs_row.get('shortTermDebt', 0) or 0) + (bs_row.get('longTermDebt', 0) or 0)
            total_assets = bs_row.get('totalAssets', 0)
            debt_to_assets = total_debt / total_assets if total_assets > 0 else None
            
            # Get stock price near statement date for ratios
            statement_date = bs_row['date']
            date_range = pd.date_range(statement_date - timedelta(days=7), statement_date + timedelta(days=7))
            px_match = px_df[px_df['date'].isin(date_range)].sort_values('date')
            
            if len(px_match) > 0:
                stock_price = px_match.iloc[len(px_match)//2]['adjClose']  # Use middle date
                
                # Calculate book to market
                book_value = bs_row.get('totalStockholdersEquity', 0)
                shares = inc_row.get('weightedAverageShsOut', 0)
                book_per_share = book_value / shares if shares > 0 else None
                book_to_market = book_per_share / stock_price if book_per_share and stock_price > 0 else None
                
                # Calculate earnings yield
                eps = inc_row.get('eps', 0)
                earnings_yield = eps / stock_price if stock_price > 0 else None
            else:
                book_to_market = None
                earnings_yield = None
            
            # Determine fiscal quarter
            fiscal_period = bs_row.get('period', '')
            fiscal_year = bs_row.get('calendarYear', year)
            
            statement_data.append({
                'ticker': ticker,
                'company_name': company_name,
                'fiscal_quarter': f'{fiscal_year}-{fiscal_period}',
                'calendar_date': statement_date.strftime('%Y-%m-%d'),
                'debt_to_assets': debt_to_assets,
                'book_to_market': book_to_market,
                'earnings_yield': earnings_yield,
                'industry': industry,
                'sector': sector,
                'total_assets': total_assets,
                'total_debt': total_debt,
                'book_value': book_value,
                'eps': eps,
                'shares_outstanding': shares
            })
        
        if not statement_data:
            error_log['errors'].append('No valid statement data found')
            return None, error_log, api_calls
            
        df = pd.DataFrame(statement_data)
        # Keep only required columns for output
        output_cols = ['ticker', 'company_name', 'fiscal_quarter', 'calendar_date', 
                      'debt_to_assets', 'book_to_market', 'earnings_yield', 'industry', 'sector']
        return df[output_cols], error_log, api_calls
        
    except Exception as e:
        error_log['errors'].append(f'Exception: {str(e)}')
        return None, error_log, api_calls


## Main Collection Functions


In [None]:
def collect_year_data_separated(tickers: List[str], year: int, max_tickers: Optional[int] = None, 
                               save_progress: bool = True, progress_interval: int = 100, 
                               batch_size: int = 50) -> Tuple[pd.DataFrame, pd.DataFrame, List[Dict]]:
    """Collect both stock price and financial statement data for a year"""
    # Initialize collections
    all_price_data = []
    all_statement_data = []
    all_errors = []
    
    successful_tickers = []
    failed_tickers = []
    skipped_tickers = []
    total_api_calls = 0
    
    tickers_to_process = tickers[:max_tickers] if max_tickers else tickers
    total_tickers = len(tickers_to_process)
    
    print(f"\n{'='*70}")
    print(f"  COLLECTING SEPARATED DATA FOR YEAR {year}")
    print(f"{'='*70}")
    print(f"Total tickers to check: {total_tickers}")
    print(f"Market cap filter: >${MARKET_CAP_THRESHOLD/1e9:.0f}B")
    print(f"API rate limit: {API_CALLS_PER_MINUTE} calls/minute")
    print(f"Batch size: {batch_size} tickers")
    print(f"Progress saves: Every {progress_interval} tickers")
    print(f"{'='*70}\n")
    
    start_time = time.time()
    
    # Process tickers in batches
    for batch_start in range(0, total_tickers, batch_size):
        batch_end = min(batch_start + batch_size, total_tickers)
        batch_tickers = tickers_to_process[batch_start:batch_end]
        
        # Progress update
        if batch_start > 0:
            elapsed = time.time() - start_time
            avg_time = elapsed / batch_start
            remaining = (total_tickers - batch_start) * avg_time
            
            print(f"\n[Progress: {batch_start}/{total_tickers} ({batch_start/total_tickers*100:.1f}%)]")
            print(f"  Time: {elapsed/60:.1f}min elapsed, ~{remaining/60:.1f}min remaining")
            print(f"  Success: {len(successful_tickers)}, Failed: {len(failed_tickers)}, Skipped (small cap): {len(skipped_tickers)}")
            print(f"  API calls: {total_api_calls} ({total_api_calls/elapsed*60:.0f}/minute avg)")
        
        print(f"\n  Processing batch {batch_start//batch_size + 1}: tickers {batch_start+1}-{batch_end}")
        
        # Get bulk profiles for the batch
        profiles = get_bulk_profiles(batch_tickers)
        total_api_calls += 1
        
        # Process each ticker in the batch
        for ticker in batch_tickers:
            profile_data = profiles.get(ticker)
            
            # Collect stock price data
            price_data, price_error, price_calls = collect_stock_prices_for_ticker(ticker, year, profile_data)
            total_api_calls += price_calls
            
            # Collect financial statement data
            statement_data, statement_error, statement_calls = collect_financial_statements_for_ticker(ticker, year, profile_data)
            total_api_calls += statement_calls
            
            # Determine success/failure
            has_price_data = price_data is not None and len(price_data) > 0
            has_statement_data = statement_data is not None and len(statement_data) > 0
            
            if has_price_data or has_statement_data:
                successful_tickers.append(ticker)
                if has_price_data:
                    all_price_data.append(price_data)
                if has_statement_data:
                    all_statement_data.append(statement_data)
                print("✓", end="", flush=True)
            elif any("Market cap below threshold" in err for err in price_error.get("errors", [])):
                skipped_tickers.append(ticker)
                print("○", end="", flush=True)
            else:
                failed_tickers.append(ticker)
                # Combine errors from both collections
                combined_errors = {
                    'ticker': ticker,
                    'year': year,
                    'price_errors': price_error.get('errors', []),
                    'statement_errors': statement_error.get('errors', [])
                }
                all_errors.append(combined_errors)
                print("✗", end="", flush=True)


In [None]:
        # Save progress periodically
        if save_progress and (batch_end % progress_interval == 0 or batch_end == total_tickers):
            if all_price_data:
                temp_price_df = pd.concat(all_price_data, ignore_index=True)
                temp_price_df['mkt_cap_rank'] = temp_price_df.groupby('quarter')['market_cap'].rank(method='dense', ascending=False).astype(int)
                price_filename = f"progress_prices_{year}_tickers_{batch_end}.csv"
                temp_price_df.to_csv(price_filename, index=False)
                print(f"\n  💾 Price data saved: {price_filename} ({len(temp_price_df)} rows)")
            
            if all_statement_data:
                temp_statement_df = pd.concat(all_statement_data, ignore_index=True)
                statement_filename = f"progress_statements_{year}_tickers_{batch_end}.csv"
                temp_statement_df.to_csv(statement_filename, index=False)
                print(f"  💾 Statement data saved: {statement_filename} ({len(temp_statement_df)} rows)")
    
    # Final summary
    total_time = time.time() - start_time
    
    print(f"\n\n{'='*70}")
    print(f"  YEAR {year} COLLECTION COMPLETE")
    print(f"{'='*70}")
    print(f"Total time: {total_time/60:.1f} minutes ({total_time/3600:.2f} hours)")
    print(f"Successful: {len(successful_tickers)} tickers")
    print(f"Failed: {len(failed_tickers)} tickers")
    print(f"Skipped (small cap): {len(skipped_tickers)} tickers")
    print(f"Total API calls: {total_api_calls:,} ({total_api_calls/total_time*60:.0f}/minute avg)")
    
    # Prepare final DataFrames
    if all_price_data:
        final_price_df = pd.concat(all_price_data, ignore_index=True)
        final_price_df = final_price_df.drop_duplicates(['ticker', 'quarter_end_date'], keep='last')
        final_price_df['mkt_cap_rank'] = final_price_df.groupby('quarter')['market_cap'].rank(method='dense', ascending=False).astype(int)
        final_price_df = final_price_df.sort_values(['ticker', 'quarter_end_date']).reset_index(drop=True)
        # Drop the quarter column used for ranking
        final_price_df = final_price_df.drop('quarter', axis=1)
    else:
        final_price_df = pd.DataFrame()
    
    if all_statement_data:
        final_statement_df = pd.concat(all_statement_data, ignore_index=True)
        final_statement_df = final_statement_df.drop_duplicates(['ticker', 'fiscal_quarter', 'calendar_date'], keep='last')
        final_statement_df = final_statement_df.sort_values(['ticker', 'calendar_date']).reset_index(drop=True)
    else:
        final_statement_df = pd.DataFrame()
    
    print(f"\n📊 Final datasets:")
    print(f"   Stock prices: {len(final_price_df)} rows, {final_price_df['ticker'].nunique() if len(final_price_df) > 0 else 0} tickers")
    print(f"   Financial statements: {len(final_statement_df)} rows, {final_statement_df['ticker'].nunique() if len(final_statement_df) > 0 else 0} tickers")
    
    # Save error log
    if all_errors:
        error_filename = f"errors_separated_{year}.json"
        with open(error_filename, 'w') as f:
            json.dump(all_errors, f, indent=2, default=str)
        print(f"\n📝 Error log saved: {error_filename} ({len(all_errors)} errors)")
    
    # Clean up progress files
    if save_progress:
        for progress_file in [f for f in os.listdir('.') if f.startswith(f'progress_') and str(year) in f]:
            os.remove(progress_file)
        print(f"🧹 Cleaned up progress files")
    
    return final_price_df, final_statement_df, all_errors
