# **Brand Sentiment Analysis using OLMo 2 Pre-training Dataset** (OLMo Mix 1124)

# Table of Contents
- [0. Setup](#0-setup)  
- [1. Data Loading](#1-data-loading)  
- [2. Data Exploration](#2-data-exploration)  
- [3. Data Preprocessing](#3-data-preprocessing)  
    - [3.1 Data Cleaning](#31-data-cleaning)   
    - [3.2 Feature Engineering](#32-feature-engineering)  
- [4. Brand Sentiment Analysis](#4-brand-sentiment-analysis)  
    - [4.1 Lexicon-Based](#41-lexicon-based)  
    - [4.2 Transformer-based](#42-transformer-based)
- [5. Visualisations](#5-visualisations)  

# 0. Setup

In [1]:
!pip install -r requirements.txt



In [2]:
import pandas as pd
import re
import os
import time
import torch
import nltk
import zstandard as zstd

In [3]:
from pyspark.sql import SparkSession, functions as F, types as T
from pyspark.sql.functions import col, udf, pandas_udf, PandasUDFType, arrays_zip, array_contains, substring, length, explode, first, avg, when, monotonically_increasing_id
from pyspark.sql.functions import to_date, dayofmonth, month, year
from pyspark.sql.types import DoubleType, IntegerType, StringType, FloatType, BooleanType, ArrayType, StructType, StructField
from pyspark.ml.feature import Tokenizer
from functools import reduce
from transformers import pipeline, AutoTokenizer, AutoModelForSequenceClassification
from collections import defaultdict
from emoji import demojize
from urllib.parse import urlparse
from nltk.tokenize import word_tokenize
from nltk.sentiment.vader import SentimentIntensityAnalyzer

nltk.download('wordnet')
nltk.download('punkt')
nltk.download('vader_lexicon')

[nltk_data] Downloading package wordnet to /home/jovyan/nltk_data...
[nltk_data]   Package wordnet is already up-to-date!
[nltk_data] Downloading package punkt to /home/jovyan/nltk_data...
[nltk_data]   Package punkt is already up-to-date!
[nltk_data] Downloading package vader_lexicon to
[nltk_data]     /home/jovyan/nltk_data...
[nltk_data]   Package vader_lexicon is already up-to-date!


True

# 1. Data Loading and Exploration

In [4]:
# Spark initialisation
spark = SparkSession.builder \
    .appName("Olmo Mix 1124 Sentiment Analysis") \
    .master("local[*]") \
    .config("spark.driver.memory", "16g") \
    .config("spark.executor.memory", "8g") \
    .config("spark.sql.shuffle.partitions", "4") \
    .getOrCreate()

spark.sparkContext.setLogLevel("ERROR")

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/06/06 00:06:26 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
----------------------------------------
Exception occurred during processing of request from ('127.0.0.1', 33610)
Traceback (most recent call last):
  File "/opt/conda/lib/python3.12/socketserver.py", line 318, in _handle_request_noblock
    self.process_request(request, client_address)
  File "/opt/conda/lib/python3.12/socketserver.py", line 349, in process_request
    self.finish_request(request, client_address)
  File "/opt/conda/lib/python3.12/socketserver.py", line 362, in finish_request
    self.RequestHandlerClass(request, client_address, self)
  File "/opt/conda/lib/python3.12/socketserver.py", line 761, in __init__
    self.handle()
  File "/opt/conda/lib/python3.12/site-packages/pyspark/accumulators.py", li

In [5]:
# Decompressses files
input_dir = "./data"  # folder containing .jsonl.zstd files
output_dir = "./data/temp_jsonl"  # temporary location for decompressed files
os.makedirs(output_dir, exist_ok=True)

decompressed_files = []
for filename in os.listdir(input_dir):
    if filename.endswith(".jsonl.zstd"):
        input_path = os.path.join(input_dir, filename)
        output_path = os.path.join(output_dir, filename.replace(".zstd", ""))
        decompressed_files.append(output_path)

        with open(input_path, 'rb') as compressed:
            dctx = zstd.ZstdDecompressor()
            with open(output_path, 'wb') as out:
                dctx.copy_stream(compressed, out)

# Aligns metadata schema 
metadata_schema = T.StructType([
    T.StructField("Content-Length", T.StringType(), True),
    T.StructField("Content-Type", T.StringType(), True),
    T.StructField("WARC-Block-Digest", T.StringType(), True),
    T.StructField("WARC-Concurrent-To", T.StringType(), True),
    T.StructField("WARC-Date", T.StringType(), True),
    T.StructField("WARC-IP-Address", T.StringType(), True),
    T.StructField("WARC-Payload-Digest", T.StringType(), True),
    T.StructField("WARC-Record-ID", T.StringType(), True),
    T.StructField("WARC-Target-URI", T.StringType(), True),
    T.StructField("WARC-Truncated", T.StringType(), True),
    T.StructField("WARC-Type", T.StringType(), True),
    T.StructField("WARC-Warcinfo-ID", T.StringType(), True),
    T.StructField("WARC-Identified-Payload-Type", T.StringType(), True),
])

# Pad metadata fields to align schemas
def pad_metadata_column(df):
    existing_fields = df.select("metadata.*").columns

    new_metadata_cols = []
    for field in metadata_schema:
        if field.name in existing_fields:
            new_metadata_cols.append(F.col("metadata").getField(field.name).alias(field.name))
        else:
            new_metadata_cols.append(F.lit(None).cast(field.dataType).alias(field.name))

    return df.withColumn("metadata", F.struct(new_metadata_cols))

df_list = []
for file_path in decompressed_files:
    df = spark.read.json(file_path)
    if "metadata" in df.columns:
        df = pad_metadata_column(df)
    else:
        # if metadata column missing entirely, add it as null struct
        df = df.withColumn("metadata", F.lit(None).cast(metadata_schema))
    df_list.append(df)

                                                                                

In [6]:
# Combining all dataframes into a combined dataframe
df = reduce(lambda a, b: a.unionByName(b, allowMissingColumns=True), df_list)
df.show()

# Can convert to Parquet format
# df.write.mode("overwrite").parquet("data/combined_data.parquet")

+---------------------------------------+---------------------------------------------------------------+-------------------------------+--------------------+-------------------+--------------------+--------------------+--------------------+
|bff_contained_ngram_count_before_dedupe|fasttext_openhermes_reddit_eli5_vs_rw_v2_bigram_200k_train_prob|language_id_whole_page_fasttext|            metadata|previous_word_count|                text|                 url|            warcinfo|
+---------------------------------------+---------------------------------------------------------------+-------------------------------+--------------------+-------------------+--------------------+--------------------+--------------------+
|                                     25|                                           0.028186440467834473|           {0.9871476292610168}|{90145, applicati...|               1052|Lavender Scare: U...|http://abcnews.go...|robots: classic\r...|
|                               

| Column Name | Description |
|-------------|-------------|
| `bff_contained_ngram_count_before_dedupe` | Number of known n-grams from the BFF (Base Filtered Fragments) list that were found before deduplication. Higher counts imply overlap with previously seen content, used in deduplication. |
| `fasttext_openhermes_reddit_eli5_vs_rw_v2_bigram_200k_train_prob` | FastText model output indicating the likelihood this text resembles a specific user-generated corpus (computed based on bigram training probabilities). |
| `language_id_whole_page_fasttext` | Probability or confidence that the whole page is in English. |
| `metadata` | Structured metadata `{doc_id, source_tag, timestamp, ...}`. |
| `previous_word_count` | Word count of this document or segment before it was deduplicated. |
| `text` | The main text body extracted from the document.|
| `url` | The original URL source of the content. Can be used for classifying content type (e.g., forums, news articles, advertising, social media, etc.). |
| `warcinfo` | Extracted metadata from the WARC.|

In [7]:
df.printSchema()

root
 |-- bff_contained_ngram_count_before_dedupe: long (nullable = true)
 |-- fasttext_openhermes_reddit_eli5_vs_rw_v2_bigram_200k_train_prob: double (nullable = true)
 |-- language_id_whole_page_fasttext: struct (nullable = true)
 |    |-- en: double (nullable = true)
 |-- metadata: struct (nullable = false)
 |    |-- Content-Length: string (nullable = true)
 |    |-- Content-Type: string (nullable = true)
 |    |-- WARC-Block-Digest: string (nullable = true)
 |    |-- WARC-Concurrent-To: string (nullable = true)
 |    |-- WARC-Date: string (nullable = true)
 |    |-- WARC-IP-Address: string (nullable = true)
 |    |-- WARC-Payload-Digest: string (nullable = true)
 |    |-- WARC-Record-ID: string (nullable = true)
 |    |-- WARC-Target-URI: string (nullable = true)
 |    |-- WARC-Truncated: string (nullable = true)
 |    |-- WARC-Type: string (nullable = true)
 |    |-- WARC-Warcinfo-ID: string (nullable = true)
 |    |-- WARC-Identified-Payload-Type: string (nullable = true)
 |-- pr

In [8]:
num_rows = df.count()
num_cols = len(df.columns)
print(f"Number of rows: {num_rows}")
print(f"Number of columns: {num_cols}")



Number of rows: 392833
Number of columns: 8


                                                                                

# 3. Data Preprocessing

## 3.1 Data Cleaning

In [9]:
# Filter for non-null/non-empty text
df = df.filter(col("text").isNotNull() & (col("text") != ""))

# Rename for usability
df = (df
      .withColumnRenamed("fasttext_openhermes_reddit_eli5_vs_rw_v2_bigram_200k_train_prob", "user_gen_prob")
      .withColumnRenamed("language_id_whole_page_fasttext", "lang_en_prob"))

In [10]:
# Filter to only high-confidence English content
df = df.filter(col("lang_en_prob.en") >= 0.95)

# Filter low-signal/high-duplicate content
df = df.filter(col("bff_contained_ngram_count_before_dedupe") < 1000)

In [11]:
df = df.drop("bff_contained_ngram_count_before_dedupe", "warcinfo", "lang_en_prob")

In [12]:
# word_count
df = df.withColumnRenamed("previous_word_count", "word_count")
df = df.filter(col("word_count") >= 10)

In [13]:
# Tokenise text
def clean_text(text):
    if not isinstance(text, str):
        return ""
    
    text = re.sub(r'[\n\r]', ' ', text)     # Remove newlines and carriage returns
    text = re.sub(r'[^\w\s]', '', text.lower())     # Remove punctuation and lowercase
    text = re.sub(r'\d+', '', text)   # Remove digits
    text = re.sub(r'\s+', ' ', text).strip() # Normalize whitespace
    text = demojize(text)  # Convert emojis to text (e.g., 😊 → :smiling_face:)
    text = re.sub(r'[^\w\s:]', '', text.lower())  # Preserve emoji tokens
    return text

def nltk_tokenize(text):
    if not isinstance(text, str) or text.strip() == "":
        return []
    return word_tokenize(text)

clean_udf = udf(clean_text, StringType())
tokenize_udf = udf(nltk_tokenize, ArrayType(StringType()))

# Clean text
df = df.withColumn("clean_text", clean_udf(col("text")))

# Tokenise text
df = df.withColumn("tokens", tokenize_udf(col("clean_text")))


In [14]:
df.show()

[Stage 9:>                                                          (0 + 1) / 1]

+--------------------+--------------------+----------+--------------------+--------------------+--------------------+--------------------+
|       user_gen_prob|            metadata|word_count|                text|                 url|          clean_text|              tokens|
+--------------------+--------------------+----------+--------------------+--------------------+--------------------+--------------------+
|0.028186440467834473|{90145, applicati...|      1052|Lavender Scare: U...|http://abcnews.go...|lavender scare us...|[lavender, scare,...|
| 0.04570215940475464|{45887, applicati...|       807|Mentor vs. pupil ...|http://articles.c...|mentor vs pupil i...|[mentor, vs, pupi...|
|0.021046340465545654|{44841, applicati...|       854|Judge refuses to ...|http://articles.m...|judge refuses to ...|[judge, refuses, ...|
| 0.09183675050735474|{34772, applicati...|      2334|Join 3,380 reader...|http://ask.metafi...|join readers in h...|[join, readers, i...|
| 0.02820640802383423|{9044

                                                                                

In [15]:
num_rows = df.count()
num_cols = len(df.columns)
print(f"Number of rows: {num_rows}")
print(f"Number of columns: {num_cols}")



Number of rows: 182600
Number of columns: 7


                                                                                

## 3.2 Feature Engineering

In [16]:
# UK banks
BRANDS = ["barclays", "lloyds", "natwest", "hsbc", "monzo", "revolut", "tsb", "santander", "starling", "nationwide"]

# Banking-related context terms to confirm brand relevance
BANKING_CONTEXT = [
    "finance", "financial", "bank", "banking", "account", "savings", "current", "mortgage", "loan", "credit", "debit", "card",
    "app", "mobile", "online", "branch", "atm", "transfer", "fees", "overdraft", "service", "support"
]

# Negative context terms to exclude false positives
NEGATIVE_CONTEXT = {
    "nationwide": ["global", "worldwide", "across the nation", "nationwide coverage"],
    "revolut": ["revolution", "revolutionary", "national revolution"],
    "barclays": ["barclays center", "barclays arena"],
    "lloyds": ["lloyds of london"],
    "natwest": [],  
    "hsbc": [],    
    "monzo": [],   
    "tsb": [],    
    "santander": [], 
    "starling": ["starling bird", "starlings"],
    "nationwide": ["nationwide insurance"]  # excludes US insurance company
}

### Extraction of Brand-related Content: *brand_name* and *mention_count*

In [17]:
def extract_brands_and_counts(text):
    if not isinstance(text, str):
        return [], []
    text_lower = text.lower()
    tokens = word_tokenize(text_lower)
    
    brands_found = []
    counts = []
    
    for brand in BRANDS:
        brand_count = 0
        
        # Check for brand in tokens with word boundaries
        brand_pattern = r'\b' + re.escape(brand) + r'\b'
        matches = re.findall(brand_pattern, text_lower)
        brand_count += len(matches)
        
        # Validating with banking context (at least 1 banking term nearby)
        has_banking_context = False
        for context in BANKING_CONTEXT:
            if context in text_lower:
                has_banking_context = True
                break
        
        # Check for negative context to exclude false positives
        has_negative_context = False
        for negative_term in NEGATIVE_CONTEXT.get(brand, []):
            if negative_term in text_lower:
                has_negative_context = True
                break
        
        # Only include brand if it has banking context and no negative context
        if brand_count > 0 and has_banking_context and not has_negative_context:
            brands_found.append(brand)
            counts.append(brand_count)
    
    return brands_found, counts

@udf(ArrayType(StringType()))
def extract_brands(text):
    brands, _ = extract_brands_and_counts(text)
    return brands

@udf(ArrayType(IntegerType()))
def extract_mention_counts(text):
    _, counts = extract_brands_and_counts(text)
    return counts

df = df.withColumn("brand_name", extract_brands(col("clean_text")))
df = df.withColumn("mention_count", extract_mention_counts(col("clean_text")))

# Filter rows with at least 1 valid brand mention
df = df.filter(col("brand_name").isNotNull() & (col("brand_name").getItem(0).isNotNull()))
df.show()

[Stage 13:>                                                         (0 + 1) / 1]

+--------------------+--------------------+----------+--------------------+--------------------+--------------------+--------------------+------------+-------------+
|       user_gen_prob|            metadata|word_count|                text|                 url|          clean_text|              tokens|  brand_name|mention_count|
+--------------------+--------------------+----------+--------------------+--------------------+--------------------+--------------------+------------+-------------+
| 0.12067514657974243|{281399, applicat...|      6572|Forgot your passw...|http://slashdot.o...|forgot your passw...|[forgot, your, pa...|[nationwide]|          [1]|
|0.024826467037200928|{157993, applicat...|       929|Golf Digest edito...|http://www.golfdi...|golf digest edito...|[golf, digest, ed...|      [hsbc]|          [1]|
| 0.12538021802902222|{123284, applicat...|      5279|Friday, June 18, ...|http://hotinterne...|friday june write...|[friday, june, wr...|      [hsbc]|          [1]|
| 0.

                                                                                

### *url_domain*

In [18]:
# url_domain
@udf(StringType())
def extract_domain(url):
    if not isinstance(url, str):
        return ""
    parsed = urlparse(url)
    return parsed.netloc or ""

df = df.withColumn("url_domain", extract_domain(col("url")))

### Classification of Brand-related Content: *content_type*

In [19]:
def classify_content(url, prob, text, content_type):
    if not isinstance(url, str):
        url = ""
    if not isinstance(text, str):
        text = ""
    if not isinstance(content_type, str):
        content_type = ""
    
    url = url.lower()
    text = text.lower()
    content_type = content_type.lower()
    prob = prob if prob is not None else 0.0
    
    # Social media: High FastText score or social platforms
    social_media_domains = [
        "reddit", "twitter", "x.com", "facebook", "linkedin", "instagram",
        "tiktok", "pinterest", "forum", "discuss", "community"
    ]
    if prob >= 0.5 or any(domain in url for domain in social_media_domains):
        return "user_generated"
    
    # News article: Reputable news sources
    news_domains = [
        "bbc", "guardian", "telegraph", "ft.com", "reuters", "bloomberg",
        "cnn", "nytimes", "independent", "dailymail", "sky.com", "news"
    ]
    if any(domain in url for domain in news_domains) or "news" in text:
        return "news_article"
    
    # Customer review: Review platforms or review-related keywords
    review_domains = ["trustpilot", "feefo", "reviews", "yelp", "google.com/reviews"]
    review_keywords = ["review", "rated", "rating", "customer feedback", "complaint", "testimonial"]
    if any(domain in url for domain in review_domains) or any(keyword in text for keyword in review_keywords):
        return "customer_review"
    
    # Blog post: Blog platforms or keywords
    blog_domains = ["medium", "wordpress", "blogger", "tumblr", "substack"]
    blog_keywords = ["blog", "post", "article by", "opinion piece"]
    if any(domain in url for domain in blog_domains) or any(keyword in text for keyword in blog_keywords):
        return "blog_post"
    
    # Regulatory document: Official or compliance-related sources
    regulatory_domains = ["fca.org.uk", "bankofengland", "gov.uk", "regulations", "compliance"]
    if any(domain in url for domain in regulatory_domains) or content_type == "application/pdf":
        return "regulatory_document"
    
    # Advertising content: Promotional keywords
    advertising_keywords = ["ads", "campaign", "promo", "sponsor", "advert", "promotion"]
    if any(term in url for term in advertising_keywords) or any(term in text for term in advertising_keywords):
        return "advertising_content"
    
    # Owned media: Brand or institutional domains
    owned_media_domains = [
        "gov.uk", "ac.uk", "co.uk", "barclays", "lloyds", "natwest", "hsbc",
        "monzo", "revolut", "tsb", "santander", "starling", "nationwide"
    ]
    if any(domain in url for domain in owned_media_domains):
        return "owned_media"
    
    # Forum post: Specific forum platforms
    forum_domains = ["moneysavingexpert", "thestudentroom", "forums", "discussion"]
    if any(domain in url for domain in forum_domains) and prob >= 0.3:
        return "forum_post"
    
    # FAQ/Knowledge base: Support or informational pages
    faq_keywords = ["faq", "help", "support", "knowledge base", "how to", "guide"]
    if any(keyword in url for keyword in faq_keywords) or any(keyword in text for keyword in faq_keywords):
        return "faq_knowledge_base"
    
    # Default: Other
    return "miscellaneous"

content_type_udf = udf(classify_content, StringType())
df = df.withColumn("content_type", content_type_udf(
    col("url"),
    col("user_gen_prob"),
    col("clean_text"),
    col("metadata.Content-Type")
))

In [20]:
df.show()

[Stage 14:>                                                         (0 + 1) / 1]

+--------------------+--------------------+----------+--------------------+--------------------+--------------------+--------------------+------------+-------------+--------------------+------------------+
|       user_gen_prob|            metadata|word_count|                text|                 url|          clean_text|              tokens|  brand_name|mention_count|          url_domain|      content_type|
+--------------------+--------------------+----------+--------------------+--------------------+--------------------+--------------------+------------+-------------+--------------------+------------------+
| 0.12067514657974243|{281399, applicat...|      6572|Forgot your passw...|http://slashdot.o...|forgot your passw...|[forgot, your, pa...|[nationwide]|          [1]|        slashdot.org|      news_article|
|0.024826467037200928|{157993, applicat...|       929|Golf Digest edito...|http://www.golfdi...|golf digest edito...|[golf, digest, ed...|      [hsbc]|          [1]|  www.golfd

                                                                                

### Temporal columns

In [21]:
df = df.withColumn("date", to_date(col("metadata.WARC-Date")))
df = df.withColumn("day", dayofmonth(col("date")))
df = df.withColumn("month", month(col("date")))
df = df.withColumn("year", year(col("date")))

### Drop Irrelevant Columns

In [22]:
df = df.drop("user_gen_prob", "metadata")

### Summary of Final Columns

| Column Name         | Data Type          | Description                                      |
|---------------------|--------------------|--------------------------------------------------|
| `brand_name`        | Array of String    | List of UK bank brands mentioned in the text (e.g., ["barclays", "nationwide"]) |
| `mention_count`     | Array of Integer   | Number of mentions for each brand in `brand_name` (e.g., [2, 1]) |
| `content_type`      | String             | Classification of content (e.g., social_media, news_article, customer_review) |
| `raw_text`          | String             | Original, unprocessed text from the dataset       |
| `clean_text`        | String             | Preprocessed text for analysis (lowercase, no punctuation) |
| `tokens`            | Array of String    | Tokenized words from `clean_text` for further processing |
| `url`               | String             | Original URL of the content source               |
| `url_domain`        | String             | Root domain extracted from `url` (e.g., x.com)   |
| `word_count`        | Long               | Number of words in the content                   |
| `date`              | Date               | Full crawl timestamp (e.g., 2024-11-24)          |
| `day`               | Integer            | Day of the month from metadata        |
| `month`             | Integer            | Month from crawl metadata                   |
| `year`              | Integer            | Year from crawl metadata                    |

In [23]:
df = df.selectExpr(
    "brand_name",
    "mention_count",
    "content_type",
    "text AS raw_text",
    "clean_text",
    "tokens",
    "url",
    "url_domain",
    "word_count",
    "date",
    "day",
    "month",
    "year",
)

df.printSchema()

root
 |-- brand_name: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- mention_count: array (nullable = true)
 |    |-- element: integer (containsNull = true)
 |-- content_type: string (nullable = true)
 |-- raw_text: string (nullable = true)
 |-- clean_text: string (nullable = true)
 |-- tokens: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- url: string (nullable = true)
 |-- url_domain: string (nullable = true)
 |-- word_count: long (nullable = true)
 |-- date: date (nullable = true)
 |-- day: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- year: integer (nullable = true)



In [24]:
df.show()

[Stage 15:>                                                         (0 + 1) / 1]

+------------+-------------+------------------+--------------------+--------------------+--------------------+--------------------+--------------------+----------+----------+---+-----+----+
|  brand_name|mention_count|      content_type|            raw_text|          clean_text|              tokens|                 url|          url_domain|word_count|      date|day|month|year|
+------------+-------------+------------------+--------------------+--------------------+--------------------+--------------------+--------------------+----------+----------+---+-----+----+
|[nationwide]|          [1]|      news_article|Forgot your passw...|forgot your passw...|[forgot, your, pa...|http://slashdot.o...|        slashdot.org|      6572|2014-07-14| 14|    7|2014|
|      [hsbc]|          [1]|      news_article|Golf Digest edito...|golf digest edito...|[golf, digest, ed...|http://www.golfdi...|  www.golfdigest.com|       929|2014-07-14| 14|    7|2014|
|      [hsbc]|          [1]|      news_article|Fri

                                                                                

In [None]:
# num_rows = df.count()
# num_cols = len(df.columns)
# print(f"Number of rows: {num_rows}")
# print(f"Number of columns: {num_cols}")



In [25]:
df.cache()

DataFrame[brand_name: array<string>, mention_count: array<int>, content_type: string, raw_text: string, clean_text: string, tokens: array<string>, url: string, url_domain: string, word_count: bigint, date: date, day: int, month: int, year: int]

# 4. Brand Sentiment Analysis

In this section, sentiment analysis is performed on UK bank brand mentions using a hybrid approach combining lexicon-based (VADER) and a transformer-based model (FinBERT).

### 4.1 Lexicon-Based (VADER)

The following code performs brand sentiment analysis using NLTK's VADER (Valence Aware Dictionary and sEntiment Reasoner), a lexicon-based tool specifically designed for detecting sentiment in user-generated texts.

VADER calculates 4 sentiment metrics for each text input:
- `vader_score` (compound score): A normalized weighted composite score ranging from -1 (negative) to +1 (positive). Derived from the sum of valence scores of individual words, adjusted for modifiers (e.g., "very good" amplifies positivity).
- `positive_score`, `neutral_score`, `negative_score`: Proportional metrics representing the text's positive, neutral, and negative sentiment (each ranges 0–1). The 3 scores sum to 1.

`sentiment_label` is assigned based on the compound `vader_score`.

In [26]:
# Initialise VADER
sid = SentimentIntensityAnalyzer()

# Calculates VADER sentiment
def vader_sentiment(text):
    if not isinstance(text, str) or text.strip() == "":
        return {"compound": 0.0, "positive": 0.0, "neutral": 0.0, "negative": 0.0}
    scores = sid.polarity_scores(text)
    return scores

# Schema for VADER output
vader_schema = StructType([
    StructField("compound", FloatType(), nullable=True),
    StructField("pos", FloatType(), nullable=True),
    StructField("neu", FloatType(), nullable=True),
    StructField("neg", FloatType(), nullable=True)
])

vader_udf = udf(vader_sentiment, vader_schema)
df = df.withColumn("vader_sentiment", vader_udf(col("clean_text")))

### Sentiment Scores and Label

In [27]:
df = df.withColumn("vader_score", col("vader_sentiment.compound"))
df = df.withColumn("positive_score", col("vader_sentiment.pos"))
df = df.withColumn("neutral_score", col("vader_sentiment.neu"))
df = df.withColumn("negative_score", col("vader_sentiment.neg"))

# Sentiment Label
df = df.withColumn("sentiment_label",
    when(col("vader_score") > 0.05, "Positive")
    .when(col("vader_score") < -0.05, "Negative")
    .otherwise("Neutral"))

df = df.drop("vader_sentiment")

print("\nVADER Sentiment Scores and Labels:")
df.select(
    "brand_name", "mention_count", "content_type", "clean_text", "url_domain", "vader_score", "positive_score",
    "neutral_score", "negative_score", "sentiment_label"
).show(10, truncate=50)


VADER Sentiment Scores and Labels:


ERROR:root:KeyboardInterrupt while sending command.               (17 + 1) / 24]
Traceback (most recent call last):
  File "/opt/conda/lib/python3.12/site-packages/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/conda/lib/python3.12/site-packages/py4j/clientserver.py", line 511, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
                          ^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/conda/lib/python3.12/socket.py", line 720, in readinto
    return self._sock.recv_into(b)
           ^^^^^^^^^^^^^^^^^^^^^^^
KeyboardInterrupt
25/06/06 00:33:19 ERROR Executor: Exception in task 17.0 in stage 16.0 (TID 96): Python worker exited unexpectedly (crashed)


KeyboardInterrupt: 

### Overall Sentiment Aggregation: *avg_vader_score*

In [None]:
# Associates sentiment with each brand
df_exploded = df.select(
    explode(col("brand_name")).alias("brand"),
    col("vader_score"),
    col("content_type"),
    col("year"),
    col("month"),
    col("day")
)

# Sentiment by brand and content_type
sentiment_summary = df_exploded.groupBy("brand", "content_type").agg(
    avg("vader_score").alias("avg_vader_score")
).orderBy("brand", "content_type")

# Sentiment label
sentiment_summary = sentiment_summary.withColumn(
    "avg_sentiment_label",
    when(col("avg_vader_score") > 0.05, "Positive")
    .when(col("avg_vader_score") < -0.05, "Negative")
    .otherwise("Neutral")
)

print("VADER Sentiment Summary by Brand and Content Type:")
sentiment_summary.show(truncate=False)

### 4.2 Transformer-Based
**FinBERT** model is implemented for brand sentiment analysis of UK financial services brands due to its:
- Domain-specialisation:  Explicitly trained on financial texts (10M+ finance docs), including financial news, analyst reports, earnings call transcripts, SEC/FCA filings, and other regulatory documents. It has good understanding of key financial concepts, such as, financial metrics, market movements, and regulatory language.
- Sentiment granularity: 3-class (positive/neutral/negative)
- Numerical sensitivity: Handles earnings and percentages well.
  
The following outputs are computed:
- `finbert_label` – the sentiment class with the highest average probability across all chunks
- `finbert_score` – the sentiment polarity score, calculated as Positive - Negative probability.
- `finbert_confidence`: How confident FinBERT is about its prediction


In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# FinBERT tokenizer and model
finbert_tokenizer = AutoTokenizer.from_pretrained("ProsusAI/finbert", use_fast=True)
finbert_model = AutoModelForSequenceClassification.from_pretrained("ProsusAI/finbert").to(device)
finbert_pipeline = pipeline(
    task="sentiment-analysis",
    model=finbert_model,
    tokenizer=finbert_tokenizer,
    device=0 if torch.cuda.is_available() else -1,
    torch_dtype=torch.float16,
    return_all_scores=True,
    truncation=True,
    padding=True,
    max_length=512,
    batch_size=32
)    

In [None]:
def prepare_chunks(text, tokenizer):
    if not isinstance(text, str) or not text.strip():
        return [""]
    tokens = tokenizer.tokenize(text)
    chunks = []
    for i in range(0, len(tokens), 510):  # 510 to allow for special tokens
        chunk_tokens = tokens[i:i + 510]
        chunk_text = tokenizer.convert_tokens_to_string(chunk_tokens)
        chunks.append(chunk_text)
    return chunks if chunks else [""]
    
def analyze_finbert(text):
    try:
        chunks = prepare_chunks(text, finbert_tokenizer)
        if not chunks or all(not c.strip() for c in chunks):
            return "neutral", 0.0, 0.0, {"positive": 0.0, "neutral": 1.0, "negative": 0.0}

        results = finbert_pipeline(chunks)
        cumulative_scores = {"positive": 0.0, "neutral": 0.0, "negative": 0.0}
        confidences = []
        count = 0

        for r in results:
            if isinstance(r, list):
                for entry in r:
                    label = entry["label"].lower()
                    score = entry["score"]
                    cumulative_scores[label] += score
                confidences.append(max(entry["score"] for entry in r))
                count += 1

        if count == 0:
            return "neutral", 0.0, 0.0, {"positive": 0.0, "neutral": 1.0, "negative": 0.0}

        # Normalise all scores
        avg_scores = {k: v / count for k, v in cumulative_scores.items()}
        avg_confidence = sum(confidences) / count

        # Polarity score
        polarity = avg_scores["positive"] - avg_scores["negative"]

        # Final predicted label
        if abs(polarity) < 0.15:
            final_label = "neutral"
        else:
            final_label = "positive" if polarity > 0 else "negative"

        return (
            final_label,
            round(polarity, 4),
            round(avg_confidence, 4),
            {k: round(v, 4) for k, v in avg_scores.items()}
        )

    except torch.cuda.OutOfMemoryError:
        torch.cuda.empty_cache()
        return analyze_finbert_vader_style(text)
    except Exception as e:
        print(f"FinBERT error on text {text[:50]}...: {str(e)}")
        return "neutral", 0.0, 0.0, {"positive": 0.0, "neutral": 1.0, "negative": 0.0}


In [None]:
# Filter and truncate text
df = df.filter(col("clean_text").isNotNull() & col("clean_text").cast("string").isNotNull())
df = df.withColumn("clean_text", substring(col("clean_text"), 1, 512))
df.cache()

# Row index for joining
df = df.withColumn("row_id", monotonically_increasing_id())

# Convert to Pandas for transformer processing
pandas_df = df.select("row_id", "clean_text").toPandas()

In [None]:
# Execute analysis
finbert_results = [analyze_finbert(text) for text in pandas_df["clean_text"]]

# Sentiment label, score and confidence
pandas_df["finbert_label"] = [r[0] for r in finbert_results]
pandas_df["finbert_score"] = [r[1] for r in finbert_results]
pandas_df["finbert_confidence"] = [r[2] for r in finbert_results]

# Individual sentiment scores
pandas_df["finbert_dist_positive"] = [r[3]["positive"] for r in finbert_results]
pandas_df["finbert_dist_neutral"] = [r[3]["neutral"] for r in finbert_results]
pandas_df["finbert_dist_negative"] = [r[3]["negative"] for r in finbert_results]


In [None]:
# Save to CSV
finbert_csv = "data/finbert_results.csv"
pandas_df[[
    "row_id", "finbert_label", "finbert_score", "finbert_confidence",
    "finbert_dist_positive", "finbert_dist_neutral", "finbert_dist_negative"
]].to_csv(finbert_csv, index=False)

# Transform back to Spark
transformer_df = spark.read.csv(finbert_csv, header=True, inferSchema=True)
df = df.join(transformer_df, "row_id").drop("row_id")

print("\nDisplaying DataFrame after transformer processing:")
df.select(
    "brand_name", "mention_count", "content_type", "clean_text", "vader_score", "sentiment_label",
    "finbert_label", "finbert_score", "finbert_confidence"
).show(5, truncate=50)

## 4.3 Hybrid Sentiment Analysis
The following section combines VADER and FinBERT predictions, weighted by `content_type`. VADER is up-weighted for `user_generated` and `customer_review`, and FinBERT for `news_article` and `regulatory_document`. This outputs `hybrid_sentiment`.

In [39]:
@udf(StringType())
def hybrid_sentiment(vader_score, finbert_score, content_type):
    if vader_score is None or finbert_score is None:
        return "Neutral"
    
    # Adjusted weights (sum to 1)
    if content_type in ["user_generated", "customer_review", "forum_post"]:
        weights = {"vader": 0.6, "finbert": 0.4}
    elif content_type in ["news_article", "regulatory_document"]:
        weights = {"vader": 0.3, "finbert": 0.7}
    else:
        weights = {"vader": 0.5, "finbert": 0.5}
    
    combined_score = (
        weights["vader"] * vader_score +
        weights["finbert"] * finbert_score
    )
    
    if combined_score > 0.05:
        return "Positive"
    elif combined_score < -0.05:
        return "Negative"
    return "Neutral"

df = df.withColumn("hybrid_sentiment", hybrid_sentiment(
    col("vader_score"),
    col("finbert_score"),
    col("content_type")
))

In [41]:
df.select(
    "brand_name", "mention_count", "content_type", "clean_text", "hybrid_sentiment", "vader_score", "finbert_score"
).show(10, truncate=50)

+------------+-------------+------------------+--------------------------------------------------+----------------+--------------+-------------+
|  brand_name|mention_count|      content_type|                                        clean_text|hybrid_sentiment|compound_score|finbert_score|
+------------+-------------+------------------+--------------------------------------------------+----------------+--------------+-------------+
|[nationwide]|          [1]|         blog_post|follow slashdot blog updates by subscribing to ...|        Positive|        0.9943|      -0.1004|
|       [tsb]|          [2]|    user_generated|gm ceo mary barra congressional hearings on rec...|        Negative|       -0.9969|      -0.1355|
|  [barclays]|          [4]|   customer_review|the common good former barclays ceo feels respo...|        Positive|        0.9588|      -0.7265|
|  [starling]|          [1]|      news_article|get connected facebook twitter sign in classifi...|        Negative|       -0.9876|

## 4.4 Brand-Specific Analysis

In [45]:
brand_sentiment_df = df.select(
    explode(arrays_zip(col("brand_name"), col("mention_count"))).alias("exploded"),
    col("content_type"),
    col("vader_score"),
    col("sentiment_label"),
    col("finbert_label"),
    col("finbert_score"),
    col("finbert_confidence"),
    col("hybrid_sentiment"),
    col("date"),
    col("month"),
    col("year")
).select(
    col("exploded.brand_name").alias("brand"),
    col("exploded.mention_count").alias("mentions"),
    col("content_type"),
    col("vader_score"),
    col("sentiment_label"),
    col("finbert_label"),
    col("finbert_score"),
    col("finbert_confidence"),
    col("hybrid_sentiment"),
    col("date"),
    col("month"),
    col("year")
)

# Filter for Lloyds and Barclays
brands_of_interest = ["lloyds", "barclays"]
brand_specific_df = brand_sentiment_df.filter(col("brand").isin(brands_of_interest))


### Sentiment by brand and content type

In [47]:
brand_summary = brand_specific_df.groupBy(
    "brand", "content_type", "hybrid_sentiment"
).agg({"mentions": "sum"}).withColumnRenamed("sum(mentions)", "total_mentions")

print("\nBrand Sentiment Summary (Lloyds and Barclays):")
brand_summary.orderBy("brand", "content_type").show(50, truncate=False)


Brand Sentiment Summary (Lloyds and Barclays):
+--------+-------------------+----------------+--------------+
|brand   |content_type       |hybrid_sentiment|total_mentions|
+--------+-------------------+----------------+--------------+
|barclays|advertising_content|Positive        |1             |
|barclays|blog_post          |Positive        |3             |
|barclays|customer_review    |Negative        |4             |
|barclays|customer_review    |Positive        |9             |
|barclays|faq_knowledge_base |Positive        |2             |
|barclays|news_article       |Neutral         |4             |
|barclays|news_article       |Positive        |22            |
|barclays|news_article       |Negative        |12            |
|barclays|other              |Negative        |1             |
|barclays|other              |Positive        |1             |
|lloyds  |customer_review    |Negative        |16            |
|lloyds  |customer_review    |Positive        |3             |
|lloyds

## 4.5 Visualisations

### Sentiment distribution by brand and content type

In [56]:
brand_sentiment_counts = brand_summary.groupBy("brand", "hybrid_sentiment").agg({"total_mentions": "sum"}).collect()
brands = sorted(set(row["brand"] for row in brand_sentiment_counts))
sentiments = ["Positive", "Neutral", "Negative"]
data = {sentiment: [0] * len(brands) for sentiment in sentiments}
for row in brand_sentiment_counts:
    brand_idx = brands.index(row["brand"])
    sentiment = row["hybrid_sentiment"]
    data[sentiment][brand_idx] = row["sum(total_mentions)"]

bar_chart_data = {
    "positive": data["Positive"],
    "neutral": data["Neutral"],
    "negative": data["Negative"],
    "labels": ["Barclays", "Lloyds"]
}

In [57]:
from IPython.display import display, HTML, Javascript

html_setup = """
<script src="https://cdn.jsdelivr.net/npm/chart.js"></script>
<div>
    <canvas id="barChart" width="600" height="300"></canvas>
</div>
"""
display(HTML(html_setup))

# Pass data to JavaScript
bar_chart_js_data = f"""
var barChartData = {{
    labels: {bar_chart_data['labels']},
    datasets: [
        {{
            label: 'Positive',
            data: {bar_chart_data['positive']},
            backgroundColor: 'rgba(75, 192, 192, 0.6)',
            borderColor: 'rgba(75, 192, 192, 1)',
            borderWidth: 1
        }},
        {{
            label: 'Neutral',
            data: {bar_chart_data['neutral']},
            backgroundColor: 'rgba(153, 162, 235, 0.6)',
            borderColor: 'rgba(153, 162, 235, 1)',
            borderWidth: 1
        }},
        {{
            label: 'Negative',
            data: {bar_chart_data['negative']},
            backgroundColor: 'rgba(255, 99, 132, 0.6)',
            borderColor: 'rgba(255, 99, 132, 1)',
            borderWidth: 1
        }}
    ]
}};
"""

render_bar_chart_js = """
var barCtx = document.getElementById('barChart').getContext('2d');
new Chart(barCtx, {
    type: 'bar',
    data: barChartData,
    options: {
        scales: {
            x: { title: { display: true, text: 'Brand' } },
            y: { title: { display: true, text: 'Total Mentions' }, beginAtZero: true }
        },
        plugins: {
            title: { display: true, text: 'Hybrid Sentiment Distribution for Lloyds and Barclays' },
            legend: { display: true, position: 'top' }
        }
    }
});
"""

display(Javascript(bar_chart_js_data + render_bar_chart_js))

<IPython.core.display.Javascript object>

### Temporal sentiment trends for Lloyds and Barclays (Negative sentiment over time)

In [60]:
temporal_data = temporal_summary.filter(col("hybrid_sentiment") == "Negative").groupBy("brand", "year", "month").agg({"total_mentions": "sum"}).collect()

time_labels = sorted(set(f"{row['year']}-{row['month']:02d}" for row in temporal_data))
lloyds_data = [0] * len(time_labels)
barclays_data = [0] * len(time_labels)
for row in temporal_data:
    time_idx = time_labels.index(f"{row['year']}-{row['month']:02d}")
    if row['brand'] == "lloyds":
        lloyds_data[time_idx] = row["sum(total_mentions)"]
    elif row['brand'] == "barclays":
        barclays_data[time_idx] = row["sum(total_mentions)"]

# Prepare line chart data
line_chart_data = {
    "time_labels": time_labels,
    "lloyds_data": lloyds_data,
    "barclays_data": barclays_data
}



Line Chart Data: {'time_labels': ['2014-03', '2014-07', '2014-12'], 'lloyds_data': [3, 16, 27], 'barclays_data': [3, 4, 10]}


                                                                                

In [61]:
# Include Chart.js and create canvas element
html_setup = """
<script src="https://cdn.jsdelivr.net/npm/chart.js"></script>
<div>
    <canvas id="lineChart" width="600" height="300"></canvas>
</div>
"""
display(HTML(html_setup))

# Pass line chart data to JavaScript
line_chart_js_data = f"""
var lineChartData = {{
    labels: {line_chart_data['time_labels']},
    datasets: [
        {{
            label: 'Lloyds Positive Mentions',
            data: {line_chart_data['lloyds_data']},
            borderColor: 'rgba(75, 192, 192, 1)',
            backgroundColor: 'rgba(75, 192, 192, 0.2)',
            fill: true,
            tension: 0.4
        }},
        {{
            label: 'Barclays Positive Mentions',
            data: {line_chart_data['barclays_data']},
            borderColor: 'rgba(255, 99, 132, 1)',
            backgroundColor: 'rgba(255, 99, 132, 0.2)',
            fill: true,
            tension: 0.4
        }}
    ]
}};
"""

# Render the line chart
render_line_chart_js = """
var lineCtx = document.getElementById('lineChart').getContext('2d');
new Chart(lineCtx, {
    type: 'line',
    data: lineChartData,
    options: {
        scales: {
            x: { title: { display: true, text: 'Year-Month' } },
            y: { title: { display: true, text: 'Positive Mentions' }, beginAtZero: true }
        },
        plugins: {
            title: { display: true, text: 'Positive Sentiment Trends for Lloyds and Barclays' },
            legend: { display: true, position: 'top' }
        }
    }
});
"""

# Combine and display
display(Javascript(line_chart_js_data + render_line_chart_js))

<IPython.core.display.Javascript object>

In [None]:
spark.stop()