Reproducible steps to create a corpus from [EDGAR](https://www.sec.gov/edgar) 10-k filings.

# Example

Retrieve a single file by hand.

1. Navigate to [here](https://www.sec.gov/edgar/searchedgar/companysearch)
2. Enter "ENV" into the search box.
3. Right hand side, expand "10-K (annual reports) and 10-Q (quarterly reports)"
4. Get whatever is on top.

As of 2024/01/03

"ENV" is:

* https://www.sec.gov/edgar/browse/?CIK=1337619
* https://www.sec.gov/ix?doc=/Archives/edgar/data/1337619/000133761923000012/env-20221231.htm

"MSFT" is:

* https://www.sec.gov/edgar/browse/?CIK=789019
* https://www.sec.gov/ix?doc=/Archives/edgar/data/789019/000095017023035122/msft-20230630.htm


# Pseudocode

Below is a list of the steps we take.
Keep in mind that these steps are a 10 thousand foot view.
The implementation will be commented to a more detailed level.

1. Get the tickers from the [SEC](https://www.sec.gov/file/company-tickers)
2. Using the retrieved data, get the accession documents for the 10-Ks (`form_type`) the past 20 (`limit`) years
3. Using the retrieved data, get the XHTML files
4. For each retrieved XHTML, extract the "Item 7: Management's Discussion ..." section the a TXT file

You can force a full re-download by deleting everything in _~/data/10-k_.
Otherwise the script will do a checkpoint evaluation of how far it has processed.
It will skip steps it thinks are done.

In [None]:
from pathlib import Path

tickers_url = 'https://www.sec.gov/files/company_tickers.json'
user_agent = 'TextCorpusLabs/EDGAR'
limit = 20
form_type = '10-K'

data_folder = Path('./data/')
tickers_file = data_folder.joinpath('./tickers.csv')
form_type_folder = data_folder.joinpath(f'./{form_type}')
accessions_file = form_type_folder.joinpath('./accession.metadata.csv')
raw_folder = form_type_folder.joinpath('./raw')
corpus_folder = form_type_folder.joinpath('./corpus')

# Step 1

1. Get the list of tickers from the SEC
2. Convert the tickers into an array, then sort it.
3. Save the tickers to a CSV

In [None]:
import requests
import pandas as pd
import json

def get_tickers(tickers_file: Path, tickers_url: str, user_agent: str, ) -> pd.DataFrame:
    if not tickers_file.exists():
        tickers = None
        with requests.Session() as session:
            session.headers['User-Agent'] = user_agent
            with session.get(tickers_url) as result:
                if result.status_code == 200:
                    t1 = json.loads(result.text)
                    t2 = [x for x in t1.values()]
                    t3 = sorted(t2, key = lambda tup: tup['ticker'])
                    tickers = [(x['cik_str'], x['ticker'], x['title']) for x in t3]
        if tickers is not None:
            df = pd.DataFrame(tickers, columns = ['CIK', 'Ticker', 'Name'])
            if not tickers_file.parent.exists():
                tickers_file.parent.mkdir(parents = True)
            df.to_csv(tickers_file, index = False)
        else:
            raise RuntimeError('Error retrieving tickers')          
    return pd.read_csv(tickers_file) #type: ignore

tickers = get_tickers(tickers_file, tickers_url, user_agent)

# Step 2

For each CIK in _tickers.csv_ (Step 1)

1. Get the accessions for the past 20 10-Ks

Save all the accessions for all the CIKs to disk

**Note 1**: Notice `tickers.CIK.unique()`.
The data pull needs to be done on CIK, not ticker.
A single company can have more than one ticker (AACI vs AACIU), byt only one CIK (1844817).

**Note 2**: Notice `except ValueError: pass`.
It is possible for a CIK (or ticker) to have no associated documents of a particular type(10-k).
`get_filing_metadatas()` responds to this case by throwing an error.
On our side, it just means skip the record.


In [None]:
#type: ignore
#cSpell: ignore tqdm, metadatas, dtype
from sec_downloader import Downloader
from tqdm.notebook import tqdm
import sec_downloader.types as sec_t
import typing as t

def get_accession_metadata(accessions_file: Path, tickers: pd.DataFrame, form_type: str, limit: int, user_agent: str) -> pd.DataFrame:
    if not accessions_file.exists():
        metadata: t.List[sec_t.FilingMetadata] = []
        downloader = Downloader(user_agent, '')
        for cik in tqdm(tickers.CIK.unique()):
            try:
                t1 = downloader.get_filing_metadatas(sec_t.RequestedFilings(ticker_or_cik  = cik, form_type = form_type, limit = limit))
                metadata.extend(t1)
            except ValueError:
                pass
        if len(metadata) > 0:
            df = pd.DataFrame(metadata)
            df = df[["cik", "accession_number", "report_date", "primary_doc_url"]]
            df = df.rename(columns={'cik': 'CIK', 'accession_number': 'Accession Number', 'report_date': 'Report Date', 'primary_doc_url': 'URL'})
            if not accessions_file.parent.exists():
                accessions_file.parent.mkdir(parents = True)
            df.to_csv(accessions_file, index = False)
        else:
            raise RuntimeError('Error retrieving accessions')
    accessions = pd.read_csv(accessions_file, dtype = {'CIK': int, 'Accession Number': str, 'Report Date': str, 'URL': str})
    accessions['Report Date'] = pd.to_datetime(accessions['Report Date'])
    return accessions

accessions = get_accession_metadata(accessions_file, tickers, form_type, limit, user_agent)

# Step 3

For each accession in _accessions.csv_ (Step 2)

1. Get the XHTML document
2. save it to disk as _~/data/10-k/raw/{year}/{cik}.{accession number}.xhtml_

In [None]:
#cSpell: ignore tqdm
#type: ignore
from datetime import datetime
from dataclasses import dataclass

def get_accession_metadata(raw_folder: Path, accessions: pd.DataFrame, user_agent: str) -> None:
    @dataclass
    class Accession:
        CIK: int
        AccessionNumber: str
        ReportDate: datetime
        URL: str
        def __init__(self, record: t.Dict[str, t.Union[int, str, datetime]]):
            self.CIK = record['CIK']
            self.AccessionNumber = record['Accession Number']
            self.ReportDate = record['Report Date']
            self.URL = record['URL']
    def get_filing_xhtml(session: requests.Session, accession: Accession) -> t.Union[None, str]:
        with session.get(accession.URL) as response:
            if response.status_code == 200:
                return response.text
        return None
    def get_xhtml_file_path(raw_folder: Path, accession: Accession) -> Path:
        year = str(accession.ReportDate.year)
        year = year if year != 'nan' else '0000'
        return raw_folder.joinpath(f'{year}/{accession.CIK}.{accession.AccessionNumber}.xhtml')
    with requests.Session() as session:
        session.headers['User-Agent'] = user_agent
        for accession in tqdm([Accession(x) for x in accessions.to_dict('records')]): 
            xhtml_file = get_xhtml_file_path(raw_folder, accession)
            if not xhtml_file.parent.exists():
                xhtml_file.parent.mkdir(parents = True)
            if xhtml_file.exists():
                continue
            xhtml = get_filing_xhtml(session, accession)
            if xhtml is None:
                print(f'{accession.CIK}.{accession.AccessionNumber} failed')
            else:
                with open(xhtml_file, mode = 'w') as fp:
                    fp.write(xhtml)

get_accession_metadata(raw_folder, accessions, user_agent)

# Step 4

For each XHTML document:

1. Find "Item 7: Management's Discussion ..."
2. Find the next section.
3. Extract the IDs for both.
4. Extract the HTML between the IDs
5. Convert to TXT

In [None]:
#cSpell: ignore lxml, tqdm, xpaths
#type: ignore
from lxml import etree

def extract_text(raw_folder: Path, corpus_folder: Path) -> None:
    def get_txt_file_path(corpus_folder: Path, xhtml_file: Path) -> Path:
        return corpus_folder.joinpath(xhtml_file.parent.name, xhtml_file.name.replace('.xhtml', '.txt'))
    def get_ref_ids(node: etree.Element) -> t.Union[None, t.Tuple[str, str]]:
        # you need to include the NS in every part of the `xpath`
        # https://stackoverflow.com/questions/38936185/etree-xpath-return-entire-html-instead-of-text
        ns_map = {'x':'http://www.w3.org/1999/xhtml'}
        # older 10-ks have a different structure.
        # they are missing the modern cross linking that we need
        # around 2015, we start to see cross linking, but it is all over the place
        # we find "Financial Condition...", go to its containing tr
        # id1 is the the first href
        # looking at all the following sibling trs, looking at the non-empty first tds, id2 is the first href
        xpath_0 = ".//x:table/x:tr/x:td//*[contains(text(),'Financial Condition and Results of Operations')]/ancestor::x:tr"
        xpath_1 = ".//x:a[contains(@href, '#')]"
        xpath_2 = "following-sibling::x:tr/x:td[1]//*[normalize-space(text())]/ancestor-or-self::x:a[contains(@href, '#')]"
        t0: list[etree.Element] = node.xpath(xpath_0, namespaces = ns_map)
        if t0 is None or len(t0) == 0:
            return None
        t1: list[etree.Element] = t0[0].xpath(xpath_1, namespaces = ns_map)
        t2: list[etree.Element] = t0[0].xpath(xpath_2, namespaces = ns_map)
        if t1 is None or len(t1) == 0 or t2 is None or len(t2) == 0:
            return None
        ids = (t1[0].attrib['href'], t2[0].attrib['href'])
        return [id[1:] for id in ids]
    def get_text(node: etree.Element, ids: t.Tuple[str, str]) -> t.Union[None, str]:
        ns_map = {'x':'http://www.w3.org/1999/xhtml'}
        t1: list[etree.Element] = node.xpath(f".//*", namespaces = ns_map)
        start: int = -1
        end: int = -1
        for i in range(0, len(t1)):
            if 'id' in t1[i].attrib and t1[i].attrib['id'] == ids[0]:
                start = i + 1
                for j in range(i+1, len(t1)):
                    if 'id' in t1[j].attrib and t1[j].attrib['id'] == ids[1]:
                        end = j
                        break
                break
        if start == -1 or end == -1:
            return None
        chunks = [elm.text for elm in t1[start:end] if elm.text is not None]
        return '\n'.join(chunks)
    parser = etree.XMLParser(encoding = 'utf-8', recover = True, ns_clean = True)
    for xhtml_file in tqdm([x for x in raw_folder.rglob('*.xhtml') if x.is_file()]):
        txt_file = get_txt_file_path(corpus_folder, xhtml_file)
        if txt_file.exists():
            continue
        with open(xhtml_file, mode = 'rb') as fp:
            xhtml = fp.read()
        root: etree.Element = etree.fromstring(xhtml, parser)
        ids = get_ref_ids(root)
        if ids is not None:
            if not txt_file.parent.exists():
                txt_file.parent.mkdir(parents = True)
            text = get_text(root, ids)
            if text is not None:
                with open(txt_file, mode = 'w', encoding = 'utf-8') as fp:
                    fp.write(text)
extract_text(raw_folder, corpus_folder)