In [1]:
import os
from concurrent.futures import ThreadPoolExecutor
from typing import List, Tuple

import mysql
import mysql.connector
import pandas as pd
import tensorflow as tf
from mysql.connector import Error
from scipy.special import softmax
from sqlalchemy import create_engine
from tqdm import tqdm
from transformers import AutoTokenizer, TFAutoModelForSequenceClassification


In [2]:
def fetch_data(query: str, user, password, host, database) -> pd.DataFrame:
    engine = create_engine(f"mysql://{user}:{password}@{host}:3306/{database}")
    return pd.read_sql_query(query, engine, index_col="tweet_id")


def check_given_var(env_var_str: str) -> str:
    """
    Check if the given environment variable is set and return its value.

    Args:
        env_var_str (str): The name of the environment variable to check.

    Returns:
        str: The value of the environment variable.

    Raises:
        AssertionError: If the environment variable is not found.
    """

    env_var = os.getenv(env_var_str)
    assert (
        env_var is not None
    ), f"{env_var_str} is required but not found in environment variables"
    return env_var


def check_env_vars() -> (str, str, str, str):  # type: ignore
    user = check_given_var("DBL_USER")
    database = check_given_var("DBL_DATABASE")
    password = check_given_var("DBL_PASSWORD")
    host = check_given_var("DBL_HOST")
    return user, database, password, host


def process_batch(texts):
    """
    Apply sentiment analysis to a batch of texts using a pre-trained transformer model.

    Parameters:
    texts (list): A list of texts to analyze.

    Returns:
    list: A list of ranked sentiment labels for the input texts.

    This function uses a pre-trained transformer model for sequence classification to analyze the sentiment of the texts in the given batch. It returns a list of ranked sentiment labels for the input texts, where the first label is the most likely sentiment and the subsequent labels are less likely sentiments.
    """
    # Get the maximum sequence length for the model

    encoded_input = tokenizer(texts, return_tensors="tf", padding=True, truncation=True)
    with tf.device(device):
        output = model(encoded_input)

    scores = output[0].numpy()
    scores = softmax(scores, axis=1)

    sentiment_scores = scores[:, 2] - scores[:, 0]
    return sentiment_scores.tolist()


def apply_sentiment_analysis(df, text_column, batch_size=64, max_workers=14):
    """
    Apply sentiment analysis to the given DataFrame using a pre-trained transformer model.

    Parameters:
    df (pd.DataFrame): The input DataFrame containing the text column to analyze.
    text_column (str): The name of the column in the DataFrame containing the text to analyze.
    batch_size (int): The number of texts to process in each batch. Default is 128.
    max_workers (int): The maximum number of worker threads to use for parallel processing. Default is 4.

    Returns:
    pd.DataFrame: The input DataFrame with an additional 'sentiment' column containing the sentiment analysis results.

    This function uses a pre-trained transformer model for sequence classification to analyze the sentiment of the texts in the given DataFrame. It applies the sentiment analysis in parallel using multiple worker threads to improve performance. The results are then added to the input DataFrame as a new 'sentiment' column.
    """
    texts = df[text_column].tolist()
    results = []

    with ThreadPoolExecutor(max_workers=max_workers) as executor:
        for i in range(0, len(texts), batch_size):
            batch = texts[i : i + batch_size]
            results.extend(executor.submit(process_batch, batch).result())

    df["sentiment"] = results
    return df


def connect_to_database(user: str, database: str, password: str, host: str):
    """
    Establish a connection to the database.

    Args:
        user: The database user.
        database: The name of the database.
        password: The password for the database.
        host: The database host.

    Returns:
        A connection object to the MySQL database.
    """
    try:
        connection = mysql.connector.connect(
            user=user, password=password, host=host, database=database
        )
        if connection.is_connected():
            return connection
    except Error as e:
        print(f"Error while connecting to MySQL: {e}")
    return None


def get_batches(df: pd.DataFrame, batch_size: int = 1000) -> List[pd.DataFrame]:
    """
    Split DataFrame into batches of DataFrames with specified batch size.

    Args:
        df: The DataFrame containing tweet data.
        batch_size: The size of each batch.

    Returns:
        A list of DataFrames, each containing a batch of rows.
    """
    return [df.iloc[i : i + batch_size] for i in range(0, len(df), batch_size)]


def upload_sentiment(
    batch: Tuple[Tuple[float, str]], user: str, database: str, password: str, host: str
) -> None:
    """
    Create and insert batches of tweets into the database in parallel.

    Args:
        batches_list: Tuple of (sentiment, tweet_id) pairs.
        user: The database user.
        database: The name of the database.
        password: The password for the database.
        host: The database host.
    """
    connection = connect_to_database(user, database, password, host)
    if connection is None:
        return
    cursor = connection.cursor()
    update_query = "UPDATE Tweets SET sentiment_score = %s WHERE tweet_id = %s"
    cursor.executemany(update_query, batch)
    connection.commit()
    cursor.close()
    connection.close()

    print("Sentiment values updated successfully")


def convert_to_list(df: pd.DataFrame) -> List[List]:
    """
    Convert DataFrame with tweet_id as index to a list of lists containing sentiment and tweet_id.

    Args:
        df: The DataFrame with tweet_id as index and sentiment as a column.

    Returns:
        A list of lists containing tweet_id and sentiment.
    """
    tweet_ids = df.index.to_numpy()
    sentiments = df["sentiment"].to_numpy()
    return tuple(zip(sentiments, tweet_ids))


In [3]:
print("Start the upload")
user, database, password, host = check_env_vars()
path = os.path.join("data_processed", "local_backup.db")
query = """
SELECT tweet_id, full_text
FROM Tweets
WHERE sentiment_score IS NULL;
"""
batch_size = 10_000
# Load the tokenizer and model
model_name = "cardiffnlp/twitter-roberta-base-sentiment"
tokenizer = AutoTokenizer.from_pretrained(model_name)
model = TFAutoModelForSequenceClassification.from_pretrained(
    "cardiffnlp/twitter-roberta-base-sentiment"
)
# Check if GPU is available
device = "/GPU:0" if tf.test.is_gpu_available() else "/CPU:0"
print("All required constants defined sucessfully")


Start the upload








All model checkpoint layers were used when initializing TFRobertaForSequenceClassification.

All the layers of TFRobertaForSequenceClassification were initialized from the model checkpoint at cardiffnlp/twitter-roberta-base-sentiment.
If your task is similar to the task the model of the checkpoint was trained on, you can already use TFRobertaForSequenceClassification for predictions without further training.


Instructions for updating:
Use `tf.config.list_physical_devices('GPU')` instead.
All required constants defined sucessfully


In [4]:
test_data = fetch_data(query, user, password, host, database)
print("Data extracted successfully")

Data extracted successfully


In [5]:
data_batches = get_batches(test_data[["full_text"]], batch_size)
print("Batches created successfully")

Batches created successfully


In [6]:
for batch in tqdm(data_batches, desc="Uploading batches: "):
    sentiment_with_score = apply_sentiment_analysis(batch, "full_text")
    upload_sentiment(
        convert_to_list(sentiment_with_score), user, database, password, host
    )
print("Finished uploading semantic analysis")

Uploading batches:   0%|          | 0/615 [00:00<?, ?it/s]Asking to truncate to max_length but no maximum length is provided and the model has no predefined maximum length. Default to no truncation.
Uploading batches:   0%|          | 0/615 [02:00<?, ?it/s]


InvalidArgumentError: Exception encountered when calling layer 'embeddings' (type TFRobertaEmbeddings).

{{function_node __wrapped__ResourceGather_device_/job:localhost/replica:0/task:0/device:CPU:0}} indices[35,512] = 514 is not in [0, 514) [Op:ResourceGather] name: 

Call arguments received by layer 'embeddings' (type TFRobertaEmbeddings):
  • input_ids=tf.Tensor(shape=(64, 548), dtype=int32)
  • position_ids=None
  • token_type_ids=tf.Tensor(shape=(64, 548), dtype=int32)
  • inputs_embeds=None
  • past_key_values_length=0
  • training=False

In [None]:
tf.config.list_physical_devices('GPU')

In [None]:
tf.test.is_gpu_available()

In [None]:
data_batches