## Fetching Financial Statements (0-10 Years Ago)

This next step will fetch 10 years of quarterly financial data for every symbol in our `fetchable_symbols.csv` list. This is a long-running process, but it has been redesigned to be **robust and resumable**.

Here’s how it works:
1.  **Pre-Flight Check:** It first runs a quick test on a single symbol to ensure the API is responding correctly.
2.  **Individual Symbol Processing:** It fetches, processes, and saves the data for one symbol at a time to a temporary `partials` folder.
3.  **Automatic Resuming:** If the script is stopped and restarted, it will automatically skip any symbols it has already completed, picking up where it left off.
4.  **Clear Error Logging:** If a symbol fails repeatedly, it will now print a clear error message instead of failing silently.

After this cell completes, a final script in the next cell will assemble these individual files into the four alphabetized batch files.

In [None]:
import pandas as pd
import requests
import os
from dotenv import load_dotenv
import asyncio
import aiohttp
from tqdm.notebook import tqdm
import nest_asyncio

# Apply nest_asyncio to allow running asyncio event loop in Jupyter
nest_asyncio.apply()

async def fetch_endpoint(session, url, params, symbol_for_logging):
    """Asynchronously fetches data from a single API endpoint with retries and logging."""
    retries = 3
    last_error = None
    for i in range(retries):
        try:
            async with session.get(url, params=params, timeout=30) as response:
                if response.status == 429:
                    retry_after = int(response.headers.get("Retry-After", 5))
                    await asyncio.sleep(retry_after)
                    continue
                response.raise_for_status()
                return await response.json()
        except Exception as e:
            last_error = e
            if i < retries - 1:
                await asyncio.sleep((i + 1) * 2)
            else:
                endpoint_name = url.split('/')[-1]
                tqdm.write(f"ERROR for {symbol_for_logging}: Failed to fetch {endpoint_name} data after {retries} retries. Final error: {last_error}")
                return []
    return []

async def fetch_and_process_symbol(session, symbol, api_key, partials_dir):
    """Fetches, merges, and saves data for a single symbol."""
    base_url = "https://financialmodelingprep.com/stable"
    base_params = {"period": "quarter", "limit": 40, "apikey": api_key, "symbol": symbol}
    
    income_url = f"{base_url}/income-statement"
    balance_url = f"{base_url}/balance-sheet-statement"
    ratios_url = f"{base_url}/ratios"
    
    tasks = [
        fetch_endpoint(session, income_url, base_params, symbol),
        fetch_endpoint(session, balance_url, base_params, symbol),
        fetch_endpoint(session, ratios_url, base_params, symbol)
    ]
    income_data, balance_data, ratios_data = await asyncio.gather(*tasks)

    if not income_data and not balance_data and not ratios_data:
        return

    income_df = pd.DataFrame(income_data)
    balance_df = pd.DataFrame(balance_data)
    ratios_df = pd.DataFrame(ratios_data)
    
    all_dfs = []
    if not income_df.empty: all_dfs.append(income_df.set_index(['date', 'symbol', 'period']))
    if not balance_df.empty: all_dfs.append(balance_df.set_index(['date', 'symbol', 'period']))
    if not ratios_df.empty: all_dfs.append(ratios_df.set_index(['date', 'symbol', 'period']))

    if not all_dfs:
        return

    merged_df = pd.concat(all_dfs, axis=1).reset_index()
    
    columns_to_keep = {
        'date': 'exactDate', 'symbol': 'symbol', 'revenue': 'revenue', 'netIncome': 'netIncome',
        'ebitda': 'ebitda', 'eps': 'eps', 'priceToEarningsRatio': 'pToE', 'totalCurrentAssets': 'totalCurrentAssets',
        'totalAssets': 'totalAssets', 'totalLiabilities': 'totalLiabilities', 'totalDebt': 'totalDebt',
        'debtToAssetsRatio': 'debtToAssets', 'priceToBookRatio': 'pToB', 'debtToEquityRatio': 'debtToEquity',
        'debtToCapitalRatio': 'debtToCapital', 'revenuePerShare': 'revenuePerShare'
    }
    
    # Filter for columns that actually exist and drop duplicates from the join
    merged_df = merged_df.loc[:,~merged_df.columns.duplicated()]
    existing_columns = {k: v for k, v in columns_to_keep.items() if k in merged_df.columns}
    
    if not existing_columns:
        return

    final_df = merged_df[list(existing_columns.keys())].rename(columns=existing_columns)
    
    if not final_df.empty:
        output_path = os.path.join(partials_dir, f"{symbol}.csv")
        final_df.to_csv(output_path, index=False)

async def main_fetcher_resumable():
    load_dotenv()
    api_key = os.getenv('API_KEY')
    if not api_key:
        print("API_KEY not found.")
        return

    partials_dir = '../output/Financial Statements/partials'
    os.makedirs(partials_dir, exist_ok=True)
    
    async with aiohttp.ClientSession() as session:
        print("--- Running pre-flight API check... ---")
        test_symbol = 'AAPL'
        test_url = f"https://financialmodelingprep.com/stable/income-statement"
        test_params = {"symbol": test_symbol, "period": "quarter", "limit": 1, "apikey": api_key}
        test_data = await fetch_endpoint(session, test_url, test_params, test_symbol)
        if not test_data:
            print(f"--- Pre-flight check FAILED. Could not fetch data for {test_symbol}. Please check your API key and network connection. ---")
            return
        print("--- Pre-flight check PASSED. ---")

        symbols_df = pd.read_csv('../output/fetchable_symbols.csv')
        all_symbols = symbols_df['symbol'].unique().tolist()
        
        completed_symbols = {f.split('.')[0] for f in os.listdir(partials_dir) if f.endswith('.csv')}
        symbols_to_fetch = [s for s in all_symbols if s not in completed_symbols]
        
        print(f"Found {len(all_symbols)} total symbols. {len(completed_symbols)} already completed.")
        print(f"Fetching financial data for {len(symbols_to_fetch)} new symbols...")

        sem = asyncio.Semaphore(20)
        async def fetch_with_semaphore(symbol):
            async with sem:
                await fetch_and_process_symbol(session, symbol, api_key, partials_dir)

        tasks = [fetch_with_semaphore(symbol) for symbol in symbols_to_fetch]
        
        for f in tqdm(asyncio.as_completed(tasks), total=len(tasks), desc="Fetching Financials"):
            await f

    print("--- Financial data fetching complete. ---")

# Run the main fetching process
await main_fetcher_resumable()

## Assemble Final Financial Reports

After the fetching process is complete, run this cell to combine all the individual symbol files from the `partials` directory into the four final alphabetized CSV files.


In [None]:
import pandas as pd
import os

def assemble_final_reports():
    """
    Assembles individual symbol CSVs from the 'partials' directory
    into four final alphabetized batch files.
    """
    partials_dir = '../output/Financial Statements/partials'
    output_dir = '../output/Financial Statements'
    
    if not os.path.exists(partials_dir):
        print(f"Directory with partial files not found: {partials_dir}")
        return

    all_files = [os.path.join(partials_dir, f) for f in os.listdir(partials_dir) if f.endswith('.csv')]
    if not all_files:
        print("No partial symbol files were found to assemble.")
        return

    print(f"Assembling data from {len(all_files)} individual symbol files...")
    
    # Use a generator to save memory
    df_generator = (pd.read_csv(f) for f in all_files)
    full_df = pd.concat(df_generator, ignore_index=True)
    
    if 'symbol' not in full_df.columns:
        print("Error: 'symbol' column not found in the assembled data. Cannot create batches.")
        return

    # Split into alphabetical chunks
    symbol_chunks = {
        "A-F": full_df[full_df['symbol'].str[0].str.upper() <= 'F'],
        "G-L": full_df[(full_df['symbol'].str[0].str.upper() >= 'G') & (full_df['symbol'].str[0].str.upper() <= 'L')],
        "M-R": full_df[(full_df['symbol'].str[0].str.upper() >= 'M') & (full_df['symbol'].str[0].str.upper() <= 'R')],
        "S-Z": full_df[full_df['symbol'].str[0].str.upper() >= 'S']
    }

    for chunk_name, chunk_df in symbol_chunks.items():
        if not chunk_df.empty:
            output_path = os.path.join(output_dir, f"financials_{chunk_name}.csv")
            chunk_df.to_csv(output_path, index=False)
            print(f"Saved chunk '{chunk_name}' with {len(chunk_df)} records to {output_path}")
        else:
            print(f"No data for chunk '{chunk_name}'.")
    
    print("--- Assembly of final reports is complete. ---")


# Run the assembly function
assemble_final_reports()



## Fetching Financial Statements (Last 50 Years)

This cell will fetch 50 years of quarterly financial data for every symbol in our `fetchable_symbols.csv` list. This is a long-running process that is robust and resumable. It will produce the final financial statement files with an A-D, E-J, K-P, Q-Z split.


In [7]:
import pandas as pd
import requests
import os
from dotenv import load_dotenv
import asyncio
import aiohttp
from tqdm.notebook import tqdm
import nest_asyncio
from datetime import datetime, timedelta

# Apply nest_asyncio to allow running asyncio event loop in Jupyter
nest_asyncio.apply()

async def fetch_endpoint_50_years(session, url, params, symbol_for_logging):
    """Asynchronously fetches data from a single API endpoint with retries and logging."""
    retries = 3
    last_error = None
    for i in range(retries):
        try:
            async with session.get(url, params=params, timeout=30) as response:
                if response.status == 429:
                    retry_after = int(response.headers.get("Retry-After", 5))
                    await asyncio.sleep(retry_after)
                    continue
                response.raise_for_status()
                return await response.json()
        except Exception as e:
            last_error = e
            if i < retries - 1:
                await asyncio.sleep((i + 1) * 2)
            else:
                endpoint_name = url.split('/')[-1]
                tqdm.write(f"ERROR for {symbol_for_logging}: Failed to fetch {endpoint_name} data after {retries} retries. Final error: {last_error}")
                return []
    return []

async def fetch_and_process_symbol_50_years(session, symbol, api_key, partials_dir):
    """Fetches, merges, and saves 50 years of data for a single symbol."""
    base_url = "https://financialmodelingprep.com/stable"
    
    # Define date range for last 50 years
    today = datetime.now().strftime('%Y-%m-%d')
    fifty_years_ago = (datetime.now() - timedelta(days=365*50)).strftime('%Y-%m-%d')

    base_params = {"period": "quarter", "from": fifty_years_ago, "to": today, "limit": 200, "apikey": api_key, "symbol": symbol}
    
    income_url = f"{base_url}/income-statement"
    balance_url = f"{base_url}/balance-sheet-statement"
    ratios_url = f"{base_url}/ratios"
    
    tasks = [
        fetch_endpoint_50_years(session, income_url, base_params, symbol),
        fetch_endpoint_50_years(session, balance_url, base_params, symbol),
        fetch_endpoint_50_years(session, ratios_url, base_params, symbol)
    ]
    income_data, balance_data, ratios_data = await asyncio.gather(*tasks)

    if not income_data and not balance_data and not ratios_data:
        return

    income_df = pd.DataFrame(income_data)
    balance_df = pd.DataFrame(balance_data)
    ratios_df = pd.DataFrame(ratios_data)
    
    # Filter out any data with future dates
    today_dt = pd.to_datetime(datetime.now().date())
    if not income_df.empty:
        income_df['date'] = pd.to_datetime(income_df['date'])
        income_df = income_df[income_df['date'] <= today_dt]
    if not balance_df.empty:
        balance_df['date'] = pd.to_datetime(balance_df['date'])
        balance_df = balance_df[balance_df['date'] <= today_dt]
    if not ratios_df.empty:
        ratios_df['date'] = pd.to_datetime(ratios_df['date'])
        ratios_df = ratios_df[ratios_df['date'] <= today_dt]

    all_dfs = []
    if not income_df.empty: all_dfs.append(income_df.set_index(['date', 'symbol', 'period']))
    if not balance_df.empty: all_dfs.append(balance_df.set_index(['date', 'symbol', 'period']))
    if not ratios_df.empty: all_dfs.append(ratios_df.set_index(['date', 'symbol', 'period']))

    if not all_dfs:
        return

    merged_df = pd.concat(all_dfs, axis=1).reset_index()
    
    columns_to_keep = {
        'date': 'exactDate', 'symbol': 'symbol', 'revenue': 'revenue', 'netIncome': 'netIncome',
        'ebitda': 'ebitda', 'eps': 'eps', 'priceToEarningsRatio': 'pToE', 'totalCurrentAssets': 'totalCurrentAssets',
        'totalAssets': 'totalAssets', 'totalLiabilities': 'totalLiabilities', 'totalDebt': 'totalDebt',
        'debtToAssetsRatio': 'debtToAssets', 'priceToBookRatio': 'pToB', 'debtToEquityRatio': 'debtToEquity',
        'debtToCapitalRatio': 'debtToCapital', 'revenuePerShare': 'revenuePerShare',
        'period': 'quarter', 'grossProfit': 'grossProfit', 'stockholdersEquity': 'stockholdersEquity'
    }
    
    merged_df = merged_df.loc[:,~merged_df.columns.duplicated()]
    existing_columns = {k: v for k, v in columns_to_keep.items() if k in merged_df.columns}
    
    if not existing_columns:
        return

    final_df = merged_df[list(existing_columns.keys())].rename(columns=existing_columns)
    
    if not final_df.empty:
        output_path = os.path.join(partials_dir, f"{symbol}.csv")
        final_df.to_csv(output_path, index=False)

async def main_fetcher_50_years_resumable():
    load_dotenv()
    api_key = os.getenv('API_KEY')
    if not api_key:
        print("API_KEY not found.")
        return

    partials_dir = 'Statement Data/partials_50_years'
    os.makedirs(partials_dir, exist_ok=True)
    
    async with aiohttp.ClientSession() as session:
        # Pre-flight check for KO to ensure the 'limit' parameter is working
        print("--- Running pre-flight API check for KO... ---")
        test_symbol = 'KO'
        test_url = "https://financialmodelingprep.com/stable/income-statement"
        test_params = {"period": "quarter", "limit": 200, "apikey": api_key, "symbol": test_symbol}
        test_data = await fetch_endpoint_50_years(session, test_url, test_params, test_symbol)
        
        if not test_data or len(test_data) <= 40:
            print(f"--- Pre-flight check FAILED. Expected >40 records for {test_symbol} but got {len(test_data)}. Check API 'limit' parameter. ---")
            return
        print(f"--- Pre-flight check PASSED. Found {len(test_data)} records for KO. ---")

        symbols_df = pd.read_csv('../Ticker Symbols/Symbol Lists/fetchable_symbols.csv')
        all_symbols = symbols_df['symbol'].unique().tolist()
        
        completed_symbols = {f.split('.')[0] for f in os.listdir(partials_dir) if f.endswith('.csv')}
        symbols_to_fetch = [s for s in all_symbols if s not in completed_symbols]
        
        if not symbols_to_fetch:
            print("All required symbols have already been processed.")
            return
            
        print(f"Found {len(all_symbols)} total symbols. {len(completed_symbols)} already completed for 50-year fetch.")
        print(f"Fetching 50-year financial data for {len(symbols_to_fetch)} new symbols...")

        # --- Rate Limiting and Concurrency Control ---
        API_CALL_INTERVAL = 0.077
        CONCURRENCY_LIMIT = 20
        sem = asyncio.Semaphore(CONCURRENCY_LIMIT)
        
        with tqdm(total=len(symbols_to_fetch), desc="Fetching 50-Year Financials") as pbar:
            async def fetch_with_sem(symbol):
                async with sem:
                    await fetch_and_process_symbol_50_years(session, symbol, api_key, partials_dir)
                pbar.update(1)

            tasks = []
            for symbol in symbols_to_fetch:
                tasks.append(asyncio.create_task(fetch_with_sem(symbol)))
                await asyncio.sleep(API_CALL_INTERVAL)
            
            await asyncio.gather(*tasks)

    print("--- 50-year financial data fetching complete. ---")

def assemble_50_years_reports():
    partials_dir = 'Statement Data/partials_50_years'
    output_dir = 'Statement Data'
    
    if not os.path.exists(partials_dir):
        print(f"Directory with partial files not found: {partials_dir}")
        return

    all_files = [os.path.join(partials_dir, f) for f in os.listdir(partials_dir) if f.endswith('.csv')]
    if not all_files:
        print("No partial 50-year symbol files were found to assemble.")
        return

    print(f"Assembling 50-year data from {len(all_files)} individual symbol files...")
    
    df_generator = (pd.read_csv(f) for f in all_files)
    full_df = pd.concat(df_generator, ignore_index=True)
    
    if 'symbol' not in full_df.columns:
        print("Error: 'symbol' column not found in the assembled data. Cannot create batches.")
        return
        
    print(f"Total records before deduplication: {len(full_df)}")

    # Deduplicate based on symbol and date
    full_df.drop_duplicates(subset=['symbol', 'exactDate'], inplace=True)
    
    print(f"Total records after deduplication: {len(full_df)}")

    new_symbol_chunks = {
        "A-D": full_df[full_df['symbol'].str[0].str.upper() <= 'D'],
        "E-J": full_df[(full_df['symbol'].str[0].str.upper() >= 'E') & (full_df['symbol'].str[0].str.upper() <= 'J')],
        "K-P": full_df[(full_df['symbol'].str[0].str.upper() >= 'K') & (full_df['symbol'].str[0].str.upper() <= 'P')],
        "Q-Z": full_df[full_df['symbol'].str[0].str.upper() >= 'Q']
    }

    for chunk_name, chunk_df in new_symbol_chunks.items():
        if not chunk_df.empty:
            output_path = os.path.join(output_dir, f"financials_{chunk_name}.csv")
            chunk_df.to_csv(output_path, index=False)
            print(f"Saved chunk '{chunk_name}' with {len(chunk_df)} records to {output_path}")
        else:
            print(f"No data for chunk '{chunk_name}'.")
    
    print("--- Assembly of 50-year reports is complete. ---")

# Run the main fetching process and then assemble
await main_fetcher_50_years_resumable()
assemble_50_years_reports()


--- Running pre-flight API check for KO... ---
--- Pre-flight check PASSED. Found 160 records for KO. ---
Found 12800 total symbols. 11144 already completed for 50-year fetch.
Fetching 50-year financial data for 1656 new symbols...


Fetching 50-Year Financials:   0%|          | 0/1656 [00:00<?, ?it/s]

--- 50-year financial data fetching complete. ---
Assembling 50-year data from 11147 individual symbol files...
Total records before deduplication: 603279
Total records after deduplication: 603248
Saved chunk 'A-D' with 174236 records to Statement Data\financials_A-D.csv
Saved chunk 'E-J' with 125034 records to Statement Data\financials_E-J.csv
Saved chunk 'K-P' with 144851 records to Statement Data\financials_K-P.csv
Saved chunk 'Q-Z' with 159075 records to Statement Data\financials_Q-Z.csv
--- Assembly of 50-year reports is complete. ---
