# Notebook to create content similarity based on the data fetched from Twitter

### Handle imports

In [None]:
import os

import pandas as pd
import numpy as np
from pymongo import MongoClient
import re
import tweepy
import datetime
from matplotlib import pyplot as plt
import spacy
import networkx as nx
from pyvis.network import Network
import matplotlib.pyplot as plt
from joblib import Parallel, delayed, cpu_count

from tqdm.notebook import tqdm
tqdm.pandas()

### Load Twitter API secrets

In [None]:
%load_ext dotenv
%dotenv

In [None]:
twitter_api = {
     "bearer_token": os.getenv("BEARER_TOKEN"),
    "api_key": os.getenv("API_KEY"),
    "api_secret": os.getenv("API_SECRET"),
    "access_token": os.getenv("ACCESS_TOKEN"),
    "access_secret": os.getenv("ACCESS_SECRET")
}

In [None]:
pd.set_option('display.max_colwidth', None)

### Function to extract relevant users from the clusters in the textClust mongoDB database

In [None]:
def extract_relevant_users_from_clusters(source_uuid, cluster_id, timestamp):
    connection = MongoClient(f"mongodb://localhost:27017/")
    db = connection.textclustDB
    
    # Extract all tweets of a cluster from the MongoDB database
    textids = db[f"mc_{source_uuid}"].find_one(
        {"id": cluster_id},
        sort=[("timestamp", -1)],
        projection={
            "_id": 0,
            "textids": 1
            }
    )
    
    # Extract the relevant users
    users = db[f"texts_{source_uuid}"].find(
        {
            "$and": [
                {"general.text_id": {
                        "$in": textids["textids"]
                    }
                },
                {"$or": [
                    {"general.time": {
                        "$lte": datetime.strptime(timestamp, "%Y-%m-%dT%H:%M:%S")
                        }
                    },
                    {"general.time": {
                        "$lte": timestamp.replace("T", " ")
                        }
                    }
                ]}
            ]
        },
        sort=[("general.time", -1)],
        projection = {
            "_id": 0,
            "user": "$specific.user"
        }
    ).limit(5000)
    users = pd.DataFrame([user['user'] for user in users])
    return users.drop_duplicates(["id_str"], ignore_index=True)


In [None]:
users = extract_relevant_users_from_clusters("8273444c-abdd-4410-829a-970846ebd00e", 52525, "2022-02-25T22:41:49")

## Approach to load the required tweets from the Twitter API

### Function that handles the loading of thetweets

In [None]:
def extract_last_tweets_per_user_from_timestamp(user, timestamp):
    end_time = timestamp.isoformat() + "Z"
    start_time = timestamp - datetime.timedelta(days=1)
    start_time = start_time.isoformat() + "Z"
    data = pd.DataFrame(columns=["user_screen_name", "user_id", "id", "text", "created_at", "attachments", "public_metrics", "referenced_tweets", "source"])
    user_tweets = __fetch_all_tweets(user['id_str'], end_time, start_time)
    if user_tweets is None:
        return
    tweets_list = []
    for tweet in user_tweets:
        referenced_tweets = tweet.referenced_tweets
        if referenced_tweets is not None:
            referenced_tweets = [{'id': ref.id, 'type':ref.type} for ref in referenced_tweets]
        tweets_list.append({"user_screen_name": user['screen_name'],
                            "user_id": user['id_str'],
                            "id": tweet.id,
                            "text": tweet.text,
                            "created_at": tweet.created_at,
                            "attachments": tweet.attachments,
                            "public_metrics": tweet.public_metrics,
                            "referenced_tweets": referenced_tweets,
                            "source": tweet.source})
    temp = pd.DataFrame.from_records(tweets_list)
    data = pd.concat([data, temp], ignore_index=True)
    del tweets_list
    del temp
    return data

def __fetch_all_tweets(user_id: str, end_time: str, start_time: str, pagination_token: str = None):
        client = tweepy.Client(bearer_token=twitter_api["bearer_token"], consumer_key=twitter_api["api_key"], consumer_secret=twitter_api["api_secret"], access_token=twitter_api["access_token"], access_token_secret=twitter_api["access_secret"], wait_on_rate_limit=True)
        #if pagination_token is None:
        tweets = client.get_users_tweets(id=user_id, end_time=end_time, start_time=start_time, max_results=100, tweet_fields=["created_at", "attachments", "public_metrics", "referenced_tweets", "source"])
        #else:
        #    tweets = client.get_users_tweets(id=user_id, end_time=end_time, start_time=start_time, max_results=20, pagination_token=pagination_token, tweet_fields=["created_at", "attachments", "public_metrics", "referenced_tweets", "source"])
        #if tweets.meta.get("next_token", None) is not None:
        #    next_tweets = __fetch_all_tweets(user_id, end_time, start_time, tweets.meta.get("next_token", None))
        #    if next_tweets is not None and tweets.data is not None:
        #        tweets.data.extend(next_tweets)
        #    return tweets.data
        #else:
        return tweets.data
            

### Parallelization of twitter data fetching

In the initial development of this thesis joblib was used to parallelize the tweet fetching. Nevertheless, in the actual implementation joblib did not work with Celery. Therefore it was exchanged with billiard which is a replacement for the standard library multiprocessing that can work inside a Celery worker.

In [None]:
time = datetime.datetime.strptime("2022-02-25T22:41:49", "%Y-%m-%dT%H:%M:%S")
responses = Parallel(n_jobs=cpu_count())(delayed(extract_last_tweets_per_user_from_timestamp)(user, time) for _, user in tqdm(users[0:3000].iterrows(), total=len(users[0:3000])))

tweet = pd.DataFrame(columns=["user_screen_name", "user_id", "id", "text", "created_at", "attachments", "public_metrics", "referenced_tweets", "source"])
for response in responses:
    tweet = pd.concat([tweet, response], ignore_index=True)
del responses

The limit of parallelization is the rate limit from twitter. If more than 1500 users are in the cluster the API will eventually block the access for 15 minutes.
Before parallelization the operation took ca. 3:30 minutes. Now it only takes ca. 25 seconds.

### Store the tweets to a parquet file

This avoids to rerun the whole tweet fetching if the kernel crashed for whatever reason

In [None]:
tweet.to_parquet(f'./parquet_saves/extracted_tweets_xxxxxx.snappy', compression='snappy')

### Load tweets stored to a parquet file

In [None]:
tweets = pd.read_parquet('./parquet_saves/extracted_tweets_xxxxxx.snappy')

### Filter users with too few tweets in the dataset

In [None]:
v = tweets["user_screen_name"].value_counts()
tweets = tweets[tweets["user_screen_name"].isin(v.index[v.gt(5)])]
tweets.reset_index(inplace=True, drop=True)

### Adapt timestamp types

In [None]:

tweets['created_at'] = tweets['created_at'].values.astype('datetime64[m]')
tweet = tweets.astype({'created_at': 'datetime64[m]'})

### Concatenate all tweets of a user

In [None]:
grouped_texts = tweet.groupby(["user_screen_name"]).agg({'text': ' '.join})

### Function to apply the preprocessing

The code is used from the original textClust implementation with some slight adaptions

In [None]:
import string
import emoji

def create_preprocessed_text(text):
    # Lower text
    text = text.lower()

    # Remove text wrap
    text = text.replace("\n", " ")

    # Remove URLs
    text = re.sub('((www\.[^\s]+)|(https?://[^\s]+)|(http?://[^\s]+))', '', text)
    text = re.sub(r'http\S+', '', text)

    # Remove usernames
    text = re.sub('@[^\s]+', '', text)
        
    # remove the # in #hashtag
    text = re.sub(r'#([^\s]+)', r'\1', text)

    ## remove multi exclamation mark
    text = re.sub(r"(\!)\1+", ' multiExclamation ', text)
    
    # Initialize Punctuation set
    exclude = '’“' + string.punctuation
    
    # Check char characters to see if they are in punctuation
    text = [char for char in text if char not in exclude]
    
    # Join the characters again to form the string.
    text = ''.join(text)

    # Remove emojis
    text = emoji.get_emoji_regexp().sub(r'', text)

    return text

### Create lemmatizer based on spacy

In [None]:
nlp = spacy.load('en_core_web_sm', disable=['ner', 'parser'])

def spacy_tokenizer(doc):
    return [x.lemma_ for x in nlp(doc)]

### Create the TF-IDF vectors for the users

In [None]:
from sklearn.feature_extraction.text import TfidfVectorizer
tf = TfidfVectorizer(preprocessor=create_preprocessed_text, tokenizer=spacy_tokenizer, ngram_range=(1,2), max_features=20000)
tf_matrix = tf.fit_transform(grouped_texts['text'])

### Tranform the numpy array to a pandas dataframe

In [None]:
tf_matrix = pd.DataFrame(tf_matrix.toarray(), index=grouped_texts.index.values)
# Remove users that only have 0 vectors as they will result in a cosine similarity of NaN
tf_matrix = tf_matrix.loc[~(tf_matrix==0).all(axis=1)]

### Create similarity matrix for users with TF-IDF

This function was later adapted in the code to use tqdm so that one can see the progress in the console.

In [None]:
from scipy.spatial.distance import pdist, squareform
result = pd.DataFrame(squareform(pdist(tf_matrix, metric='cosine')), columns=tf_matrix.index.values, index=tf_matrix.index.values)

### Remove the scores on the diagonal to be zero because a user should not have a similarity to itself

In [None]:
np.fill_diagonal(result.values, 1.0)
similarity = 1 - result

### Create similarity graph

In [None]:
G = nx.from_pandas_adjacency(similarity)

F = G.copy()
# Define the filter were edges should be cut
threshold = 0.9
F.remove_edges_from([(n1, n2) for n1, n2, w in F.edges(data="weight") if w < threshold])
F.remove_nodes_from(list(nx.isolates(F)))
fig = plt.figure(1, figsize=(30, 20), dpi=60)
nx.draw(F, with_labels=True, node_size=1000, font_size=24)
plt.show()