In [None]:
import os
import re
import warnings

warnings.filterwarnings("ignore")

import altair as alt
import matplotlib.pyplot as plt
import nltk
import numpy as np
import pandas as pd
import pysentiment2 as ps
import seaborn as sns
import spacy
from dotenv import find_dotenv, load_dotenv
from nltk.corpus import stopwords
from nltk.sentiment.vader import SentimentIntensityAnalyzer
from nltk.stem import WordNetLemmatizer
from rich import print
from sklearn.feature_extraction.text import TfidfVectorizer
from transformers import pipeline

from sentibignomics import senti_bignomics

%load_ext rich


load_dotenv(find_dotenv())


In [None]:
nltk.download("stopwords")
nltk.download("vader_lexicon")
nltk.download('punkt')
nltk.download("wordnet")

## Preprocessing

### Load documents


In [None]:
documents_list = os.listdir("./extracted/")

docs = {
    doc.split("_")[0]: open(f"./extracted/{doc}", "r").read()
    for doc in documents_list
    if doc.endswith(".txt")
}

### Preprocess documents


In [None]:
# looking at one example document
print(docs["AAPL"])

In [None]:
# getting documents that have less than 1000 characters
doc_lengths = {k: len(v) for k, v in docs.items()}
print([(k, v) for k, v in doc_lengths.items() if v < 1000])
docs_to_remove = [k for k, v in doc_lengths.items() if v < 1000]


The above tickers do not have relevant content in their MD&A, possibly due to cross-reference to other sections or unavailability of the documents. We drop these tickers from the list.


In [None]:
for key in docs_to_remove:
    docs.pop(key, None)

### Preprocessing Text

In [None]:
nlp = spacy.load("en_core_web_sm", disable=["parser", "ner"])
lemmatizer = WordNetLemmatizer()
stop_words = set(stopwords.words("english"))
stop_words.update(["table_end", "table_start"])


class Tokenizer:
    def __init__(self):
        self.nlp = spacy.load("en_core_web_sm", disable=["parser", "ner"])
        self.lemmatizer = WordNetLemmatizer()
        self.stop_words = set(stopwords.words("english"))
        self.stop_words.update(["table_end", "table_start"])

    def tokenize(self, text):
        # Clean text
        text = text.lower()  # Lowercase
        text = re.sub(
            r"\$\d+(\.\d+)?(m| million| billion)?", " ", text
        )  # Remove dollar amounts
        text = re.sub(r"\d+(\.\d+)?%", " ", text)  # Remove percentages
        text = re.sub(r"\b\d+(\.\d+)?\b", " ", text)  # Remove standalone numbers
        text = re.sub(r"\s+", " ", text).strip()  # Remove extra spaces
        text = re.sub(r"[^\w\s]", "", text)  # Remove punctuation

        # Tokenize and remove stopwords
        tokens = [
            word for word in nltk.word_tokenize(text) if word not in self.stop_words
        ]

        # Lemmatize
        lemmatized = [self.lemmatizer.lemmatize(token) for token in tokens]

        return lemmatized

We define the preprocessing pipeline as follows:
- Convert to lowercase
- Remove mention of dollar amounts
- Remove percentages
- Remove standalone numbers
- Remove extra whitespaces and punctuations
- Tokenize by word and remove stopwords
- Lemmatize words using WordNetLemmatizer

Since the focus is on a qualitative analysis of the MD&A, we remove dollar amounts, percenages and numbers as they are not relevant for the analysis. We also remove stopwords and lemmatize the words to reduce the dimensionality of the data and to focus on the content of the documents.

In [None]:
tokenizer = Tokenizer()


def preprocess_text(text):
    return " ".join(tokenizer.tokenize(text))


### TF-IDF


In [None]:
tf_idf_vectorizer = TfidfVectorizer(
    max_df=0.95,
    min_df=5,
    max_features=2000,
    preprocessor=preprocess_text,
    strip_accents="unicode",
)

tf_idf = tf_idf_vectorizer.fit_transform(docs.values())

In [None]:
# Create a DataFrame to display the TF-IDF values
tf_idf_df = pd.DataFrame(
    tf_idf.todense(),
    columns=tf_idf_vectorizer.get_feature_names_out(),
    index=docs.keys(),
)

# Add a row for the document frequency
tf_idf_df.loc["0_DOC_FREQ"] = (tf_idf_df > 0).sum()
tf_idf_df.sort_index().round(2)

#### Top 50 most frequent words

In [None]:
print(tf_idf_df.T.sort_values("0_DOC_FREQ", ascending=False).head(50).index)
tf_idf_df.drop("0_DOC_FREQ", inplace=True)

In [None]:
# Get the top 10 terms by TF-IDF value for each document
top_tfidf = (
    (
        tf_idf_df.stack()
        .reset_index()
        .rename(
            columns={
                0: "tfidf",
                "level_0": "document",
                "level_1": "term",
                "level_2": "term",
            }
        )
    )
    .sort_values(by=["document", "tfidf"], ascending=[True, False])
    .groupby(["document"])
    .head(10)
)

top_tfidf

In [None]:
# adding a little randomness to break ties in term ranking
top_tfidf_plusRand = top_tfidf.copy()
top_tfidf_plusRand["tfidf"] = (
    top_tfidf_plusRand["tfidf"] + np.random.rand(top_tfidf.shape[0]) * 0.0001
)

# Create a base chart
base = (
    alt.Chart(top_tfidf_plusRand)
    .encode(x="rank:O", y="document:N")
    .transform_window(
        rank="rank()",
        sort=[alt.SortField("tfidf", order="descending")],
        groupby=["document"],
    )
)

# Create a heatmap of the top 10 terms by TF-IDF value for each document
heatmap = base.mark_rect().encode(color="tfidf:Q")
# Add text labels for the terms
text = base.mark_text(baseline="middle").encode(
    text="term:N",
    color=alt.condition(
        alt.datum.tfidf >= 0.23, alt.value("white"), alt.value("black")
    ),
)

(heatmap + text).properties(width=1000)

The above shows the top 10 words with the highest TF-IDF scores for each document. We can see that the words are relevant to the content and industry of each stock ticker. For example, Boeing Airlines (BA) has words like "aircraft", "airline", "contract", while Tesla (TSLA) has words like "automotive", "energy", "vehicle".

This indicates that the TF-IDF scores are capturing the relevant content of the documents.

## Sentiment analysis


To perform sentiment analysis using the lexicons, we tokenize each document into sentences, apply the preprocessing pipeline, and then calculate the sentiment scores for each sentence using the lexicons.

### Using VADER Dictionary (Generic)

In [None]:
VADER_res = []
vader = SentimentIntensityAnalyzer()


for k, v in docs.items():
    for sent in nltk.sent_tokenize(v):
        scores = vader.polarity_scores(preprocess_text(sent))

        VADER_res.append(
            {
                "Document": k,
                "Sentence": sent,
                "VADER_Negative": scores["neg"],
                "VADER_Neutral": scores["neu"],
                "VADER_Positive": scores["pos"],
                "VADER_Compound": scores["compound"],
            }
        )

print("Processed using VADER")
VADER_res = pd.DataFrame(VADER_res)

### Using Financial Dictionaries

#### Loughran and McDonald Financial Sentiment Dictionaries

Loughran, Tim and McDonald, Bill, When is a Liability not a Liability? Textual Analysis, Dictionaries, and 10-Ks (March 4, 2010). Journal of Finance, Forthcoming, Available at SSRN: https://ssrn.com/abstract=1331573

In [None]:
lm = ps.LM(tokenizer=tokenizer)

LM_res = []

for k, v in docs.items():
    for sent in nltk.sent_tokenize(v):
        scores = lm.get_score(lm.tokenize(sent))
        LM_res.append(
            {
                "Document": k,
                "Sentence": sent,
                "LM_Positive": scores["Positive"],
                "LM_Negative": scores["Negative"],
                "LM_Polarity": scores["Polarity"],
                "LM_Subjectivity": scores["Subjectivity"],
            }
        )


print("Processed using LM financial dictionary")
LM_res = pd.DataFrame(LM_res)


#### SentiBigNomics (VADER-based)

Consoli, S., Barbaglia, L., & Manzan, S. (2022). Fine-grained, aspect-based sentiment analysis on economic and financial lexicon. Knowledge-Based Systems, 247, 108781. https://doi.org/10.1016/j.knosys.2022.108781

In [None]:
SBN_res = []
sbn_vader = SentimentIntensityAnalyzer()
sbn_vader.lexicon.update(senti_bignomics)

for k, v in docs.items():
    for sent in nltk.sent_tokenize(v):
        scores = sbn_vader.polarity_scores(preprocess_text(sent))

        SBN_res.append(
            {
                "Document": k,
                "Sentence": sent,
                "SBN_Negative": scores["neg"],
                "SBN_Neutral": scores["neu"],
                "SBN_Positive": scores["pos"],
                "SBN_Compound": scores["compound"],
            }
        )

print("Processed using SentiBigNomics")

SBN_res = pd.DataFrame(SBN_res)


### Using pre-trained model from HuggingFace

[FinBERT](https://huggingface.co/ProsusAI/finbert): Araci, D. (2019). FinBERT: Financial Sentiment Analysis with Pre-trained Language Models. arXiv (Cornell University). https://doi.org/10.48550/arxiv.1908.10063


In [None]:
pipe = pipeline("sentiment-analysis", model="ProsusAI/finbert", device=-1)

In [None]:
finbert_results = []

for ticker, doc in docs.items():
    for sent in nltk.sent_tokenize(doc):
        sentiment = pipe(
            sent, padding=True, truncation=True, max_length=512, top_k=None
        )

        finbert_results.append(
            {"Ticker": ticker, "Sentence": sent, "Sentiment": sentiment}
        )

    print(f"Processed {ticker} using FinBERT")

print("Processed using FinBERT")

In [None]:
finbert_results_df = pd.json_normalize(
    finbert_results, record_path="Sentiment", meta=["Ticker", "Sentence"]
).pivot_table(
    values="score",
    index=["Ticker", "Sentence"],
    columns="label",
)

finbert_results_df

In [None]:
finbert_results_df.groupby("Ticker")[['negative', 'neutral', 'positive']].agg("mean")

### COVID-related disclosure frequency

Dutta, S., Kumar, A., Pant, P., Walsh, C., & Dutta, M. (2023). Using 10-K text to gauge COVID-related corporate disclosure. PLOS ONE, 18(3), e0283138. https://doi.org/10.1371/journal.pone.0283138

In [None]:
# list of words related to covid from paper
covid_word_list = """pandemic
epidemic
contagious
disease
infectious
coronavirus
covid
strain
outbreak
resurgence
health
crisis
"""

covid_word_list = covid_word_list.split("\n")

# getting lists of positive and negative words from VADER
positive_words = [word for word in vader.lexicon if vader.lexicon[word] > 0]
negative_words = [word for word in vader.lexicon if vader.lexicon[word] < 0]

In [None]:
# Helper function for checking if a sentence contains any word from a list
def sentence_contains_wordlist(words, wordlist):
    return any(word in wordlist for word in words)


covid_res = []

for k, v in docs.items():
    # initializing variables
    sentences = nltk.sent_tokenize(v)
    total_sentence_count = len(sentences)

    # calculating contextual and covid-related disclosure frequencies
    for sentence in sentences:
        contextual_positive_freq = 0
        contextual_negative_freq = 0
        covid_freq = 0

        tokens = tokenizer.tokenize(sentence)

        if sentence_contains_wordlist(tokens, covid_word_list):
            covid_freq += 1

        else:
            covid_res.append(
                {
                    "Document": k,
                    "Sentence": sentence,
                    "COVID_Related_Frequency": covid_freq,
                    "COVID_Contextual_Positive_Frequency": contextual_positive_freq,
                    "COVID_Contextual_Negative_Frequency": contextual_negative_freq,
                }
            )
            continue

        if sentence_contains_wordlist(tokens, positive_words):
            contextual_positive_freq += 1

        if sentence_contains_wordlist(tokens, negative_words):
            contextual_negative_freq += 1

        covid_res.append(
            {
                "Document": k,
                "Sentence": sentence,
                "COVID_Related_Frequency": covid_freq,
                "COVID_Contextual_Positive_Frequency": contextual_positive_freq,
                "COVID_Contextual_Negative_Frequency": contextual_negative_freq,
            }
        )


print("Processed using COVID-19 related words")
covid_res = pd.DataFrame(covid_res)

## Comparing Lexicons


### Comparing LM, VADER, SentiBigNomics, and COVID-19 dictionary

In [None]:
nlp_results = pd.concat(
    [
        LM_res.set_index(["Document", "Sentence"]),
        VADER_res.set_index(["Document", "Sentence"]),
        SBN_res.set_index(["Document", "Sentence"]),
        covid_res.set_index(["Document", "Sentence"]),
    ],
    axis=1,
).reset_index()

nlp_results_stats = nlp_results.groupby("Document").agg(
    {
        "LM_Positive": "mean",
        "LM_Negative": "mean",
        "LM_Polarity": "mean",
        "LM_Subjectivity": "mean",
        "VADER_Negative": "mean",
        "VADER_Neutral": "mean",
        "VADER_Positive": "mean",
        "VADER_Compound": "mean",
        "SBN_Negative": "mean",
        "SBN_Neutral": "mean",
        "SBN_Positive": "mean",
        "SBN_Compound": "mean",
        "COVID_Related_Frequency": "sum",
        "COVID_Contextual_Positive_Frequency": "sum",
        "COVID_Contextual_Negative_Frequency": "sum",
        "Sentence": "size",
    }
)

nlp_results_stats[
    [
        "COVID_Related_Frequency",
        "COVID_Contextual_Positive_Frequency",
        "COVID_Contextual_Negative_Frequency",
    ]
] = nlp_results_stats[
    [
        "COVID_Related_Frequency",
        "COVID_Contextual_Positive_Frequency",
        "COVID_Contextual_Negative_Frequency",
    ]
].div(nlp_results_stats["Sentence"], axis=0)

nlp_results_stats

### Adding FinBERT results

In [None]:
results_df = (
    finbert_results_df.groupby("Ticker")[["negative", "neutral", "positive"]]
    .agg("mean")
    .rename_axis("Document")
    .rename(
        columns={
            "negative": "FinBERT_Negative",
            "neutral": "FinBERT_Neutral",
            "positive": "FinBERT_Positive",
        }
    )
    .join(nlp_results_stats)
)

results_df


### Correlation matrix


In [None]:
correlation_matrix = results_df.drop(columns="Sentence").corr()

plt.figure(figsize=(20, 10))

sns.heatmap(correlation_matrix, annot=True)

plt.show()


### Comparing sentiment score


In [None]:
ax = results_df.iloc[:5].plot(
    kind="bar",
    y=["FinBERT_Positive", "LM_Positive", "VADER_Positive", "SBN_Positive", "COVID_Contextual_Positive_Frequency"],
    figsize=(20, 10),
)
ax.set_ylabel("Positive Sentiment Score")
ax.set_title("Sentiment Score comparison for first 5 companies")
plt.show()


In [None]:
ax = results_df.iloc[:5].plot(
    kind="bar",
    y=["FinBERT_Negative", "LM_Negative", "VADER_Negative", "SBN_Negative", "COVID_Contextual_Negative_Frequency"],
    figsize=(20, 10),
)
ax.set_ylabel("Negative Sentiment Score")
ax.set_title("Sentiment Score comparison for first 5 companies")
plt.show()


### Checking COVID-19 mentions

In [None]:
results_df.query("COVID_Related_Frequency == 0")

All MD&A sections mention terms related to COVID-19.

## Comparing Stock Prices with Results from Sentiment Analysis


### Loading Stock Data


In [None]:
stock_df = pd.read_csv("./data/stock_prices.csv", parse_dates=["Date"])
stock_df

### Loading Industry Data

In [None]:
ticker_data = pd.read_csv("./data/constituents.csv")

ticker_data["Symbol"] = ticker_data["Symbol"].str.replace(".", "-")

ticker_data = (
    ticker_data.set_index("Symbol").rename_axis("Ticker").join(results_df, how="inner")
)[["Name", "Sector"]].sort_index()

ticker_data


### Merging Datasets


In [None]:
# Calculate the yearly price change for each ticker
stock_df["Year"] = stock_df["Date"].dt.year

price_change_df = stock_df.pivot(
    index="Ticker", columns="Year", values="Close"
).pct_change(axis=1)

# Reset index and prepare for merging with the result
price_change_df = price_change_df.reset_index()
price_change_df = price_change_df.drop(columns=2020)
price_change_df.columns = ["Ticker", "Price_Change_%"]
price_change_df = price_change_df.set_index("Ticker").rename_axis("Document")

merged_df = results_df.join(price_change_df).join(ticker_data).reset_index()

merged_df

### VADER vs. Stock Price Change

In [None]:
plt.figure(figsize=(10, 6))

# Create a color map based on the Price Change %
colors = merged_df["Price_Change_%"].apply(lambda x: "red" if x < 0 else "green")

plt.scatter(
    merged_df["VADER_Compound"],
    merged_df["Price_Change_%"],
    c=colors,
    alpha=0.5,
    label=["Negative Change", "Positive Change"],
)

plt.xlabel("Compound Sentiment Score")
plt.ylabel("Stock Price Change (%)")
plt.title("VADER Compound Sentiment Score vs. Stock Price Change")

plt.grid(True)
# plt.savefig("./plots/VADER_Compound_vs_Price_Change.jpg", dpi=300, bbox_inches="tight")
plt.show()

### Loughran and McDonald vs. Stock Price Change

In [None]:
# create a scatter plot of the loughran and mcdonald dictionary and the price change
plt.figure(figsize=(10, 6))

# Create a color map based on the Price Change %
colors = merged_df["Price_Change_%"].apply(lambda x: "red" if x < 0 else "green")

plt.scatter(
    merged_df["LM_Polarity"],
    merged_df["Price_Change_%"],
    c=colors,
    alpha=0.5,
    label=["Negative Change", "Positive Change"],
)

plt.xlabel("Loughran and McDonald Polarity Score")
plt.ylabel("Stock Price Change (%)")
plt.title("Loughran and McDonald Polarity Score vs. Stock Price Change")

plt.grid(True)
# plt.savefig("./plots/LM_Compound_vs_Price_Change.jpg", dpi=300, bbox_inches="tight")
plt.show()

### SentiBigNomics vs. Stock Price Change


In [None]:
plt.figure(figsize=(10, 6))

# Create a color map based on the Price Change %
colors = merged_df["Price_Change_%"].apply(lambda x: "red" if x < 0 else "green")

plt.scatter(
    merged_df["SBN_Compound"],
    merged_df["Price_Change_%"],
    c=colors,
    alpha=0.5,
    label=["Negative Change", "Positive Change"],
)

plt.xlabel("Compound Sentiment Score")
plt.ylabel("Stock Price Change (%)")
plt.title("SBN Compound Sentiment Score vs. Stock Price Change")

plt.grid(True)
# plt.savefig("./plots/SBN_Compound_vs_Price_Change.jpg", dpi=300, bbox_inches="tight")
plt.show()

### FinBERT vs. Stock Price Change


Since most sentences have a neutral sentiment, we denote an overall document as "positive" if the average positive sentiment score is greater than the average negative sentiment score, and vice versa. We then calculate the stock price change for each ticker and compare it with the sentiment analysis results.

In [None]:
merged_df["FinBERT_Class"] = (
    merged_df[["FinBERT_Negative", "FinBERT_Positive"]]
    .idxmax(axis=1)
    .str.split("_")
    .str[1]
)

merged_df

In [None]:
# plot for FinBERT both positive and negative sentiment scores
plt.figure(figsize=(10, 6))

# Create a color map based on the Price Change %
colors = merged_df["Price_Change_%"].apply(lambda x: "red" if x < 0 else "green")

plt.scatter(
    merged_df["FinBERT_Positive"],
    merged_df["Price_Change_%"],
    c=colors,
    alpha=0.5,
    label=["Negative Change", "Positive Change"],
)


plt.xlabel("Positive Sentiment Score")
plt.ylabel("Stock Price Change (%)")
plt.title("FinBERT Positive Sentiment Score vs. Stock Price Change")

plt.grid(True)
# plt.savefig("./plots/FinBERT_Positive_vs_Price_Change.jpg", dpi=300, bbox_inches="tight")
plt.show()

In [None]:
# plot for FinBERT both positive and negative sentiment scores
plt.figure(figsize=(10, 6))

# Create a color map based on the Price Change %
colors = merged_df["Price_Change_%"].apply(lambda x: "red" if x < 0 else "green")

plt.scatter(
    merged_df["FinBERT_Negative"],
    merged_df["Price_Change_%"],
    c=colors,
    alpha=0.5,
    label=["Negative Change", "Positive Change"],
)


plt.xlabel("Negative Sentiment Score")
plt.ylabel("Stock Price Change (%)")
plt.title("FinBERT Negative Sentiment Score vs. Stock Price Change")

plt.grid(True)
# plt.savefig("./plots/FinBERT_Negative_vs_Price_Change.jpg", dpi=300, bbox_inches="tight")
plt.show()

In [None]:
industry_count = (
    merged_df.groupby("FinBERT_Class")["Sector"].value_counts().to_frame().reset_index()
)

plt.figure(figsize=(20, 10))
sns.countplot(
    data=merged_df,
    y="Sector",
    hue="FinBERT_Class",
    palette={"Negative": "red", "Positive": "green"},
)

plt.title("Industry Count by FinBERT Sentiment Class")
plt.ylabel("Industry")
plt.xlabel("Count")
plt.legend(title="FinBERT Sentiment Class")

plt.tight_layout()
# plt.savefig("./plots/Industry_Count_by_FinBERT_Sentiment_Class.jpg", dpi=300, bbox_inches="tight")
plt.show()


### COVID related analysis 

In [None]:
# create the dataset adn plot the average covid related frequency, contexual positive and negative by sector
covid_contextual_freq = (
    merged_df.groupby("Sector")[
        ["COVID_Related_Frequency", "COVID_Contextual_Positive_Frequency", "COVID_Contextual_Negative_Frequency"]
    ]
    .mean()
    .reset_index()
)

covid_contextual_freq = pd.melt(
    covid_contextual_freq,
    id_vars="Sector",
    value_vars=[
        "COVID_Related_Frequency",
        "COVID_Contextual_Positive_Frequency",
        "COVID_Contextual_Negative_Frequency",
    ],
    var_name="Frequency_Type",
    value_name="Frequency",
)

plt.figure(figsize=(20, 10))
sns.barplot(
    data=covid_contextual_freq,
    x="Frequency",
    y="Sector",
    hue="Frequency_Type",
    palette="viridis",
)

plt.title("Average COVID-19 Related and Contextual Frequency by Sector")
plt.xlabel("Frequency")
plt.ylabel("Sector")
plt.legend(title="Frequency Type")

plt.tight_layout()
# plt.savefig("./plots/Average_COVID-19_Related_and_Contextual_Frequency_by_Sector.jpg", dpi=300, bbox_inches="tight")
plt.show()
