Reference to extracting all labels 
https://datascience.stackexchange.com/questions/112438/how-to-get-all-3-labels-sentiment-from-finbert-instead-of-the-most-likely-label

essentially, us the AutoModelForSequenceClassification to get all raw logits and then apply softmax ourselves 

normally the pipeline does the softmax and ONLY returns the highest

In [1]:
import multiprocessing

num_cores = multiprocessing.cpu_count()
print(f"Total CPU cores available: {num_cores}")

Total CPU cores available: 16


In [2]:
# !pip install bokeh

# !pip install pyarrow==10.0.1

In [3]:
import pandas as pd
import dask
import pyarrow
import dask.dataframe as dd
from dask.diagnostics import ProgressBar
import duckdb
import torch
import torch.nn.functional as F
from transformers import AutoTokenizer, AutoModelForSequenceClassification
import numpy as np
from dask.distributed import Client, LocalCluster
import bokeh

print("Pandas version:", pd.__version__)
print("Dask version:", dask.__version__)
print("PyArrow version:", pyarrow.__version__)

Pandas version: 2.2.3
Dask version: 2025.2.0
PyArrow version: 19.0.1


In [4]:
# Load FinBERT model and tokenizer
model_name = "yiyanghkust/finbert-tone"
tokenizer = AutoTokenizer.from_pretrained(model_name)
model = AutoModelForSequenceClassification.from_pretrained(model_name)
model.eval()  # Put model in evaluation mode

def classify_sentiment(text):
    if not isinstance(text, str) or text.strip() == "":
        return {"label": "NEUTRAL", "score": 1.0, "positive": 0.0, "neutral": 1.0, "negative": 0.0}
    
    # Tokenize input text
    # inputs = tokenizer(text[:512], return_tensors="pt", truncation=True)
    # Getting truncation warning. I'ma use tokenizer truncation instead
    inputs = tokenizer(text, return_tensors="pt", truncation=True, max_length=512)

    with torch.no_grad():  # Disable gradients
        outputs = model(**inputs)

    logits = outputs.logits  # Get raw model outputs (logits)
    probs = F.softmax(logits, dim=1)  # Apply softmax across dimension 1 (classes)

    # Convert to a Python list
    probs = probs.numpy()[0]  # Extract probabilities as a NumPy array

    # Define label mapping
    labels = ["NEGATIVE", "NEUTRAL", "POSITIVE"]
    sentiment_dict = dict(zip(labels, probs))

    # Get the highest-probability label
    max_label = labels[torch.argmax(logits).item()]
    max_score = max(probs)

    return {
        "label": max_label,
        "score": max_score,
        "positive": sentiment_dict["POSITIVE"],
        "neutral": sentiment_dict["NEUTRAL"],
        "negative": sentiment_dict["NEGATIVE"]
    }


**CANT START A CLIENT AND CLUSTER BEFORE LOADING FINBERT**

In [14]:
# Try to avoid PyArrow
pd.options.mode.string_storage = "python"

# cluster = LocalCluster(n_workers=num_cores//2, threads_per_worker=1)
cluster = LocalCluster(n_workers=10, threads_per_worker=1) # upping to full CPU cores when not using my laptop

cluster.adapt(minimum=1, maximum=10)
client = Client(cluster)

pbar = ProgressBar()
pbar.register()
print(client.dashboard_link)
print(client.ncores())

http://127.0.0.1:8787/status
{'tcp://127.0.0.1:55584': 1, 'tcp://127.0.0.1:55585': 1, 'tcp://127.0.0.1:55586': 1, 'tcp://127.0.0.1:55587': 1, 'tcp://127.0.0.1:55588': 1, 'tcp://127.0.0.1:55589': 1, 'tcp://127.0.0.1:55590': 1, 'tcp://127.0.0.1:55591': 1, 'tcp://127.0.0.1:55592': 1, 'tcp://127.0.0.1:55611': 1}


In [None]:
import os 
from dotenv import load_dotenv
from pathlib import Path
load_dotenv()
DB_PATH = Path(os.getenv("DB_PATH"))
DB_FILE = os.getenv("DB_FILE")
duckdb_path = DB_PATH / DB_FILE

In [None]:
con = duckdb.connect(duckdb_path, read_only=True)
# df = con.execute("SELECT guid, description, article_title, ticker FROM headlines.articles").fetchdf()

# okay getting a bottle neck at pandas to ddf so ognna write as csv 
# df.to_csv("articles_db.csv", index=False)

# try writing to parquet instead
# file_name = 'articles_db.parquet'
# con.execute(f"COPY (SELECT guid, description, article_title, ticker FROM headlines.articles) TO '{file_name}' WITH (HEADER, DELIMITER ',');")
# con.execute(f"COPY (SELECT guid, description, article_title, ticker FROM headlines.articles) TO '{file_name}' (FORMAT 'parquet');")
# try partitioning based on date 

# will try to partition based on date later 
output_dir = "articles_partitioned/"
con.execute(f'''
    COPY (
        SELECT 
            guid, 
            description, 
            article_title, 
            ticker, 
            article_pubdate,
            YEAR(article_pubdate) AS year, 
            MONTH(article_pubdate) AS month
        FROM headlines.articles
    ) 
    TO '{output_dir}' 
    (FORMAT 'parquet', PARTITION_BY (year, month));
''')

con.close()
# df.head()

In [8]:
# testing only
# filtered_df = df.head(1000)
# len(filtered_df)
# ddf = dd.read_csv("articles_db.csv", assume_missing=True, dtype={'guid': 'object', 'description': 'object', 'article_title': 'object', 'ticker': 'object'})
# read parquet 
# ddf = dd.read_parquet(file_name, engine='pyarrow')

# read from articles_partitioned output_dir 
ddf = dd.read_parquet(output_dir, engine='pyarrow')
# check partitions in ddf 
print(ddf.npartitions)


23


In [None]:
# Enable Progress Bar
with ProgressBar():
    # Process title sentiment
    ddf['finbert_title'] = ddf.map_partitions(
        lambda df: df['article_title'].apply(classify_sentiment), meta=("x", "object")
    )
    ddf['finbert_title_label'] = ddf.map_partitions(
        lambda df: df['finbert_title'].apply(lambda x: x['label']), meta=("x", "str")
    )
    ddf['finbert_title_score'] = ddf.map_partitions(
        lambda df: df['finbert_title'].apply(lambda x: x['score']), meta=("x", "float")
    )
    ddf['finbert_title_positive'] = ddf.map_partitions(
        lambda df: df['finbert_title'].apply(lambda x: x['positive']), meta=("x", "float")
    )
    ddf['finbert_title_neutral'] = ddf.map_partitions(
        lambda df: df['finbert_title'].apply(lambda x: x['neutral']), meta=("x", "float")
    )
    ddf['finbert_title_negative'] = ddf.map_partitions(
        lambda df: df['finbert_title'].apply(lambda x: x['negative']), meta=("x", "float")
    )

    # Process description sentiment
    ddf['finbert_description'] = ddf.map_partitions(
        lambda df: df['description'].apply(classify_sentiment), meta=("x", "object")
    )
    ddf['finbert_description_label'] = ddf.map_partitions(
        lambda df: df['finbert_description'].apply(lambda x: x['label']), meta=("x", "str")
    )
    ddf['finbert_description_score'] = ddf.map_partitions(
        lambda df: df['finbert_description'].apply(lambda x: x['score']), meta=("x", "float")
    )
    ddf['finbert_description_positive'] = ddf.map_partitions(
        lambda df: df['finbert_description'].apply(lambda x: x['positive']), meta=("x", "float")
    )
    ddf['finbert_description_neutral'] = ddf.map_partitions(
        lambda df: df['finbert_description'].apply(lambda x: x['neutral']), meta=("x", "float")
    )
    ddf['finbert_description_negative'] = ddf.map_partitions(
        lambda df: df['finbert_description'].apply(lambda x: x['negative']), meta=("x", "float")
    )
    ddf.to_csv("articles_with_finbert_scores.csv")

# Convert back to Pandas
# df_final = ddf.compute()

# Save results
# df_final.to_csv("articles_with_all_finbert_scores.csv", index=False)


This may cause some slowdown.
Consider loading the data with Dask directly
 or using futures or delayed objects to embed the data into the graph without repetition.
See also https://docs.dask.org/en/stable/best-practices.html#load-data-with-dask for more information.




In [None]:
# # Convert Pandas DataFrame to Dask DataFrame
# # ddf = dd.from_pandas(filtered_df, npartitions=8)  # Adjust partitions based on CPU cores
# # ddf = dd.from_pandas(df, npartitions=8)   # gonna set it to 4 cause 400k rows / 4 = 100k rows per partition 
# # raise ValueError("stop here")
# # Apply FinBERT sentiment analysis in parallel
# with ProgressBar():
#     ddf['finbert_title'] = ddf['article_title'].map(classify_sentiment, meta=("x", "object"))
#     ddf['finbert_title_label'] = ddf['finbert_title'].map(lambda x: x['label'], meta=("x", "str"))
#     ddf['finbert_title_score'] = ddf['finbert_title'].map(lambda x: x['score'], meta=("x", "float"))
#     ddf['finbert_title_positive'] = ddf['finbert_title'].map(lambda x: x['positive'], meta=("x", "float"))
#     ddf['finbert_title_neutral'] = ddf['finbert_title'].map(lambda x: x['neutral'], meta=("x", "float"))
#     ddf['finbert_title_negative'] = ddf['finbert_title'].map(lambda x: x['negative'], meta=("x", "float"))

#     ddf['finbert_description'] = ddf['description'].map(classify_sentiment, meta=("x", "object"))
#     ddf['finbert_description_label'] = ddf['finbert_description'].map(lambda x: x['label'], meta=("x", "str"))
#     ddf['finbert_description_score'] = ddf['finbert_description'].map(lambda x: x['score'], meta=("x", "float"))
#     ddf['finbert_description_positive'] = ddf['finbert_description'].map(lambda x: x['positive'], meta=("x", "float"))
#     ddf['finbert_description_neutral'] = ddf['finbert_description'].map(lambda x: x['neutral'], meta=("x", "float"))
#     ddf['finbert_description_negative'] = ddf['finbert_description'].map(lambda x: x['negative'], meta=("x", "float"))

# # print("before compute")
# # Convert back to Pandas
# df_final = ddf.compute()
# # print("after compute")

# # Save results
# df_final.to_csv("articles_with_all_finbert_scores.csv", index=False)


In [16]:
# con.close()

client.close()
cluster.close()