In [None]:
#**********************************************************************
#AUTHOR:     Yumna Hussain, yumnahussain444@gmail.com
#DATE:       12/31/2025
#*********************************************************************

# ====== installing required packages and setting paths  ======

!pip -q install beautifulsoup4 lxml nltk tqdm

import re, time, json, gzip
from io import StringIO

import requests
import pandas as pd
from bs4 import BeautifulSoup
from lxml import etree
from tqdm import tqdm

import nltk
nltk.download("punkt")
nltk.download("punkt_tab")
from nltk.tokenize import sent_tokenize

from google.colab import drive
drive.mount("/content/drive")

CIK_LIST_PATH  = "/content/drive/MyDrive/SEC_10Q/CIK_list.txt"
MASTER_IDX_PATH = "/content/drive/MyDrive/SEC_10Q/form.idx"
USER_AGENT = "Yumna Hussain (yumnahussain444@gmail.com)"

In [None]:
# ====== reading the cik file and parsing the idx file  ======
# ====== Read the CIK list and parse the SEC master index file ======

def read_cik_list(path):
    # This function extracts CIK numbers from a text file, normalizes them, and removes duplicates.
    txt = open(path, "r", encoding="utf-8", errors="ignore").read()

    # This finds any numeric token (up to 10 digits), which covers how CIKs appear in most lists.
    ciks = re.findall(r"\b\d{1,10}\b", txt)

    # This keeps the first occurrence of each CIK and preserves the original order.
    seen, out = set(), []
    for c in ciks:
        # Converting to int and back removes leading zeros so the CIK format is consistent.
        c = str(int(c))
        if c not in seen:
            seen.add(c)
            out.append(c)

    return out


def _read_lines(path):
    # This reads a text file into a list of lines, supporting both plain text and gzip-compressed files.
    if path.lower().endswith(".gz"):
        with gzip.open(path, "rt", encoding="latin1", errors="ignore") as f:
            return f.read().splitlines()

    with open(path, "r", encoding="latin1", errors="ignore") as f:
        return f.read().splitlines()


def load_master_idx_fixedwidth(path):
    # This function parses the SEC master index format that is laid out like fixed-width text.
    lines = _read_lines(path)

    # Locate the header line so we know where the filings table begins.
    header_i = None
    for i, ln in enumerate(lines):
        s = ln.replace("\ufeff", "").rstrip()
        if ("Form Type" in s) and ("Company Name" in s) and ("CIK" in s) and ("Date Filed" in s) and ("File Name" in s):
            header_i = i
            break

    if header_i is None:
        raise ValueError("Could not find the master.idx table header line.")

    # The filings table usually starts after the dashed separator line that follows the header.
    start_i = None
    for j in range(header_i + 1, min(header_i + 10, len(lines))):
        if re.fullmatch(r"-{10,}\s*", lines[j].strip()):
            start_i = j + 1
            break

    # If the separator line is missing, begin immediately after the header.
    if start_i is None:
        start_i = header_i + 1

    # Parse each row into five fields: form type, company name, CIK, filing date, and file path.
    row_re = re.compile(
        r"^(\S+)\s+(.*?)\s+(\d{1,10})\s+(\d{4}-\d{2}-\d{2})\s+(edgar/data/\d+/\S+?\.txt)\s*$"
    )

    rows = []
    for ln in lines[start_i:]:
        if not ln.strip():
            continue

        m = row_re.match(ln.rstrip())
        if not m:
            continue

        form_type, company, cik, date_filed, filename = m.groups()
        rows.append([cik.strip(), company.strip(), form_type.strip(), date_filed.strip(), filename.strip()])

    if not rows:
        raise ValueError("No filings rows were parsed from master.idx.")

    df = pd.DataFrame(rows, columns=["CIK", "Company Name", "Form Type", "Date Filed", "File Name"])
    return df


cik_list = read_cik_list(CIK_LIST_PATH)
idx = load_master_idx_fixedwidth(MASTER_IDX_PATH)

print("Loaded CIK count:", len(cik_list))
print("Parsed index row count:", len(idx))

idx.head()


In [None]:
# ====== filter to each CIKs(latest) 10Q in Q2 2020  ======
sub = idx[idx["CIK"].isin(cik_list) & idx["Form Type"].str.startswith("10-Q", na=False)].copy()

# prefer 10-Q over 10-Q/A, then most recent Date Filed
sub["is_amendment"] = sub["Form Type"].str.contains("/A", na=False).astype(int)
sub = sub.sort_values(["CIK", "is_amendment", "Date Filed"], ascending=[True, True, False])
sub = sub.groupby("CIK", as_index=False).head(1)

print("Matched 10-Q filings:", len(sub), "out of", len(cik_list))
sub.head()


In [None]:
# ====== SEC downloader and word/sentence counts  ======
session = requests.Session()
session.headers.update({
    "User-Agent": USER_AGENT,
    "Accept-Encoding": "gzip, deflate",
    "Host": "www.sec.gov",
})

def get_url_text(url, max_tries=6):
    # This fetches a URL and retries on transient errors such as rate-limiting and server hiccups.
    for k in range(max_tries):
        r = session.get(url, timeout=60)

        # A successful response is returned as text.
        if r.status_code == 200:
            r.encoding = r.encoding or "utf-8"
            return r.text

        # Indication of temporary blocking or overload
        if r.status_code in (403, 429, 500, 502, 503, 504):
            time.sleep(1.5 * (k + 1))
            continue

        # Flaging failure
        r.raise_for_status()

    raise RuntimeError(f"Failed after retries: {url} (last status {r.status_code})")

_word_re = re.compile(r"\b\w+\b", flags=re.UNICODE)

def filing_text_metrics(raw_filing_text):
    # This converts the filing HTML into plain text, normalizes spacing, and computes text-length metrics.
    soup = BeautifulSoup(raw_filing_text, "lxml")
    txt = soup.get_text(" ", strip=True)
    txt = re.sub(r"\s+", " ", txt).strip()

    word_count = len(_word_re.findall(txt))
    sentence_count = len(sent_tokenize(txt)) if txt else 0

    return word_count, sentence_count



In [None]:
def parse_cik_and_accession(filename):
    # This extracts the CIK directory and the accession number (with dashes removed) from an EDGAR filing path.
    # Example: edgar/data/1652044/0001652044-20-000021.txt -> ("1652044", "000165204420000021")
    parts = filename.strip("/").split("/")
    if len(parts) < 4 or parts[-3].lower() != "data":
        return (None, None)

    cik = parts[-2].strip()
    accession = parts[-1].replace(".txt", "").strip().replace("-", "")
    return (cik, accession)


def get_index_json_url(filename):
    # This builds the URL to the directory listing for a filing, which is used to discover the instance XML file.
    cik, acc = parse_cik_and_accession(filename)
    if not cik or not acc:
        return None
    return f"https://www.sec.gov/Archives/edgar/data/{cik}/{acc}/index.json"


def download_best_instance_xml(filename):
    # This selects the most likely instance XML from index.json by excluding common non-instance XMLs
    # and taking the largest remaining XML file.
    idx_url = get_index_json_url(filename)
    if not idx_url:
        return None

    try:
        j = json.loads(get_url_text(idx_url))
    except Exception:
        return None

    items = j.get("directory", {}).get("item", [])
    if not items:
        return None

    # These names are typically linkbases, schemas, stylesheets, or summaries rather than the instance document.
    excluded_tokens = ("cal", "def", "lab", "pre", "xsd", "xsl", "ref", "schema", "filingsummary")

    candidates = []
    for it in items:
        name = (it.get("name") or "")
        lname = name.lower()

        if not lname.endswith(".xml"):
            continue
        if any(tok in lname for tok in excluded_tokens):
            continue

        size = int(it.get("size") or 0)
        candidates.append((size, name))

    if not candidates:
        return None

    candidates.sort(reverse=True)  # largest first
    best_name = candidates[0][1]

    cik, acc = parse_cik_and_accession(filename)
    xml_url = f"https://www.sec.gov/Archives/edgar/data/{cik}/{acc}/{best_name}"

    try:
        return get_url_text(xml_url)
    except Exception:
        return None


def parse_eps_two_latest_quarters(xbrl_xml_text):
    # This extracts quarterly (roughly 80â€“100 day) diluted EPS values and returns the two most recent quarters.
    parser = etree.XMLParser(recover=True, huge_tree=True)
    try:
        root = etree.fromstring(xbrl_xml_text.encode("utf-8", errors="ignore"), parser=parser)
    except Exception:
        return (None, None)

    # This maps each context id to its (startDate, endDate) so facts can be filtered to quarterly durations.
    ctx_period = {}
    for ctx in root.findall(".//{*}context"):
        cid = ctx.get("id")
        if not cid:
            continue
        start = ctx.findtext(".//{*}period/{*}startDate")
        end = ctx.findtext(".//{*}period/{*}endDate")
        if start and end:
            ctx_period[cid] = (start.strip(), end.strip())

    facts = []
    for el in root.iter():
        try:
            if etree.QName(el).localname != "EarningsPerShareDiluted":
                continue
        except Exception:
            continue

        ctxref = el.get("contextRef")
        val_txt = (el.text or "").strip()

        if not ctxref or not val_txt or ctxref not in ctx_period:
            continue

        try:
            val = float(val_txt)
        except Exception:
            continue

        start, end = ctx_period[ctxref]
        try:
            s = pd.to_datetime(start)
            e = pd.to_datetime(end)
            days = (e - s).days
        except Exception:
            continue

        if 80 <= days <= 100:
            facts.append((e, val))

    if not facts:
        return (None, None)

    facts.sort(key=lambda x: x[0])  # by quarter end date

    # This keeps one EPS value per quarter-end date and returns the two most recent quarters.
    seen_dates = set()
    latest = []
    for end, val in reversed(facts):
        key = end.date().isoformat()
        if key in seen_dates:
            continue
        seen_dates.add(key)
        latest.append(val)
        if len(latest) == 2:
            break

    if len(latest) == 1:
        return (latest[0], None)
    return (latest[0], latest[1])


def extract_two_quarterly_eps(filename):
    # This uses only the index.json approach and returns (current_quarter_eps, previous_quarter_eps).
    xml = download_best_instance_xml(filename)
    if not xml:
        return (None, None)
    return parse_eps_two_latest_quarters(xml)



In [None]:
def normalize_master_columns(df):
    # This standardizes column names so downstream code can rely on one consistent schema.
    df.columns = [c.strip() for c in df.columns]

    # This mapping is keyed by a normalized version of the column name (lowercase, no spaces, no trailing periods).
    canonical = {
        "cik": "CIK",
        "companyname": "Company Name",
        "formtype": "Form Type",
        "datefiled": "Date Filed",
        "filename": "Filename",
        "file name": "Filename",  # included as a direct special-case label
    }

    rename_map = {}
    for c in df.columns:
        # This normalization makes "File Name", "file name", and "File   Name" comparable.
        cleaned = c.strip().lower()
        key_no_spaces = cleaned.replace(" ", "")
        key_no_dot = key_no_spaces.rstrip(".")

        # This prefers the robust "no spaces + no trailing dot" key, but also supports the literal "file name".
        if cleaned in canonical:
            rename_map[c] = canonical[cleaned]
        elif key_no_dot in canonical:
            rename_map[c] = canonical[key_no_dot]

    return df.rename(columns=rename_map)
idx = normalize_master_columns(idx)
sub = normalize_master_columns(sub)

print("idx columns:", idx.columns.tolist())
print("sub columns:", sub.columns.tolist())



In [None]:
rows = []

for _, r in tqdm(sub.iterrows(), total=len(sub)):
    cik = r["CIK"]
    filing_date = r["Date Filed"]
    filename = r["Filename"]
    filing_url = "https://www.sec.gov/Archives/" + filename.lstrip("/")

    raw_txt = get_url_text(filing_url)
    word_count, sentence_count = filing_text_metrics(raw_txt)
    eps_cur, eps_prev = extract_two_quarterly_eps(filename, raw_txt)

    rows.append({
        "CIK": cik,
        "filing_date": filing_date,
        "word_count": word_count,
        "sentence_count": sentence_count,
        "eps_current_q": eps_cur,
        "eps_previous_q": eps_prev,
        "filing_url": filing_url
    })

    time.sleep(0.35)

out = pd.DataFrame(rows).sort_values("CIK").reset_index(drop=True)
out


In [None]:
OUT_PATH = "/content/drive/MyDrive/edgar_q2_2020_10q_metrics.csv"
out.to_csv(OUT_PATH, index=False)
print("Saved to:", OUT_PATH)

from google.colab import files
files.download(OUT_PATH)
