In [None]:
import glob
import os
import re

from sentence_transformers import SentenceTransformer, util
from huggingface_hub import login
from dotenv import load_dotenv

load_dotenv()
hf_token = os.getenv("HF_TOKEN")
login(token=hf_token)

# keywords for “guidance” events and “layoff” events
GUIDANCE_KEYWORDS = [
    r'\bguidance\b',
    r'\bforecast\b',
    r'\brevis(ed|ion)?\b',
    r'\bupward\b',
    r'\bdownward\b',
    r'\braise[sd]?\b',
    r'\blower(ed)?\b',
    r'\bexpect(ed|ation)?\b',
]

LAYOFF_KEYWORDS = [
    r'\blayoff(ed|s)?\b',
    r'\bjob cut(s)?\b',
    r'\breduc(e|ing) (workforce|staff)\b',
    r'\brestructur(ing|e)?\b',
    r'\bdownsiz(e|ing)\b',
]

SEEDS = {
    "guidance_up": [
        "We are raising our full-year guidance.",
        "Outlook for the quarter has been increased."
    ],
    "guidance_down": [
        "We are lowering our guidance.",
        "Full-year outlook was reduced."
    ],
    "layoff": [
        "We will reduce headcount across several departments.",
        "The company has initiated workforce reductions."
    ]
}

# Folder of your 8-K .txt files:
INPUT_FOLDER = "wrds_clean_filings_1994"

# Cosine‐similarity threshold for flagging
THRESHOLD = 0.6

# Minimum paragraph length to consider
MIN_LEN = 50

mapping = {
    "negative": -10,
    "neutral": 0,
    "positive": 10
}


In [None]:
# 2. Load model and precompute seed embeddings ------------------------
from transformers import AutoTokenizer
from sentence_transformers import SentenceTransformer, models

tokenizer = AutoTokenizer.from_pretrained("ProsusAI/finbert")
# Build SBERT pipeline around FinBERT with mean pooling
word_embedding_model = models.Transformer("ProsusAI/finbert")
pooling_model = models.Pooling(word_embedding_model.get_word_embedding_dimension())
model = SentenceTransformer(modules=[word_embedding_model, pooling_model])

print("Encoding seed sentences…")
seed_embeddings = {
    label: model.encode(texts, convert_to_tensor=True)
    for label, texts in SEEDS.items()
}

In [None]:
def is_guidance_para(para):
    text = para.lower()
    for kw in GUIDANCE_KEYWORDS:
        if re.search(kw, text):
            return True
    return False

def is_layoff_para(para):
    text = para.lower()
    for kw in LAYOFF_KEYWORDS:
        if re.search(kw, text):
            return True
    return False

Dictionary Lookup Method

In [None]:
# results = {}   # filename -> list of (para, flags)
# counter = 0
#
# for filepath in glob.glob('wrds_clean_filings_1994/*.txt'):
#     if counter >= 20: break             #Number of 8-Ks to process
#     counter += 1
#     with open(filepath, 'r', encoding='utf-8', errors='ignore') as f:
#         text = f.read()
#     # split on two-or-more newlines
#     paras = re.split(r'\n\s*\n', text)
#
#     matches = []
#     for para in paras:
#         # skip very short “paras”
#         if len(para.strip()) < 50:
#             continue
#
#         guidance = is_guidance_para(para)
#         layoff   = is_layoff_para(para)
#         if guidance or layoff:
#             matches.append({
#                 'paragraph': para.strip(),
#                 'guidance_flag': guidance,
#                 'layoff_flag': layoff
#             })
#
#     if matches:
#         results[os.path.basename(filepath)] = matches

In [None]:
# for fname, paras in results.items():
#     print(f'=== {fname} ===')
#     for m in paras:
#         flags = []
#         if m['guidance_flag']: flags.append('GUIDANCE')
#         if m['layoff_flag']:   flags.append('LAYOFF')
#         print(f"[{','.join(flags)}]\n{m['paragraph']}\n")

FinBERT Pipeline

In [None]:
# 3. Process each file ------------------------------------------------

def extract_paragraphs(text):
    # split on two-or-more newlines
    paras = re.split(r"\n\s*\n", text)
    return [p.strip() for p in paras if len(p.strip()) >= MIN_LEN]


results = {}  # filename → list of (paragraph, {label:score})
counter = 0
for filepath in glob.glob(os.path.join(INPUT_FOLDER, "*.txt")):
    if counter >= 20:
        break
    counter += 1
    fname = os.path.basename(filepath)
    with open(filepath, "r", encoding="utf-8", errors="ignore") as f:
        text = f.read()

    paras = extract_paragraphs(text)
    if not paras:
        continue

    # embed all paragraphs in this file
    para_embs = model.encode(paras, convert_to_tensor=True)

    matches = []
    for idx, emb in enumerate(para_embs):
        # compute max‐similarity vs each seed category
        scores = {
            label: util.pytorch_cos_sim(emb, seed_emb).max().item()
            for label, seed_emb in seed_embeddings.items()
        }
        # pick any above threshold
        hits = {lbl:s for lbl,s in scores.items() if s >= THRESHOLD}
        if hits:
            matches.append((paras[idx], hits))

    if matches:
        results[fname] = matches

In [None]:
for fname, paras in results.items():
    print(f"\n=== {fname} ===")
    for para, hits in paras:
        lbls = ", ".join(f"{lbl} ({score:.2f})" for lbl, score in hits.items())
        print(f"[{lbls}]\n{para}\n")


Sentiment analysis with FinBERT and Llama

In [None]:
from transformers import pipeline

# FinBERT sentiment classifier
finbert_clf = pipeline(
    "sentiment-analysis",
    model="ProsusAI/finbert",
    tokenizer="ProsusAI/finbert",
    return_all_scores=False
)

# Llama instruction-based sentiment via text-generation
# Attempt to load Llama; if gated, fall back
try:
    llama_llm = pipeline(
        "text-generation",
        model="meta-llama/Llama-2-7b-chat-hf",
        tokenizer="meta-llama/Llama-2-7b-chat-hf",
        trust_remote_code=True,
        use_auth_token=True
    )
except Exception as e:
    print("Warning: Could not load Llama-2-7b-chat-hf:", e)
    llama_llm = None

print("\n=== Sentiment Comparison ===")
fin = []
for fname, paras in results.items():
    print(f"\n*** {fname} ***")
    for para, _ in paras:
        # Run FinBERT
        fin = finbert_clf(para[:512])[0]  # truncate to 512 tokens
        # Run Llama if available
        prompt = (
            "Classify the sentiment of the following paragraph "
            "as positive, neutral, or negative, and respond with just the label.\n\n"
            f"Paragraph:\n{para}\n\nSentiment:"
        )
        if llama_llm is not None:
            llama_out = llama_llm(prompt, max_length=20)
            llama_label = llama_out[0]["generated_text"].strip().split("\n")[0]
        else:
            llama_label = "Llama unavailable"
        # Print results
        print("Paragraph:", para)
        print(f"FinBERT -> label: {fin['label']}, score: {fin['score']:.2f}")
        print(f"Llama   -> label: {llama_label}")
        print("-" * 80)

In [None]:
#Return sentiment score for each file

finbert = pipeline(
    "sentiment-analysis",
    model="ProsusAI/finbert",
    tokenizer="ProsusAI/finbert",
    return_all_scores=True
)

def sentiment_score(text):
    # truncate to 512 tokens to keep it fast
    scores = finbert(text[:512])[0]
    # weighted average of our mapping
    return sum(mapping[d["label"]] * d["score"] for d in scores)

for filepath in sorted(glob.glob(f"{INPUT_FOLDER}/*.txt"))[:20]:
    fname = os.path.basename(filepath)
    txt = open(filepath, encoding="utf-8", errors="ignore").read()
    # split into paras and drop very short ones
    paras = [p.strip() for p in re.split(r"\n\s*\n", txt)
             if len(p.split()) >= MIN_LEN]

    # compute a sentiment score per para
    para_scores = [sentiment_score(p) for p in paras]
    if not para_scores:
        continue

    # (a) Print all paragraph scores:
    scores_str = ", ".join(f"{s:.2f}" for s in para_scores)
    print(f"{fname}: {scores_str}")

    # Print the file’s average score:
    avg = sum(para_scores)/len(para_scores)
    print(f"{fname}  ⟶  average sentiment intensity: {avg:.2f}\n")