In [2]:
"""
npx_parser_simple.ipynb
-----------------------
A simplified N-PX parsing demonstration focused on the core algorithm:
1) Read the SEC header to extract file-level metadata.
2) Stream each <DOCUMENT> block to identify "primary_doc.xml" or vote tables.
3) Parse the relevant XML sections using BeautifulSoup (lxml-xml).
4) Output final metadata and vote data as CSV.

This removes advanced or redundant logic (e.g., iterparse for huge files,
Trino queries, N-14/N-PORT references) to keep the example minimal.
"""

import os
import re
import pandas as pd
from bs4 import BeautifulSoup

# Directory of .txt N-PX filings (already downloaded)
NPX_DOWNLOAD_DIR = "./npx_filings"

In [3]:
# ------------------------------------------------------------------------------
# 1. Read/Parse SEC Header
# ------------------------------------------------------------------------------
def parse_sec_header(filepath):
    """
    Reads file line-by-line until the first <DOCUMENT> is encountered.
    Extracts basic header fields (e.g., ACCESSION NUMBER, FILING DATE).
    """
    header_info = {
        'accessionNumber': "",
        'filingDate': "",
        'conformedPeriod': "",
        'headerCik': "",
        'amendmentNo': ""
    }

    with open(filepath, "r", encoding="utf-8", errors="ignore") as f:
        for line in f:
            line_upper = line.upper()
            if "<DOCUMENT>" in line_upper:
                break

            if "AMENDMENT NO:" in line_upper:
                val = line.split("NO:")[-1].strip()
                header_info['amendmentNo'] = val
            elif "ACCESSION NUMBER:" in line_upper:
                val = line.split("NUMBER:")[-1].strip()
                header_info['accessionNumber'] = val
            elif "FILED AS OF DATE:" in line_upper:
                val = line.split("DATE:")[-1].strip()
                header_info['filingDate'] = val
            elif "CONFORMED PERIOD OF REPORT:" in line_upper:
                val = line.split("REPORT:")[-1].strip()
                header_info['conformedPeriod'] = val
            elif "CENTRAL INDEX KEY:" in line_upper:
                val = line.split("KEY:")[-1].strip()
                header_info['headerCik'] = val

    return header_info


In [4]:
# ------------------------------------------------------------------------------
# 2. Stream <DOCUMENT> Blocks
# ------------------------------------------------------------------------------
def stream_documents(filepath):
    """
    Generator: yields (doc_type, doc_content) for each <DOCUMENT> block.
    This approach is simpler than reading the entire file into memory
    or dealing with lxml.etree.iterparse for huge files.
    """
    inside_document = False
    doc_lines = []
    doc_type = "NO_TYPE_FOUND"

    type_regex = re.compile(r"<TYPE>(?P<type>[^\r\n<]+)", re.IGNORECASE)

    with open(filepath, "r", encoding="utf-8", errors="ignore") as f:
        for line in f:
            upper_line = line.upper()

            if "<DOCUMENT>" in upper_line:
                inside_document = True
                doc_lines = [line]  # start collecting lines
                doc_type = "NO_TYPE_FOUND"

            elif "</DOCUMENT>" in upper_line and inside_document:
                doc_lines.append(line)
                doc_text = "".join(doc_lines)
                yield (doc_type.upper(), doc_text)

                inside_document = False
                doc_lines = []
                doc_type = "NO_TYPE_FOUND"

            else:
                if inside_document:
                    doc_lines.append(line)
                    match = type_regex.search(line)
                    if match:
                        doc_type = match.group("type").strip()


In [5]:
# ------------------------------------------------------------------------------
# 3. Extract & Parse XML Blocks
# ------------------------------------------------------------------------------
def extract_primary_doc_xml(document_text):
    """
    Identifies whether the <FILENAME> tag references 'primary_doc.xml'.
    If so, extracts the <TEXT> content as raw XML.
    """
    filename_match = re.search(r"<FILENAME>(.*?)</FILENAME>", document_text, re.IGNORECASE)
    if not filename_match:
        return None

    filename = filename_match.group(1).strip()
    if "primary_doc.xml" in filename.lower():
        text_match = re.search(r"<TEXT>(?P<xml>.*?)</TEXT>", document_text, re.IGNORECASE | re.DOTALL)
        return text_match.group("xml") if text_match else None

    return None


def extract_vote_table_xml(document_text):
    """
    Identifies if the <TYPE> or <DESCRIPTION> indicates a PROXY VOTING RECORD or VOTE TABLE,
    then grabs the <TEXT> content as raw XML.
    """
    # Check <TYPE>
    type_match = re.search(r"<TYPE>(?P<type>[^\r\n<]+)", document_text, re.IGNORECASE)
    doc_type = type_match.group("type").strip() if type_match else ""

    if "PROXY VOTING RECORD" in doc_type.upper():
        text_match = re.search(r"<TEXT>(?P<xml>.*?)</TEXT>", document_text, re.IGNORECASE | re.DOTALL)
        return text_match.group("xml") if text_match else None

    # Check <DESCRIPTION>
    desc_match = re.search(r"<DESCRIPTION>(?P<desc>[^\r\n<]+)", document_text, re.IGNORECASE)
    desc = desc_match.group("desc").strip() if desc_match else ""
    if "VOTE TABLE" in desc.upper():
        text_match2 = re.search(r"<TEXT>(?P<xml>.*?)</TEXT>", document_text, re.IGNORECASE | re.DOTALL)
        return text_match2.group("xml") if text_match2 else None

    return None

In [6]:
# ------------------------------------------------------------------------------
# 4. Parsing the Found XML with BeautifulSoup (lxml-xml)
# ------------------------------------------------------------------------------
def parse_primary_npx_xml(xml_str):
    """
    Parses the 'primary_doc.xml' block to extract fields like seriesId, periodOfReport, etc.
    """
    from bs4 import BeautifulSoup
    soup = BeautifulSoup(xml_str, "lxml-xml")

    edgar_sub = soup.find("edgarSubmission")
    if not edgar_sub:
        return {}

    doc_info = {}
    # Example tags we commonly see in N-PX
    doc_info["seriesId"] = edgar_sub.find_text("seriesId") or ""

    doc_info["periodOfReport"] = edgar_sub.find_text("periodOfReport") or ""
    doc_info["submissionType"] = edgar_sub.find_text("submissionType") or ""
    doc_info["registrantType"] = edgar_sub.find_text("registrantType") or ""
    doc_info["investmentCompanyType"] = edgar_sub.find_text("investmentCompanyType") or ""

    return doc_info


def parse_vote_table_xml(xml_str):
    """
    Parses the proxy table to extract voting records: issuerName, cusip, howVoted, etc.
    """
    from bs4 import BeautifulSoup
    soup = BeautifulSoup(xml_str, "lxml-xml")

    # "proxyTable" or "inf:proxyTable"
    vote_tables = soup.find_all(["proxyTable", "inf:proxyTable"])

    results = []
    for vt in vote_tables:
        row = {}

        issuer_tag = vt.find("issuerName")
        row["issuerName"] = issuer_tag.get_text(strip=True) if issuer_tag else ""

        cusip_tag = vt.find("cusip")
        row["cusip"] = cusip_tag.get_text(strip=True) if cusip_tag else ""

        isin_tag = vt.find("isin")
        row["isin"] = isin_tag.get_text(strip=True) if isin_tag else ""

        figi_tag = vt.find("figi")
        row["figi"] = figi_tag.get_text(strip=True) if figi_tag else ""

        meeting_date_tag = vt.find("meetingDate")
        row["meetingDate"] = meeting_date_tag.get_text(strip=True) if meeting_date_tag else ""

        shares_voted_tag = vt.find("sharesVoted")
        row["sharesVoted"] = shares_voted_tag.get_text(strip=True) if shares_voted_tag else ""

        how_voted_tag = vt.find("howVoted")
        row["howVoted"] = how_voted_tag.get_text(strip=True) if how_voted_tag else ""

        mgmt_rec_tag = vt.find("managementRecommendation")
        row["managementRecommendation"] = mgmt_rec_tag.get_text(strip=True) if mgmt_rec_tag else ""

        vote_desc_tag = vt.find("voteDescription")
        row["voteDescription"] = vote_desc_tag.get_text(strip=True) if vote_desc_tag else ""

        # Simple logic for "forAgainstMgmt"
        if row["howVoted"] and row["managementRecommendation"]:
            if row["howVoted"].upper() == row["managementRecommendation"].upper():
                row["forAgainstMgmt"] = "FOR"
            else:
                row["forAgainstMgmt"] = "AGAINST"
        else:
            row["forAgainstMgmt"] = ""

        results.append(row)

    return results

In [7]:
# ------------------------------------------------------------------------------
# 5. Main Parsing Function
# ------------------------------------------------------------------------------
def parse_npx_file(filepath):
    """
    Single approach: line-by-line streaming to get doc_type and doc_content,
    extract needed XML, parse them. Return (doc_info, all_votes).
    """
    # 1. Basic header info
    header_info = parse_sec_header(filepath)
    doc_info = dict(header_info)
    all_votes = []

    # 2. For each <DOCUMENT> block, see if it's primary_doc.xml or a vote table
    for doc_type, doc_content in stream_documents(filepath):
        doc_type_upper = doc_type.upper()

        # If we see "N-PX" in <TYPE> or something similar
        if "N-PX" in doc_type_upper:
            # Attempt to parse "primary_doc.xml"
            primary_xml = extract_primary_doc_xml(doc_content)
            if primary_xml:
                p_info = parse_primary_npx_xml(primary_xml)
                doc_info.update(p_info)

        # If we see "PROXY VOTING RECORD" or "VOTE TABLE"
        if ("PROXY VOTING RECORD" in doc_type_upper) or ("VOTE TABLE" in doc_type_upper):
            vote_xml = extract_vote_table_xml(doc_content)
            if vote_xml:
                votes = parse_vote_table_xml(vote_xml)
                all_votes.extend(votes)

    return doc_info, all_votes

In [8]:
# ------------------------------------------------------------------------------
# 6. Minimal Main Notebook Flow
# ------------------------------------------------------------------------------
metadata_list = []
votes_list = []

all_txt_files = [
    f for f in os.listdir(NPX_DOWNLOAD_DIR)
    if f.lower().endswith(".txt")
]

for filename in all_txt_files:
    filepath = os.path.join(NPX_DOWNLOAD_DIR, filename)
    doc_info, doc_votes = parse_npx_file(filepath)

    if doc_info:
        doc_info["filename"] = filename
        metadata_list.append(doc_info)

    for vote_item in doc_votes:
        vote_item["filename"] = filename
        votes_list.append(vote_item)

df_meta = pd.DataFrame(metadata_list)
df_votes = pd.DataFrame(votes_list)

# (Optional) Merge on filename
if not df_meta.empty and not df_votes.empty:
    df_final = df_votes.merge(df_meta, on="filename", how="inner")
else:
    df_final = pd.DataFrame()

print(f"Metadata entries: {len(df_meta)}")
print(f"Vote entries: {len(df_votes)}")
print(f"Final joined entries: {len(df_final)}")

df_meta.to_csv("npx_simplified_metadata.csv", index=False)
df_votes.to_csv("npx_simplified_votes.csv", index=False)
df_final.to_csv("npx_simplified_final.csv", index=False)

print("Done! See CSV outputs for the simplified parse approach.")

Metadata entries: 25
Vote entries: 115047
Final joined entries: 115047
Done! See CSV outputs for the simplified parse approach.
