In [0]:
import requests
from lxml import html, etree
from lxml.etree import tostring
from bs4 import BeautifulSoup
import re
import csv
import pandas as pd
from tqdm import tqdm_notebook
from datetime import datetime
import collections
from collections import defaultdict
import os

# Utils
#### These functions are applicable for both 10K and 10Q such as removing foot note, getting url for reports and requesting the url to get the content

In [0]:
# Define some global variables
BASE_URL = "https://www.sec.gov"
n = datetime.now()
default_year = [n.year, n.year-1, n.year-2]

# Create the folder "risk_factors" if not already exist
if not os.path.exists('risk_factors'):
    os.makedirs('risk_factors')

def get_files(cik, doc_type, no_of_documents=1, debug=False):
    '''
    Getting the text file url from sec using cik number
    Args:
        cik : company code
        no_of_documents: default 1
    
    Returns: 
        String of url
    '''
    if doc_type not in ['10-K', '10-Q']:
        print("Input doc type should be '10-K' or '10-Q'")
        return
    url = f"https://www.sec.gov/cgi-bin/browse-edgar?action=getcompany&CIK={cik}"
    href = ""
    prior_to=""
    ownership="include"
    no_of_entries=100
    filing_type= doc_type
    url1 = url + "&type=" + filing_type + "&dateb=" + prior_to + "&owner=" +  ownership + "&count=" + str(no_of_entries)
    page = requests.get(url1, timeout=10)
    tree = html.fromstring(page.content)
    elems = tree.xpath('//*[@id="documentsbutton"]')[:no_of_documents]
    result = []
    
    # Use the base url, we first enter the page of table of documents. Get the last one url of the table. That url is what we want
    for elem in elems:
        url2 = BASE_URL + elem.attrib["href"]
        content_page = get_request(url2)
        table = content_page.find_class("tableFile")[0]
        last_row = table.getchildren()[-1]
        href = last_row.getchildren()[2].getchildren()[0].attrib["href"]
        href = BASE_URL + href
        result.append(href)
    return result

def get_request(href, isxml=False):
    '''
    Get the page content given url
    Args:
        href : given url
    Returns: 
        String of page content
    '''
    page = requests.get(href)
    if isxml:
        p = etree.XMLParser(huge_tree=True)
        return etree.fromstring(page.content, parser=p)
    else:
        return html.fromstring(page.content)
    


def is_number(s):
    '''
    Check whether the string is a number
    Args:
        s : the given string
    Returns: 
        Bool : whether the string is a number
    '''
    try:
        float(s)
        return True
    except ValueError:
        pass
 
    try:
        import unicodedata
        unicodedata.numeric(s)
        return True
    except (TypeError, ValueError):
        pass
 
    return False

def remove_footnote(text):
    '''
    Remove the page number in foot note
    Args:
        text : extracted risk factor part
    Returns: 
        String of the risk factor part after removing the page number
    '''
    sentence = text.split("\n")
    text_new = ""
    for i in sentence:
        if is_number(i) and len(i) < 4:
            continue
        text_new += i + "\n"
    return text_new

## 10_K
#### Following functions are specifically written to scrape the 'Risk factors' section of 10k reports from Edgar database


In [0]:
def get_company_url_10k(company_list=None, year=default_year):
    '''
    getting list of urls for all 10Ks
    Args:
        company_list : default is all company on SEC, or user can provide
        year: default is the nearest 3 year
    Returns: 
        list of tuples: (ticker, url)
    '''
    ticker_dict = dict() # a dictionary to store the url
    
    # read the ticker.txt downloaded form the SEC at: https://www.sec.gov/about/webmaster-faq.htm#developers
    # read the file into dictionary as {ticker: cik_code}
    with open('ticker.txt', mode='r') as infile:
        for line in infile:
            item = line.split('\t')
            ticker_dict[item[0]] = item[1].strip().zfill(7) #left padding with zero
            
        # if company_list is not provided, stored the whole dictionary to a list of tuple as (ticker, cik_code)
        if not company_list:
            mylist = [(item[0], item[1]) for item in ticker_dict.items()]
        # if the company_list is provided, only get the cik codes of those company provided     
        else:
            mylist = [(ticker, ticker_dict[ticker]) for ticker in company_list]
            
    url_dict = {} # temp dict to store the urls as {ticker: [list of urls]}
    n = datetime.now() # get the current date to compute the number of documents to be scraped
    for i in tqdm_notebook(mylist):
        urlk = get_files(i[1], "10-K", no_of_documents=(n.year - min(year))*2) # considering the possibility of having an amendeded report, ex. we get 6 documents for 3 years to avoid missing some reports
        url_dict[i[0]] = urlk
    url_10k = []
    year.append(max(year)+1) # some company may have 10k filing in the later year (ex. 2018Q4 filled in 2019) to avoid missing the report we add 1 to the max year for scraping
    for k,v in url_dict.items():
        for i in v:
            year_k = int(i.split("-")[1])+2000
            # get rid of reports not in the range of input or default year
            if year_k in year:
                url_10k.append((k, i))
    return url_10k

def parser_10k(url):
    '''
    parse the 10k report
    Args:
        url : 10K report url
    Returns: 
        String of the 10k document
    '''
    r = requests.get(url)
    raw_10k = r.text
    # regex to find <DOCUMENT> tags
    doc_start_pattern = re.compile(r'<DOCUMENT>')
    doc_end_pattern = re.compile(r'</DOCUMENT>')
    # regex to find <TYPE> tag prceeding any characters, terminating at new line
    type_pattern = re.compile(r'<TYPE>[^\n]+')
    # regex to get the filling date
    date_pattern = re.compile(r'FILED AS OF DATE:.+')
    # create 3 lists with the span idices for each regex
    doc_start_is = [x.end() for x in doc_start_pattern.finditer(raw_10k)]
    doc_end_is = [x.start() for x in doc_end_pattern.finditer(raw_10k)]
    doc_types = [x[len('<TYPE>'):] for x in type_pattern.findall(raw_10k)]
    # get the filling date
    doc_date = [x[len('FILED AS OF DATE:\t\t'):] for x in date_pattern.findall(raw_10k)][0][:8]
    
    document = {}

    # Create a loop to go through each section type and save only the 10-K & 10-K/A section in the dictionary
    d_type = ""
    for doc_type, doc_start, doc_end in zip(doc_types, doc_start_is, doc_end_is):
        if doc_type == '10-K' or doc_type == '10-K/A' or doc_type == '10-KT' or doc_type == '10-KT/A':
            document[doc_type] = raw_10k[doc_start:doc_end]
            d_type = doc_type

    return document[d_type], d_type, doc_date

def get_rf_section_10k(text):
    '''
    get the text of the risk factor section
    the pattern for regular expression failed to handle some exceptions, such as companies having their risk factor section under different title other than item 1A, or having rare spacing characters.
    Args:
        String : raw text of the 10-K returned by the parse 10-K function
    Returns: 
        String of raw risk factor section
    '''
    # write the regex
    regex = re.compile(r'(>Item(\s|&#160;|&nbsp;|\n    )(1a|1A|1b|1B|2|3|4)\.{0,1})|(ITEM(\s|&#160;|&nbsp;|\n    )(1a|1A|1b|1B|2|3|4))|(PART II)|(>tem(&#160;| )(1A|1B|2|3|4))|(>TEM(&#160;| )(1A|1B|2|3|4))|((>&#32;Item(\s|&#160;|&nbsp;|\n    )(1a|1A|1b|1B|2|3|4)))')

    # use finditer to match the regex
    matches = regex.finditer(text)
    
    # store all matches
    all_match = []
    for match in matches:
        all_match.append(match)
    
    # return "No Risk Factor Part" if no matches
    if len(all_match) == 0:
        return "No Risk Factor Part"
    
    matches = regex.finditer(text)
    # create the dataframe
    test_df = pd.DataFrame([(x.group(), x.start(), x.end()) for x in matches])
    test_df.columns = ['item', 'start', 'end']
    test_df['item'] = test_df.item.str.lower()

    
    # get rid of unnesesary charcters from the dataframe
    test_df.replace('&#160;',' ',regex=True,inplace=True)
    test_df.replace('&#32;','',regex=True,inplace=True)
    test_df.replace('&nbsp;',' ',regex=True,inplace=True)
    test_df.replace(' ','',regex=True,inplace=True)
    test_df.replace('\.','',regex=True,inplace=True)
    test_df.replace('>','',regex=True,inplace=True)
    test_df.replace('\n','',regex=True,inplace=True)
    test_df.replace('\n    ','',regex=True,inplace=True)
    test_df.replace('^tem','item',regex=True,inplace=True)
    test_df.replace('iitem','item',regex=True,inplace=True)

    # sort all the items in the dataframe, drop duplicated items and only keep last item with the same name
    if "item1a" in test_df["item"].values:
        pos_dat = test_df.loc[test_df['item'].shift(1) != test_df['item']]
        pos_dat = pos_dat.sort_values('start', ascending=True).drop_duplicates(subset=['item'], keep='last')
        pos_dat = pos_dat.reset_index()
       
        item_1a_index = pos_dat[pos_dat['item']=='item1a'].index.values[0]
        if item_1a_index != pos_dat.shape[0]-1:
            item_1a_raw = text[pos_dat['start'].loc[item_1a_index]:pos_dat['start'].loc[item_1a_index + 1]]
        else:
             item_1a_raw = "No Risk Factor Part"

    else:
        item_1a_raw = "No Risk Factor Part"
    
    return item_1a_raw

def content_refine_10k(text):
    '''
    Refine the raw risk factor text.
    Args:
        String : raw risk factor String returned by method "get_rf_section_10k"
    Returns: 
        String of refined risk factor section
    '''
    if not text:
        return None
    # First convert the raw text we have to exrtacted to BeautifulSoup object 
    item_1a_content = BeautifulSoup(text, 'lxml')
    
    # Method get_text() is what we need, \n\n is optional, I just added this to read text 
    # more cleanly, it's basically new line character between sections. 
    return item_1a_content.get_text("\n\n")

def get_rf_10k(url_10k, item_dict, year):
    '''
    Combine report processing methods above into single function and write the result to text file in risk_factors file.
    Args:
        List : list of url for 10Ks
        Dict : dictionary to keep track of written files(avoid duplicate file e.g. if amend file has risk factor section, we take the content in the amend file)
        List : list of years to be scraped (default - the recent 3 year)
    '''
    # get the raw text of the full report with the doc_type and doc_date
    raw_text, doc_type, doc_date = parser_10k(url_10k[1])
    # get document year
    doc_year = int(doc_date[:4])
    # if document year is not in year list return None
    if doc_year not in year:
        return None
    # get the raw risk factor text
    raw_rf_text = get_rf_section_10k(raw_text)
    # if amend report does not contain risk factor section, return None
    if raw_rf_text == "No Risk Factor Part" and doc_type == "10-K/A":
        return None
    # refine the raw risk factor text
    rf = content_refine_10k(raw_rf_text)
    # remove footnote
    rf_ = remove_footnote(rf)
    # get company ticker stored previously in url_10k
    company = url_10k[0]
    # get the company name and report year and store as String for future purpose in keeping track of the text written to output directory
    file = company+'_' + str(doc_date)
    # if file name already exist(since amend report will be scraped first due to the ordering, meaning the amend report of the year contains risk factor section
    # and the current file with the duplicate name should be the original 10-K, so we store the file with name "ticker_filling date_10k_org")
    if file in item_dict.keys():
        file_name = "risk_factors/{0}_{1}_10k_org.txt".format(company, str(doc_date))
    file_name = "risk_factors/{0}_{1}_10k.txt".format(company, str(doc_date))
    with open(file_name, 'w', encoding='utf-8') as f: 
        f.write(rf_)      
    return file

def scraper_10K(company_list=None, year=default_year):
    '''
    Combine the code for getting 10K urls and text processing.
    Args:
        List : company list containing tickers of the companies to be scraped(default: all company)
        List : year list containing years to be scraped(default: most recent 3 years)
    Returns: 
        List of failed cases.
    '''
    # get the urls
    url_10k = get_company_url_10k(company_list, year)
    # create a dictionary to keep track on scraped reports
    item_dict = collections.defaultdict(int)
    # create a list to store failed cases
    failed_case = []
    # loop through the urls to get all 10Ks' risk factor section
    for item in tqdm_notebook(url_10k):
        try:
            file = get_rf_10k(item, item_dict, year)
            if not file:
                continue
            item_dict[file] += 1
        except:
            failed_case.append(item)
    return failed_case

## 10_Q
#### Following functions are specifically written to scrape the 'Risk factors' section of 10Q reports from Edgar database

In [0]:
def get_company_url_10q(ticker=None, yearl=default_year):
    '''
    Get urls of 10q files for certain tickers and years
    Args:
        ticker : given ticker list
        yearl : given year list 
    Returns: 
        List of urls of 10q files
    '''
    mydict = {}
    # read the file that matches the tickers and cik number, since we use cik number to get the url
    with open('ticker.txt', mode='r') as infile:
        for line in infile:
            item = line.split('\t')
            mydict[item[0]] = item[1].strip().zfill(7)
    if ticker == None:
        ticker = mydict.keys()
    url_10q=[]
    curr_year = datetime.now().year
    for i in tqdm_notebook(ticker):
        diff = curr_year + 1 - min(yearl)
        # use the get_files function to get the certain number of urls;
        # the number is the difference between the earliest year multiplies 6
        # (in one year, the maximum number of 10q is 3 quarters * 2 types of 10q -- 10q and 10q/a)
        urlq = get_files(mydict[i], "10-Q", no_of_documents = diff*3*2)
        for j in range(len(urlq)):
            if urlq[j] != "":
                yearq = int(urlq[j].split("-")[1]) + 2000
                # to check whether the year of the document is within the given list of years
                if yearq in yearl:
                    url_10q.append((i, urlq[j], yearq))
    return url_10q

def parser_10q(url):
    '''
    Extract the raw text of 10q file
    Args:
        url : 10q report url
    Returns: 
        String of the 10q document, document type and filling date
    '''
    r = requests.get(url)
    raw_10q = r.text
    # get the fillling date of the 10q file
    line = raw_10q.split("\n")
    date = line[7].split("\t")
    # Regex to find <DOCUMENT> tags
    doc_start_pattern = re.compile(r'<DOCUMENT>')
    doc_end_pattern = re.compile(r'</DOCUMENT>')
    # Regex to find <TYPE> tag prceeding any characters, terminating at new line
    type_pattern = re.compile(r'<TYPE>[^\n]+')
    doc_start_is = [x.end() for x in doc_start_pattern.finditer(raw_10q)]
    doc_end_is = [x.start() for x in doc_end_pattern.finditer(raw_10q)]
    doc_types = [x[len('<TYPE>'):] for x in type_pattern.findall(raw_10q)]
    document = {}
    # get the document type and the main content within this tag; we only want the document type of 10q or 10q/a
    for doc_type, doc_start, doc_end in zip(doc_types, doc_start_is, doc_end_is):
        if doc_type == '10-Q':
            document[doc_type] = raw_10q[doc_start:doc_end]
            return document['10-Q'], '10-Q', date[-1]

        if doc_type == '10-Q/A':
            document[doc_type] = raw_10q[doc_start:doc_end]
            return document['10-Q/A'], '10-Q/A', date[-1]

    return None, None, None

def get_rf_section_10q(text):
    '''
    Use regular experssion to extract risk factor part in the raw text (However, our regular expression is hard to capture all the risk factor structures)
    Args:
        text : extracted raw text
    Returns: 
        String of the risk factor part or "No Risk Factor Part"
    '''
    item_1a = ""
    if text == None:
        return "No Risk Factor Part"
    # Write the regex (This regex now cannot capture all the risk fatcor patterns, since there are a lot of different structures of documents and we now check them manually。)
    regex = re.compile(r'(> *Item(\s|&#160;|&nbsp;|&#32;)(1A|1B|2|3|4|5|6)\.{0,1})|(> *ITEM(\s|&#160;|&nbsp;|&#32;)(1A|1B|2|3|4|5|6))')
    matches = regex.finditer(text)

    try:
        # Create the dataframe and store the item name, start position and end position of this item
        test_df = pd.DataFrame([(x.group(), x.start(), x.end()) for x in matches])
        test_df.columns = ['item', 'start', 'end']
        test_df['item'] = test_df.item.str.lower()

        # Get rid of unnesesary charcters from the dataframe
        test_df.replace('&#160;',' ',regex=True,inplace=True)
        test_df.replace('&#32;','',regex=True,inplace=True)
        test_df.replace('&nbsp;',' ',regex=True,inplace=True)
        test_df.replace(' ','',regex=True,inplace=True)
        test_df.replace('\.','',regex=True,inplace=True)
        test_df.replace('>','',regex=True,inplace=True)
        test_df.replace('\n','',regex=True,inplace=True)
        test_df.replace('\n    ','',regex=True,inplace=True)
        test_df.replace('^tem','item',regex=True,inplace=True)
        test_df.replace('iitem','item',regex=True,inplace=True)

        # sort all the items in the dataframe, drop duplicated items and only keep last item with the same name
        if "item1a" in test_df["item"].values:
            pos_dat = test_df.loc[test_df['item'].shift(1) != test_df['item']]
            pos_dat = pos_dat.sort_values('start', ascending=True).drop_duplicates(subset=['item'], keep='last')
            pos_dat = pos_dat.reset_index()       
            item_1a_index = pos_dat[pos_dat['item']=='item1a'].index.values[0]
            # if the certain item cannot be found, just return "No Risk factor Part"
            if item_1a_index != pos_dat.shape[0]-1:
                item_1a = text[pos_dat['start'].loc[item_1a_index]:pos_dat['start'].loc[item_1a_index + 1]]
            else:
                 item_1a = "No Risk Factor Part"

        else:
            item_1a = "No Risk Factor Part"
            
    except:
        item_1a = "No Risk Factor Part"
    
    return item_1a

def content_refine_10q(raw_text):
    '''
    Refine the risk factor part with the original format with paragraph
    Args:
        raw_text : extracted risk factor part
    Returns: 
        String of the refined risk factor part
    '''
    if raw_text == "No Risk Factor Part":
        return raw_text
    # Convert the raw text we have to exrtacted to BeautifulSoup object 
    item_1a_content = BeautifulSoup(raw_text, 'lxml')
    return item_1a_content.get_text("\n")

def write_10q(outputfile, text):
    '''
    Write the risk factor in file
    Args:
        outputfile : the name of the output file
        text : extracted text to be written
    '''
    with open(outputfile, 'w', encoding='utf-8') as f:
        f.write(text)
        
def get_rf_10q(url_10q, flag):
    '''
    Integrate the functions to get the risk factor by one given url
    Args:
        url_10q : one url of the 10q file
        flag : integer used to mark whether we will pass the next file
    Returns: 
        String of the output file name and the current flag
    '''
    # Define the path of the files
    base = "risk_factors/"
    # replace the "/" in name to "_"
    name = url_10q[0].replace("/", "_")
    raw_text, doctype, date = parser_10q(url_10q[1])
    raw_rf = get_rf_section_10q(raw_text)
    
    # After we get the risk factor part, there is a strategy to get the correct risk factor:
    # if there is no risk factor part in 10q/a, we should find the next file(10q), so set a flag to mark this.
    # if there is risk factor part in 10q/a, just write this part into output and pass the next file, also set a flag to mark this.
    # if the type is 10q, no matter what we get, write the result into output.
    if raw_rf == "No Risk Factor Part" and (doctype == "10-Q/A" or doctype == None) and flag != -1:
        return (None, -1)
    elif raw_rf !="No Risk Factor Part" and doctype == "10-Q/A":
        flag = 1
        
    flag = 0
    file_name = base+name+"_"+date+"_10q.txt"
    if raw_rf == "No Risk Factor Part":
        write_10q(file_name, raw_rf)
    else:
        # Refine the risk factor part and remove the page number in foot note.
        rf = content_refine_10q(raw_rf)
        rf = remove_footnote(rf)
        write_10q(file_name, rf)
    return file_name, flag

def scraper_10Q(ticker=None, yearl=default_year):
    '''
    Put all the functions of 10q part together and write all the scraping result
    Args:
        ticker : given ticker list
        yearl : given year list 
    Returns: 
        List of failed cases
    '''
    failed_case = []
    filenames = {}
    totalnumber = {}
    flag = 0
    # get all the urls for given input
    url_10q = get_company_url_10q(ticker, yearl)
    
    # Since sometimes we need to pass some file(as described in function "get_rf_10q"),
    # and ensure there are no more than 3 files in the year of given ticker, we need to 
    # use a dictinary filenames to count the number of files with the same name in the same year
    for item in tqdm_notebook(url_10q):
        name = item[0]
        year = item[2]
        if name not in filenames:
            filenames[name] = {}
            filenames[name][year] = 0
            flag = 0
        else:
            if year not in filenames[name]:
                filenames[name][year] = 0
                flag = 0
            else:
                filenames[name][year] += 1
                
        if  filenames[name][year] >= 3:
            continue
              
        # if the flag is 1 in the last round, we should pass the current file.
        if flag == 1:
            flag = 0
            continue
        try:
            file, flag = get_rf_10q(item, flag)
            # if the flag is 1 or -1, that means we pass one file, so the count should be -1.
            if flag == -1 or flag == 1:
                filenames[name][year] -= 1
            if file == None:
                continue
        # if there are some exceptions, just put them into failed_case list
        except:
            failed_case.append(item)
    return failed_case

## Get All Risk Factors

In [0]:
def Risk_Factor_Scraper(company_list=None, year=default_year):
    '''
    This function integrate the 10k scraper and 10q scraper
    Args:
        comapny_list : given ticker list
        year : given year list 
    Returns: 
        List of 10k failed cases and list of 10q failed cases
    '''
    failed_case_10q = scraper_10Q(company_list, year)
    failed_case_10k = scraper_10K(company_list, year)
    return failed_case_10k, failed_case_10q

In [0]:
# In pratice, you can just change the input -- ticker_list and year_list, 
# and run this function, the certain risk factor part will be written into the folder
ticker_list = ["aapl"]
year_list = [2018, 2019, 2020]

failed_case_10k, failed_case_10q = Risk_Factor_Scraper(ticker_list)

HBox(children=(IntProgress(value=0, max=1), HTML(value='')))




HBox(children=(IntProgress(value=0, max=7), HTML(value='')))




HBox(children=(IntProgress(value=0, max=1), HTML(value='')))




HBox(children=(IntProgress(value=0, max=2), HTML(value='')))


