In [1]:
import mysql.connector
import sys
sys.path.append('..')
from config import config
from pathlib import Path
import os
import json
import re
import datetime
import logging
import timeit
import csv
import matplotlib.pyplot as plt
from sentiment import sentiment_score

In [2]:
from transformers import AutoModelForSequenceClassification
from transformers import TFAutoModelForSequenceClassification
from transformers import AutoTokenizer, AutoConfig
# import time

from transformers import logging
logging.set_verbosity_error()

MODEL = f"cardiffnlp/twitter-roberta-base-sentiment-latest"
tokenizer = AutoTokenizer.from_pretrained(MODEL)
configr = AutoConfig.from_pretrained(MODEL)
model = AutoModelForSequenceClassification.from_pretrained(MODEL)

  from .autonotebook import tqdm as notebook_tqdm


In [3]:
label, score = sentiment_score.roberta("I work 40 hours a week for me to be this poor", tokenizer, model, configr)
print(label)
print(score)

negative
-0.7785103525966406


In [4]:
from vaderSentiment.vaderSentiment import SentimentIntensityAnalyzer

analyzer = SentimentIntensityAnalyzer()
analyzer.polarity_scores("I work 40 hours a week for me to be this poor")

{'neg': 0.248, 'neu': 0.752, 'pos': 0.0, 'compound': -0.561}

In [8]:
connection = mysql.connector.connect(host=config.get('HOST'), user=config.get('USERNAME'), password=config.get('PASSWORD'),database=config.get('DATABASE'), allow_local_infile=True)

In [None]:
cursor = connection.cursor()
cursor.execute("SELECT * FROM tweets ORDER BY timestamp_ms DESC")
tweets = cursor.fetchall()

In [None]:
def add_sentiment_column(connection):
    add_column_query = """
    ALTER TABLE `tweets`
    ADD COLUMN `sentiment` FLOAT NULL;
    """
    connection.cursor().execute(add_column_query)

In [None]:
add_sentiment_column(connection)

In [None]:
def process_tweet(tweet):
    id, text = tweet
    try:
        score = sentiment_score.roberta(text)
        return (id, score)
    except Exception as e:
        print(f"❌ Error: {e}, Tweet: {id}", file=sys.stderr)
        return None

In [None]:
import os
import timeit
import datetime

os.environ["TOKENIZERS_PARALLELISM"] = "false"

from multiprocessing import Pool

def update_sentiment_scores(connection):
    cursor = connection.cursor()
    cursor.execute("SELECT `id`, `text` FROM `tweets`")
    tweets = cursor.fetchall()
    total_tweets = len(tweets)

    counter = 0
    elapsed = 0
    errors = 0

    with Pool() as p:
        results = []
        for tweet in tweets:
            start = timeit.default_timer()
            res = process_tweet(tweet)
            results.append(res)
            duration = timeit.default_timer() - start
            elapsed += duration
            if counter == 0:
                time_remaining = 0  # or some default value
            else:
                time_remaining = (total_tweets - counter) * (elapsed / counter)
            print(f"Processed {counter} out of {total_tweets} tweets. Estimated time left: {str(datetime.timedelta(seconds=time_remaining))}")
            counter += 1

    # Filter out None results
    results = [r for r in results if r is not None]

    # Batch update
    update_query = "UPDATE `tweets` SET `sentiment` = CASE `id` "
    for id, score in results:
        update_query += f"WHEN {id} THEN {score} "
    update_query += "END WHERE `id` IN (" + ",".join(str(id) for id, _ in results) + ")"
    cursor.execute(update_query)
    connection.commit()
    print("Database update completed")

update_sentiment_scores(connection)

In [None]:
import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf
from pyspark.sql.types import FloatType

def update_sentiment_scores_spark():
    # Create a SparkSession with the MySQL JDBC driver
    spark = SparkSession.builder \
        .config("spark.jars", "/path/to/mysql-connector-java-version-bin.jar") \
        .getOrCreate()

    # Read environment variables
    host = os.getenv('HOST')
    username = os.getenv('USERNAME')
    password = os.getenv('PASSWORD')
    database = os.getenv('DATABASE')

    # JDBC connection string
    url = f"jdbc:mysql://{host}:3306/{database}?user={username}&password={password}"

    # Load the tweets into a DataFrame
    df = spark.read.jdbc(url=url, table="tweets")

    # Define a UDF to calculate the sentiment score
    sentiment_score_udf = udf(lambda text: sentiment_score.roberta(text), FloatType())

    # Calculate the sentiment scores
    df = df.withColumn("sentiment", sentiment_score_udf(df["text"]))

    # Write the updated DataFrame back to the database
    df.write.jdbc(url=url, table="tweets", mode="overwrite")

update_sentiment_scores_spark()

In [None]:
def update_sentiment_scores(connection):
    cursor = connection.cursor()
    cursor.execute("SELECT `id`, `text` FROM `tweets`")
    tweets = cursor.fetchall()

    counter = 0
    elapsed = 0
    n = len(tweets)
    errors = 0

    for tweet in tweets:
        start = timeit.default_timer()
        id, text = tweet
        try:
            score = sentiment_score.roberta(text)
            print(f"📊 Sentiment score: {score}")
            cursor.execute(f"UPDATE `tweets` SET `sentiment` = {score} WHERE `id` = {id}")
            if counter % round(n/10) == 0:
                connection.commit()
            print(f"✅ Tweet {id} updated.")
        except Exception as e:
            print(f"❌ Error: {e}, Tweet: {id}", file=sys.stderr)
            errors += 1

        counter += 1
        duration = timeit.default_timer() - start
        elapsed += duration
        time_remaining = (n - counter) * (elapsed / counter)
        print(f"⏯️ Process: {(counter/n)*100:.2f}% - #️⃣ {counter}/{n} tweets updated - ⏳ Time remaining : {str(datetime.timedelta(seconds=time_remaining))}")
        print("-----------------------------------")

update_sentiment_scores(connection)

In [9]:
from transformers import pipeline
sentiment_task = pipeline("sentiment-analysis", model="cardiffnlp/twitter-roberta-base-sentiment-latest", tokenizer="cardiffnlp/twitter-roberta-base-sentiment-latest")

  from .autonotebook import tqdm as notebook_tqdm
Some weights of the model checkpoint at cardiffnlp/twitter-roberta-base-sentiment-latest were not used when initializing RobertaForSequenceClassification: ['roberta.pooler.dense.bias', 'roberta.pooler.dense.weight']
- This IS expected if you are initializing RobertaForSequenceClassification from the checkpoint of a model trained on another task or with another architecture (e.g. initializing a BertForSequenceClassification model from a BertForPreTraining model).
- This IS NOT expected if you are initializing RobertaForSequenceClassification from the checkpoint of a model that you expect to be exactly identical (initializing a BertForSequenceClassification model from a BertForSequenceClassification model).


In [46]:
text = "I have an iphone"

In [47]:
sentiment_task(text)

[{'label': 'neutral', 'score': 0.6373922228813171}]