In [None]:
import requests
import bs4 as bs
import pandas as pd
import yfinance as yf
import numpy as np
from io import StringIO
import transformers
import torch
from huggingface_hub import login
import re

In [None]:
# @title headers for interacting with SEC EDGAR DB
headers = {
    "User-Agent": "oarnst04@gmail.com"
}

##Extract Financial Tables from 10-Q and 10-K Forms





In [None]:
# @title custom_string_similarity
def custom_string_similarity(choice, query):

    """
      Return the number of common words in each string

      Args:
          query (str): text from a filing
          choice (str): text the query is being compared to

      Returns:
          intersection (int): Number of words in both
    """

    choice_tokens = set(choice.lower().split(" "))
    query_tokens = set(query.lower().split(" "))
    intersection = len(choice_tokens.intersection(query_tokens))

    return intersection

In [None]:
# @title clean_df
def clean_df(statement, year, quarter):

  """
    Clean the extracted financial statement table

    Args:
      statement (pandas.DataFrame): A table of a financial statement
      year (str): Year the document was filed in
      quarter (str): Quarter the document was filed in

    Returns:
      cleaned_statement (pandasDataFrame): Cleaned version of the input
      year_end (int): The year in which the metrics are recorded
      month_end (int): The month in which the metrics are recorded

  """

  months = {"Jan.": 1, "Feb.": 2, "Mar.": 3, "Apr.": 4,
            "May": 5, "May.": 5, "Jun.": 6, "Jul.": 7, "Aug.": 8,
            "Sep.": 9, "Oct.": 10, "Nov.": 11, "Dec.": 12}

  try:
    # print(statement)
    months_end = ""
    year_end = ""
    statement = statement.drop(statement.index[-1])
    cleaned_statement = statement.iloc[:, 0]

    current_year = str(year)
    prev_year = str(int(year) - 1)
    done = False

    for col in statement.columns:

      if statement[col].dropna().shape[0] < 5:
        continue

      if done == True:
        break

      if type(col) == tuple:
        col_word_list = " ".join(col).split(" ")
      elif type(col) == str:
        col_word_list = col.split(" ")
      else:
        col_word_list = []

      for month in months:
        if month in col_word_list and (current_year in col_word_list or prev_year in col_word_list):
          month_end = months[month]
          if current_year in col_word_list:
            year_end = current_year
          else:
            year_end = prev_year
          cleaned_statement = pd.concat([cleaned_statement, statement[col]], axis=1)
          done = True
          break

    cleaned_statement.dropna(inplace=True)

    return cleaned_statement, int(year_end.strip()), int(month_end)

  except Exception as e:

    print("Error in clean_df: ", e)

    return

In [None]:
# @title get_statement_filepaths
def get_statement_filepaths(headers, dir):

  """
    Get paths to the financial statements stored in document directory

    Args:
      headers (dict): Required headers to access SEC EDGAR DB API
      dir (str): Path to a specific 10-Q/10-K/8-K document

    Returns:
      statement_path (dict): Path to income statement, balance sheet, and cash flow statement in the provided document
  """

  statement_filters1 = ["condensed", "consolidated", "statement", "statements"]

  income_statement_filters = ["income", "earnings", "profit loss", "operations"]
  balance_sheet_filters = ["balance sheet", "financial condition", "financial position"]
  cash_flow_filters = ["cash flow"]

  try:

    response = requests.get(dir + "/FilingSummary.xml", headers=headers)
    response.raise_for_status()

    statement_paths = {
        "income statement": ["", float('inf')],
        "balance sheet": ["", float('inf')],
        "cash flow": ["", float('inf')]
    }

    filing_summary_soup = bs.BeautifulSoup(response.text, "lxml-xml")
    for report in filing_summary_soup.find_all("Report"):

      short_name_tag = report.find("ShortName")
      short_name = short_name_tag.text.lower() if short_name_tag else None

      # print(short_name)

      if short_name and len(short_name.split(" ")) <= 12:

        html_file_tag = report.find("HtmlFileName")
        statement_path = html_file_tag.text if html_file_tag else None

        if any(term in short_name for term in statement_filters1):

          if any(term in short_name for term in income_statement_filters) and len(short_name) < statement_paths["income statement"][1] and "comprehensive" not in short_name:
            statement_paths["income statement"][0] = statement_path
            statement_paths["income statement"][1] = len(short_name)

          if any(term in short_name for term in balance_sheet_filters) and len(short_name) < statement_paths["balance sheet"][1] and "parenthetical" not in short_name:
            statement_paths["balance sheet"][0] = statement_path
            statement_paths["balance sheet"][1] = len(short_name)

          if any(term in short_name for term in cash_flow_filters) and len(short_name) < statement_paths["cash flow"][1]:
            statement_paths["cash flow"][0] = statement_path
            statement_paths["cash flow"][1] = len(short_name)


    return statement_paths

  except Exception as e:

    print("Error in get_statement_filepaths: ", e)

    return

In [None]:
# @title extract_statement_df
def extract_statement_df(headers, dir, path, year, quarter):

  """
    Get paths to the financial statements stored in document directory

    Args:
      headers (dict): Required headers to access SEC EDGAR DB API
      dir (str): Path to a specific 10-Q/10-K document
      path (str): Path to a statement table within dir
      year (str): Year the document was filed in
      quarter (str): Quarter the document was filed in

    Returns:
      df_statement (pandas.DataFrame): Extracted and cleaned dataframe of our statement
      year_end (int): The year in which the metrics are recorded
      month_end (int): The month in which the metrics are recorded
  """

  try:

    response = requests.get(dir + f"/{path}", headers=headers)
    response.raise_for_status()

    soup_statement = bs.BeautifulSoup(response.text, "lxml-xml")
    table_statement = soup_statement.find("table")
    df_statement = pd.read_html(StringIO(str(table_statement)))[0]
    df_statement, months_end, year_end = clean_df(df_statement, year, quarter)

    return df_statement, year_end, months_end

  except Exception as e:

    print("Error in extract_statement_df: ", e)
    return

In [None]:
# @title test_statement_extraction
def test_statement_extraction(headers, dir, year, quarter):

  """
    Test the extraction of the income, balance sheet, and cash flow statements for the given year and quarter

    Args:
      headers (dict): Required headers to access SEC EDGAR DB API
      dir (str): Path to a specific 10-Q/10-K document
      year (str): Year the document was filed in
      quarter (str): Quarter the document was filed in

    Returns:
      statement_tables (dict): dictionary of dataframes containing income, balance sheet, and cash flow statements
      year_end (int): The year in which the metrics are recorded
      month_end (int): The month in which the metrics are recorded
  """

  try:

    statement_paths = get_statement_filepaths(headers, dir)
    # print(statement_paths)
    statement_tables = dict()
    months_end = ""
    for statement, path_info in statement_paths.items():
      path = path_info[0]
      statement_tables[statement], year_end, month_end  = extract_statement_df(headers, dir, path, year, quarter)

    if len(statement_tables["income statement"].shape) != 2:
      raise Exception("Income statement is missing information")

    if len(statement_tables["balance sheet"].shape) != 2:
      raise Exception("Balance sheet is missing information")

    if len(statement_tables["cash flow"].shape) != 2:
      raise Exception("Cash flow statement is missing information")

    return statement_tables, year_end, month_end

  except Exception as e:

    print("Failed")
    print(f"Error: {e}" )
    print(dir, year, quarter)

    return False, False, False

In [None]:
# statements, _, _ = test_statement_extraction(headers, "https://www.sec.gov/Archives/edgar/data/320187/000032018716000242", 2016, 1)

In [None]:
# statements["balance sheet"]

Unnamed: 0,Unaudited Condensed Consolidated Balance Sheets - USD ($) $ in Millions,"Nov. 30, 2015"
1,Cash and equivalents (Note 4),"$ 3,851"
2,Short-term investments (Note 4),2265
3,"Accounts receivable, net",3437
4,Inventories (Note 2),4600
5,Deferred income taxes (Note 7),405
6,Prepaid expenses and other current assets (Not...,2197
7,Total current assets,16755
8,"Property, plant and equipment, net",3235
9,"Identifiable intangible assets, net",281
10,Goodwill,131


## Process Financial Tables to Create Tables of Key Financial Metrics and Ratios

In [None]:
# @title extract_values
def extract_values(response):
    """
    Extract the values returned by the LLM

    Args:
      response (str): LLM response

    Returns:
      values (dict): Extracted value for each category
    """

    values = {
        "Basic EPS": None,
        "Total Common Shares Outstanding": None,
        "Total Assets": None,
        "Total Liabilities": None,
        "Dividends": None,
        "Revenue": None,
        "Net Income": None,
        "Capital Expenditure": None,
        "Operating Cash Flow": None,
    }

    for key in values:
        match = re.search(f"{key}: (.+)", response)
        if match:
            values[key] = match.group(1)

    return values

In [None]:

# @title extract_metrics
def extract_metrics(prompt, ex_prompt, ex_answer, explanation):
    """
    Prompt LLM to extract basic, total assets, total liabilities, dividends paid, and total shares outstanding

    Args:
      prompt (str): Filtered rows from financial statements to extract from
      ex_prompt (str): An example prompt
      ex_answer (str): Correct answer to the example prompt

    Returns:
      values (dict): Extracted values
    """

    message = [
        {
            "role": "system",
            "content": f"""

      You are a financial analyst trying to help the user extract information. For each of the following requested categories, carefully review the provided options
      and extract the numerical value that corresponds to the category.

      IMPORTANT: Your response should contain values found in the user's prompt.

      Categories:

      Basic EPS: Use the basic earnings per share row. It should be a smaller float value.
      Total Common Shares Outstanding: Look for the most up-to-date information on common shares issued and/or outstanding. Authorized shares are not important.
      Total Assets: Use the 'total assets' row
      Total Liabilities: Use the 'total liabilities' row
      Dividends: Look for dividends paid on common stock
      Revenue: Look for 'revenue' row. If there is no revenue row, look for 'sales' row
      Net Income: Look for 'Net Income' row
      Capital Expenditure: Look for either capital expenditure row or purchases of plant, property, and equipment row
      Operating Cash Flow: Look for net cash flow for operating activities row

      ---Example---

      Example Prompt:

      {ex_prompt}

      Example Answer:

      {ex_answer}

      Explanation:

      {explanation}

      ---End of Example---

      IMPORTANT: Do not use any data from the example. It is to help you extract the relevant information.

      Only provide a single numerical value for each category. Your output should be structured in the following format, containing nothing else (9 lines in total):

      Basic EPS: <Number>
      Total Common Shares Outstanding: <Number>
      Total Assets: <Number>
      Total Liabilities: <Number>
      Dividends: <Number>
      Revenue: <Number>
      Net Income: <Number>
      Capital Expenditure: <Number>
      Operating Cash Flow: <Number>

      """,
        },
        {"role": "user", "content": prompt},
    ]

    outputs = pipe(
        message,
        max_new_tokens=256,
    )

    response = outputs[0]["generated_text"][-1]["content"]
    values = extract_values(response)

    return list(values.values())

In [None]:
# @title stringify_terms
def stringify_terms(terms_dict):
  prompt = ""
  idx = 1
  for key, value in terms_dict.items():
    prompt += f"{idx}. {key}: {value}\n"
    idx += 1
  return prompt

In [None]:
# @title generate_prompt
def generate_prompt(statements, omit_shares, omit_capex):

  """
    Filter rows from financial statements and generate a prompt for the LLM to extract values from

    Args:
      statements (dict): Income, balance sheet, and cash flow statements

    Returns:
      prompt (str): prompt for the LLM
  """

  # Alter filters if needed

  basic_eps_filters1 = ["basic", "net income", "net earnings", "net loss", "earnings", "profit"]
  basic_eps_filters2 = ["per share", "eps", "per common share"]

  assets_filters = ["total assets"]

  liabilities_filters = ["total liabilities", "equity", "stockholders"]

  total_shares_filters1 = ["common stock", "shares"]
  total_shares_filters2 = ["outstanding", "issued"]

  dividends_filters = ["dividends", "dividend"]

  revenue_filters = ["revenue", "sales"]

  net_income_filters = ["net operating income", "net income", "net loss", "net earnings", "profit"]

  capital_expenditure_filters1 = ["capital expenditure", "capital expenditures"]
  capital_expenditure_filters2 = ["plant", "property", "equipment", "ppe"]
  capital_expenditure_filters3 = ["addition", "additions", "purchase", "purchases", "acquisition", "acquisitions", "payment", "payments", "investment", "investments"]
  operating_cash_flow_filters1 = ["net cash", "cash"]
  operating_cash_flow_filters2 = ["operations", "operating"]


  basic_eps_terms = {}
  assets_terms = {}
  liabilities_terms = {}
  total_shares_terms = {}
  dividends_terms = {}
  revenue_terms = {}
  net_income = ("", "")
  capital_expenditure_terms = {}
  operating_cash_flow_terms = {}

  total_assets = 0
  loss = False
  total_liabilities_present = False

  prompt = f"Titles:\n\n"

  for statement in statements.values():
    if type(statement.columns[0]) == tuple:
      col = statement.columns[0][0]
    else:
      col = statement.columns[0]
    prompt += f"{col}\n"

  income_statement = statements["income statement"]
  balance_sheet = statements["balance sheet"]
  cash_flow = statements["cash flow"]

  for row in income_statement.itertuples():

    val = str(row[2]).replace("(", "").replace(")", "").replace(",", "").replace("$", "").replace(" ", "")
    category = row[1].lower().strip().replace("(", "").replace(")", "").replace(",", "").replace("$", "")

    if (any(term in category for term in basic_eps_filters1) and any(term in category for term in basic_eps_filters2)):
      if "." in val and category not in basic_eps_terms:
        if loss:
            basic_eps_terms[category.replace('loss', '').strip()] = f"-{val}"
        else:
            basic_eps_terms[category] = val

    if any(term in category for term in revenue_filters):
      revenue_terms[category] = val

    if any(term in category for term in net_income_filters) and (len(category) < len(net_income[0]) or net_income[0] == ""):
      if "loss" in category:
          net_income = (category, f"-{val}")
          loss = True
      else:
          net_income = (category, val)
          loss = False

    if "shares" in category or category == "basic":
      if omit_shares:
        total_shares_terms[category] = ""
      else:
        total_shares_terms[category] = val

  for row in balance_sheet.itertuples():

    val = str(row[2]).replace("(", "").replace(")", "").replace(",", "").replace("$", "").replace(" ", "")
    category = row[1].lower().strip().replace("(", "").replace(")", "").replace(",", "").replace("$", "")

    if any(term in category for term in assets_filters) or category == "assets":
      if category not in assets_terms:
        total_assets = round(float(val))
        assets_terms[category] = total_assets

    if not total_liabilities_present:
      if any(term in category for term in liabilities_filters) or category == "liabilities":
        if category == "total liabilities" or category == "liabilities":
          total_liabilities_present = True
          liabilities_terms = {"total liabilities": val}
        liabilities_terms[category] = val

    if (any(term in category for term in total_shares_filters1) and any(term in category for term in total_shares_filters2)):
      if omit_shares:
        total_shares_terms[category] = ""
      else:
        total_shares_terms[category] = val


  for row in cash_flow.itertuples():

    val = str(row[2]).replace("(", "").replace(")", "").replace(",", "").replace("$", "").replace(" ", "")
    category = row[1].lower().strip().replace("(", "").replace(")", "").replace(",", "").replace("$", "")

    if any(term in category for term in dividends_filters) and ("payable" not in category):
      dividends_terms[category] = val

    if any(term in category for term in capital_expenditure_filters1) or (any(term in category for term in capital_expenditure_filters2) and any(term in category for term in capital_expenditure_filters3)):
      if omit_capex:
        capital_expenditure_terms[category] = ""
      else:
        capital_expenditure_terms[category] = val

    if any(term in category for term in operating_cash_flow_filters1) and any(term in category for term in operating_cash_flow_filters2):
      if "financing" not in category and "investing" not in category:
        operating_cash_flow_terms[category] = val

  if not total_liabilities_present:
    total_liabilities = 0
    for term in liabilities_terms:
      val = round(float(liabilities_terms[term]))
      if "total" in term and ("equity" in term or "stockholders" in term) and val != total_assets:
        if "deficit" in term or "loss" in term:
          total_liabilities = total_assets + val
        else:
          total_liabilities = total_assets - val

    liabilities_terms = {"total liabilities": total_liabilities}

  prompt += "\n\nHere are the options for each category:\n\n"

  prompt += f"""
Basic EPS Options:
{stringify_terms(basic_eps_terms)}
Total Assets Options:
{stringify_terms(assets_terms)}
Total Liabilities Options:
{stringify_terms(liabilities_terms)}
Total Common Shares Outstanding Options:
{stringify_terms(total_shares_terms)}
Dividends Options:
{stringify_terms(dividends_terms) if stringify_terms(dividends_terms) != '' else '1. dividends paid: 0'}\n
Revenue Options:
{stringify_terms(revenue_terms)}
Net Income Options:
1. {net_income[0]}: {net_income[1]}\n
Capital Expenditure:
{stringify_terms(capital_expenditure_terms) if stringify_terms(capital_expenditure_terms) != '' else '1. capital expenditure: 0'}\n
Operating Cash Flow:
{stringify_terms(operating_cash_flow_terms)}
"""

  return prompt

In [None]:
# print(generate_prompt(statements, False, False))

Titles:

Unaudited Condensed Consolidated Statements Of Income - USD ($)  $ in Millions
Unaudited Condensed Consolidated Balance Sheets - USD ($)  $ in Millions
Unaudited Condensed Consolidated Statements of Cash Flows - USD ($)  $ in Millions


Here are the options for each category:


Basic EPS Options:
1. basic in dollars per share: 0.46

Total Assets Options:
1. total assets: 22583

Total Liabilities Options:
1. total liabilities: 9178

Total Common Shares Outstanding Options:

Dividends Options:
1. dividends — common and preferred: 479


Revenue Options:
1. revenues: 7686
2. cost of sales: 4185

Net Income Options:
1. net income: 785

Capital Expenditure:
1. additions to property plant and equipment: 615
2. non-cash additions to property plant and equipment: 201


Operating Cash Flow:
1. cash provided by operations: 1036




In [None]:
# @title get_share_price
def get_adjusted_share_prices(ticker):

  """
    Get the price per share of a company at the end of the given month in the given year

    Args:
      ticker (str): ticker symbol of the company
      year_end (int): year to search
      month_end (int): month to search

    Returns:d
      price_per_share (float): The share price at the end of the given month in the given year
  """

  # create your own free api key: https://www.alphavantage.co/support/#support

  alpha_vantage_key = "4SHH6RWLX44J8ZJZ"

  # replace the "demo" apikey below with your own key from https://www.alphavantage.co/support/#api-key
  url = f'https://www.alphavantage.co/query?function=TIME_SERIES_MONTHLY_ADJUSTED&symbol={ticker}&apikey={alpha_vantage_key}'
  r = requests.get(url)
  data = r.json()

  adjusted = data["Monthly Adjusted Time Series"]
  adjusted_prices_df = pd.DataFrame(columns=["Year", "Month", "Price"])
  for key, value in adjusted.items():
    year, month, day = key.split("-")
    year = int(year)
    month = int(month)
    adjusted_prices_df.loc[len(adjusted_prices_df)] = [year, month, value['5. adjusted close']]

  return adjusted_prices_df

In [None]:
# @title get_answer
def get_answer(eps, shares, assets, liabilities, dividends, revenue, income, capex, op_cash_flow):

  """
    generates an example answer LLM should follow

    Args:
      income (str)
      shares (str)
      assets (str)
      liabilities (str)
      dividends (str)

    Returns:
      answer (str): transformed input
  """

  answer = f"""
Basic EPS: {eps}
Total Common Shares Outstanding: {shares}
Total Assets: {assets}
Total Liabilities: {liabilities}
Dividends: {dividends}
Revenue: {revenue}
Net Income: {income}
Capital Expenditure: {capex}
Operating Cash Flow: {op_cash_flow}
"""

  return answer

In [None]:
# @title get_example
def get_example(ticker, headers=headers):

  """
    generates an example prompt and answer for LLM to follow

    Args:
      ticker (str): the company being processed

    Returns:
      company_filings (pd.DataFrame): Quarterly filings associated with ticker
      ex_prompt (str): example prompt
      ex_answer (str): example answer
      shares_scale (int: magnitude to scale shares
      dollars_scale (int): magnitude to scale dollars

  """

  scales = {
      "N": 1,
      "T": 1000,
      "M": 1000000,
      "B": 1000000000
  }

  filings_df = pd.read_csv("quarterly_and_annual_forms.csv")
  company_filings = filings_df[filings_df["Ticker"] == ticker]

  url = company_filings["URLs"].iloc[0]
  year = company_filings["Year"].iloc[0]
  quarter = company_filings["Quarter"].iloc[0]

  first_statements, _, _ = test_statement_extraction(headers, url, year, quarter)

  ex_prompt = generate_prompt(first_statements, False, False)
  print(f"Here is the example prompt:\n\n{ex_prompt}\n\n")

  omit_shares = input("Do you want to omit the right hand side of common shares options? (y/n)\n")
  if omit_shares.lower() == "y":
    omit_shares = True
  else:
    omit_shares = False
  omit_capex = input("Do you want to omit the right hand side of capital expenditure options? (y/n)\n")
  if omit_capex.lower() == "y":
    omit_capex = True
  else:
    omit_capex = False

  ex_prompt = generate_prompt(first_statements, omit_shares, omit_capex)
  print(ex_prompt)

  basic_eps = input("Enter Basic EPS: ")
  assets = input("Enter total assets: ")
  liabilities = input("Enter total liabilities: ")
  shares = input("Enter total common shares outstanding: ")
  dividends = input("Enter dividends paid: ")
  revenue = input("Enter revenue: ")
  net_income = input("Enter net income: ")
  capex = input("Enter capital expenditure (may appear as plant, property, and equipment): ")
  op_cash_flow = input("Enter net operating cash flow: ")

  shares_scale = scales[input("Shares scale (N/T/M/B): ").upper()]
  dollars_scale = scales[input("Dollars scale (N/T/M/B): ").upper()]

  ex_answer = get_answer(basic_eps, shares, assets, liabilities, dividends, revenue, net_income, capex, op_cash_flow)

  return company_filings, ex_prompt, ex_answer, shares_scale, dollars_scale, omit_shares, omit_capex

In [None]:
# @title generate_metrics_df
def generate_metrics_df(ticker, filings, ex_prompt, ex_answer, explanation, omit_shares, omit_capex):

  """
    Generate a dataframe of key financial metrics for each filing

    Args:
      filings (pd.DataFrame): Quarterly filings for a company
      ex_prompt (str): Prompt for first filing
      month_end (int): Example answer for first filing

    Returns:
      metrics_df (pd.DataFrame): dataframe of key financial metrics for each filing
      prompts (list): all prompts
  """

  ready_info_df = pd.DataFrame(columns=["CompanyID", "Year", "Quarter", "Form_Type", "Adjusted_Price_Per_Share"])
  extracted_info_df = pd.DataFrame(columns=["Basic_EPS", "Total_Common_Shares_Outstanding", "Total_Assets", "Total_Liabilities",
                                            "Dividends", "Revenue", "Net_Income", "Capital_Expenditure", "Operating_Cash_Flow"])
  adjusted_price_per_share = get_adjusted_share_prices(ticker)

  prompts = []

  for i in range(len(filings)):

    url = filings.iloc[i]["URLs"]
    quarter = filings.iloc[i]["Quarter"]
    year = filings.iloc[i]["Year"]
    ticker = filings.iloc[i]["Ticker"]
    company_id = filings.iloc[i]["Company ID"]
    form_type = filings.iloc[i]["Form Type"]


    statements, month_end, year_end = test_statement_extraction(headers, url, year, quarter)

    prompt = generate_prompt(statements, omit_shares, omit_capex)
    metrics = extract_metrics(prompt, ex_prompt, ex_answer, explanation)

    if year_end not in adjusted_price_per_share["Year"].values:
      year_end = int(year_end) + 1
      month_end = 1
    pps_row = adjusted_price_per_share[(adjusted_price_per_share["Year"] == year_end) & (adjusted_price_per_share["Month"] == month_end)]
    price_per_share = pps_row["Price"].iloc[0]

    ready_info_df.loc[len(ready_info_df)] = [company_id, year, quarter, form_type, price_per_share]
    extracted_info_df.loc[len(extracted_info_df)] = metrics
    prompts.append(prompt)

    print(f"{i}/{len(filings)-1}")

  metrics_df = pd.concat([ready_info_df, extracted_info_df], axis=1)

  return metrics_df, prompts


In [None]:
def scale_rows(metrics, which_to_scale, scale, start, end):
  for i in range(start, end + 1):

    if which_to_scale == "dollars":

        metrics.loc[i, "Total_Assets"] = (
            float(metrics["Total_Assets"].loc[i]) * scale
        )
        metrics.loc[i, "Total_Liabilities"] = (
            float(metrics["Total_Liabilities"].loc[i]) * scale
        )
        metrics.loc[i, "Dividends"] = (
            float(metrics["Dividends"].loc[i]) * scale
        )
        metrics.loc[i, "Revenue"] = (
            float(metrics["Revenue"].loc[i]) * scale
        )
        metrics.loc[i, "Net_Income"] = (
            float(metrics["Net_Income"].loc[i]) * scale
        )
        metrics.loc[i, "Capital_Expenditure"] = (
            float(metrics["Capital_Expenditure"].loc[i]) * scale
        )
        metrics.loc[i, "Operating_Cash_Flow"] = (
            float(metrics["Operating_Cash_Flow"].loc[i]) * scale
        )

    else:
        metrics.loc[i, "Total_Common_Shares_Outstanding"] = (
            float(metrics["Total_Common_Shares_Outstanding"].loc[i]) * scale
        )

  return metrics


In [None]:
# @title extract_and_process_metrics
def extract_and_process_metrics(ticker):

    filings, ex_prompt, ex_answer, shares_scale, dollars_scale, omit_shares, omit_capex = get_example(ticker)
    explanation = input("Explain how you got the answer (briefly)\n")

    metrics, prompts = generate_metrics_df(ticker, filings, ex_prompt, ex_answer, explanation, omit_shares, omit_capex)

    print(metrics)
    print(metrics["Total_Common_Shares_Outstanding"])
    print(metrics["Total_Assets"])

    print("\nCheck the table fix any discrepancies\n")

    while True:

        user_input = input("\nDoes everything look good? (y/n)\n")
        if user_input == "y":
            break

        type_of_change = input(
            "\nDo you want to scale rows (s) or change their value (c)?\n"
        )

        if type_of_change == "s":

            which_to_scale = input("\nWhich do you want to scale? (shares/dollars)\n")

            start = int(input("\nStart of rows to scale:\n"))

            while True:
              end = int(input("\nEnd of rows to scale:\n"))
              if end >= len(metrics) or end < 0:
                print("Invalid input")
              else:
                break


            scale = float(input("\nHow to scale:"))

            scale_rows(metrics, which_to_scale, scale, start, end)

        else:

            which_to_change = input(
                "\nWhich category do you want to change? (eps, assets, liab, shares, div, rev, capex, income, op_cash)\n"
            ).lower()

            while True:
              row_num = int(input("\nWhich row do you want to change: "))
              if row_num >= len(metrics) or row_num < 0:
                print("Invalid input")
              else:
                break

            print(prompts[row_num])

            new_val = (
                input(
                    f"\nEnter the correct value for category {which_to_change} in row {row_num} (type 'q' to quit): "
                )
            )
            if new_val == "q":
                break
            else:
              if "." in new_val:
                new_val = float(new_val)
              else:
                new_val = int(new_val)

            if which_to_change == "eps":
                metrics.loc[row_num, "Basic_EPS"] = new_val
            elif which_to_change == "assets":
                metrics.loc[row_num, "Total_Assets"] = new_val
            elif which_to_change == "liab":
                metrics.loc[row_num, "Total_Liabilities"] = new_val
            elif which_to_change == "shares":
                metrics.loc[row_num, "Total_Common_Shares_Outstanding"] = new_val
            elif which_to_change == "div":
                metrics.loc[row_num, "Dividends"] = new_val
            elif which_to_change == "rev":
                metrics.loc[row_num, "Revenue"] = new_val
            elif which_to_change == "capex":
                metrics.loc[row_num, "Capital_Expenditure"] = new_val
            elif which_to_change == "income":
                metrics.loc[row_num, "Net_Income"] = new_val
            else:
                metrics.loc[row_num, "Operating_Cash_Flow"] = new_val

        print(metrics)
        print(metrics["Total_Common_Shares_Outstanding"])
        print(metrics["Total_Assets"])

    metrics = scale_rows(metrics, "shares", shares_scale, 0, len(metrics)-1)
    metrics = scale_rows(metrics, "dollars", dollars_scale, 0, len(metrics)-1)

    return metrics

In [None]:
# @title convert_to_float
def convert_to_float(val):

  """
    converts numerical strings with commas into a float datatype

    Args:
      val (any): value to convert into a float

    Returns:
      val (float): transformed input
  """

  val = str(val).replace(",", "")
  return float(val)

In [None]:
# @title calculate_ratios
def calculate_ratios(metrics):
    """
    Generate a dataframe of key financial ratios for each row in metrics

    Args:
      metrics (pd.DataFrame): Financial metrics for quarterly form(s)
      shares_scale (int): Amount to scale share values by
      dollars_scale (int): Amount to scale dollar values by

    Returns:
      ratios_df (pd.DataFrame): dataframe of key financial ratios
    """

    trailing_basic_eps = []
    trailing_dividends = []
    ratios_df = pd.DataFrame(
        columns=[
            "CompanyID",
            "Year",
            "Quarter",
            "Price_To_Earnings_Ratio",
            "Price_To_Book_Ratio",
            "Debt_To_Equity_Ratio",
            "Dividend_Yield",
            "Free_Cash_Flow",
        ]
    )

    initial = 0
    for i in range(0, len(metrics)):

        company_id = metrics["CompanyID"].iloc[i]
        year = metrics["Year"].iloc[i]
        quarter = metrics["Quarter"].iloc[i]
        form_type = metrics["Form_Type"].iloc[i]

        price_per_share = convert_to_float(metrics["Adjusted_Price_Per_Share"].iloc[i])
        basic_eps = convert_to_float(metrics["Basic_EPS"].iloc[i])
        assets = convert_to_float(metrics["Total_Assets"].iloc[i])
        liabilities = convert_to_float(metrics["Total_Liabilities"].iloc[i])
        dividends = convert_to_float(metrics["Dividends"].iloc[i])
        shares = convert_to_float(metrics["Total_Common_Shares_Outstanding"].iloc[i])
        capex = convert_to_float(metrics["Capital_Expenditure"].iloc[i])
        op_cash_flow = convert_to_float(metrics["Operating_Cash_Flow"].iloc[i])

        market_cap = shares * price_per_share

        if initial < 3:
            if form_type == "10-K" and quarter != 1:
                continue
            trailing_basic_eps.append(basic_eps)
            trailing_dividends.append(dividends)
            initial += 1
            continue

        if form_type == "10-K":
            if basic_eps - sum(trailing_basic_eps) < 0:
                trailing_basic_eps.append(0)
            else:
                trailing_basic_eps.append(basic_eps - sum(trailing_basic_eps))
            if dividends - sum(trailing_dividends) < 0:
                trailing_dividends.append(0)
            else:
                trailing_dividends.append(dividends - sum(trailing_dividends))
        else:
            trailing_basic_eps.append(basic_eps)
            trailing_dividends.append(dividends)


        pe_ratio = price_per_share / sum(trailing_basic_eps)
        trailing_basic_eps.pop(0)

        book_value = assets - liabilities

        if book_value == 0:
            pb_ratio = np.nan
            de_ratio = np.nan
        else:
            pb_ratio = market_cap / book_value
            de_ratio = liabilities / book_value

        dividends_per_share = dividends / shares
        div_yield = (dividends_per_share / price_per_share) * 100

        fcf = op_cash_flow - capex

        ratios_df.loc[len(ratios_df)] = [
            company_id,
            year,
            quarter,
            pe_ratio,
            pb_ratio,
            de_ratio,
            div_yield,
            fcf,
        ]

    ratios_df["Quarter"] = ratios_df["Quarter"].astype(int)
    ratios_df["Year"] = ratios_df["Year"].astype(int)
    ratios_df["CompanyID"] = ratios_df["CompanyID"].astype(int)

    return ratios_df

## Script (Make sure you have quarterly_and_annual_forms.csv) imported from google drive

In [None]:
if __name__ == "__main__":

    HF_token = "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"
    login(token=HF_token)

    model_id = "meta-llama/Llama-3.2-1B-Instruct"
    pipe = transformers.pipeline(
        "text-generation",
        model=model_id,
        torch_dtype=torch.bfloat16,
        device_map="auto",
    )

    while True:

        try:

            ticker = input(
                "Enter the ticker of the company you want to analyze (all caps) or enter 'q' to quit:\n"
            )

            if ticker == "q":
              break

            metrics = extract_and_process_metrics(ticker)
            metrics.to_csv(f"metrics/{ticker}_metrics.csv")

            print(metrics)

            ratios = calculate_ratios(metrics)
            ratios.to_csv(f"ratios/{ticker}_ratios.csv")

            print(ratios)

            torch.cuda.empty_cache()

        except Exception as e:

            print(f"Error: {e}")
            continue

The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


config.json:   0%|          | 0.00/877 [00:00<?, ?B/s]

model.safetensors:   0%|          | 0.00/2.47G [00:00<?, ?B/s]

generation_config.json:   0%|          | 0.00/189 [00:00<?, ?B/s]

tokenizer_config.json:   0%|          | 0.00/54.5k [00:00<?, ?B/s]

tokenizer.json:   0%|          | 0.00/9.09M [00:00<?, ?B/s]

special_tokens_map.json:   0%|          | 0.00/296 [00:00<?, ?B/s]