Extracts MD&A sections from SEC filings via the agency API and saves cleaned text for covenant analysis.


In [None]:
# !pip install sec-api

from sec_api import ExtractorApi, FullTextSearchApi
import pandas as pd
from tqdm import tqdm
import warnings
import re
import os
import numpy as np
from ratelimit import limits, sleep_and_retry
import boto3
import json
import awswrangler as wr
import logging

warnings.filterwarnings('ignore')
tqdm.pandas()

#setup
s3_resource = boto3.resource('s3')

# directories
bucket = '[your-bucket-name]'  # Replace with your bucket name
output_prefix = f's3://{bucket}/'

In [None]:
# load api
api_key = 'YOUR_SEC_API_KEY'  
extractorApi = ExtractorApi(api_key)

# Configure logging to save errors to a text file
logging.basicConfig(
    filename='errorlog_sec_api_extract_mda_2.txt',
    level=logging.ERROR
)

In [None]:
# extract MDA using api
@sleep_and_retry
@limits(calls=5, period=1)
def get_mda(row_in):
    
    cik = row_in.cik
    company_name = row_in.company_name
    filing_type = row_in.filing_type
    filing_date = row_in.filing_date
    report_date = row_in.report_date
    filing_index = row_in.filing_index
    filing_url = row_in.main_url
    
    # for indexing files
    master_idx = row_in.master_idx
    yr = row_in.year
    qtr = row_in.quarter
    
    # replace name of filingtype
    filing_type = re.sub('[/.]','-',filing_type)
    
    # get text
    try:
        if 'K' in filing_type:
            text = extractorApi.get_section(filing_url, "7", "text")
        elif 'Q' in filing_type:
            text = extractorApi.get_section(filing_url, "part1item2", "text")
        else:
            text = ''
    except Exception as e:
        
        logging.error(f'Error occurred for CIK {cik} and URL {filing_url}: {str(e)}')
        return None
    
    # save as text file
    
    textfile = ''
    textfile += '<master idx>'+str(master_idx)+'</master idx> \n'
    textfile += '<cik>'+str(cik)+'</cik> \n'
    textfile += '<company name>'+str(company_name)+'</company name> \n'
    textfile += '<filing type>'+str(filing_type)+'</filing type> \n'
    textfile += '<filing date>'+str(filing_date)+'</filing date> \n'
    textfile += '<report date>'+str(report_date)+'</report date> \n'
    textfile += '<filing index>'+str(filing_index)+'</filing index> \n'
    textfile += '<filing url>'+str(filing_url)+'</filing url> \n\n'
    
    if len(text)>10:
        textfile += text
        savepath = 'edgar_mda_new_2/' + str(yr) + '/QTR' + str(qtr)  + '/' + str(master_idx) + '_' + str(report_date) + '_' + str(filing_date) + '_' + str(filing_type) + '_' + str(cik) +'.txt'
        object = s3_resource.Object(bucket, savepath)
        result = object.put(Body=textfile)
        
    else:
        # error log
        logging.error(f'Error occurred for CIK {cik} and URL {filing_url}: Text less than 10 characters.')
        
        
    return None

In [None]:
# load index files
loadpath = f's3://{bucket}/misc/edgar_masterhtml1_combined.csv'
datdf = wr.s3.read_csv(path=loadpath,
                        sep='|',
                       lineterminator='\n'
                      )

# generate relevant variables
datdf.rename({'Unnamed: 0':'master_idx'},axis=1,inplace=True)
datdf['filing_date1'] = pd.to_datetime(datdf['filing_date'])
datdf['year'] = datdf['filing_date1'].dt.year.astype(int)
datdf['quarter'] = datdf['filing_date1'].dt.quarter.astype(int)

In [None]:
for index, row in datdf.iterrows():
    
    if (index) % 50 == 0:
        print(f"Processing idx {index+1} of {len(datdf)}: {row.year}-QTR{row.quarter}")
    
    get_mda(row)

In [None]:
# -----------------------------------------------------
# read in parsed text and check how many files obtained
# run this after compiling code above
# -----------------------------------------------------

In [None]:
def get_file_index(str_in):
    
    # Split the string by "/"
    split_parts = str_in.split('/')

    # Extract the desired substring
    str_out = split_parts[-1].split('_')[0]
    
    return str_out

In [None]:
yr = 2020
qtr = 1

index_list_all = pd.Series()

yrlist = range(2000,2022)
qtrlist = range(1,5)

for yr in yrlist:
    for qtr in qtrlist:
        
        print(f"collecting file index for {yr}-QTR{qtr}")

        prefix = f"edgar_mda_new_2/{yr}/QTR{qtr}/"
        s3_client = boto3.client('s3')
        paginator = s3_client.get_paginator('list_objects_v2')
        pages = paginator.paginate(Bucket=bucket, Prefix=prefix)
        filepath = [obj['Key'] for page in pages for obj in page['Contents'] if '.txt' in obj['Key']]

        index_list = pd.Series(filepath).apply(get_file_index)
        index_list_all = index_list_all.append(index_list, ignore_index=True)


In [None]:
# merge with main index file

index_df = pd.DataFrame(index_list_all)
index_df.rename({0:'master_idx'}, axis=1, inplace=True)
index_df['master_idx'] = index_df['master_idx'].astype(int) 
index_df['has_text'] = 1

datdf1 = datdf.merge(index_df, on='master_idx', how='left')
datdf1['has_text'] = datdf1['has_text'].fillna(0)

In [None]:
# check how many filings has text file

subsample = datdf1[(datdf1.filing_type=='10-K')|(datdf1.filing_type=='10-Q')]

print(f"number of filings with text: {subsample.has_text.sum()}")
print(f"total number of filings: {len(subsample)}")
print(f"percentage: {subsample.has_text.sum()/len(subsample)}")

In [None]:
# chcek trends in filings over time

import matplotlib.pyplot as plt

yq = subsample['year'].astype(int).astype(str)+'-Q'+subsample['quarter'].astype(int).astype(str)
subsample['yq'] = pd.PeriodIndex(yq, freq='Q').to_timestamp()
plots = subsample.groupby('yq')['has_text'].mean()
plt.plot(plots)

In [None]:
# check whether filings exist

sample0 = subsample[(subsample.has_text==0)].sample(n=1)
url = sample0.main_url.values[0]
filetype=sample0.filing_type.values[0]
print(url)
print(filetype)

if 'K' in filetype:
    print(extractorApi.get_section(url, "7", "text"))
elif 'Q' in filetype:
    print(extractorApi.get_section(url, "part1item2", "text"))