## Installing & Importing Necessary Libraries

In [None]:
!pip install transformers datasets nltk tqdm scikit-learn openpyxl

In [None]:
import os
import re
import numpy as np
import pandas as pd
import requests
import time
from datetime import datetime
from tqdm import tqdm
from bs4 import BeautifulSoup
import matplotlib.pyplot as plt
import seaborn as sns
import torch
#from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay
import nltk
from nltk.tokenize import word_tokenize
from nltk.corpus import stopwords
from datasets import Dataset
#from scipy.stats import pearsonr, ttest_rel
nltk.download('punkt_tab')
nltk.download("stopwords")

# Data Collection

## Downloading .idx files from SEC

In [None]:
# TO download the .idx files -- only run this if you dont have the idx files upfront
# Base URL for the SEC EDGAR full index
base_url = 'https://www.sec.gov/Archives/edgar/full-index/'

# Function to download the file, now includes headers parameter
def download_file(url, path, headers):
    with requests.get(url, headers=headers, stream=True) as r:
        r.raise_for_status()
        with open(path, 'wb') as f:
            for chunk in r.iter_content(chunk_size=8192):
                f.write(chunk)

# Prompt for start and end year
start_year = int(input("Enter the start year (YYYY): "))
end_year = int(input("Enter the end year (YYYY): "))
save_dir = input('Please Input Path to Your Directory to Download Files:')

# Add your user-agent string here
headers = {'User-Agent': 'simransukhawani3@gmail.com'}

# Iterate over each year and quarter within the specified range
for year in range(start_year, end_year + 1):
    for quarter in ['QTR1', 'QTR2', 'QTR3', 'QTR4']:
        file_url = f"{base_url}{year}/{quarter}/company.idx"
        save_path = os.path.join(save_dir, f"{year}_{quarter}_company.idx")

        print(f"Attempting to download {file_url}...")

        # Make the download attempt
        try:
            download_file(file_url, save_path, headers)
            print(f"Successfully downloaded {file_url}")
        except Exception as e:
            print(f"Failed to download {file_url}. Error: {e}")

        # Respect the SEC's rate limiting
        time.sleep(1)  # Sleep for 1 second to avoid hitting rate limit

print("All requested files have been attempted to download.")

## Creating a dataframe using all the .idx files

In [None]:
# To create the dataframe for the links from the .idx files and saving it into a csv
# Adjusting pandas display options for more optimized data viewing
pd.set_option('display.max_rows', None)  # Display all rows
pd.set_option('display.max_columns', None)  # Display all columns
pd.set_option('display.width', None)  # Automatically adjust display width to terminal size
pd.set_option('display.max_colwidth', None)  # Display full content of each cell

#Load data from all EDGAR index files in the specified director
def load_data_from_directory(source_dir):
    colspecs = [(0, 62), (62, 74), (74, 86), (86, 98), (98, None)]
    column_names = ['Company Name', 'Form Type', 'CIK', 'Date Filed', 'Filename']
    dataframe_collection = []

    # Iterate over each file in the directory
    for file_name in os.listdir(source_dir):
        if file_name.endswith('.idx'):  # Check for .idx files
            file_path = os.path.join(source_dir, file_name)
            try:
                # Read fixed-width file with specified columns and skip header rows
                temp_df = pd.read_fwf(file_path, colspecs=colspecs, skiprows=9, names=column_names)
                dataframe_collection.append(temp_df)
            except UnicodeDecodeError as e:
                print(f'Error reading {file_name}: {e}')
                continue
            except Exception as e:
                print(f'An unexpected error occurred while reading {file_name}: {e}')
                continue

    if not dataframe_collection:
        print("No data was loaded. Please check your file paths and names.")
        return pd.DataFrame()

    # Concatenate all DataFrames into one DataFrame
    combined_df = pd.concat(dataframe_collection, ignore_index=True)
    combined_df.columns = combined_df.columns.str.strip()  # Strip any leading/trailing whitespace from column names
    return combined_df

def save_to_csv(df, output_path):
    """Save DataFrame to a CSV file."""
    try:
        df.to_csv(output_path, index=False)
        print(f"Data saved successfully to {output_path}")
    except Exception as e:
        print(f"Failed to save the DataFrame: {e}")

# Main execution logic
if __name__ == "__main__":
    source_directory = input('Enter/path/to/data/directory: ')  # Get directory containing the data files from user
    csv_name = input('Enter the filename for the CSV (e.g., combined_data.csv): ')
    output_path = os.path.join(source_directory, csv_name)  # Construct the full path to save the CSV file

    # Load data from the specified directory
    all_data_df = load_data_from_directory(source_directory)

    # Save the data to a CSV file
    if not all_data_df.empty:
        save_to_csv(all_data_df, output_path)
    else:
        print("No data to save.")

all_data_df.head()

# Data Modeling & Pre-Processing

In [None]:
#Load the combined_data.csv that contains the filings links of all the companies
df_idx = pd.read_csv("data/combined_data.csv")

In [None]:
display(df_idx.head())

In [None]:
#Load the sp500 company list
df_sp500 = pd.read_excel("data/sp500_cik.xlsx")

In [None]:
display(df_sp500.head())

In [None]:
# Filtering only the Symbol and CIK columns of the data
df_cik = df_sp500[["Symbol","CIK"]]
df_cik.head()

In [None]:
print(len(df_cik["CIK"]))
print(len(df_idx["CIK"]))

In [None]:
#Making sure that both the dataframe CIK columns are of same type
df_idx['CIK']  = df_idx['CIK'].astype(str)
df_cik['CIK']  = df_cik['CIK'].astype(str)

### Filtering out the S&P500 companies

In [None]:
# Filtering the idx dataframe to only include rows where the CIK is in the sp500 list
sp500_ciks = df_cik['CIK'].unique()
filtered = df_idx[df_idx['CIK'].isin(sp500_ciks)]

In [None]:
filtered.info()

In [None]:
#Checking the number of unique CIKs in the filtered dataframe
unique_names= filtered["CIK"].unique()
print(len(unique_names))

## Extracting HTML contents of the page

In [None]:
#To extract the html content from the SEC page
import requests

import pandas as pd

def extract_filing_html_directly(row, user_agent_email):
    """
    Extracts the actual 10-K filing HTML content from a row in .idx using the real HTML URL.
    """
    try:
        filename = row['Filename'].strip().replace(" ", "")
        path_parts = filename.split("/")

        if len(path_parts) < 4:
            print(f"Invalid path in Filename: {filename}")
            return None, None

        cik = path_parts[2]
        accession_with_dashes = path_parts[3]
        accession_nodashes = accession_with_dashes.replace("-", "")
        index_filename = accession_with_dashes + "-index.htm"

        index_url = f"https://www.sec.gov/Archives/edgar/data/{cik}/{accession_nodashes}/{index_filename}"
        headers = {"User-Agent": user_agent_email}

        response = requests.get(index_url, headers=headers, timeout=10)
        if response.status_code != 200:
            print(f"Failed to load index page: {index_url}")
            return None, None

        soup = BeautifulSoup(response.text, "html.parser")
        doc_table = soup.find("table", class_="tableFile")
        if doc_table is None:
            print(f"Could not find document table at: {index_url}")
            return None, None

        doc_link_tag = doc_table.find("a", href=lambda href: href and href.endswith(".htm") and not href.endswith("-index.htm"))
        if doc_link_tag is None:
            print(f"No .htm filing document found in index page: {index_url}")
            return None, None

        primary_doc = doc_link_tag['href'].lstrip("/")  # remove leading slash
        filing_url = f"https://www.sec.gov/{primary_doc}"  # FIXED — no double Archives

        filing_response = requests.get(filing_url, headers=headers, timeout=15)
        if filing_response.status_code == 200:
            print(f"Downloaded: {filing_url}")
            return filing_url, filing_response.text
        else:
            print(f"Failed to download filing from: {filing_url}")
            return filing_url, None

    except Exception as e:
        print(f"Exception occurred: {e}")
        return None, None

## Cleaning the HTML content

In [None]:
# Cleaning the HTML content of the filing
from bs4 import BeautifulSoup

def clean_filing_html(filing_html):
    """
    Cleans the full HTML of a 10-K filing to extract readable plain text.
    Removes scripts, styles, and unnecessary whitespace.
    """
    try:
        soup = BeautifulSoup(filing_html, "html.parser")

        # Remove unwanted tags
        for tag in soup(["script", "style", "header", "footer", "nav", "noscript"]):
            tag.decompose()

        # Extract text from the body if present
        body = soup.find("body")
        raw_text = body.get_text(separator="\n") if body else soup.get_text(separator="\n")

        # Normalize whitespace
        lines = [line.strip() for line in raw_text.splitlines()]
        clean_text = "\n".join(line for line in lines if line)

        return clean_text

    except Exception as e:
        print(f" Error cleaning HTML: {e}")
        return ""

## Downloading mulitple 10 filings based on the user selection

In [None]:
# Downloading multiple 10-K filings
def download_multiple_10k_filings(df, user_agent_email):
    """
    Show how many 10-Ks are available, let the user choose how many to download,
    and return a DataFrame with filing metadata and text.
    """
    tenk_df = df[df['Form Type'].str.upper() == '10-K'].reset_index(drop=True)
    total = len(tenk_df)

    if total == 0:
        print("No 10-K filings found in the dataset.")
        return pd.DataFrame()

    print(f"Found {total} 10-K filings in the dataset.")
    
    while True:
        try:
            limit = int(input(f"Enter the number of 10-K filings to download (1 to {total}): "))
            if 1 <= limit <= total:
                break
            else:
                print(f"Please enter a number between 1 and {total}.")
        except ValueError:
            print("Please enter a valid integer.")

    results = []
    for idx, row in tenk_df.head(limit).iterrows():
        url, html_text = extract_filing_html_directly(row, user_agent_email)
        if html_text:
            cleaned_text = clean_filing_html(html_text)
            results.append({
                "Company Name": row['Company Name'],
                "CIK": row['CIK'],
                "Date Filed": row['Date Filed'],
                "Filing URL": url,
                "Filing Text": html_text,
                "Cleaned Text": cleaned_text
            })

    return pd.DataFrame(results)

In [None]:
#df = pd.read_csv("/data/combined_data.csv")

### Downloading and creating the dataframe of the final cleaned text of the filings

In [None]:
# Run the full process
filings_df = download_multiple_10k_filings(filtered, "simransukhawani3@gmail.com")

# Preview results
print(filings_df[["Company Name", "Filing URL"]].head())
print("\n Sample Cleaned Filing Text:\n")
print(filings_df["Cleaned Text"][0][:2000])

### Creating the csv for the future use

In [None]:
# Saving the DataFrame to a CSV file
filings_df.to_csv("sp500_filings_df.csv")

### Loading the filings dataframe that was saved

In [None]:
# Load the saved DataFrame
df_full = pd.read_csv("sp500_filings_df.csv")

In [None]:
# Creating a new DataFrame with selected columns
df_section = df_full[["Company Name","CIK","Date Filed","Filing Text", "Cleaned Text"]].copy()

In [None]:
df_section.head()

# Section Wise Sentiment Analysis

In [None]:
def extract_10k_sections(text):
    """
    Robustly extracts Item 1, Item 7, and Item 7A sections from plain 10-K text.
    Handles common formatting variations and malformed headers.
    """
    if not isinstance(text, str) or len(text) < 100:
        return {"Item 1": "", "Item 7": "", "Item 7A": ""}

    # Lowercase to normalize matching
    text_lower = text.lower()

    # Expanded regex patterns to handle various formats and noise
    patterns = {
        "Item 1": r"(item[\s]*1[\s\.:\-–—]*((business)?[^a-z0-9]{0,10}))",
        "Item 7": r"(item[\s]*7[^a-z0-9]{0,10}(management'?s)?[^a-z0-9]{0,10}(discussion)?)",
        "Item 7A": r"(item[\s]*7a[^a-z0-9]{0,10}(quantitative)?[^a-z0-9]{0,10}(market)?[^a-z0-9]{0,10}(risk)?)"
    }

    # Match section headers with start positions
    matches = []
    for section, pattern in patterns.items():
        match = re.search(pattern, text_lower)
        if match:
            matches.append((section, match.start()))

    # Sort by appearance in the text
    matches.sort(key=lambda x: x[1])

    # Extract sections between start and next match
    sections = {}
    for i in range(len(matches)):
        name, start = matches[i]
        end = matches[i+1][1] if i + 1 < len(matches) else len(text)
        sections[name] = text[start:end].strip()

    return {
        "Item 1": sections.get("Item 1", ""),
        "Item 7": sections.get("Item 7", ""),
        "Item 7A": sections.get("Item 7A", "")
    }

In [None]:
# Applying the section level text to the dataframe
sections_df = df_section["Cleaned Text"].apply(extract_10k_sections).apply(pd.Series)

# Add the extracted sections as new columns
df_section["Item 1 Text"] = sections_df["Item 1"]
df_section["Item 7 Text"] = sections_df["Item 7"]
df_section["Item 7A Text"] = sections_df["Item 7A"]

In [None]:
# Drop rows where any of the important columns are NULL
df_section = df_section.dropna(subset=["Item 1 Text", "Item 7 Text", "Item 7A Text", "Cleaned Text"])

# Drop rows where any of the important columns are empty strings
df_section = df_section[
    (df_section["Item 1 Text"].str.strip() != "") &
    (df_section["Item 7 Text"].str.strip() != "") &
    (df_section["Item 7A Text"].str.strip() != "") &
    (df_section["Cleaned Text"].str.strip() != "")
]

# Drop rows where 'Cleaned Text' has less than 500 characters (counting spaces)
df_section = df_section[df_section["Cleaned Text"].str.len() >= 500]

# Reset index
df_section = df_section.reset_index(drop=True)

print(f"Final dataset shape: {df_section.shape}")

In [None]:
# Checking the number of null values in the important columns
print(df_section[["Item 1 Text", "Item 7 Text", "Item 7A Text"]].isnull().sum())
print(df_section[["Item 1 Text", "Item 7 Text", "Item 7A Text"]].eq("").sum())

In [None]:
df_section.describe()
df_section.info()

In [None]:
df_section.head()

In [None]:
#Loading the Loughran-McDonald_MasterDictionary_1993-2024
lm_df = pd.read_csv("Loughran-McDonald_MasterDictionary_1993-2024.csv")

In [None]:
# Filter only positive and negative words
positive_words = set(lm_df[lm_df["Positive"] > 0]["Word"].str.lower())
negative_words = set(lm_df[lm_df["Negative"] > 0]["Word"].str.lower())

In [None]:
# Computing the sentiment score for the Lexicon method
stop_words = set(stopwords.words("english"))

def compute_lm_sentiment_percent(text):
    """
    Returns a sentiment score as a percentage:
    ((positive - negative) / total tokens) * 100
    """
    if not isinstance(text, str) or len(text.strip()) == 0:
        return 0.0

    tokens = word_tokenize(text.lower())
    tokens = [w for w in tokens if w.isalpha() and w not in stop_words]

    total = len(tokens)
    pos = sum(1 for word in tokens if word in positive_words)
    neg = sum(1 for word in tokens if word in negative_words)

    sentiment_score = (pos - neg) / total if total > 0 else 0.0
    return sentiment_score * 100

In [None]:
# Applying the sentiment analysis using Lexicon Method
df_section.loc[:, "Item1_LM_Sentiment%"] = df_section["Item 1 Text"].apply(compute_lm_sentiment_percent)
df_section.loc[:, "Item7_LM_Sentiments%"] = df_section["Item 7 Text"].apply(compute_lm_sentiment_percent)
df_section.loc[:, "Item7A_LM_Sentiments%"] = df_section["Item 7A Text"].apply(compute_lm_sentiment_percent)

In [None]:
df_section[["Company Name", "CIK","Date Filed", "Item1_LM_Sentiment%","Item7_LM_Sentiments%","Item7A_LM_Sentiments%"]].head()

In [None]:
from transformers import pipeline

finbert = pipeline(
    "sentiment-analysis",
    model="yiyanghkust/finbert-tone",
    tokenizer="yiyanghkust/finbert-tone",
    device=0,
    truncation=True,
    padding=True,
    max_length=512
)

In [None]:
from tqdm.notebook import tqdm
tqdm.pandas()

# FinBERT Sentiment Function 
def get_finbert_sentiment(text, batch_size=16):
    """
    Breaks long text into ~3-sentence chunks, runs FinBERT on each chunk,
    and returns a single net sentiment percentage (positive% - negative%).
    """
    if not isinstance(text, str) or len(text.strip()) < 10:
        return 0.0

    from nltk.tokenize import sent_tokenize
    import numpy as np

    sentences = sent_tokenize(text)
    if len(sentences) == 0:
        return 0.0

    # Using the chunck size of 3 sentences
    chunks = [' '.join(sentences[i:i+3]) for i in range(0, len(sentences), 3)]

    positive = 0
    neutral = 0
    negative = 0

    for i in range(0, len(chunks), batch_size):
        batch = chunks[i:i+batch_size]
        # Skip truly empty batch
        batch = [b for b in batch if len(b.strip()) > 0]
        if not batch:
            continue
        try:
            results = finbert(batch)
            for r in results:
                label = r["label"].lower()
                if "positive" in label:
                    positive += 1
                elif "neutral" in label:
                    neutral += 1
                elif "negative" in label:
                    negative += 1
        except Exception as e:
            print("Error during FinBERT batch:", e)
            continue

    total = positive + neutral + negative

    if total == 0:
        return 0.0

    positive_pct = (positive / total) * 100
    negative_pct = (negative / total) * 100

    return positive_pct - negative_pct  # Only return net sentiment

In [None]:
# Converting Dataframe into Dataset for better usage for the section level sentiment analysis
df_finbert = Dataset.from_pandas(df_section)

In [None]:
# Helper function for batch
def get_batch_sentiment(text_list):
    """Efficiently get FinBERT sentiment for a list of texts."""
    scores = []
    if not text_list:
        return [0.0] * len(text_list)

    for text in text_list:
        scores.append(get_finbert_sentiment(text))
    return scores

def apply_finbert_sentiment_batch(batch):
    item1_sentiments = get_batch_sentiment(batch['Item 1 Text'])
    item7_sentiments = get_batch_sentiment(batch['Item 7 Text'])
    item7a_sentiments = get_batch_sentiment(batch['Item 7A Text'])
    
    return {
        "Item 1 FinBERT Sentiment": item1_sentiments,
        "Item 7 FinBERT Sentiment": item7_sentiments,
        "Item 7A FinBERT Sentiment": item7a_sentiments
    }

# Filtering based on text length
df_finbert = df_finbert.filter(lambda example: len(example["Cleaned Text"]) >= 500)
print(f"Dataset after length filtering: {len(df_finbert)} rows")

# Now run the optimized batch map
df_finbert = df_finbert.map(
    apply_finbert_sentiment_batch,
    batched=True,
    batch_size=32,
    num_proc=1,
    desc="Applying Section-wise FinBERT Net Sentiment"
)

In [None]:
# Convering the Dataset back to DataFrame
df_sentiment = df_finbert.to_pandas()

In [None]:
df_sentiment[["Company Name", "CIK","Date Filed","Item 1 FinBERT Sentiment","Item 7 FinBERT Sentiment","Item 7A FinBERT Sentiment"]].head()

In [None]:
# Select only needed columns
selected_columns = df_sentiment[[
    "Company Name", 
    "CIK", 
    "Date Filed", 
    "Item 1 FinBERT Sentiment", 
    "Item 7 FinBERT Sentiment", 
    "Item 7A FinBERT Sentiment"
]]

# Save to CSV
df_sentiment.to_csv("sentiment_scores.csv", index=False)

# Applying Document level sentiment

In [None]:
# Filtering only the Cleaned Text with length greater than 500
df_full_filtered = df_full[df_full["Cleaned Text"].str.len() >= 500].copy()

print(f"Dataset after full document length filtering: {len(df_full_filtered)} rows")

In [None]:
df_full_filtered.head()

### Applying Lexicon method at full document level

In [None]:
# Applying the Loughran-McDonald sentiment analysis on the full Document text
df_full_filtered["LM_Sentiment%"] = df_full_filtered["Cleaned Text"].progress_apply(compute_lm_sentiment_percent)

### Applying FinBERT at full document level

In [None]:
# Convert the dataframe to a Dataset
hf_full = Dataset.from_pandas(df_full_filtered)

In [None]:
# Define the mapping function
def apply_finbert_full_document(batch):
    sentiments = get_batch_sentiment(batch["Cleaned Text"])
    return {
        "Full Document FinBERT Sentiment": sentiments
    }

# Run map() on the HuggingFace Dataset
hf_full = hf_full.map(
    apply_finbert_full_document,
    batched=True,
    batch_size=32,
    num_proc=1,
    desc="Applying FinBERT Sentiment at Full Document Level"
)

# Step 3: Done! View the new columns
print(hf_full.column_names)

In [None]:
# Convert to Pandas
df_full_sentiment = hf_full.to_pandas()

# Save to CSV
df_full_sentiment.to_csv("full_document_sentiment.csv", index=False)

In [None]:
df_full_sentiment.head()

In [None]:
# Select only necessary columns cleanly
section_lm = df_section[[
    "Company Name", "CIK", "Date Filed",
    "Item1_LM_Sentiment%",
    "Item7_LM_Sentiments%",
    "Item7A_LM_Sentiments%"
]]

section_finbert = df_sentiment[[
    "Company Name", "CIK", "Date Filed",
    "Item 1 FinBERT Sentiment",
    "Item 7 FinBERT Sentiment",
    "Item 7A FinBERT Sentiment"
]]

full_document_sentiment = df_full_sentiment[[
    "Company Name", "CIK", "Date Filed",
    "LM_Sentiment%",   # Full-document LM
    "Full Document FinBERT Sentiment"
]]

# Merge Section-level FinBERT and Section-level LM
merged_sections = pd.merge(section_finbert, section_lm, on=["Company Name", "CIK", "Date Filed"], how="inner")

# Merge with Full-document level sentiments
master_sentiment = pd.merge(merged_sections, full_document_sentiment, on=["Company Name", "CIK", "Date Filed"], how="inner")

# Check the final merged master table
print(master_sentiment.head())

In [None]:
master_sentiment.to_csv("master_sentiment.csv",index=False)

## Pearson Correlation Analysis

In [None]:
# Columns mapping: FinBERT field -> LM field
field_mapping = {
    "Item 1 FinBERT Sentiment": "Item1_LM_Sentiment%",
    "Item 7 FinBERT Sentiment": "Item7_LM_Sentiments%",
    "Item 7A FinBERT Sentiment": "Item7A_LM_Sentiments%",
    "Full Document FinBERT Sentiment": "LM_Sentiment%"
}

# Prepare a place to store results
comparison_results = []

# Loop over each field pair
for finbert_col, lm_col in field_mapping.items():
    
    print(f"\n Comparing {finbert_col} vs {lm_col}")
    
    # Drop any rows with missing values (important)
    temp_df = master_sentiment[[finbert_col, lm_col]].dropna()
    
    # Pearson Correlation
    r, p_corr = pearsonr(temp_df[finbert_col], temp_df[lm_col])
    
    # Paired t-test
    t_stat, p_ttest = ttest_rel(temp_df[finbert_col], temp_df[lm_col])
    
    # Mean and Standard Deviation
    finbert_mean = temp_df[finbert_col].mean()
    finbert_std = temp_df[finbert_col].std()
    
    lm_mean = temp_df[lm_col].mean()
    lm_std = temp_df[lm_col].std()
    
    # Collect results
    comparison_results.append({
        "Field": finbert_col.split()[1],  # "Item 1", "Item 7" etc
        "Pearson r": r,
        "p-value (corr)": p_corr,
        "t-statistic": t_stat,
        "p-value (t-test)": p_ttest,
        "FinBERT Mean": finbert_mean,
        "FinBERT Std": finbert_std,
        "LM Mean": lm_mean,
        "LM Std": lm_std
    })

# Convert to DataFrame
results_df = pd.DataFrame(comparison_results)

# Print final result table
pd.set_option('display.float_format', lambda x: '%.4f' % x) 
print(results_df)

In [None]:
results_df.to_csv("results_sentiment.csv")

## Classify Sentiment

In [None]:
# Sentiment classification function
def classify_sentiment(score):
    if score > 2.0:
        return "Positive"
    elif score < -2.0:
        return "Negative"
    else:
        return "Neutral"

# Apply classification for each method and section
# Full Document
master_sentiment["Full Doc FinBERT Class"] = master_sentiment["Full Document FinBERT Sentiment"].apply(classify_sentiment)
master_sentiment["Full Doc LM Class"] = master_sentiment["LM_Sentiment%"].apply(classify_sentiment)

# Section Level
master_sentiment["Item 1 FinBERT Class"] = master_sentiment["Item 1 FinBERT Sentiment"].apply(classify_sentiment)
master_sentiment["Item 1 LM Class"] = master_sentiment["Item1_LM_Sentiment%"].apply(classify_sentiment)

master_sentiment["Item 7 FinBERT Class"] = master_sentiment["Item 7 FinBERT Sentiment"].apply(classify_sentiment)
master_sentiment["Item 7 LM Class"] = master_sentiment["Item7_LM_Sentiments%"].apply(classify_sentiment)

master_sentiment["Item 7A FinBERT Class"] = master_sentiment["Item 7A FinBERT Sentiment"].apply(classify_sentiment)
master_sentiment["Item 7A LM Class"] = master_sentiment["Item7A_LM_Sentiments%"].apply(classify_sentiment)

print(master_sentiment.head())

## Agreement Analysis
* Compare if LM and FinBERT classified the same for each filing.

In [None]:
# Full Document Agreement
master_sentiment["Full Doc Agreement"] = (
    master_sentiment["Full Doc FinBERT Class"] == master_sentiment["Full Doc LM Class"]
)

# Section Agreement
master_sentiment["Item 1 Agreement"] = (
    master_sentiment["Item 1 FinBERT Class"] == master_sentiment["Item 1 LM Class"]
)

master_sentiment["Item 7 Agreement"] = (
    master_sentiment["Item 7 FinBERT Class"] == master_sentiment["Item 7 LM Class"]
)

master_sentiment["Item 7A Agreement"] = (
    master_sentiment["Item 7A FinBERT Class"] == master_sentiment["Item 7A LM Class"]
)

# Calculate agreement percentages
print("\nAgreement Rates (%):")
print("Full Document:", master_sentiment["Full Doc Agreement"].mean() * 100)
print("Item 1:", master_sentiment["Item 1 Agreement"].mean() * 100)
print("Item 7:", master_sentiment["Item 7 Agreement"].mean() * 100)
print("Item 7A:", master_sentiment["Item 7A Agreement"].mean() * 100)

## Agreement Bar Chart

In [None]:
# Prepare Agreement Rates
agreement_rates = {
    "Full Document": master_sentiment["Full Doc Agreement"].mean() * 100,
    "Item 1": master_sentiment["Item 1 Agreement"].mean() * 100,
    "Item 7": master_sentiment["Item 7 Agreement"].mean() * 100,
    "Item 7A": master_sentiment["Item 7A Agreement"].mean() * 100
}

# Bar Chart Plot
plt.figure(figsize=(8, 6))
bars = plt.bar(agreement_rates.keys(), agreement_rates.values(), color="skyblue")

# Add values on top of bars
for bar in bars:
    yval = bar.get_height()
    plt.text(bar.get_x() + bar.get_width()/2, yval + 1, f'{yval:.1f}%', ha='center', va='bottom', fontsize=10)

plt.ylim(0, 100)
plt.ylabel("Agreement Rate (%)")
plt.title("Agreement Rates Between LM and FinBERT Sentiment Classification")
plt.grid(axis='y', linestyle='--', alpha=0.7)
plt.savefig("agreement_bar_chart.png", dpi=300, bbox_inches='tight')
plt.show()

## Confusion Matrix (LM vs FinBERT)

In [None]:
# Create confusion matrix
cm = confusion_matrix(
    master_sentiment["Full Doc LM Class"], 
    master_sentiment["Full Doc FinBERT Class"],
    labels=["Positive", "Neutral", "Negative"]  # consistent label ordering
)

# Display confusion matrix
disp = ConfusionMatrixDisplay(
    confusion_matrix=cm,
    display_labels=["Positive", "Neutral", "Negative"]
)

fig, ax = plt.subplots(figsize=(7, 5))
disp.plot(cmap="Blues", values_format='d', ax=ax)
plt.title("Confusion Matrix: LM vs FinBERT (Full Document)")
plt.savefig("confusion_matrix_full_doc.png", dpi=300, bbox_inches='tight')
plt.show()

## Class Distribution Bar Plots

In [None]:
# Define all sections
sections = {
    "Full Document": ("Full Doc FinBERT Class", "Full Doc LM Class"),
    "Item 1": ("Item 1 FinBERT Class", "Item 1 LM Class"),
    "Item 7": ("Item 7 FinBERT Class", "Item 7 LM Class"),
    "Item 7A": ("Item 7A FinBERT Class", "Item 7A LM Class")
}

# Prepare the figure
fig, axes = plt.subplots(2, 2, figsize=(14, 10))  # 2 rows, 2 columns
axes = axes.flatten()  # flatten axes for easy looping

# Plot each section
for idx, (section_name, (finbert_col, lm_col)) in enumerate(sections.items()):
    # Count class distributions
    finbert_counts = master_sentiment[finbert_col].value_counts()
    lm_counts = master_sentiment[lm_col].value_counts()
    
    # Create comparison DataFrame
    class_distribution = pd.DataFrame({
        "FinBERT": finbert_counts,
        "LM": lm_counts
    }).reindex(["Positive", "Neutral", "Negative"])
    
    # Plot on corresponding axis
    class_distribution.plot(kind="bar", ax=axes[idx], color=["skyblue", "salmon"], legend=False)
    axes[idx].set_title(section_name)
    axes[idx].set_ylabel("Number of Documents")
    axes[idx].set_xlabel("")
    axes[idx].grid(axis='y', linestyle='--', alpha=0.7)
    axes[idx].set_xticklabels(["Positive", "Neutral", "Negative"], rotation=0)

# Add one shared legend
fig.legend(["FinBERT", "LM"], loc="upper center", ncol=2, fontsize="large")

# Tight layout
plt.tight_layout(rect=[0, 0, 1, 0.95])  # leave space for legend
plt.suptitle("Class Distribution Comparison: FinBERT vs LM", fontsize=16)
plt.savefig("class_distribution_all_sections.png", dpi=300, bbox_inches='tight')
plt.show()

## Comparing All 3 different level sentiments

In [None]:
# Define all sections and corresponding columns
sections = {
    "Full Document": ("LM_Sentiment%", "Full Document FinBERT Sentiment"),
    "Item 1": ("Item1_LM_Sentiment%", "Item 1 FinBERT Sentiment"),
    "Item 7": ("Item7_LM_Sentiments%", "Item 7 FinBERT Sentiment"),
    "Item 7A": ("Item7A_LM_Sentiments%", "Item 7A FinBERT Sentiment")
}

# Prepare a subplot grid
fig, axes = plt.subplots(2, 2, figsize=(16, 12))
axes = axes.flatten()  # flatten axes to access easily

# Loop through each section and plot
for idx, (section_name, (lm_col, finbert_col)) in enumerate(sections.items()):
    
    # Scatter plot
    sns.scatterplot(
        x=master_sentiment[lm_col],
        y=master_sentiment[finbert_col],
        color="blue",
        edgecolor="black",
        alpha=0.7,
        ax=axes[idx]
    )
    
    # Perfect agreement line (y = x)
    lims = [
        min(master_sentiment[lm_col].min(), master_sentiment[finbert_col].min()),
        max(master_sentiment[lm_col].max(), master_sentiment[finbert_col].max())
    ]
    axes[idx].plot(lims, lims, 'k--', alpha=0.75)

    # Regression trend line
    sns.regplot(
        x=master_sentiment[lm_col],
        y=master_sentiment[finbert_col],
        scatter=False,
        color="red",
        line_kws={"linewidth":1.5, "linestyle":"-."},
        ax=axes[idx]
    )
    
    # Labels and titles
    axes[idx].set_xlabel(f"{section_name} LM Sentiment (%)")
    axes[idx].set_ylabel(f"{section_name} FinBERT Sentiment (%)")
    axes[idx].set_title(section_name)
    axes[idx].grid(True, linestyle='--', alpha=0.7)

# Layout adjustment
plt.tight_layout()
plt.suptitle("Correlation Scatter Plots: LM vs FinBERT (Full Document and Section Level)", fontsize=18, y=1.03)
plt.savefig("correlation_scatter_full_and_sections.png", dpi=300, bbox_inches='tight')
plt.show()