In [None]:
import tweepy
import pandas as pd
from datetime import datetime, timedelta, timezone
import logging
import os
import re
import time
from transformers import AutoModelForSequenceClassification, AutoTokenizer
from datasets import Dataset
import numpy as np
from transformers import Trainer, TrainingArguments
from torch.utils.data import Subset
from transformers import AutoModelForSequenceClassification, AutoTokenizer, Trainer, TrainingArguments
import torch
import numpy as np
import pandas as pd
from torch.nn.functional import softmax

In [None]:
# Configure logging to give updates on completed steps
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

#Function to update masterlist
def update_masterlist(new_data, master_file='masterlist.csv'):
    """
    Check if a previous masterlist csv file exists, create a new file if it does not or update the master list with new data and save it to a CSV file.
    
    Args:
        new_data (pd.DataFrame): New data to add to the master list.
        master_file (str): Path to already-existing master list CSV file.
        
    Returns:
        pd.DataFrame: Updated master list DataFrame.
    """
    if os.path.exists(master_file):
        master_df = pd.read_csv(master_file)
        logging.info(f"Loaded existing master list with {len(master_df)} entries.") #logging updates on completed steps
    else:
        master_df = pd.DataFrame()
        logging.info("No master list found. Creating a new one.")
    
    #Updating existing masterlist with new data
    updated_master = pd.concat([master_df, new_data], ignore_index=True)
    updated_master = updated_master.drop_duplicates(subset='tweet_id', keep='last')
    updated_master.to_csv(master_file, index=False)
    logging.info(f"Master list updated and saved to {master_file}. Total entries: {len(updated_master)}") #logging updates on completed steps
    return updated_master

#Function to fetch tweets
def get_recent_tweets_with_context(handle, bearer_token, days_back=6, master_file='masterlist.csv'):
    """
    Retrieve recent tweets
    
    Args:
        handle (str): Twitter handle to fetch tweets for.
        bearer_token (str): Twitter API bearer token.
        days_back (int): Number of days to look back for tweets.
        master_file (str): Path to the master list CSV file.
        
    Returns:
        pd.DataFrame: Updated master list DataFrame with recent tweets.
    """
    client = tweepy.Client(bearer_token=bearer_token, wait_on_rate_limit=True)

    #Specify how far back function can retrieve tweets
    end_time = datetime.now(timezone.utc).replace(microsecond=0) - timedelta(seconds=30)
    start_time = end_time - timedelta(days=days_back)
    
    start_time_str = start_time.strftime('%Y-%m-%dT%H:%M:%SZ')
    end_time_str = end_time.strftime('%Y-%m-%dT%H:%M:%SZ')
    
    query = f"@{handle} -is:retweet lang:en"

    #Retrieving posts
    try:
        paginator = tweepy.Paginator(
            client.search_recent_tweets,
            query=query,
            start_time=start_time_str,
            end_time=end_time_str,
            max_results=100,
            tweet_fields=["id", "created_at", "text", "public_metrics"]
        )
        
        tweets_data = []
        for response in paginator:
            if response.data:
                for tweet in response.data:
                    public_metrics = tweet["public_metrics"]
                    
                    tweets_data.append({
                        'tweet_id': tweet.id,
                        'date': tweet["created_at"],
                        'texts': tweet["text"],
                        'likes': public_metrics.get('like_count', 0),
                        'retweets': public_metrics.get('retweet_count', 0),
                        'replies': public_metrics.get('reply_count', 0),
                        'views': public_metrics.get('impression_count', 0),
                    })
        
        df = pd.DataFrame(tweets_data)
        updated_master_df = update_masterlist(df, master_file)
        return updated_master_df

    except Exception as e:
        logging.error(f"Error retrieving tweets: {e}")
        return pd.DataFrame()  # Return empty DataFrame on failure

# Replace with your actual Twitter API bearer token
BEARER_TOKEN = 'API_KEY'
handle = "@BANK_NAME"  # Replace with the desired Twitter handle

masterlist_df = get_recent_tweets_with_context(handle, BEARER_TOKEN)

In [None]:
masterlist_df.head()

In [None]:
#Load model
load_directory = "Replace with path of model"

# Load model and tokenizer
model = AutoModelForSequenceClassification.from_pretrained(load_directory)
tokenizer = AutoTokenizer.from_pretrained(load_directory)

# Set model to evaluation mode
model.eval()

# Load the trainer with the model
trainer = Trainer(model=model)

print("Model and tokenizer loaded successfully!")


In [None]:
# Create teokenization function
def tokenize_function(examples):
    return tokenizer(examples["texts"], truncation=True, padding="max_length", max_length=128)

In [None]:
#Convert masterlist to test dataframe
# Ensure test dataset has only text column

test_df = masterlist_df.drop(columns = ["tweet_id", "date", "likes", "retweets", "replies", "views"])


test_dataset = Dataset.from_pandas(test_df)
test_dataset = test_dataset.map(tokenize_function, batched=True)

In [None]:
# Convert test dataset to PyTorch format
test_dataset.set_format(type="torch", columns=["input_ids", "attention_mask"])

# Get model predictions
predictions = trainer.predict(test_dataset)

# Convert logits to predicted class labels
preds = np.argmax(predictions.predictions, axis=1)

# Convert logits to probabilities
probs = softmax(torch.tensor(predictions.predictions), dim=1).numpy()

# Add predictions and probabilities to test DataFrame
test_df["predicted_labels"] = preds
test_df["max_probability"] = np.round(np.max(probs, axis=1), 2)

test_df.head()

In [None]:
#Adding predicted labels and probabilities to masterlist
# Ensure both DataFrames have the same length

if len(masterlist_df) == len(test_df):
    masterlist_df["predicted_labels"] = test_df["predicted_labels"].values
    masterlist_df["max_probability"] = test_df["max_probability"].values
else:
    print("Error: DataFrames have different lengths!")

#Converting numerical predicted labels back to word labels
# Define mapping dictionary
label_mapping = {
    0: "Complaint",
    1: "Enquiry",
    2: "Other",
    3: "Praise",
    4: "Promo",
    5: "Reaction",
    6: "Recommendation",
    7: "Response"
}

# Apply mapping to convert numerical labels to word labels
masterlist_df["word_labels"] = masterlist_df["predicted_labels"].map(label_mapping)

#Filtering for labels with probabilities greater than or equal to 0.50
final_df = masterlist_df[masterlist_df["max_probability"] >= 0.50]

final_df.head()

In [None]:
#Drop numerical label and keep word label
final_df.drop(columns = 'predicted_labels', inplace=True)

In [None]:
final_df.head()

In [None]:
# Save the merged DataFrame
final_df.to_csv("sentiments.csv", index=False, mode='w')