<a href="https://colab.research.google.com/github/pauloprsdesouza/recsys-twitter-social-capital/blob/dev-colab/RecSysTwitterSocialCapital.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Initialize Dependencies

In [None]:
pip install emoji

In [None]:
pip install python-twitter-v2

In [None]:
pip install transformers

In [None]:
pip install pyspellchecker

## Import necessary libraries

In [None]:
import re
import string
import nltk
from nltk.corpus import wordnet
from nltk.tokenize import word_tokenize
from nltk.stem import WordNetLemmatizer
from nltk.corpus import stopwords
from nltk.stem import SnowballStemmer
from nltk.corpus import floresta
import datetime
import emoji
import math
from pytwitter.models import User, Tweet, TweetEntities, TweetEntitiesUrl, TweetPublicMetrics, TweetEntitiesMention, TweetEntitiesHashtag, TweetEntitiesAnnotation
from pytwitter import Api
from sklearn.feature_extraction.text import TfidfVectorizer
import numpy as np
import torch
from transformers import BertTokenizer, BertForSequenceClassification
from spellchecker import SpellChecker
import pandas as pd
import matplotlib.pyplot as plt

## Nltk Dependencies
nltk.download('stopwords')
nltk.download('punkt')
nltk.download('wordnet')
nltk.download('floresta')
nltk.download('punkt')
nltk.download('omw')
nltk.download('omw-1.4')
nltk.download('averaged_perceptron_tagger')

stemmer = SnowballStemmer('portuguese')
stop_words = set(stopwords.words('portuguese'))

## Bert Configuration
tokenizer = BertTokenizer.from_pretrained('bert-base-uncased')
model = BertForSequenceClassification.from_pretrained('bert-base-uncased', num_labels=2)
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model.to(device)

## Api Bearer Definition

In [6]:
api = Api(bearer_token="AAAAAAAAAAAAAAAAAAAAAMhqlAEAAAAA4Pqzn354Z5nlkP5lKaW98vzlVlA%3D7GIA03xacVKdFYTFg7qmgvWTZThpa2FFd4SNPUqP7uPK7Xjue5")

# Step 1: Collect Data

## Initial parameters

In [168]:
usersById = {}
usersByUserName = {}
usersEngagementData = {}
number_results = 100

## Get user from Twitter

In [169]:
def get_user_mentioned(username):
  try:
    user = api.get_user(username=username) 
  except:
    usersByUserName[username] = 0 
  else:
    usersEngagementData[user.id] = {'tweets':[], 'mentions':[], 'replies':[]}
    user_strength = user_engagement_strength_score(user, usersEngagementData, number_results)
    usersById[user.id] = user_strength
    usersByUserName[user.username] = user_strength

In [170]:
def collect_data(subject):
    public_tweets = api.search_tweets(query=f"{subject} lang:pt has:hashtags -is:retweet has:media", 
                                      expansions=["referenced_tweets.id.author_id","in_reply_to_user_id","attachments.media_keys","author_id","entities.mentions.username"], 
                                      user_fields=["created_at","entities","id","location","name","pinned_tweet_id","profile_image_url","protected","public_metrics","url","username","verified"],
                                      tweet_fields=["attachments","author_id","context_annotations","created_at","entities","geo","in_reply_to_user_id","lang","public_metrics","reply_settings","source"], 
                                      max_results=100)
    
    
    
    for user in public_tweets.includes.users:
       usersEngagementData[user.id] = {'tweets':[], 'mentions':[], 'replies':[]}
       user_strength = user_engagement_strength_score(user, usersEngagementData, number_results)
       usersById[user.id] = user_strength
       usersByUserName[user.username] = user_strength

    for tweet in public_tweets.data:
        user_id = tweet.author_id
        if user_id not in usersById:
            usersEngagementData[user.id] = {'tweets':[], 'mentions':[], 'replies':[]}
            user = api.get_user(user_id)
            user_strength = user_engagement_strength_score(user, usersEngagementData, number_results)
            usersById[user_id] = user_strength
            usersByUserName[user.username] = user_strength 

    return public_tweets, usersById, usersByUserName

# Step 2: Text Analysis

## Get Pos from Wordnet

In [8]:
# Define a function to get the WordNet POS for a given token
def get_wordnet_pos(token):
    tag = nltk.pos_tag([token])[0][1][0].upper()
    if tag == 'N':
        return wordnet.NOUN
    elif tag == 'V':
        return wordnet.VERB
    elif tag == 'R':
        return wordnet.ADV
    else:
        return wordnet.NOUN

## Desanbiguation Process

In [9]:
def disambiguate_word(word, context):
    synset = nltk.wsd.lesk(context, word)
    if synset is not None:
        return synset.name().split('.')[0]
    else:
        return word

## Synonimize Process

In [10]:
def get_synonyms(word):
    synonyms = set()
    for synset in wordnet.synsets(word):
        for lemma in synset.lemmas():
            synonyms.add(lemma.name())
    return list(synonyms)

## Sentiment Analysis

In [11]:
def get_sentiment_score(text):
    # Use BERT to calculate the sentiment score of the tweet text
    inputs = tokenizer(text, return_tensors='pt', truncation=True, padding=True)
    inputs.to(device)
    outputs = model(**inputs)
    logits = outputs.logits
    probs = torch.softmax(logits, dim=1)
    sentiment_score = probs[0][1].item() - probs[0][0].item()

    return sentiment_score

## Diversity Score (TTR)

In [105]:
def calculate_diversity_score(words):
    # Use a measure of lexical diversity to calculate the diversity score of the tweet text
    # For example, using the type-token ratio (TTR) metric
    num_words = len(words)
    num_unique_words = len(set(words))
    diversity_score = num_unique_words / num_words if num_words > 0 else 0

    return diversity_score

## Context Analysis

In [112]:
def calculate_context_score(text, topic_keywords):
    # Calculate the context score of the tweet based on its relevance to the given topic keywords
    # For example, using the cosine similarity between the tweet and the topic keywords
    vectorizer = TfidfVectorizer()
    corpus = [text] + topic_keywords
    X = vectorizer.fit_transform(corpus)
    cosine_similarities = np.dot(X[0], X[1:].T).toarray()[0]
    context_score = max(cosine_similarities) if len(cosine_similarities) > 0 else 0

    return context_score

## Preprocessing a text

In [14]:
def pre_processing_text(text):
    # Convert to lowercase
    text = text.lower()
    
    # Remove Emojis
    text = emoji.replace_emoji(text, '')
    
    # Remove URLs
    text = re.sub(r'http\S+', '', text)

    # Remove mentions
    text = re.sub(r'@\S+', '', text)

    # Remove hashtags
    text = re.sub(r'#\S+', '', text)

    # Tokenize
    tokens = word_tokenize(text, language='portuguese')

    # Lemmatize the tokens
    lemmatizer = WordNetLemmatizer()

    # Lemmatize the tokens
    lemmatized_tokens = []
    for token in tokens:
        pos = get_wordnet_pos(token)
        lemma = lemmatizer.lemmatize(token, pos=pos)
        lemmatized_tokens.append(lemma)

    # Join the lemmatized tokens to form the tweet text again
    lemmatized_tweet = ' '.join(lemmatized_tokens)

    # Desanbiguation Process
    disambiguated_tokens = [disambiguate_word(token, text) for token in lemmatized_tokens]

    # Synonimization Process
    synonymized_tokens = []
    for token in disambiguated_tokens:
        synonyms = get_synonyms(token)
        if synonyms:
            synonymized_tokens.append(max(synonyms, key=len))
        else:
            synonymized_tokens.append(token)

    # Remove stop words and punctuation
    cleaned_tokens = [token for token in synonymized_tokens if token not in stop_words and token.isalnum()]

    # Re-tokenize the cleaned tokens
    cleaned_tweet = ' '.join(cleaned_tokens)

    return cleaned_tokens

# Analyze Data

In [None]:
def visualize_data(data):
  num_tweets = len(data)
  num_followers = data['followers'].sum()
  num_likes = data['likes'].sum()
  num_retweets = data['retweets'].sum()

  # Create visualizations
  plt.hist(data['likes'])
  plt.title('Distribution of Likes')
  plt.xlabel('Number of Likes')
  plt.ylabel('Frequency')
  plt.show()

  plt.boxplot(data['followers'])
  plt.title('Distribution of Followers')
  plt.ylabel('Number of Followers')
  plt.show()

  # Identify common words and hashtags
  words = {}
  for tweet in data['text']:
      for word in tweet.split():
          if word.startswith('#'):
              words[word] = words.get(word, 0) + 1
          else:
              words[word.lower()] = words.get(word.lower(), 0) + 1
  word_freq = pd.DataFrame.from_dict(words, orient='index', columns=['frequency'])
  word_freq.sort_values(by='frequency', ascending=False, inplace=True)
  word_freq.head()

  # Analyze temporal patterns
  data['created_at'] = pd.to_datetime(data['created_at'])
  tweets_per_day = data.groupby(pd.Grouper(key='created_at', freq='D')).size()
  plt.plot(tweets_per_day)
  plt.title('Number of Tweets Posted per Day')
  plt.xlabel('Date')
  plt.ylabel('Number of Tweets')
  plt.show()

  # Explore relationships between variables
  plt.scatter(data['followers'], data['likes'])
  plt.title('Correlation between Followers and Likes')
  plt.xlabel('Number of Followers')
  plt.ylabel('Number of Likes')
  plt.show()

# Step 3: User's Engagement Analysis

## User's Strength Score

In [131]:
def user_engagement_strength_score(user: User, usersEngagementData, number_results):
  influence = calculate_influence(user, usersEngagementData, number_results)
  reputation = calculate_reputation(user, usersEngagementData, number_results)

  return influence * reputation

In [132]:
def get_user_egagement_data(user, key, usersEngagementData, number_results):
  if usersEngagementData[user.id] is not None:
    return usersEngagementData[user.id]
  
  if key == 'tweets':
    tweets = api.get_timelines(user.id, max_results=number_results, tweet_fields=["attachments","author_id","context_annotations","created_at","entities","geo","in_reply_to_user_id","lang","public_metrics","reply_settings","source"])
    usersEngagementData[user.id][key] = tweets.data
  elif key == 'replies':
    mentions = api.search_tweets(query=f"@{user.username}", max_results=number_results)
    usersEngagementData[user.id][key] = mentions.data
  else:
    replies = api.search_tweets(query=f"to:{user.username}", max_results=number_results)
    usersEngagementData[user.id][key] = mentions.data

## User's Influence

In [126]:
def calculate_influence(user: User, usersEngagementData, number_results):
    follower_count = user.public_metrics.followers_count

    # Get user's tweet count and average engagement rate
    engagement = get_user_egagement_data(user, 'tweets', usersEngagementData, number_results)
    tweets = engagement['tweets']
    tweet_count = len(tweets)
    total_engagement = 0
    
    for tweet in tweets:
        total_engagement += tweet.public_metrics.like_count + tweet.public_metrics.retweet_count + tweet.public_metrics.quote_count + tweet.public_metrics.reply_count
        
    if tweet_count > 0:
        avg_engagement_rate = total_engagement / (tweet_count * follower_count) if total_engagement > 0 and tweet_count > 0 and follower_count > 0 else 0
    else:
        avg_engagement_rate = 0

    # Calculate influence score
    influence_score = math.log(follower_count + 1, 10) * (avg_engagement_rate + 1)
    
    return influence_score

## User's reputation

In [130]:
def calculate_reputation(user: User, usersEngagementData, number_results):
    # Get user's recent mentions and replies
    engagementMentions = get_user_egagement_data(user, 'mentions', usersEngagementData, number_results)
    engagementReplies = get_user_egagement_data(user, 'replies', usersEngagementData, number_results)

    mentions = engagementMentions['mentions']
    replies = engagementReplies['replies']

    # Calculate reputation score based on sentiment analysis of mentions and replies
    positive_sentiments = 0
    negative_sentiments = 0
    
    for mention in mentions:
        if mention.author_id != user.id:
            sentiment = get_sentiment_score(mention.text)
            if sentiment > 0:
                positive_sentiments += 1
            elif sentiment < 0:
                negative_sentiments += 1
                
    for reply in replies:
        if reply.author_id != user.id:
            sentiment = get_sentiment_score(reply.text)
            if sentiment > 0:
                positive_sentiments += 1
            elif sentiment < 0:
                negative_sentiments += 1
                
    if (positive_sentiments + negative_sentiments) > 0:
        
        reputation_score = positive_sentiments / (positive_sentiments + negative_sentiments)
    else:
        reputation_score = 0
        
    fine_adjustment = 0.01
    normalized_reputation_score = (reputation_score + user.public_metrics.listed_count * fine_adjustment) / (1 + (user.public_metrics.listed_count * fine_adjustment))

    # Return influence and reputation scores
    return normalized_reputation_score

# Step 4: Tweet Metrics

In [18]:
def calculate_recency_score(created_at):
    # Calculate the recency score of the tweet based on its age
    # For example, using a logarithmic decay function

    tweet_date = datetime.datetime.strptime(created_at, '%Y-%m-%dT%H:%M:%S.%fZ')
    now = datetime.datetime.now()

    age_in_seconds = (now - tweet_date).total_seconds()
    
    if age_in_seconds < 0: 
       age_in_seconds = (tweet_date - now).total_seconds()

    # Set the decay factor
    decay_factor = 0.1

    # Calculate the recency score using a logarithmic decay function
    recency_score = 1 / (1 + decay_factor * math.log10(1 + age_in_seconds))

    return recency_score

# Step 5: Social Capital Calculus

In [187]:
def calculate_social_capital_score(tweet: Tweet, usersById, usersByUserName):
    # Extract relevant information from the tweet
    text = tweet.text
    attachments = tweet.attachments
    public_metrics = tweet.public_metrics
    created_at = tweet.created_at

    # Preprocess tweet text
    tokens = pre_processing_text(text)

    # Calculate sentiment score
    sentiment_score = get_sentiment_score(text)

    # Calculate diversity score
    diversity_score = calculate_diversity_score(tokens)

    # Calculate recency score
    recency_score = calculate_recency_score(created_at)

    # Calculate context score
    context_score = calculate_context_score(text, tokens)

    # Calculate social capital score
    likes = public_metrics.like_count
    retweets = public_metrics.retweet_count
    replies = public_metrics.reply_count
    impressions_count = public_metrics.impression_count
    num_hashtags = len(tweet.entities.hashtags)
    num_urls = len(tweet.entities.urls)

    mention_users_strenght_score = 0
    if tweet.entities.mentions is not None:
      for mention in tweet.entities.mentions:
        if mention.username not in usersByUserName:
          get_user_mentioned(mention.username)

        mention_users_strenght_score += usersByUserName[mention.username]

    num_media = 0
    if attachments is not None and attachments.media_keys is not None:
        num_media = len(attachments.media_keys)

    #print(likes, retweets, replies, impressions_count, num_hashtags, num_urls, num_media, diversity_score, (influence_score * reputation_score), mention_users_strenght_score, len(tokens), context_score, recency_score)
    
    social_capital_score = ((usersById[tweet.author_id] + retweets + likes + replies + impressions_count + num_media + num_hashtags + num_urls + diversity_score + mention_users_strenght_score + len(tokens) + context_score) * recency_score)

    return {'tweet': tweet, 'score': social_capital_score }

# Step 6: Build User Profile

# Step 7: Generate Recommendations

In [188]:
def generate_recommendations(subject, topN):
  public_tweets, usersById, usersByUserName = collect_data(subject);
  ranking = {}

  for tweet in public_tweets.data:
      social_capital_score = calculate_social_capital_score(tweet, usersById, usersByUserName)
      ranking[tweet.id] = social_capital_score

  ranked = dict(sorted(ranking.items(), key=lambda item: item[1]['score'], reverse=True)) 

  for tweetId in {k: ranked[k] for k in list(ranked)[:topN]}: 
      print(f"SC: {ranking[tweetId]['score']} - TweetID: {ranking[tweetId]['tweet'].id}")
      print(ranking[tweetId]['tweet'].text)
      print()

# Step 8: Defines the subject

In [192]:
generate_recommendations("Luísa Sonza", 5)

SC: 71840.85671179024 - TweetID: 1634184303625007106
Luísa Sonza é a atração da festa de amanhã! #BBB23 https://t.co/X9fQ5x7nHc

SC: 52157.552392563724 - TweetID: 1634509712027787265
Bom dia minha aldeia, como vocês estão? O Boss acordou todo mundo bem cedinho hoje, ja teve fila para o poder coringa e a prova do anjo acontece essa manhã. Vamos torcer pra Domi ganhar e ver a família, ela merece muito! 
Mood de hoje pois tem fexxtinha com Luísa Sonza! 🥂 #BBB23 https://t.co/sAeV6fSuwi

SC: 36728.08020715328 - TweetID: 1634695238978600960
Luisa Sonza falando com os brothers #BBB23 https://t.co/9yl75bLjEQ

SC: 30086.113136239735 - TweetID: 1634696250875428865
Luisa Sonza apareceu para dar um recado aos brothers e avisar que os figurinos já estão liberados!!!

Reprodução: Globo/Globoplay

#TeamAmanda #BBB23 https://t.co/wOmVeDDB19

SC: 26343.3430293797 - TweetID: 1634738666076622848
Luísa Sonza chegou para comandar a festa do #BBB23 🔥 https://t.co/5mSxZEODxg

