In [1]:
#globals
MY_API_KEY = "insert_your_OpenAI_API_key_here_if_you_don't_want_to_set_it_as_an_environmental_var"

test_url = "https://www.sec.gov/Archives/edgar/data/1373715/000137371520000072/now-20191231x10kxinpro.htm"

MANDATORY_FIELDS_BS = ['asset', 'cash', 'share', 'liabilit', 'equit', 'par val']
MAX_DISTANCE_BS = 2000 
MANDATORY_FIELDS_INCOME = ['net income', 'share', 'diluted']
MAX_DISTANCE_INCOME = 2000
MANDATORY_FIELDS_CASHOPERATING = ['operating activit']
MAX_DISTANCE_CASHOPERATING = 2000
PLAY_NICE = 1
GPT_ATTEMPTS = 3
val_dict = { #dict to hold values retrieved from 10-K or 10-Q form
    'sums': {
        'cash': None, 'debt': None, 'earnings': None, 'EPS': None, 'earn_months': None, 'cash_op': None, 'cash_op_months': None
        }, 
    'share_cnt': {
        'authorized': None, 'issued': None, 'outstanding': None
        }
        }


In [2]:
#import libraries for global use
import os
import requests
from bs4 import BeautifulSoup
from openai import OpenAI
import numpy as np
import time
from collections import Counter

In [3]:
#get form content from EDGAR

try:
    client = OpenAI(api_key=os.environ.get("OPENAI_API_KEY"))
except:
    client = OpenAI(api_key=MY_API_KEY)


headers = {
    "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:91.0) Gecko/20100101 Firefox/91.0",
    "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8",
    "Accept-Language": "en-US,en;q=0.5",
    "Accept-Encoding": "gzip, deflate, br",
    "Referer": "https://www.sec.gov",
    "Connection": "keep-alive",
    "Upgrade-Insecure-Requests": "1",
    "DNT": "1",  
    "Sec-Fetch-Dest": "document",
    "Sec-Fetch-Mode": "navigate",
    "Sec-Fetch-Site": "same-origin",
    "Sec-Fetch-User": "?1"
}

try:
    response = requests.get(test_url, headers=headers)

except requests.exceptions.ConnectionError:
    print("Error: No internet connection.")
except requests.exceptions.HTTPError as http_err:
    print(f"HTTP error occurred: {http_err}")
except requests.exceptions.RequestException as err:
    print(f"An error occurred: {err}")

if not response.ok:
    print(response.status_code)


In [4]:
#get relevant text blocks from form

soup = BeautifulSoup(response.content, 'html.parser')

all_text = soup.text.replace('\xa0', ' ')

#find instances of 'balance sheet(s)' in all-text version of soup:
balance_sheets_indices = [at for at in range(len(all_text)) if all_text[at:at+len("BALANCE SHEET")] == "BALANCE SHEET"]
net_income_indices = [
    at for at in range(len(all_text)) 
    if all_text[at:at+len('STATEMENT OF OPERATION')] == 'STATEMENT OF OPERATION'
    or all_text[at:at+len('STATEMENTS OF OPERATION')] == 'STATEMENTS OF OPERATION'
    or all_text[at:at+len('STATEMENTS OF COMPREHENSIVE INCOME')] == 'STATEMENTS OF COMPREHENSIVE INCOME'
    or all_text[at:at+len('STATEMENT OF COMPREHENSIVE INCOME')] == 'STATEMENT OF COMPREHENSIVE INCOME'
    or all_text[at:at+len('STATEMENTS OF INCOME')] == 'STATEMENTS OF INCOME'
    or all_text[at:at+len('STATEMENT OF INCOME')] == 'STATEMENTS OF INCOME'
    ]

cash_operating_indices = [
    at for at in range(len(all_text))
    if all_text[at:at+len("CASH FLOW")] == 'CASH FLOW'
    or all_text[at:at+len("CASH BALANCE")] == 'CASH BALANCE'
]

balance_sheet_text = ''
net_income_text = ''
cash_operating_text = ''

#look for mandatory fields after each time you encounter 'balance sheet':
for index in balance_sheets_indices:
    start_index = max(0, index)
    end_index = min(len(all_text), index + MAX_DISTANCE_BS)   
    if all(x.lower() in all_text[start_index:end_index].lower() for x in MANDATORY_FIELDS_BS):
        balance_sheet_text += all_text[start_index:end_index] + "\n"

#look for mandatory fields after each time you encounter 'net income':
for index in net_income_indices:
    start_index = max(0, index)
    end_index = min(len(all_text), index + MAX_DISTANCE_INCOME)   
    if all(x.lower() in all_text[start_index:end_index].lower() for x in MANDATORY_FIELDS_INCOME):
        net_income_text += all_text[start_index:end_index] + "\n"

for index in cash_operating_indices:
    start_index = max(0, index)
    end_index = min(len(all_text), index + MAX_DISTANCE_CASHOPERATING)   
    if all(x.lower() in all_text[start_index:end_index].lower() for x in MANDATORY_FIELDS_CASHOPERATING):
        cash_operating_text += all_text[start_index:end_index] + "\n"    

print(len(balance_sheet_text), len(net_income_text), len(cash_operating_text)) #sanity check


2001 2001 2001


In [5]:
#function to get sums (in millions of USDs) from requested fields; returns None in case of error / no value found
def get_sums(text, requested_values, var_names):

    counter = 0

    global val_dict #used here for reading, but not for writing!

    input_len = len(requested_values.split(';')) #number of values requested by user
    var_names = tuple([name.strip() for name in var_names.split(';')])
    if len(var_names) != input_len:
        raise ValueError(f"Wrong number of titles for the values requested ({len(var_names)} and {input_len}, respectively) - make sure to separate titles with ';' !")
    
    for name in var_names:
        if name not in val_dict['sums'].keys():
            raise ValueError(f"Requested value name {name} not in val_dict['sums'].keys(): {val_dict['sums'].keys()}")
        
    while True: 

        try:

            completion = client.chat.completions.create(
                model="gpt-4o",
                messages=[
                    {"role": "system", "content": f"""You are to help the user with extracting financial information from reports submitted by public companies to the SEC. 
You will recognize the header of this section (hereafter: 'header') by the fact that it is written in all caps.
You will be asked to extract specific values the provided text {text}, under the header, for the latest period reported. 
Your answer should include only a number, in in millions of US dollars. Be aware that the specification related to units might not be placed directly next to the sum - it is usually specified soon after the header.
Therefore, if it is specified that values are given in thousands, then the reported sum(s) should be divided by 1000.
When calculating sums, do not include in the calculation any items that were not specified by the user. 

If the user asks for several values (separated by ';'), return all requested values separated by ';'. 
If a value is within parentheses (e.g., '(X)'), then return it as a negative value (e.g., '-X').
For each requested value, if you fail to find the answer or are not confident about it, return 'NA'. However, if you are confident that the requested value is not reported, return '0'.
Otherwise, return numerical value only (in millions of US dollars), without text or symbols. Do not comment on the values or explain them in text.
                    """},
                    {"role": "user", "content": f"""Return the values of the following fields:
                    {requested_values}"""}
                ]
                )
            
            break
        
        except OpenAI.InternalServerError:
            counter += 1
            if counter == GPT_ATTEMPTS:
                raise OpenAI.InternalServerError ("Could not reach OpenAI server, try again later")
            time.sleep(PLAY_NICE)

    
    gpt_output = completion.choices[0].message.content
    #print(gpt_output) #TEST

    try:            
        output_list = gpt_output.split(';')
        if len(output_list) != input_len:
            #print(f"len(output_list) != input_len: {len(output_list)} != {input_len} ") #TEST
            return
        
        returned_list = []
        for value in output_list:
            value = value.strip()
            if value == 'NA': 
                returned_list.append(None)
            else:
                 returned_list.append(float(value.replace(',', '')))

        return returned_list

    except:
        return

In [6]:
#repeat each prompt to increase confidence in the response given - specific for the sums dict
def repeat_gpt_sums(balance_sheet_text, net_income_text, cash_operating_text): 

    print("Populating val_dicts['sums']..........")

    global val_dict

    lists_dict = {key: [] for key in val_dict['sums']}
    #+++think about avoiding non-Nones 

    debt_fields = """'Convertible senior notes', 'Operating lease liabilities, less current portion', 'Term Loans', 'Bonds Payable', 'Debentures', 'Mortgage Payable',
    'Capital Lease Obligations', 'Finance Lease Obligations', 'Notes Payable', 'Subordinated Debt', 'Other long-term liabilities', and similar long-term debt instruments"""
    cash_debt_prompt = f"""Under 'current assets', the reported sum(s) of cash + 'cash items' or 'cash equivalents' + marketable securities or short-term investments, in millions of USD; 
    long-term (non-current) debt (including {debt_fields}), in millions of USD"""    
    cash_debt_titles = "cash; debt"
    earnings_prompt = "'Net income' or 'Net earnings' (or loss, as a negative value); 'net income' or 'net earnings' (or loss, as a negative value) per share - diluted; duration of latest reported period, in months (e.g., 3 for 'three months', 12 for 'yearly')"
    earnings_titles = "earnings; EPS; earn_months"
    cashop_prompt = "(Net) cash provided by operating activities, in millions of USD; duration of latest reported period, in months (e.g., 3 for 'three months', 12 for 'yearly')"
    cashop_titles = "cash_op; cash_op_months"

    initial_attempts = int(np.ceil(GPT_ATTEMPTS/2))
    additional_attempts = GPT_ATTEMPTS - initial_attempts

    for i in range(initial_attempts): #to save API calls, first try half and see if they're the same
        print(f'Prompt {i+1} out of maximum {GPT_ATTEMPTS} attempts................', end='\r')
        if i > 0:
            time.sleep(PLAY_NICE)
        cash_debt = get_sums(balance_sheet_text, cash_debt_prompt, cash_debt_titles)
        if not cash_debt: 
            cash_debt = [None] * len(cash_debt_titles.split(';'))
        for j, key in enumerate([title.strip() for title in cash_debt_titles.split(';')]):
            if val_dict['sums'][key] is None:
                lists_dict[key].append(cash_debt[j])  
        
        time.sleep(PLAY_NICE)
        earnings = get_sums(net_income_text, earnings_prompt, earnings_titles)      
        if not earnings:
            earnings = [None] * len(earnings_titles.split(';')) 
        for j, key in enumerate([title.strip() for title in earnings_titles.split(';')]):
            if val_dict['sums'][key] is None:
                lists_dict[key].append(earnings[j])  

        time.sleep(PLAY_NICE)
        cashop = get_sums(cash_operating_text, cashop_prompt, cashop_titles)
        if not cashop:
            cashop = [None] * len(cashop_titles.split(';'))
        for j, key in enumerate([title.strip() for title in cashop_titles.split(';')]):
            if val_dict['sums'][key] is None:
                lists_dict[key].append(cashop[j]) 

    for key in lists_dict:
        if all(x == lists_dict[key][0] for x in lists_dict[key][1:]): 
            val_dict['sums'][key] = lists_dict[key][0]

    if all(value for value in val_dict['sums'].values()): #no more Nones in sums dict
        print(f"Values confirmed after {initial_attempts} prompts - no additional attempts will be made\n")
        return #val_dict['sums'] has been filled, skip the rest

    for i in range(additional_attempts): #no decision made yet for at least one var, get more votes
        print(f'Prompt {initial_attempts+i+1} out of maximum {GPT_ATTEMPTS} attempts................', end='\r')

        if not val_dict['sums']['cash'] or not val_dict['sums']['debt']:
            time.sleep(PLAY_NICE)
            cash_debt = get_sums(balance_sheet_text, cash_debt_prompt, cash_debt_titles)
            if not cash_debt: 
                cash_debt = [None] * len(cash_debt_titles.split(';'))
            for j, key in enumerate([title.strip() for title in cash_debt_titles.split(';')]):
                if val_dict['sums'][key] is None:
                    lists_dict[key].append(cash_debt[j])                       

        if not val_dict['sums']['EPS'] or not val_dict['sums']['earn_months']:
            time.sleep(PLAY_NICE)
            earnings = get_sums(net_income_text, earnings_prompt, earnings_titles)    
            if not earnings:
                earnings = [None] * len(earnings_titles.split(';'))
            for j, key in enumerate([title.strip() for title in earnings_titles.split(';')]):
                if val_dict['sums'][key] is None:
                    lists_dict[key].append(earnings[j]) 

        if not val_dict['sums']['cash_op'] or not val_dict['sums']['cash_op_months']:
            time.sleep(PLAY_NICE)
            cashop = get_sums(cash_operating_text, cashop_prompt, cashop_titles)
            if not cashop:
                cashop = [None] * len(cashop_titles.split(';'))
            for j, key in enumerate([title.strip() for title in cashop_titles.split(';')]):
                if val_dict['sums'][key] is None:
                    lists_dict[key].append(cashop[j]) 

    for key in lists_dict:
        val_dict['sums'][key] = Counter(lists_dict[key]).most_common(1)[0][0]


In [7]:
#function to get number of authorized, issued, and outstanding shares (in millions); returns None in case of error / no value found
def get_share_counts(balance_sheet_text): 

    counter = 0

    while True: 
        
        try:

            completion = client.chat.completions.create(
                model="gpt-4o",
                messages=[
                    {"role": "system", "content": f"""You are to help the user with extracting financial information from reports submitted by public companies to the SEC. 
You will be asked to extract the number of authorized shares and shares outstanding from the provided text {balance_sheet_text}, which was retrieved from the balance sheet table of a K-10 or Q-10 form. 
You might encounter cases where sums/words are fused together where there should be a space - this is usually the result of a new line or tab not being correctly recognized; 
add the new line or tab as required to make better sense of the data.
You should approach the text in the following way:
1. Find out which units (thousands/millions/absolute number of shares) are used for reporting the number of shares - this is usually specified in the beginning of the text.
For example, you may see a statement like '(In millions, except number of shares, which are reflected in thousands, and par value)'. This means that the number of shares in the text is given in thousands. 
2. Find a sentence referring to the number of shares authorized and outstanding - if the form refers to preferred and common stocks, focus on the common stock. 
Note the structure of this sentence. Often it will be something like:
'X shares authorized; Y and Z shares issued and outstanding, respectively' - in such a case, X refers to authorized shares, Y refers to stocks issued, and Z refers to shares outstanding. 
In other cases, the phrasing might be something like:
'Common stock [irrelevant sum] par value; X shares authorized; Y and Z shares issued and outstanding at [later date] and [earlier date], respectively' 
- in such a case, X refers to authorized shares and Y refers both to shares issued and shares outstanding.
"""},
                    {"role": "user", "content": 
f"""NUM_AUTHORIZED = the number of common shares authorized, as reported in {balance_sheet_text} (the absolute number as it appears in the text, without conversions, but without any separators such as commas). 
Return 'NA' if you're unsure about which value should be extracted. 
NUM_ISSUED = the number of shares issued, as reported in {balance_sheet_text} (the absolute number as it appears in the text, without conversions, but without any separators such as commas). 
Return 'NA' if you're unsure about which value should be extracted. 
NUM_OUTSTANDING = the number of shares outstanding / outstanding shares reported in {balance_sheet_text} (the absolute number as it appears in the text, without conversions, but without any separators such as commas). 
Return 'NA' if you're unsure about which value should be extracted. 
UNIT = which units is the number of shares outstanding reported in {balance_sheet_text}? Possible values are: 'thousands', 'millions', 'absolute number', or 'NA' if you're unsure.
Return a tuple (NUM_AUTHORIZED, NUM_ISSUED, NUM_OUTSTANDING, UNIT), without any additional text, symbols, or units. 
"""}
                ]
                )
            
            break
        
        except OpenAI.InternalServerError:
            counter += 1
            if counter == GPT_ATTEMPTS:
                raise OpenAI.InternalServerError ("Could not reach OpenAI server, try again later")
            time.sleep(PLAY_NICE)

    
    gpt_output = completion.choices[0].message.content

    if gpt_output.count(',') != 3 :
        #print(f"gpt_output.count(',') == {gpt_output.count(',')}")
        return
    
    gpt_tuple = gpt_output.replace("(", "").replace(")", "").split(',')
    if len(gpt_tuple) !=4:
        #print(f'len(gpt_tuple) == {len(gpt_tuple)}')
        return
    
    units = gpt_tuple[-1].strip()

    if 'thousand' in units:
        div = 1000
    elif 'million' in units:
        div = 1
    elif 'absolute' in units:
        div = 10**6
    else: 
        #print(f'units could not be extracted - {units}')
        return

    try:
        authorized = int(gpt_tuple[0].strip()) / div
    except: 
        authorized = None
    try: 
        issued = int(gpt_tuple[1].strip()) / div
    except: 
        issued = None
    try:
        outstanding = int(gpt_tuple[2].strip()) / div
    except: 
        outstanding = None

    return authorized, issued, outstanding   
    

In [8]:
#repeat each prompt to increase confidence in the response given - specific for the share_cnt dict
def repeat_gpt_share_cnt(balance_sheet_text): 

    print("Populating val_dicts['share_cnt']..........")

    global val_dict

    lists_dict = {key: [] for key in val_dict['share_cnt']} 

    initial_attempts = int(np.ceil(GPT_ATTEMPTS/2))
    additional_attempts = GPT_ATTEMPTS - initial_attempts

    for i in range(initial_attempts): #to save API calls, first try half and see if they're the same
        print(f'Prompt {i+1} out of maximum {GPT_ATTEMPTS} attempts................', end='\r')
        if i > 0:
            time.sleep(PLAY_NICE)
        share_cnt = get_share_counts(balance_sheet_text)
        if not share_cnt: 
            share_cnt = [None] * len(lists_dict)
        for j, (key, value) in enumerate(val_dict['share_cnt'].items()):
            if value is None:
                lists_dict[key].append(share_cnt[j])

    for key in lists_dict:                
        if all(x == lists_dict[key][0] for x in lists_dict[key][1:]): 
            val_dict['share_cnt'][key] = lists_dict[key][0]

    if all(value for value in lists_dict.values()): #no empty lists
        print(f"Values confirmed after {initial_attempts} prompts - no additional attempts will be made\n")
        return #val_dict['share_cnt'] has been filled, skip the rest

    for i in range(additional_attempts): #no decision made yet for at least one var, get more votes
        print(f'Prompt {initial_attempts+i+1} out of maximum {GPT_ATTEMPTS} attempts................', end='\r')
        time.sleep(PLAY_NICE)
        share_cnt = get_share_counts(balance_sheet_text)
        if not share_cnt: 
            share_cnt = [None] * len(lists_dict)
        for j, (key, value) in enumerate(val_dict['share_cnt'].items()):
            if value is None:
                lists_dict[key].append(share_cnt[j])  

    for key in lists_dict:
        val_dict['share_cnt'][key] = Counter(lists_dict[key]).most_common(1)[0][0]


In [9]:
#call repeat_gpt functions
repeat_gpt_sums(balance_sheet_text, net_income_text, cash_operating_text)
time.sleep(PLAY_NICE)
repeat_gpt_share_cnt(balance_sheet_text)
val_dict

Populating val_dicts['sums']..........
Populating val_dicts['share_cnt'].................
Values confirmed after 2 prompts - no additional attempts will be made



{'sums': {'cash': 1691.095,
  'debt': 1101.666,
  'earnings': 626.698,
  'EPS': 3.18,
  'earn_months': 12.0,
  'cash_op': 1.236,
  'cash_op_months': 12.0},
 'share_cnt': {'authorized': 600.0, 'issued': 189.461, 'outstanding': 189.461}}

In [10]:
#if there are None values in any of the sub-dicts, rerun the appropriate repeat_gpt function(s) to try and populate the fields that contain None
if None in val_dict['sums'].values():
    print("Repeating GPT calls to try to fill-in None in 'sums' dict.............")
    time.sleep(PLAY_NICE)
    repeat_gpt_sums(balance_sheet_text, net_income_text, cash_operating_text)
if None in val_dict['share_cnt']:
    print("Repeating GPT calls to try to fill-in None in 'share_cnt' dict.............")
    time.sleep(PLAY_NICE)
    repeat_gpt_share_cnt(balance_sheet_text)
val_dict

{'sums': {'cash': 1691.095,
  'debt': 1101.666,
  'earnings': 626.698,
  'EPS': 3.18,
  'earn_months': 12.0,
  'cash_op': 1.236,
  'cash_op_months': 12.0},
 'share_cnt': {'authorized': 600.0, 'issued': 189.461, 'outstanding': 189.461}}

In [11]:
#calculate net cash and net cash per share
def get_net_cash():

    if not val_dict['sums']['cash'] or not val_dict['sums']['debt']: #net val_dict['sums']['cash'] can't be calculated, return None
        return
    
    try:
        net_cash = np.round(val_dict['sums']['cash'] - val_dict['sums']['debt'], 4)
    except: return

    if not val_dict['share_cnt']['outstanding']:
        net_cash_per_share = None
    else:
        net_cash_per_share = np.round(net_cash / val_dict['share_cnt']['outstanding'], 2)

    return net_cash, net_cash_per_share

In [12]:
net_cash, net_cash_per_share = get_net_cash()
net_cash, net_cash_per_share #test

(589.429, 3.11)