# Extract Financial Data From XBRL Instance Document

I have been manually noting the financial data from some companies' annual reports into Google Sheets, then downloading it as CSV file for data analysis.

I will try to automate the getting of financial data by extracting the financial data from XBRL instance documents.

# XBRL Instance Documents From SEC Website

The XBRL instance documents are data files of companies' annual reports. They can be downloaded from the [SEC](https://www.sec.gov/edgar/searchedgar/companysearch.html) website.

# Create Functions For Extracting Data From XBRL Instance Document

First, I will create the functions that I needed to do the extracting of data from XBRL instance document. It will extract the following data that I need for analysis:
* Ticker symbol
* Filing type
* Period end date
* Reporting currency
* Cash and cash equivalents
* Short-term investments
* Current portion of debt/notes
* Revenue
* Cost of revenue
* Revenue from one fiscal year ago
* Operating cash flow
* Capital expenditure

In [1]:
import scipy.stats as stats
import xml.etree.ElementTree as ET

def get_context_id_and_currency(root, ns, tag_name, fy_ended):
    context_id_length = 0
    context_id_list = []
    end_date_list = []
    unit_id_list = []
    
    tags = root.findall(tag_name, ns)
    
    # get shortest context id length
    for tag in tags:
        context_id = tag.attrib['contextRef']
        
        if context_id_length == 0 \
        or len(context_id) <= context_id_length:
            context_id_length = len(context_id)
            
    for tag in tags:
        context_id = tag.attrib['contextRef']
        
        # only process for the shortest context id as that should be overall id
        if len(context_id) == context_id_length:
            if len(root.findall("xbrli:context[@id='{}']//xbrli:instant" \
                                .format(context_id), ns)) > 0:
                end_date = root.find("xbrli:context[@id='{}']//xbrli:instant" \
                                     .format(context_id), ns)
            else:
                end_date = root.find("xbrli:context[@id='{}']//xbrli:endDate" \
                                     .format(context_id), ns)
                
            end_date_list.append(end_date.text)

            if end_date.text == fy_ended:
                context_id_list.append(context_id)
                unit_id_list.append(tag.attrib['unitRef'])
        
    context_id_current_fy = stats.mode(context_id_list)[0][0]
    
    # get currency
    unit_id = stats.mode(unit_id_list)[0][0]
    measure = root.find("xbrli:unit[@id='{}']//xbrli:measure".format(unit_id), ns)
    currency = measure.text.split(':')[1]
    
    # get context id for previous FY
    previous_fy_ended = sorted(set(end_date_list))[-2]
    
    context_id_list = []
    
    for tag in tags:
        context_id = tag.attrib['contextRef']
        
        # only process for the shortest context id as that should be overall id
        if len(context_id) == context_id_length:
            if(len(root.findall("xbrli:context[@id='{}']//xbrli:instant" \
                                .format(context_id), ns)) > 0):
                end_date = root.find("xbrli:context[@id='{}']//xbrli:instant" \
                                     .format(context_id), ns)
            else:
                end_date = root.find("xbrli:context[@id='{}']//xbrli:endDate" \
                                     .format(context_id), ns)
    
            if end_date.text == previous_fy_ended:
                context_id_list.append(context_id)
                
    context_id_previous_fy = stats.mode(context_id_list)[0][0]
    
    return (context_id_current_fy, context_id_previous_fy, currency)



def get_value(root, ns, tag_name, context_id):
    text_list = []
    
    elements = root.findall("{tag_name}[@contextRef='{context_id}']" \
                                .format(tag_name=tag_name
                                        , context_id=context_id)
                           , ns)
    
    for e in elements:
        text_list.append(e.text)
        
    # get value
    value = stats.mode(text_list)[0][0]
    
    return value    



import datetime as dt
import pandas as pd
import re

def extract_data_from_XBRL(file_path):
    company = {}
    tree = ET.parse(file_path)
    root = tree.getroot()
    
    # get namespaces
    namespaces = []
    for key, value in ET.iterparse(file_path, ['start-ns']):
        namespaces.append(value)

    ns = dict(namespaces)
    
    # if xbrl instance namespace has no prefix, then set a prefix
    for k in ns:
        if re.search(r'^http://www\.xbrl\.org/\d+/instance$', ns[k]) \
        and len(k) == 0:
            ns['xbrli'] = ns[k]
            break;

    # get trading symbol, document type, and period end date
    trading_symbol = root.find('dei:TradingSymbol', ns)
    
    if trading_symbol is None:
        raise Exception('Trading symbol not found.')
    
    document_type = root.find('dei:DocumentType', ns)
    
    if document_type is None:
        raise Exception('Document type not found.')
        
    company['filings'] = document_type.text
    
    document_period_end_date = root.find('dei:DocumentPeriodEndDate', ns)
    
    if document_period_end_date is None:
        raise Exception('Document period end date not found.')
        
    company['fiscal_year_ended'] = dt.datetime.strptime(document_period_end_date.text
                                                        , '%Y-%m-%d')
    
    # get namespace of financial data because not all are 'us-gaap'
    ns_fd = ''
    
    for child in root:
        namespace = child.tag.split('}')[0][1:]
        tag_name = child.tag.split('}')[1]
        
        if tag_name == 'AssetsCurrent':
            for k in ns:
                if ns[k] == namespace:
                    ns_fd = k

    # get context id of balance sheet and balance sheet currency
    temp = get_context_id_and_currency(root, ns
                                       , ns_fd + ':AssetsCurrent'
                                       , document_period_end_date.text)
    bs_id_for_fy = temp[0]
    currency = temp[2]
    company['reporting_currency'] = currency
    
    # get current assets and current liabilities
    elements = root.findall("*[@contextRef='{context_id}']" \
                            .format(context_id = bs_id_for_fy)
                            , ns)
    
    reached_total_current_assets = False
    reached_total_current_liabilities = False
    current_liabilities_started = False
    current_asset_list = ET.Element('CurrentAssetList')
    current_liability_list = ET.Element('CurrentLiabilityList')
    
    for e in elements:
        tag_name = e.tag.split('}')[1]
        
        if tag_name == 'AssetsCurrent':
            reached_total_current_assets = True
        elif tag_name == 'Assets':
            current_liabilities_started = True
            continue
        elif tag_name == 'LiabilitiesCurrent':
            reached_total_current_liabilities = True
            
        if not reached_total_current_assets:
            current_asset_list.append(e)
            
        if current_liabilities_started \
        and not reached_total_current_liabilities:
            current_liability_list.append(e)
            
        if reached_total_current_liabilities:
            break;
    
    # get cash and cash equivalents value
    # tag name may be 'CashAndCashEquivalentsAtCarryingValue'
    # or 'CashCashEquivalentsRestrictedCashAndRestrictedCashEquivalents'
    cash_and_equivalents = []
    tag_names = [ns_fd + ':CashAndCashEquivalentsAtCarryingValue'
                 , ns_fd + ':CashCashEquivalentsRestrictedCashAndRestrictedCashEquivalents']

    for tag_name in tag_names:
        result = current_asset_list.find(tag_name, ns)
        
        if not(result is None):
            cash_and_equivalents.append(float(result.text))
            break
    
    # try using a different way if not found
    if len(cash_and_equivalents) == 0:
        for tag_name in tag_names:
            if len(root.findall("{}[@contextRef='{}']".format(tag_name
                                                              , bs_id_for_fy)
                                , ns)) > 0:
                value = get_value(root, ns, tag_name, bs_id_for_fy)
                cash_and_equivalents.append(float(value))
                break
        
    if len(cash_and_equivalents) == 0:
        raise Exception('Cash and cash equivalents not found.')
    
    company['cash_and_equivalents'] = sum(cash_and_equivalents)
    
    # get short-term investments value, some company do not have short-term investment
    # tag name may be 'MarketableSecuritiesCurrent' 
    # or 'AvailableForSaleSecuritiesDebtSecuritiesCurrent' 
    # or 'AvailableForSaleSecuritiesCurrent'
    # or 'ShortTermInvestments' or 'HeldToMaturitySecuritiesCurrent'
    short_term_investments = []
    tag_names = [ns_fd + ':MarketableSecuritiesCurrent'
                 , ns_fd + ':AvailableForSaleSecuritiesDebtSecuritiesCurrent'
                 , ns_fd + ':AvailableForSaleSecuritiesCurrent'
                 , ns_fd + ':ShortTermInvestments'
                 , ns_fd + ':HeldToMaturitySecuritiesCurrent']

    for tag_name in tag_names:
        result = current_asset_list.find(tag_name, ns)
        
        if not(result is None):
            short_term_investments.append(float(result.text))
    
    # try using a different way if no result
    if len(short_term_investments) == 0:
        for tag_name in tag_names:
            if len(root.findall("{}[@contextRef='{}']".format(tag_name
                                                              , bs_id_for_fy)
                                , ns)) > 0:
                value = get_value(root, ns, tag_name, bs_id_for_fy)
                short_term_investments.append(float(value))
                
    company['short_term_investments'] = sum(short_term_investments)
    
    # get current debt value, some company do not have current debt
    # tag name may be 'NotesPayableCurrent' or 'ConvertibleDebtCurrent'
    # or 'ConvertibleNotesPayableCurrent' or 'LongTermDebtCurrent'
    # or 'LongTermDebtAndCapitalLeaseObligationsCurrent'
    # or 'LoansPayableToBankCurrent'
    current_debt_items = []
    tag_names = [ns_fd + ':NotesPayableCurrent'
                 , ns_fd + ':ConvertibleDebtCurrent'
                 , ns_fd + ':ConvertibleNotesPayableCurrent'
                 , ns_fd + ':LongTermDebtCurrent'
                 , ns_fd + ':LongTermDebtAndCapitalLeaseObligationsCurrent'
                 , ns_fd + ':LoansPayableToBankCurrent']
    
    for tag_name in tag_names:
        result = current_liability_list.find(tag_name, ns)
        
        if not(result is None):
            current_debt_items.append(float(result.text))
            
    # try using a different way if no result
    if len(current_debt_items) == 0:
        for tag_name in tag_names:
            if len(root.findall("{}[@contextRef='{}']".format(tag_name
                                                              , bs_id_for_fy)
                                , ns)) > 0:
                value = get_value(root, ns, tag_name, bs_id_for_fy)
                current_debt_items.append(float(value))

    company['current_debt'] = sum(current_debt_items)
    
    
    # get context id of cash flow and income statement for current and previous FY
    temp = get_context_id_and_currency(root, ns
                                       , ns_fd + ':NetCashProvidedByUsedInOperatingActivities'
                                       , document_period_end_date.text)

    cf_in_id_curr_fy = temp[0]
    cf_in_id_prev_fy = temp[1]
    currency = temp[2]
    
    # if currency from income statement is different from balance sheet
    if currency != company['reporting_currency']:
        raise Exception('Currency ' + currency +' from income statement is ' 
                        + 'different from currency ' + company['reporting_currency'] 
                        + ' from balance sheet.')

    # get cash flows from investing activities and income statement
    elements = root.findall("*[@contextRef='{context_id}']" \
                            .format(context_id = cf_in_id_curr_fy)
                            , ns)
    
    investing_cash_flow_started = False
    end_of_investing_cash_flow = False
    reached_revenue = False
    reached_operating_incomeloss = False
    
    cf_investing_activity_list = ET.Element('CashFlowInvestingActivities')
    income_statement_items = ET.Element('IncomeStatementItems')
    
    for e in elements:
        tag_name = e.tag.split('}')[1]
        
        if tag_name == 'NetCashProvidedByUsedInOperatingActivities':
            investing_cash_flow_started = True
            continue
        elif tag_name == 'NetCashProvidedByUsedInInvestingActivities':
            end_of_investing_cash_flow = True
        elif tag_name.startswith('Revenue') and \
        not reached_operating_incomeloss:
            reached_revenue = True
        elif tag_name == 'OperatingIncomeLoss':
            reached_operating_incomeloss = True
            
        if investing_cash_flow_started \
        and not end_of_investing_cash_flow:
            cf_investing_activity_list.append(e)
        
        if reached_revenue and not reached_operating_incomeloss:
            income_statement_items.append(e)
        
        if end_of_investing_cash_flow:
            break;
        
    # get revenue value
    # tag name may be 'RevenueFromContractWithCustomerExcludingAssessedTax'
    # or 'RevenueFromContractWithCustomerIncludingAssessedTax'
    # or 'Revenues'
    revenue_items = []
    tag_names = [ns_fd + ':RevenueFromContractWithCustomerExcludingAssessedTax'
                 , ns_fd + ':RevenueFromContractWithCustomerIncludingAssessedTax'
                 , ns_fd + ':Revenues']
    rev_tag_name = ''
    
    for tag_name in tag_names:
        result = income_statement_items.find(tag_name, ns)
        
        if not(result is None):
            rev_tag_name = tag_name
            revenue_items.append(float(result.text))
            break
    
    # try using a different way if not found
    if len(revenue_items) == 0:
        for tag_name in tag_names:
            if len(root.findall("{}[@contextRef='{}']".format(tag_name
                                                              , cf_in_id_curr_fy)
                                , ns)) > 0:
                value = get_value(root, ns, tag_name, cf_in_id_curr_fy)
                revenue_items.append(float(value))
                rev_tag_name = tag_name
                break
                
    if len(revenue_items) == 0:
        raise Exception('Revenue not found.')
        
    company['revenue'] = sum(revenue_items)
    
    
    # get cost of revenue value
    # tag name may be 'CostOfRevenue' or 'CostOfGoodsAndServicesSold'
    cost_of_revenue_items = []
    tag_names = [ns_fd + ':CostOfRevenue'
                 , ns_fd + ':CostOfGoodsAndServicesSold']
    
    for tag_name in tag_names:
        result = income_statement_items.find(tag_name, ns)
        
        if not(result is None):
            cost_of_revenue_items.append(float(result.text))
    
    if len(cost_of_revenue_items) == 0:
        raise Exception('Cost of revenue not found.')
    
    company['cost_of_revenue'] = sum(cost_of_revenue_items)
    
    # get revenue value of previous FY
    value = get_value(root, ns, rev_tag_name, cf_in_id_prev_fy)
    
    company['revenue_1_fy_ago'] = float(value)
    
    # get operating cash flow value
    value = get_value(root, ns
                      , ns_fd + ':NetCashProvidedByUsedInOperatingActivities'
                      , cf_in_id_curr_fy)
    
    company['operating_cash_flow'] = float(value)
    
    # get capital expenditure value
    # tag name may be 'PaymentsForCapitalImprovements' 
    # or 'PaymentsToAcquirePropertyPlantAndEquipment'
    # or 'PaymentsToDevelopSoftware' or 'PaymentsToAcquireProductiveAssets'
    # or 'PaymentsForSoftware' or 'PaymentsToAcquireIntangibleAssets'
    # or 'PaymentsToAcquireSoftware' or 'PaymentsToAcquireEquipmentOnLease'
    tag_names = [ns_fd + ':PaymentsForCapitalImprovements'
                 , ns_fd + ':PaymentsToAcquirePropertyPlantAndEquipment'
                 , ns_fd + ':PaymentsToDevelopSoftware'
                 , ns_fd + ':PaymentsToAcquireProductiveAssets'
                 , ns_fd + ':PaymentsForSoftware'
                 , ns_fd + ':PaymentsToAcquireIntangibleAssets'
                 , ns_fd + ':PaymentsToAcquireSoftware'
                 , ns_fd + ':PaymentsToAcquireEquipmentOnLease']
    cap_ex_items = []
    
    for tag_name in tag_names:
        result = cf_investing_activity_list.find(tag_name, ns)
        
        if not(result is None):
            cap_ex_items.append(float(result.text))
    
    # try using a different way if not found
    if len(cap_ex_items) == 0:
        for tag_name in tag_names:
            if len(root.findall("{}[@contextRef='{}']".format(tag_name
                                                              , cf_in_id_curr_fy)
                                , ns)) > 0:
                value = get_value(root, ns, tag_name, cf_in_id_curr_fy)
                cap_ex_items.append(float(value))
                
    if len(cap_ex_items) == 0:
        raise Exception('Capital expenditure not found.')
    
    company['capital_expenditure'] = sum(cap_ex_items)
    
    return pd.DataFrame(company, index=[trading_symbol.text.upper()])


# Process XBRL Instance Documents

Now that I have created the functions, I will process each XBRL instance document by passing their file paths as input to the function.

In [2]:
import glob

company_df_list = []
error_count = 0

file_paths = glob.glob('data_files/*.xml')

for file_path in sorted(file_paths):
    print('Processing "{}" ... '.format(file_path), end='')
    
    try:
        df = extract_data_from_XBRL(file_path)
        company_df_list.append(df)

        print('completed.')
    except Exception as e:
        print('ERROR OCCURRED:',str(e))
        error_count += 1

print()
print('Out of the {} documents, {} encountered error.'.format(len(file_paths)
                                                              , error_count))

# combined the list of dataframes into a single dataframe
companies_financials = pd.concat(company_df_list).sort_index()

Processing "data_files/a201910k-main_htm.xml" ... completed.
Processing "data_files/abmd-20190331.xml" ... completed.
Processing "data_files/adpt-10k_20191231_htm.xml" ... completed.
Processing "data_files/adsk-0131202010xk_htm.xml" ... completed.
Processing "data_files/ayx1231201910-k_htm.xml" ... completed.
Processing "data_files/bl-10k_20191231_htm.xml" ... completed.
Processing "data_files/coup-10k_20200131_htm.xml" ... completed.
Processing "data_files/crwd-20200131_htm.xml" ... completed.
Processing "data_files/d862842d10k_htm.xml" ... completed.
Processing "data_files/ddog-20191231.xml" ... completed.
Processing "data_files/docu-20200131_htm.xml" ... completed.
Processing "data_files/exas-20191231_htm.xml" ... completed.
Processing "data_files/form10-k2019classic_htm.xml" ... completed.
Processing "data_files/form20f_htm.xml" ... completed.
Processing "data_files/form40-f2019_htm.xml" ... completed.
Processing "data_files/fsly-10kx123119_htm.xml" ... completed.
Processing "data_

I noticed there are `ERROR OCCURRED` for some of the documents. After investigation, below are the causes that I have found:
* `Cost of revenue not found`: There is no cost of revenue section at the income statement.
* `Trading symbol not found`: The trading symbol is not found in the XBRL instance document.

I will have to manually note the financial data for these companies.

# Display Extracted Data For Verification

I will display the extracted data to verify that the correct data are extracted.

In [3]:
pd.options.display.float_format = '{:,.2f}'.format
companies_financials

Unnamed: 0,filings,fiscal_year_ended,reporting_currency,cash_and_equivalents,short_term_investments,current_debt,revenue,cost_of_revenue,revenue_1_fy_ago,operating_cash_flow,capital_expenditure
ABMD,10-K,2019-03-31,USD,121021000.0,370677000.0,0.0,769432000.0,129567000.0,593749000.0,252197000.0,44004000.0
ADPT,10-K,2019-12-31,USD,96576000.0,480290000.0,0.0,85071000.0,22274000.0,55663000.0,205404000.0,11200000.0
ADSK,10-K,2020-01-31,USD,1774700000.0,69000000.0,449700000.0,3274300000.0,324900000.0,2569800000.0,1415100000.0,53200000.0
AYX,10-K,2019-12-31,USD,409949000.0,376995000.0,68154000.0,417910000.0,39151000.0,253570000.0,34192000.0,11453000.0
BL,10-K,2019-12-31,USD,120232000.0,487515000.0,0.0,288976000.0,58975000.0,227788000.0,29724000.0,9692000.0
COUP,10-K,2020-01-31,USD,268045000.0,499160000.0,187115000.0,389719000.0,139216000.0,260366000.0,68156000.0,11970000.0
CRWD,10-K,2020-01-31,USD,264798000.0,647266000.0,0.0,481413000.0,141627000.0,249824000.0,99943000.0,87487000.0
DDOG,10-K,2019-12-31,USD,597297000.0,176674000.0,0.0,362780000.0,88949000.0,198077000.0,24234000.0,23443000.0
DOCU,10-K,2020-01-31,USD,241203000.0,414939000.0,0.0,973971000.0,243234000.0,700969000.0,115696000.0,72046000.0
EXAS,10-K,2019-12-31,USD,177254000.0,146401000.0,834000.0,876293000.0,216717000.0,454462000.0,-115010000.0,171802000.0


# Insert The Extracted Data Into Database

I will insert the extracted data into database so that I can use them for analysis later.

In [4]:
import psycopg2

def insert_update_record(row):
    is_ticker_exists = False
    
    company = {
        'ticker': row['index']
        ,'filings': row['filings']
        ,'fiscal_year_ended': row['fiscal_year_ended']
        ,'reporting_currency': row['reporting_currency']
        ,'cash_and_equivalents': row['cash_and_equivalents']
        ,'short_term_investments': row['short_term_investments']
        ,'current_debt': row['current_debt']
        ,'revenue': row['revenue']
        ,'cost_of_revenue': row['cost_of_revenue']
        ,'revenue_1_fy_ago': row['revenue_1_fy_ago']
        ,'operating_cash_flow': row['operating_cash_flow']
        ,'capital_expenditure': row['capital_expenditure']
    }
    
    try:
        conn = psycopg2.connect(user='postgres', host='localhost'
                            , port='5432', dbname='stock')
        
        # check whether the ticker exist in table
        cur = conn.cursor()
        query = 'SELECT ticker FROM company WHERE ticker = %s;'
        cur.execute(query, (company['ticker'],))

        if len(cur.fetchall()) > 0:
            is_ticker_exists = True

        # if exists, update existing record. Else, insert record
        cur = conn.cursor()
        
        if is_ticker_exists:
            upd_statement = 'UPDATE company \
                                SET \
                                    filings = %(filings)s \
                                    , fiscal_year_ended = %(fiscal_year_ended)s \
                                    , reporting_currency = %(reporting_currency)s \
                                    , cash_and_equivalents = %(cash_and_equivalents)s \
                                    , short_term_investments = %(short_term_investments)s \
                                    , current_debt = %(current_debt)s \
                                    , revenue = %(revenue)s \
                                    , cost_of_revenue = %(cost_of_revenue)s \
                                    , revenue_1_fy_ago = %(revenue_1_fy_ago)s \
                                    , operating_cash_flow = %(operating_cash_flow)s \
                                    , capital_expenditure = %(capital_expenditure)s \
                                WHERE ticker = %(ticker)s;'
            cur.execute(upd_statement, company)
            conn.commit()
        else:
            ins_statement = 'INSERT INTO company \
                                (ticker, filings, fiscal_year_ended, reporting_currency \
                                , cash_and_equivalents, short_term_investments \
                                , current_debt, revenue, cost_of_revenue \
                                , revenue_1_fy_ago, operating_cash_flow \
                                , capital_expenditure) \
                                VALUES \
                                (%(ticker)s, %(filings)s, %(fiscal_year_ended)s \
                                , %(reporting_currency)s, %(cash_and_equivalents)s \
                                , %(short_term_investments)s, %(current_debt)s \
                                , %(revenue)s, %(cost_of_revenue)s, %(revenue_1_fy_ago)s \
                                , %(operating_cash_flow)s, %(capital_expenditure)s);'
            cur.execute(ins_statement, company)
            conn.commit()
    except Exception as e:
        print(company['ticker'] + ':', str(e))
    finally:
        if(conn):
            conn.close()
        
    
            
# call function for each row   
for index, row in companies_financials.reset_index().iterrows():
    insert_update_record(row)
    