In [31]:
import os
import json
from dotenv import load_dotenv
from typing import Dict, Any
from datetime import datetime

import requests
import pandas as pd
import yfinance as yf

load_dotenv()
API_KEY = os.getenv("VANTAGE_API_KEY")

## Data Fetching and Caching

In [32]:
class APIRateLimitError(Exception):
    """Custom exception raised when the API rate limit is exceeded."""
    pass

def fetch_income_statement(symbol: str, api_key: str, folder: str = 'data') -> dict:
    file_path = os.path.join(folder, f'{symbol}_income_statement.json')
    if os.path.exists(file_path):
        with open(file_path, 'r') as f:
            return json.load(f)
    url = f'https://www.alphavantage.co/query?function=INCOME_STATEMENT&symbol={symbol}&apikey={api_key}'
    response = requests.get(url)
    data = response.json()
    if "Information" in data and "rate limit" in data["Information"].lower():
        raise APIRateLimitError(data["Information"])
    os.makedirs(folder, exist_ok=True)
    with open(file_path, 'w') as f:
        json.dump(data, f)
    return data


def fetch_balance_sheet(symbol: str, api_key: str, folder: str = 'data') -> dict:
    file_path = os.path.join(folder, f'{symbol}_balance_sheet.json')
    if os.path.exists(file_path):
        with open(file_path, 'r') as f:
            return json.load(f)
    url = f'https://www.alphavantage.co/query?function=BALANCE_SHEET&symbol={symbol}&apikey={api_key}'
    response = requests.get(url)
    data = response.json()
    if "Information" in data and "rate limit" in data["Information"].lower():
        raise APIRateLimitError(data["Information"])
    os.makedirs(folder, exist_ok=True)
    with open(file_path, 'w') as f:
        json.dump(data, f)
    return data


def fetch_cash_flow(symbol: str, api_key: str, folder: str = 'data') -> dict:
    file_path = os.path.join(folder, f'{symbol}_cash_flow.json')
    if os.path.exists(file_path):
        with open(file_path, 'r') as f:
            return json.load(f)
    url = f'https://www.alphavantage.co/query?function=CASH_FLOW&symbol={symbol}&apikey={api_key}'
    response = requests.get(url)
    data = response.json()
    if "Information" in data and "rate limit" in data["Information"].lower():
        raise APIRateLimitError(data["Information"])
    os.makedirs(folder, exist_ok=True)
    with open(file_path, 'w') as f:
        json.dump(data, f)
    return data

## Merging Reports

In [33]:
def merge_reports_df(income_statement: dict, balance_sheet: dict, cash_flow: dict) -> Dict[str, Any]:
    """
    Merge the annual and quarterly reports from the three API responses into pandas DataFrames,
    merging on the 'fiscalDateEnding' column.
    """
    # --- Annual Reports ---
    annual_income = pd.DataFrame(income_statement.get('annualReports', []))
    annual_balance = pd.DataFrame(balance_sheet.get('annualReports', []))
    annual_cash = pd.DataFrame(cash_flow.get('annualReports', []))

    # Drop unwanted duplicate keys
    annual_balance.drop(columns=['reportedCurrency'], errors='ignore', inplace=True)
    annual_cash.drop(columns=['reportedCurrency', 'netIncome'], errors='ignore', inplace=True)

    # Convert fiscalDateEnding to datetime and sort
    for df in (annual_income, annual_balance, annual_cash):
        if 'fiscalDateEnding' in df.columns:
            df['fiscalDateEnding'] = pd.to_datetime(df['fiscalDateEnding'])
            df.sort_values('fiscalDateEnding', inplace=True)
            df.reset_index(drop=True, inplace=True)

    # Merge the three annual DataFrames on 'fiscalDateEnding'
    annual_df = pd.merge(annual_income, annual_balance, on='fiscalDateEnding', how='outer')
    annual_df = pd.merge(annual_df, annual_cash, on='fiscalDateEnding', how='outer')

    # --- Quarterly Reports ---
    quarterly_income = pd.DataFrame(income_statement.get('quarterlyReports', []))
    quarterly_balance = pd.DataFrame(balance_sheet.get('quarterlyReports', []))
    quarterly_cash = pd.DataFrame(cash_flow.get('quarterlyReports', []))

    # Drop unwanted duplicate keys
    quarterly_balance.drop(columns=['reportedCurrency'], errors='ignore', inplace=True)
    quarterly_cash.drop(columns=['reportedCurrency', 'netIncome'], errors='ignore', inplace=True)

    # Convert fiscalDateEnding to datetime and sort
    for df in (quarterly_income, quarterly_balance, quarterly_cash):
        if 'fiscalDateEnding' in df.columns:
            df['fiscalDateEnding'] = pd.to_datetime(df['fiscalDateEnding'])
            df.sort_values('fiscalDateEnding', inplace=True)
            df.reset_index(drop=True, inplace=True)

    # Merge the three quarterly DataFrames on 'fiscalDateEnding'
    quarterly_df = pd.merge(quarterly_income, quarterly_balance, on='fiscalDateEnding', how='outer')
    quarterly_df = pd.merge(quarterly_df, quarterly_cash, on='fiscalDateEnding', how='outer')

    return {
        'symbol': income_statement.get('symbol', 'N/A'),
        'annual': annual_df,
        'quarterly': quarterly_df
    }

## EPS Metrics

In [34]:
def compute_eps_metrics(quarterly_df: pd.DataFrame, annual_df: pd.DataFrame, num_period: int) -> dict:
    """
    Compute EPS metrics using quarterly and annual DataFrames.
      - latest_quarterly_eps: EPS from the most recent quarterly report.
      - ttm_eps: Trailing Twelve Months EPS.
      - annual_eps: Dictionary mapping fiscal year to annual EPS.
    """
    # Ensure necessary columns are numeric
    for col in ['netIncome', 'commonStockSharesOutstanding']:
        if col in quarterly_df.columns:
            quarterly_df[col] = pd.to_numeric(quarterly_df[col], errors='coerce')
        if col in annual_df.columns:
            annual_df[col] = pd.to_numeric(annual_df[col], errors='coerce')

    latest_quarter = quarterly_df.iloc[-1]
    net_income_latest = latest_quarter.get('netIncome')
    shares_latest = latest_quarter.get('commonStockSharesOutstanding')

    if pd.notnull(net_income_latest) and pd.notnull(shares_latest) and shares_latest != 0:
        latest_eps = net_income_latest / shares_latest
    else:
        latest_eps = None
    
    # Compute TTM EPS (using the last 4 quarters)
    if len(quarterly_df) >= 4:
        last_four = quarterly_df.iloc[-4:]
        if last_four['netIncome'].isnull().any():
            ttm_eps = None
        else:
            total_net_income = last_four['netIncome'].sum()
            ttm_eps = total_net_income / shares_latest if pd.notnull(shares_latest) and shares_latest != 0 else None
    else:
        ttm_eps = None

    # Compute EPS for each annual report
    annual_df['eps'] = annual_df.apply(
        lambda row: row['netIncome'] / row['commonStockSharesOutstanding']
        if pd.notnull(row['netIncome']) and pd.notnull(row['commonStockSharesOutstanding']) and row['commonStockSharesOutstanding'] != 0
        else None,
        axis=1
    )
    annual_df['year'] = annual_df['fiscalDateEnding'].dt.year
    selected_annual = annual_df.iloc[-num_period:]
    annual_eps = {row['year']: row['eps'] for _, row in selected_annual.iterrows()}
    
    return {
        'latest_quarterly_eps': latest_eps,
        'ttm_eps': ttm_eps,
        'fiscal_date_ending': latest_quarter['fiscalDateEnding'].strftime('%Y-%m-%d') if pd.notnull(latest_quarter['fiscalDateEnding']) else None,
        'annual_eps': annual_eps
    }

## Financial Metrics

In [35]:
def compute_financial_metrics(annual_df: pd.DataFrame, num_periods: int = 5) -> dict:
    """
    Compute various financial metrics from the annual data using DataFrame operations:
      - Average turnover (revenue) growth.
      - Average FCF growth.
      - Average ROCE using FCF.
      - Latest net debt/FCF ratio.
      - Average stock number increase.
      - Average FCF margin.
    """
    # Convert necessary columns to numeric
    numeric_cols = ['totalRevenue', 'operatingCashflow', 'capitalExpenditures',
                    'totalAssets', 'totalCurrentLiabilities', 'currentDebt',
                    'longTermDebt', 'commonStockSharesOutstanding']
    for col in numeric_cols:
        if col in annual_df.columns:
            annual_df[col] = pd.to_numeric(annual_df[col], errors='coerce')
    
    # For cash, try one field then the other
    if 'cashAndShortTermInvestments' in annual_df.columns:
        annual_df['cash'] = pd.to_numeric(annual_df['cashAndShortTermInvestments'], errors='coerce')
    if 'cashAndCashEquivalentsAtCarryingValue' in annual_df.columns:
        annual_df['cash'] = annual_df.get('cash', pd.Series([None]*len(annual_df))).fillna(
            pd.to_numeric(annual_df['cashAndCashEquivalentsAtCarryingValue'], errors='coerce')
        )
    
    annual_df = annual_df.sort_values('fiscalDateEnding').reset_index(drop=True)
    total_periods = len(annual_df)
    non_growth_df = annual_df.iloc[-num_periods:].copy() if total_periods >= num_periods else annual_df.copy()
    growth_df = annual_df.iloc[-(num_periods + 1):].copy() if total_periods >= (num_periods + 1) else annual_df.copy()
    
    # Create Free Cash Flow (FCF) column
    for df in (non_growth_df, growth_df):
        df['fcf'] = df['operatingCashflow'] - df['capitalExpenditures']
    
    # Compute growth metrics (using percentage change)
    avg_turnover_growth = growth_df['totalRevenue'].pct_change().mean() if len(growth_df) > 1 else None
    avg_FCF_growth = growth_df['fcf'].pct_change().mean() if len(growth_df) > 1 else None
    avg_stock_increase = growth_df['commonStockSharesOutstanding'].pct_change().mean() if len(growth_df) > 1 else None

    # Compute ROCE using FCF = FCF / (totalAssets - totalCurrentLiabilities)
    non_growth_df['capital_employed'] = non_growth_df['totalAssets'] - non_growth_df['totalCurrentLiabilities']
    non_growth_df['roce'] = non_growth_df.apply(
        lambda row: row['fcf'] / row['capital_employed'] if pd.notnull(row['fcf']) and pd.notnull(row['capital_employed']) and row['capital_employed'] != 0 else None,
        axis=1
    )
    avg_roce_fcf = non_growth_df['roce'].dropna().mean() if not non_growth_df['roce'].dropna().empty else None

    # Compute net debt: (currentDebt + longTermDebt) - cash
    non_growth_df['debt'] = non_growth_df['currentDebt'] + non_growth_df['longTermDebt']
    non_growth_df['net_debt'] = non_growth_df.apply(
        lambda row: row['debt'] - row['cash'] if pd.notnull(row['debt']) and pd.notnull(row['cash']) else None,
        axis=1
    )
    non_growth_df['net_debt_to_FCF'] = non_growth_df.apply(
        lambda row: row['net_debt'] / row['fcf'] if pd.notnull(row['net_debt']) and pd.notnull(row['fcf']) and row['fcf'] != 0 else None,
        axis=1
    )
    latest_net_debt_to_FCF = non_growth_df['net_debt_to_FCF'].iloc[-1] if not non_growth_df['net_debt_to_FCF'].empty else None

    # Compute FCF margin = FCF / totalRevenue
    non_growth_df['fcf_margin'] = non_growth_df.apply(
        lambda row: row['fcf'] / row['totalRevenue'] if pd.notnull(row['fcf']) and pd.notnull(row['totalRevenue']) and row['totalRevenue'] != 0 else None,
        axis=1
    )
    avg_FCF_margin = non_growth_df['fcf_margin'].dropna().mean() if not non_growth_df['fcf_margin'].dropna().empty else None

    return {
        'avg_turnover_growth': avg_turnover_growth,
        'avg_FCF_growth': avg_FCF_growth,
        'avg_ROCE_using_FCF': avg_roce_fcf,
        'latest_net_debt_to_FCF': latest_net_debt_to_FCF,
        'avg_stock_increase': avg_stock_increase,
        'avg_FCF_margin': avg_FCF_margin,
    }

## YFinance Price Data Retrieval

In [36]:
def get_yearly_close_and_latest_quote(ticker_symbol: str, num_periods: int) -> dict:
    """
    Retrieve the yearly closing prices for the last num_period complete fiscal years and the latest price quote.
    """
    today = datetime.today()
    current_year = today.year
    last_complete_year = current_year - 1
    first_complete_year = last_complete_year - num_periods  # may include one extra year
    start_date = f"{first_complete_year}-01-01"
    end_date = f"{last_complete_year}-12-31"
    
    ticker = yf.Ticker(ticker_symbol)
    hist = ticker.history(start=start_date, end=end_date)

    yearly_close = {}
    if not hist.empty:
        if not isinstance(hist.index, pd.DatetimeIndex):
            hist.index = pd.to_datetime(hist.index)
        # Resample to get the last closing price of each year
        resampled = hist['Close'].resample('YE').last()
        for ts, price in resampled.items():
            year = ts.year
            if first_complete_year <= year <= last_complete_year:
                yearly_close[year] = price

    latest_quote = ticker.info.get('regularMarketDayHigh') if ticker.info.get('regularMarketDayHigh') else ticker.info.get('previousClose')
    return {
        'yearly_close': yearly_close,
        'latest_quote': latest_quote
    }

## PER Ratios

In [37]:
def compute_per_ratios(ticker_symbol: str, quarterly_df: pd.DataFrame, annual_df: pd.DataFrame, num_period: int) -> dict:
    """
    Compute Price-to-Earnings (PER) ratios:
      - TTM PER: latest price / TTM EPS.
      - Latest PER: latest price / latest quarterly EPS.
      - Average PER: average of (yearly closing price / annual EPS) over the last num_period fiscal years.
      - Latest price quote.
    """
    eps_metrics = compute_eps_metrics(quarterly_df, annual_df, num_period)
    price_data = get_yearly_close_and_latest_quote(ticker_symbol, num_period)
    
    latest_price = price_data.get('latest_quote')
    ttm_eps = eps_metrics.get('ttm_eps')
    latest_quarterly_eps = eps_metrics.get('latest_quarterly_eps') * 4
    annual_eps = eps_metrics.get('annual_eps')  # dict: year -> eps
    yearly_close = price_data.get('yearly_close')  # dict: year -> close
    
    ttm_per = latest_price / ttm_eps if latest_price is not None and ttm_eps is not None and ttm_eps > 0 else None
    latest_per = latest_price / latest_quarterly_eps if latest_price is not None and latest_quarterly_eps is not None and latest_quarterly_eps > 0 else None
    
    per_values = []
    for year, eps in annual_eps.items():
        if eps is not None and eps > 0 and year in yearly_close:
            per_values.append(yearly_close[year] / eps)
    average_per = sum(per_values) / len(per_values) if per_values else None
    
    return {
        'ttm_per': ttm_per,
        'latest_per': latest_per,
        'average_per': average_per,
        'fiscal_date_ending': eps_metrics.get('fiscal_date_ending'),
        'latest_price': latest_price
    }

## Visualization

In [38]:
def visualize_metrics(metrics: dict, ticker: str, num_periods: int = 5) -> None:
    """
    Display the metrics in a formatted text table.
    """
    def format_value(value, percentage=False):
        if isinstance(value, (int, float)):
            return f"{value * 100:.2f}%" if percentage else f"{value:.2f}"
        return "N/A"
    
    output = f"""
--------------------------------------------
            {ticker} (last {num_periods} years)
--------------------------------------------
Metric                        Value
--------------------------------------------
Turnover Growth               {format_value(metrics.get('avg_turnover_growth'), percentage=True)}
FCF Growth                    {format_value(metrics.get('avg_FCF_growth'), percentage=True)}
ROCE (using FCF)              {format_value(metrics.get('avg_ROCE_using_FCF'), percentage=True)}
Net Debt/FCF                  {format_value(metrics.get('latest_net_debt_to_FCF'))}
Share Number Increase         {format_value(metrics.get('avg_stock_increase'), percentage=True)}
FCF Margin                    {format_value(metrics.get('avg_FCF_margin'), percentage=True)}

Average PER (5y)              {format_value(metrics.get('average_per'))}
PER TTM                       {format_value(metrics.get('ttm_per'))}
PER (latest quarter)          {format_value(metrics.get('latest_per'))}
Fiscal date ending            {metrics.get('fiscal_date_ending')}

Stock price                   {format_value(metrics.get('latest_price'))}
--------------------------------------------
"""
    print(output)

## Analyze Ticker Function

In [39]:
ticker = 'ASML'

In [40]:
def analyze_stock(ticker: str, api_key: str, num_periods: int = 5) -> None:
    # Fetch the data
    income_statement = fetch_income_statement(ticker, api_key)
    balance_sheet_data = fetch_balance_sheet(ticker, api_key)
    cash_flow_data = fetch_cash_flow(ticker, api_key)

    # Merge the reports
    merged_data = merge_reports_df(income_statement, balance_sheet_data, cash_flow_data)
    symbol = merged_data.get('symbol', ticker)
    annual_df = merged_data['annual']
    quarterly_df = merged_data['quarterly']

    # Compute metrics
    financial_metrics = compute_financial_metrics(annual_df, num_periods=num_periods)
    per_ratios = compute_per_ratios(ticker, quarterly_df, annual_df, num_period=num_periods)

    # Visualize the combined metrics
    combined_metrics = {**financial_metrics, **per_ratios, **{'analysis_date': datetime.now().strftime('%Y-%m-%d')}}
    visualize_metrics(combined_metrics, symbol, num_periods=num_periods)

    # Save combined metrics to a JSON file
    os.makedirs('analysis', exist_ok=True)
    file_path = os.path.join('analysis', f'{ticker}.json')
    with open(file_path, 'w') as f:
        json.dump(combined_metrics, f, indent=4)

analyze_stock(ticker, API_KEY)


--------------------------------------------
            ASML (last 5 years)
--------------------------------------------
Metric                        Value
--------------------------------------------
Turnover Growth               20.67%
FCF Growth                    27.06%
ROCE (using FCF)              26.82%
Net Debt/FCF                  N/A
Share Number Increase         -1.56%
FCF Margin                    29.34%

Average PER (5y)              46.26
PER TTM                       41.79
PER (latest quarter)          34.85
Fiscal date ending            2024-09-30

Stock price                   735.45
--------------------------------------------

