# Import Statements


In [1]:
import requests
import time
import pandas as pd
import csv
from transformers import AutoModelForSequenceClassification, pipeline, AutoTokenizer
import preprocessor as p
from geopy.geocoders import Nominatim
from os import listdir
import os

  from .autonotebook import tqdm as notebook_tqdm
None of PyTorch, TensorFlow >= 2.0, or Flax have been found. Models won't be available and only tokenizers, configuration and file/data utilities can be used.


# Data Collection


In [18]:
APP_KEY = "xxxxxxxxxxxxxxxxxxxxxxxx"
APP_SECRET = "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
SAMPLES_DIR = "./data"
MAX_SAMPLES = 2 # Limit of the top n lines to read in each file sample

In [5]:
trust_website = pd.read_csv("websites.csv")
trust_websites_map: map = {}
sites = list(trust_website["site"])
site_scores = list(trust_website["trust"])
for i in range(0, len(sites)):
    trust_websites_map.setdefault(sites[i], site_scores[i])

In [6]:
def get_tweet_trust_score(external_url: str):
    if external_url:
        return trust_websites_map.get(external_url, 0)
    else:
        return 0

In [7]:
def get_bearer_token():
    url = 'https://api.twitter.com/oauth2/token'
    auth = (APP_KEY, APP_SECRET)
    data = {'grant_type': 'client_credentials'}

    response = requests.post(url, auth=auth, data=data)

    if response.status_code == 200:
        return response.json()["access_token"]

In [8]:
def get_external_url_from_tweet(tweet)->set:
    external_urls_objects = tweet['entities'].get("urls", None)
    if external_urls_objects:
        for url_object in external_urls_objects:
            try:
                url = url_object['display_url'].split('/', 1)[0]
                if "twitter.com" not in url:
                    return url
            except Exception as ex:
                continue
        return None;
    else:
        return None

In [9]:
def get_metrics_from_user(user):
    followers_count = user["public_metrics"].get("followers_count", None)
    tweet_count = user["public_metrics"].get("tweet_count", None)
    
    return (followers_count, tweet_count)

In [10]:
def get_hashtags_texts(hashtags: list):
    hashtags_list = []
    for hashtag in hashtags:
        hashtags_list.append(hashtag.get("tag", None))
        
    return hashtags_list

In [11]:
def get_location_from_user(user):
    
    location = user.get("location", None)
    
    if location:
        geolocator = Nominatim(user_agent="geoapiExercises")
        parsed_location = geolocator.geocode(location, addressdetails=True)
        if parsed_location:
            address = parsed_location.raw["address"]
            country = address.get("country", None)
            state = address.get("state", None)  
            
            return (location, country, state)
    else:
        return (None, None, None)

In [12]:
def get_user(user_id: str):
    url = 'https://api.twitter.com/2/users'
    params = {
        'ids': user_id,
        'user.fields': 'location,public_metrics'
    }
    headers = {
        'Authorization': f'Bearer {get_bearer_token()}'
    }

    response = requests.get(url, params=params, headers=headers)  

    # Check if the request was successful
    if response.status_code == 200 and "errors" not in response.json():
        data = response.json()["data"][0]
        return data
    elif response.status_code == 429:
        print("Falling asleep for 15 minutes...")
        time.sleep(960)
        return get_user(user_id)
    else:
        print(f"Request failed with status code {response.status_code}")
        print(response.text)

In [13]:
def get_tweet(tweet_id: str):
    url = 'https://api.twitter.com/2/tweets'
    params = {
        'ids': tweet_id,
        'tweet.fields': 'referenced_tweets,author_id,created_at,public_metrics,entities'
    }
    headers = {
        'Authorization': f'Bearer {get_bearer_token()}'
    }

    response = requests.get(url, params=params, headers=headers)

    # Check if the request was successful
    if response.status_code == 200 and "errors" not in response.json():
        data = response.json()["data"][0]
        if "referenced_tweets" in data:
            return get_tweet(data["referenced_tweets"][0]["id"])
        else:
            return data
    elif response.status_code == 429:
        print("Falling asleep for 15 minutes...")
        time.sleep(960)
        return get_tweet(tweet_id)
    else:
        print(f"Request failed with status code {response.status_code}")
        print(response.text)


In [22]:
def get_sample_row(data: map):
    try:
        created_at = data["created_at"]
        tweet_id = data["id"]
        user_id =data["author_id"]
        text = data["text"]
        retweets_count = data["public_metrics"]["retweet_count"]
        likes_count = data["public_metrics"]["like_count"]
        replies_count = data["public_metrics"]["reply_count"]
        hashtags = data["entities"].get("hashtags", None)
        hashtags_text = None
        if hashtags:
            hashtags_text = get_hashtags_texts(hashtags)
        url = get_external_url_from_tweet(data)
        tweet_trust_score = get_tweet_trust_score(url)
        user = get_user(user_id)
        location, country, state = get_location_from_user(user)
        followers_count, tweet_count = get_metrics_from_user(user)

        row = {
            "createdAt": [created_at],
            "tweetId": [tweet_id],
            "userId": user_id,
            "user_followers_count": [followers_count],
            "user_tweet_count": [tweet_count],
            "location": [location],
            "country": [country],
            "state": [state],
            "text": [text],
            "retweetsCount": [retweets_count],
            "likesCount": [likes_count],
            "repliesCount": [replies_count],
            "hashtags": [hashtags_text],
            "url": [url],
            "tweetTrustScore": [tweet_trust_score]
        }
        
        return row
    except Exception as ex:
        print(ex)
        return None  

In [15]:
def get_samples_files():
    return listdir(SAMPLES_DIR)

In [20]:
def collect_rows_from_tweetIds(df, tweetIds: list):
    for i, tweet_id in enumerate(lines):
        tweet_id = tweet_id.strip()
        data = get_tweet(tweet_id)
        if data:
            row = get_sample_row(data)
            if row:
                new_df = pd.DataFrame(row)
                df = pd.concat([df, new_df], ignore_index=True)
        if i == MAX_SAMPLES: break
    return df

In [23]:
samples_files = get_samples_files()
df = pd.DataFrame()

for file_name in samples_files:
    sample_file_path = os.path.join(SAMPLES_DIR, file_name)
    print("File "+file_name+" is being processed...")
    with open(sample_file_path) as sample_file:
        lines = sample_file.readlines()
        df = collect_rows_from_tweetIds(df, lines)

File 2023-02_tweet_ids.txt is being processed...
'entities'
File 2023-01_tweet_ids.txt is being processed...


In [24]:
df.to_csv("collected_samples.csv", quoting=csv.QUOTE_ALL)

# Sentiment Analysis


In [None]:
def preprocess_tweet(tweet: str):
    tweet = p.clean(tweet)
    tweet = tweet.replace('\d+', '')
    tweet = tweet.lower()
    
    return tweet

In [None]:
labels = []
texts = list(df["text"])
model = AutoModelForSequenceClassification.from_pretrained("/media/vangelis/vag/sentiment_amalysis_final")

tokenizer = AutoTokenizer.from_pretrained("roberta-large")

In [None]:
for text in texts:
    classifier = pipeline("sentiment-analysis", model=model, tokenizer=tokenizer)
    text = preprocess_tweet(text)
    label = 1 if classifier(text)=='LABEL_1' else 0
    labels.append()
