In [1]:
from datasets import load_dataset
import random
import pandas as pd
from tabulate import tabulate
from datetime import datetime
import torch
from transformers import (
    AutoModelForSequenceClassification, 
    AutoTokenizer, 
    AutoConfig,
    pipeline
)
from scipy.special import softmax
import numpy as np
from tqdm import tqdm
from langdetect import detect
import pandas as pd
import re
import textstat 
import emoji


# Enable tqdm for pandas
tqdm.pandas()

# Check if GPU is available
device = "cuda:0" if torch.cuda.is_available() else "cpu"
print(f"Using device: {device}")

# Try to update/fix transformers installation (optional)
# Uncomment these lines if you want to try reinstalling transformers
# import sys
# !{sys.executable} -m pip install --upgrade transformers accelerate

# Set up topic classification model with error handling
print("Setting up topic classification model...")
TOPIC_MODEL = "cardiffnlp/tweet-topic-base-multilingual"
try:
    topic_tokenizer = AutoTokenizer.from_pretrained(TOPIC_MODEL)
    try:
        topic_model = AutoModelForSequenceClassification.from_pretrained(
            TOPIC_MODEL, device_map="auto"
        )
    except (TypeError, ValueError):
        topic_model = AutoModelForSequenceClassification.from_pretrained(TOPIC_MODEL)
        topic_model = topic_model.to(device)
except Exception as e:
    print(f"Error loading topic model: {e}")
    print("Trying alternative loading method...")
    topic_tokenizer = AutoTokenizer.from_pretrained(TOPIC_MODEL)
    topic_model = AutoModelForSequenceClassification.from_pretrained(
        TOPIC_MODEL, ignore_mismatched_sizes=True
    )
    topic_model = topic_model.to(device)

# Create the topic classifier pipeline
try:
    # Try different device specifications based on version
    topic_classifier = pipeline(
        "text-classification", 
        model=topic_model, 
        tokenizer=topic_tokenizer, 
        top_k=1,
        device=0 if device.startswith("cuda") else -1
    )
except:
    # Fallback if the above doesn't work
    print("Using alternative pipeline configuration")
    topic_classifier = pipeline(
        "text-classification", 
        model=topic_model, 
        tokenizer=topic_tokenizer, 
        top_k=1
    )

def preprocess(text):
    new_text = []
    for t in str(text).split(" "):
        t = '@user' if t.startswith('@') and len(t) > 1 else t
        t = 'http' if t.startswith('http') else t
        new_text.append(t)
    return " ".join(new_text)

def get_tweet_topic_with_score(text):
    try:
        result = topic_classifier(text)
        top_result = result[0][0]
        return (top_result["label"], top_result["score"])
    except Exception as e:
        print(f"Error processing text: {text[:50]}... Error: {e}")
        return ("error", 0.0)

def textstat_readability(text):
    return textstat.flesch_kincaid_grade(text)

Using device: cpu
Setting up topic classification model...


Device set to use cpu


Using alternative pipeline configuration


In [2]:
existing_tweets = pd.read_csv("Filtered_Tweets_english_1.csv")
existing_texts = set(existing_tweets["tweet"].astype(str).str.strip())

In [5]:
# First, load and process the users dataset to identify verified users with 10k+ followers
print("Loading users dataset to identify verified users with high follower counts...")
users_dataset = load_dataset("enryu43/twitter100m_users", split="train", streaming=True)

# Create a dictionary to store eligible user IDs and their follower counts
eligible_users = {}

# Stream through users dataset to find verified users with 100k+ followers
for row in users_dataset:
    if row.get("followers", 0) > 100:
        eligible_users[row["user"]] = row["followers"]

print(f"Found {len(eligible_users)} users with over 100 followers")

# Now load and filter tweets dataset
print("Now filtering tweets dataset...")
tweets_stream = load_dataset("enryu43/twitter100m_tweets", split="train", streaming=True)

# Initialize reservoir sampling
sample_size = 500000
reservoir = []
count = 0
processed = 0
seen_tweets = set()  # Track unique tweets to avoid duplicates

# Stream and filter on-the-fly
for row in tweets_stream:
    processed += 1
    if processed % 5000000 == 0:
        print(f"Processed {processed} total tweets, found {count} eligible tweets")
    
    # First check that text field exists and is not empty
    if not row.get("tweet") or str(row.get("tweet")).strip() == "":
        continue
    
    # Check for any other null values in important fields
    if row.get("user") is None or row.get("date") is None:
        continue

    
    # Check if we've already seen this tweet text
    tweet_text = str(row.get("tweet")).strip()
    if tweet_text in seen_tweets or tweet_text in existing_texts:
        continue  # Skip duplicate tweets
    
    # Calculate engagement score
    engagement_score = (row.get("likes", 0) + 
                        row.get("replies", 0) + 
                        row.get("retweets", 0) + 
                        row.get("quotes", 0))

    # Create a copy of the row and add the engagement score
    row_copy = dict(row)
    row_copy["engagement_score"] = engagement_score
        
    # Check all conditions before considering for the reservoir:
    # 1. From March 2023
    # 2. Has engagement score > 1
    # 3. From a verified user with 10k+ followers
    if ("date" in row and 
        row["date"].startswith("2023-03") and
        row["user"] in eligible_users and
        engagement_score > 1 and "http" not in row["tweet"]):
        
        count += 1
        
        if len(reservoir) < sample_size:
            reservoir.append(row_copy)  # Fill up the reservoir first
            seen_tweets.add(tweet_text)  # Mark this tweet as seen
        else:
            # Replace random items in the reservoir with decreasing probability
            replace_idx = random.randint(0, count - 1)
            if replace_idx < sample_size:
                # Remove the old tweet text from seen_tweets
                old_tweet_text = str(reservoir[replace_idx].get("tweet")).strip()
                seen_tweets.discard(old_tweet_text)
                
                # Add the new tweet
                reservoir[replace_idx] = row_copy
                seen_tweets.add(tweet_text)  # Mark this tweet as seen

# Convert to Pandas DataFrame
tweets = pd.DataFrame(reservoir)

# Final quality check - remove any rows with NaN values in critical columns
tweets = tweets.dropna(subset=["tweet", "user", "date"])

# Add follower count to each row
tweets["follower_count"] = tweets["user"].map(eligible_users)

# Calculate average engagement per user
user_avg_engagement = tweets.groupby("user")["engagement_score"].mean().reset_index()
user_avg_engagement.rename(columns={"engagement_score": "avg_engagement"}, inplace=True)

# Merge the average engagement back into the main DataFrame
tweets = pd.merge(tweets, user_avg_engagement, on="user", how="left")

tweets.to_csv('checkpoint1_test.csv', index=False)


Loading users dataset to identify verified users with high follower counts...
Found 106898 users with over 100 followers
Now filtering tweets dataset...


Resolving data files:   0%|          | 0/41 [00:00<?, ?it/s]

Processed 5000000 total tweets, found 127402 eligible tweets
Processed 10000000 total tweets, found 245914 eligible tweets
Processed 15000000 total tweets, found 375055 eligible tweets
Processed 20000000 total tweets, found 505844 eligible tweets
Processed 25000000 total tweets, found 631348 eligible tweets
Processed 30000000 total tweets, found 762494 eligible tweets
Processed 35000000 total tweets, found 889592 eligible tweets
Processed 40000000 total tweets, found 1021738 eligible tweets
Processed 45000000 total tweets, found 1136438 eligible tweets
Processed 50000000 total tweets, found 1252888 eligible tweets
Processed 55000000 total tweets, found 1380107 eligible tweets
Processed 60000000 total tweets, found 1501550 eligible tweets
Processed 65000000 total tweets, found 1626450 eligible tweets
Processed 70000000 total tweets, found 1751556 eligible tweets
Processed 75000000 total tweets, found 1879238 eligible tweets
Processed 80000000 total tweets, found 2010218 eligible tweets


In [6]:
from tqdm import tqdm
tqdm.pandas()  # Enable the progress_apply method

# Add tweet character length 
tweets["characters"] = tweets["tweet"].progress_apply(lambda x: len(str(x)) if pd.notnull(x) else 0)
# Add tweet word count
tweets["word_count"] = tweets["tweet"].progress_apply(lambda x: len(str(x).split()))
# Add hashtag count
tweets["hashtag_count"] = tweets["tweet"].progress_apply(lambda x: str(x).count("#"))
# Add mention count 
tweets["mention_count"] = tweets["tweet"].progress_apply(lambda x: str(x).count("@"))
# Add a bool for if it has a url (we can also filter this out if we want) 
tweets["has_url"] = tweets["tweet"].progress_apply(lambda x: "http" in str(x))
# Create a new column 'is_reply' where True if tweet starts with '@', else False
tweets['is_reply'] = tweets['tweet'].str.startswith('@', na=False)
# Add emoji count 
import emoji
tweets["emoji_count"] = tweets["tweet"].progress_apply(lambda x: sum(1 for char in str(x) if char in emoji.EMOJI_DATA))
# Function to remove hashtags and mentions before we detect the language 
def clean_text(text):
    # Remove mentions (anything starting with @) and hashtags (anything starting with #)
    text = re.sub(r'@\w+', '', text)  # Removes mentions
    text = re.sub(r'#\w+', '', text)  # Removes hashtags
    return text
# Function to detect language
def detect_language(text):
    try:
        cleaned_text = clean_text(text)  # Clean the tweet text
        return detect(cleaned_text)  # Detect language from cleaned text
    except:
        return "unknown"
# Apply language detection to each tweet
tweets["language"] = tweets["tweet"].progress_apply(detect_language)
# ISO 639-1 language code to full name
lang_map = {
    "af": "Afrikaans",
    "ar": "Arabic",
    "bg": "Bulgarian",
    "bn": "Bengali",
    "ca": "Catalan",
    "cs": "Czech",
    "cy": "Welsh",
    "da": "Danish",
    "de": "German",
    "el": "Greek",
    "en": "English",
    "es": "Spanish",
    "et": "Estonian",
    "fa": "Persian (Farsi)",
    "fi": "Finnish",
    "fr": "French",
    "gu": "Gujarati",
    "he": "Hebrew",
    "hi": "Hindi",
    "hr": "Croatian",
    "hu": "Hungarian",
    "id": "Indonesian",
    "it": "Italian",
    "ja": "Japanese",
    "kn": "Kannada",
    "ko": "Korean",
    "lt": "Lithuanian",
    "lv": "Latvian",
    "mk": "Macedonian",
    "ml": "Malayalam",
    "mr": "Marathi",
    "ne": "Nepali",
    "nl": "Dutch",
    "no": "Norwegian",
    "pa": "Punjabi",
    "pl": "Polish",
    "pt": "Portuguese",
    "ro": "Romanian",
    "ru": "Russian",
    "sk": "Slovak",
    "sl": "Slovenian",
    "so": "Somali",
    "sq": "Albanian",
    "sv": "Swedish",
    "sw": "Swahili",
    "ta": "Tamil",
    "te": "Telugu",
    "th": "Thai",
    "tl": "Tagalog",
    "tr": "Turkish",
    "uk": "Ukrainian",
    "ur": "Urdu",
    "vi": "Vietnamese",
    "zh-cn": "Chinese (Simplified)",
    "zh-tw": "Chinese (Traditional)",
    "unknown": "Unknown"
}
# Map the codes to full language names
tweets["language_full"] = tweets["language"].map(lang_map).fillna("No_map")
# Filter to only keep English tweets
tweets = tweets[tweets["language"] == "en"].reset_index(drop=True)
tweets['textstat_readability'] = tweets['tweet'].progress_apply(textstat_readability)
# Final DataFrame now has engagement_score, follower_count, avg_engagement, sentiment scores and topic
print(f"Final sample contains {len(tweets)} tweets from verified and unverified users with >100 followers")
tweets.head()
tweets.to_csv('checkpoint2_test.csv', index=False)

100%|██████████| 500000/500000 [00:00<00:00, 976216.40it/s] 
100%|██████████| 500000/500000 [00:00<00:00, 585749.04it/s]
100%|██████████| 500000/500000 [00:00<00:00, 1155219.51it/s]
100%|██████████| 500000/500000 [00:00<00:00, 1150179.40it/s]
100%|██████████| 500000/500000 [00:00<00:00, 1424647.26it/s]
100%|██████████| 500000/500000 [00:02<00:00, 171100.10it/s]
100%|██████████| 500000/500000 [18:37<00:00, 447.53it/s]
100%|██████████| 243399/243399 [00:20<00:00, 11676.25it/s]


Final sample contains 243399 tweets from verified and unverified users with >100 followers


In [7]:
# Run topic classification on all tweets
print("Running topic classification on all tweets...")
# We'll use progress_apply to show a progress bar during processing
tweets[["topic", "topic_score"]] = tweets["tweet"].progress_apply(
    get_tweet_topic_with_score
).apply(pd.Series)



Running topic classification on all tweets...


100%|██████████| 243399/243399 [2:50:44<00:00, 23.76it/s]  


In [8]:
tweets.to_csv('Filtered_Tweets_Test.csv', index=False)

In [9]:
tweets[["id", "tweet"]].to_csv("tweet_only_Test.csv", index=False)