<a href="https://colab.research.google.com/github/kumpaten/masters-thesis-code/blob/main/SentimentAnalysis.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
# Mount Google Drive
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
# install dependencies
!pip install vaderSentiment tiktoken openai

In [None]:
#@title Run GPT-4o on the 10-K (20-F) filings of each company and return sentiment scores for different aspects

# — Imports & setup
from google.colab import userdata
import openai, tiktoken, os, time, pandas as pd, re, random, math
from vaderSentiment.vaderSentiment import SentimentIntensityAnalyzer
from openai import RateLimitError, APIError, OpenAIError

# — API client
env_key = userdata.get('API_KEY_OPENAI')
client = openai.OpenAI(api_key=env_key)

MODEL = 'gpt-4o-mini'
INTANGIBLE_ASPECTS = [
    "Employee Satisfaction Sentiment",
    "Customer Satisfaction Sentiment",
    "Innovation Sentiment",
    "Intellectual Property Sentiment",
    "Brand Strength Sentiment",
]

# Keywords to extract relevant sentences for each aspect
ASPECT_KEYWORDS = {
    "Employee Satisfaction Sentiment": [
        "employee", "staff", "workforce", "morale", "engagement", "retention",
        "turnover", "compensation", "benefits", "training", "development",
        "culture", "team", "productivity", "well‐being", "leadership", "recognition"
    ],
    "Customer Satisfaction Sentiment": [
        "customer", "client", "satisfaction", "nps", "net promoter", "service",
        "support", "feedback", "loyalty", "experience", "complaint", "resolution",
        "churn", "retention", "adoption", "recommendation", "acquisition"
    ],
    "Innovation Sentiment": [
        "innov", "innovation", "research", "development", "r&d", "patent",
        "breakthrough", "technology", "prototype", "creativity", "disruption",
        "experiment", "iteration", "platform", "solution", "idea"
    ],
    "Intellectual Property Sentiment": [
        "intellectual property", "ip", "patent", "trademark", "copyright",
        "license", "infringement", "portfolio", "rightholder", "licensing",
        "royalties", "trade secret", "proprietary", "filing"
    ],
    "Brand Strength Sentiment": [
        "brand", "reputation", "marketing", "awareness", "equity", "image",
        "positioning", "perception", "branding", "advertising", "pr",
        "recognition", "visibility", "identity", "loyalty"
    ],
}

# Paths
BASE_DIR      = '/content/drive/MyDrive/10K-20F-Filings-Cleaned'
OUTPUT_SCORES = '/content/drive/MyDrive/intangible_scores.csv'
OUTPUT_JUSTS  = '/content/drive/MyDrive/intangible_justifications.csv'

# — Chunking helper
def chunk_text(text, max_tokens=190000, model=MODEL):
    try:
        enc = tiktoken.encoding_for_model(model)
    except KeyError:
        enc = tiktoken.get_encoding("cl100k_base")
    toks = enc.encode(text)
    return [enc.decode(toks[i:i+max_tokens])
            for i in range(0, len(toks), max_tokens)]

# — Extract aspect-relevant sentences with improved filtering
def extract_aspect_text(chunk, aspect):
    kws = ASPECT_KEYWORDS[aspect]
    sentences = re.split(r'(?<=[.!?]) +', chunk)
    relevant_sents = []

    # First pass - collect all sentences with exact keywords
    for s in sentences:
        # Only include sentences with keywords and minimum length
        if len(s) > 30 and any(k in s.lower() for k in kws):
            relevant_sents.append(s)

    # If we have at least 15 relevant sentences (approx. 500-1000 words), use those
    if len(relevant_sents) >= 15:
        return " ".join(relevant_sents)

    # If we don't have enough direct matches, expand search to nearby sentences
    # This captures context sentences that might not contain the keywords
    if len(relevant_sents) >= 3:
        expanded_sents = []
        matched_indices = [sentences.index(s) for s in relevant_sents if s in sentences]

        for idx in matched_indices:
            # Include 1 sentence before and after each relevant sentence
            start = max(0, idx - 1)
            end = min(len(sentences), idx + 2)
            expanded_sents.extend(sentences[start:end])

        # Remove duplicates while preserving order
        seen = set()
        expanded_unique = [s for s in expanded_sents if not (s in seen or seen.add(s))]

        if len(expanded_unique) >= 5:
            return " ".join(expanded_unique)

    # Still not enough? Get a larger sample of the document
    # Prioritize the beginning and end of the chunk where key info often appears
    sample_size = min(150, len(sentences) // 2)
    start_sentences = sentences[:sample_size]
    end_sentences = sentences[-sample_size:] if len(sentences) > sample_size else []

    return " ".join(start_sentences + end_sentences)

# — VADER sentiment analysis with bounded output
def vader_delta(text):
    if not text or len(text) < 50:
        return 0.0

    # Get compound score and convert to -3 to +3 range
    score = detector.polarity_scores(text)['compound'] * 3.0

    # Apply sigmoid to ensure extreme values are less common
    return 2 * (1 / (1 + math.exp(-score))) - 1

# Initialize VADER
detector = SentimentIntensityAnalyzer()

# — Single-aspect GPT call with robust float parsing
def analyze_aspect_chunk(text, aspect, prev_score, prev_summary, retries=3):
    if not text or len(text) < 100:
        return 0.0, "Insufficient relevant content to analyze."

    # Construct context that includes previous year information
    context = f"""
Analyze the sentiment change for {aspect} in this company's 10-K filing compared to the previous year.

PREVIOUS YEAR:
- Score: {prev_score:.1f} (on scale 0-10)
- Assessment: "{prev_summary}"

INSTRUCTIONS:
1. Evaluate ONLY the sentiment change from last year to this year.
2. Consider both explicit statements and tone regarding {aspect}.
3. Note any significant positives or negatives mentioned.
4. Compare the current language with the previous assessment.

SCALE:
- -2.0: Major negative change (significantly worse sentiment)
- -1.0: Moderate negative change
- 0.0: No significant change
- +1.0: Moderate positive change
- +2.0: Major positive change (significantly better sentiment)

Your response MUST follow this format:
<delta> || <detailed justification>

The justification should be 3-5 sentences explaining your reasoning with specific examples from the text.
"""

    # Add a section with key aspect words to focus on
    context += f"\nKEY FOCUS AREAS FOR {aspect.upper()}:\n"
    context += ", ".join(ASPECT_KEYWORDS[aspect][:10])  # Include top keywords

    # Complete the prompt with the text for analysis
    prompt = context + f"\n\n---\nTHIS YEAR'S 10-K TEXT ABOUT {aspect.upper()}:\n{text}\n---"

    for i in range(1, retries+1):
        try:
            resp = client.chat.completions.create(
                model=MODEL,
                messages=[{"role":"user","content":prompt}],
                temperature=0.2,
                max_tokens=1500
            )
            out = resp.choices[0].message.content.strip()
            if '||' in out:
                d_str, just = out.split('||',1)
                m = re.search(r'-?\d+\.?\d*', d_str)
                if m:
                    # Explicitly bound the delta to prevent extreme values
                    delta = float(m.group(0))
                    return max(-2.0, min(2.0, delta)), just.strip()
        except RateLimitError:
            wait = 5 * i
            time.sleep(wait)
        except (APIError, OpenAIError):
            time.sleep(5)

    # Skip if all retries fail
    return 0.0, "Analysis failed after multiple attempts."

# Function to initialize baseline scores with differentiation
def initialize_baseline_scores():
    # Start with differentiated scores for each aspect
    return {
        "Employee Satisfaction Sentiment": 5.0 + random.uniform(-0.5, 0.5),
        "Customer Satisfaction Sentiment": 5.0 + random.uniform(-0.5, 0.5),
        "Innovation Sentiment": 5.0 + random.uniform(-0.5, 0.5),
        "Intellectual Property Sentiment": 5.0 + random.uniform(-0.5, 0.5),
        "Brand Strength Sentiment": 5.0 + random.uniform(-0.5, 0.5)
    }

# — Improved score calculation that allows for both increases and decreases
def calculate_new_score(prev_score, delta):
    # Scale factor decreases as scores move away from the middle (5.0)
    distance_from_center = abs(prev_score - 5.0)
    scale_factor = 1.0 - (distance_from_center / 10.0)

    # Calculate the raw score change
    # The farther from 5, the harder it is to move in that direction
    if (delta > 0 and prev_score > 5.0) or (delta < 0 and prev_score < 5.0):
        # Moving farther from center is harder
        adjusted_delta = delta * scale_factor
    else:
        # Moving toward center is easier
        adjusted_delta = delta * 1.0

    # Calculate new score and ensure it stays within bounds
    new_score = prev_score + adjusted_delta
    return round(max(1.0, min(9.0, new_score)), 1)  # Limit to 1.0-9.0 range

# Function to save a summary text file with key findings
def save_summary_report(scores_rows, justs_rows, output_path):
    companies = set(row['Company'] for row in scores_rows)

    with open(output_path, 'w') as f:
        f.write("# Intangible Asset Sentiment Analysis Summary Report\n\n")

        for company in sorted(companies):
            f.write(f"## {company}\n\n")

            # Get all years for this company
            company_scores = [row for row in scores_rows if row['Company'] == company]
            company_years = sorted(set(row['Year'] for row in company_scores))

            if not company_years:
                f.write("No data available.\n\n")
                continue

            # Create a summary of score trends
            f.write("### Score Trends\n\n")
            f.write("| Year | " + " | ".join(INTANGIBLE_ASPECTS) + " |\n")
            f.write("|------|" + "|".join(["------|" for _ in INTANGIBLE_ASPECTS]) + "\n")

            for year in company_years:
                year_data = next((row for row in company_scores if row['Year'] == year), None)
                if year_data:
                    f.write(f"| {year} | ")
                    f.write(" | ".join([f"{year_data.get(asp, 'N/A'):.1f}" for asp in INTANGIBLE_ASPECTS]))
                    f.write(" |\n")

            f.write("\n### Key Insights\n\n")

            # Find interesting trends
            for asp in INTANGIBLE_ASPECTS:
                asp_scores = [(row['Year'], row.get(asp, None)) for row in company_scores]
                asp_scores = [(year, score) for year, score in asp_scores if score is not None]

                if len(asp_scores) >= 2:
                    asp_scores.sort()  # Sort by year
                    first_year, first_score = asp_scores[0]
                    last_year, last_score = asp_scores[-1]

                    # Calculate overall trend
                    score_diff = last_score - first_score

                    if abs(score_diff) >= 1.0:
                        direction = "increased" if score_diff > 0 else "decreased"
                        f.write(f"- {asp} {direction} by {abs(score_diff):.1f} points from {first_year} to {last_year}\n")

                        # Get the justifications for the latest year
                        latest_just = next((row['Justification'] for row in justs_rows
                                           if row['Company'] == company and row['Year'] == last_year and row['Aspect'] == asp), None)

                        if latest_just:
                            f.write(f"  - Latest assessment: {latest_just}\n")

            f.write("\n")

    print(f"Summary report saved to {output_path}")

# — Main loop
def main():
    scores_rows, justs_rows = [], []

    # Check if previous output exists and load it
    existing_scores = {}
    if os.path.exists(OUTPUT_SCORES):
        existing_df = pd.read_csv(OUTPUT_SCORES)
        for _, row in existing_df.iterrows():
            company = row['Company']
            year = row['Year']
            if company not in existing_scores:
                existing_scores[company] = {}
            existing_scores[company][year] = {asp: row[asp] for asp in INTANGIBLE_ASPECTS}

        # Also load existing rows for continuity
        scores_rows = existing_df.to_dict('records')

    existing_justs = {}
    if os.path.exists(OUTPUT_JUSTS):
        justs_df = pd.read_csv(OUTPUT_JUSTS)
        for _, row in justs_df.iterrows():
            company = row['Company']
            year = row['Year']
            aspect = row['Aspect']
            if company not in existing_justs:
                existing_justs[company] = {}
            if year not in existing_justs[company]:
                existing_justs[company][year] = {}
            existing_justs[company][year][aspect] = row['Justification']

        # Also load existing rows for continuity
        justs_rows = justs_df.to_dict('records')

    for company in sorted(os.listdir(BASE_DIR)):
        comp_dir = os.path.join(BASE_DIR, company)
        if not os.path.isdir(comp_dir):
            continue

        # Initialize with slightly varied baselines for each company
        prev_scores = initialize_baseline_scores()
        prev_summary = {asp: 'Initial baseline assessment.' for asp in INTANGIBLE_ASPECTS}

        # Sort files by year for proper sequential analysis
        files = sorted(os.listdir(comp_dir))
        years_processed = []

        # Find the first year's file for this company
        first_year = None
        for fname in files:
            if fname.endswith('.txt'):
                year = os.path.splitext(fname)[0].split('_')[-1]
                first_year = year
                break

        for fname in files:
            if not fname.endswith('.txt'):
                continue

            year = os.path.splitext(fname)[0].split('_')[-1]
            print(f"→ {company} {year}")

            # If this is the first year and we don't have prior data, use baseline
            if year == first_year and not (company in existing_scores and year in existing_scores[company]):
                # Use baseline scores - no prior year to compare with
                pass
            # Otherwise, load previous year's data if available
            elif company in existing_scores:
                # Find the previous year's data
                prev_year = None
                for y in sorted(existing_scores[company].keys()):
                    if y < year:  # Find the most recent prior year
                        prev_year = y

                if prev_year:
                    prev_scores = existing_scores[company][prev_year]
                    if company in existing_justs and prev_year in existing_justs[company]:
                        prev_summary = existing_justs[company][prev_year]

            # Skip if already processed this specific year
            if company in existing_scores and year in existing_scores[company]:
                print(f"  Already processed {company} {year}, skipping...")
                years_processed.append(year)
                continue

            # Load and chunk the filing
            text = open(os.path.join(comp_dir, fname), encoding='utf-8').read()
            chunks = chunk_text(text)

            # Per-aspect storage
            year_deltas = {asp: [] for asp in INTANGIBLE_ASPECTS}
            year_justs = {asp: [] for asp in INTANGIBLE_ASPECTS}

            # Process each chunk
            for chunk_idx, chunk in enumerate(chunks):
                print(f"  Processing chunk {chunk_idx+1}/{len(chunks)}")

                for asp in INTANGIBLE_ASPECTS:
                    # 1) Extract relevant text
                    relevant_text = extract_aspect_text(chunk, asp)

                    # 2) Run VADER sentiment
                    vader_score = vader_delta(relevant_text)

                    # 3) Run GPT for that aspect
                    gpt_delta, just = analyze_aspect_chunk(
                        relevant_text, asp, prev_scores[asp], prev_summary.get(asp, 'No prior assessment available.')
                    )

                    if just == "Analysis failed after multiple attempts.":
                        continue

                    # 4) Calculate combined delta (weighted toward GPT)
                    combined_delta = (gpt_delta * 0.7) + (vader_score * 0.3)

                    # Store results
                    year_deltas[asp].append(combined_delta)
                    year_justs[asp].append(just)

                # Pause to ease rate limiting
                time.sleep(1.5)

            # Calculate new scores and choose justifications
            new_scores, new_summary = {}, {}

            for asp in INTANGIBLE_ASPECTS:
                deltas = year_deltas[asp]
                justs = year_justs[asp]

                if deltas:
                    # Take average of deltas
                    avg_delta = sum(deltas) / len(deltas)

                    # Calculate new score with improved logic
                    new_score = calculate_new_score(prev_scores[asp], avg_delta)

                    # Choose justification closest to average delta
                    idx = min(range(len(deltas)), key=lambda i: abs(deltas[i] - avg_delta))
                    justification = justs[idx]
                else:
                    # If no data, maintain previous score but note the lack of information
                    new_score = prev_scores[asp]
                    justification = "Insufficient data for meaningful analysis."

                new_scores[asp] = new_score
                new_summary[asp] = justification

            # Append results
            scores_row = {'Company': company, 'Year': year}
            scores_row.update(new_scores)
            scores_rows.append(scores_row)

            for asp in INTANGIBLE_ASPECTS:
                justs_rows.append({
                    'Company': company,
                    'Year': year,
                    'Aspect': asp,
                    'Justification': new_summary[asp]
                })

            # Save after each company-year
            pd.DataFrame(scores_rows).to_csv(OUTPUT_SCORES, index=False)
            pd.DataFrame(justs_rows).to_csv(OUTPUT_JUSTS, index=False)

            # Create a summary report
            summary_path = '/content/drive/MyDrive/intangible_summary_report.md'
            save_summary_report(scores_rows, justs_rows, summary_path)

            print(f"Saved through {company} {year}")

            # Update baselines for next year
            prev_scores = new_scores
            prev_summary = new_summary
            years_processed.append(year)

    print("✅ All done!")

if __name__ == '__main__':
    main()