In [None]:
from pymongo import MongoClient
import pandas as pd
from tqdm.notebook import tqdm
from collections import deque, defaultdict
import pickle
import os

In [None]:
# Replace with your MongoDB URI (localhost or remote)
client = MongoClient('mongodb://localhost:27017/')  # Change the URI if needed

# Access the database (replace with your actual database name)
db = client['SocialCovid19']

In [None]:
from datetime import datetime

first_date = datetime(2020,1,1)
last_date = datetime(2021,12,31)

In [None]:
dir_outs = './'

In [None]:
if os.path.exists(dir_outs + '__already_processed_filtered.pickle'):
    already_processed_files = pd.read_pickle(dir_outs + '__already_processed_filtered.pickle')
else:
    already_processed_files = {}

added = False
for file in tqdm(os.listdir(dir_outs)):
    if not file.startswith('tweets_filtered_stats'):
        continue
        
    if file in already_processed_files:
        continue
    added = True
    already_processed_files[file] = pd.read_pickle(dir_outs + file)['tweetId'].values

if added:
    with open(dir_outs + '__already_processed_filtered.pickle','wb') as ff:
        pickle.dump(already_processed_files,ff)
    
already_processed = set()
for v in tqdm(already_processed_files.values()):
    already_processed.update(v)

print('Already processed:',len(already_processed))

In [None]:
db_tweets = db['tweets']

projection = {'contributors': 1, 'created': 1, 'favoriteCount': 1, 'hastTags': 1, 'inReplyToScreenName': 1, 'inReplyToStatusId': 1, 'inReplyToUserId': 1,
'lang': 1, 'mediaEntity': 1, 'quotedStatusId': 1, 'retweet': 1, 'retweetCount': 1, 'retweetId': 1, 'text': 1, 'tweetId': 1, 'urlEntities': 1,
'userId': 1, 'userMentions': 1, 'place':1}

tweets_stats = deque()
tweets_per_day = defaultdict(deque)

chunk_size = 10_000
for ff in tqdm(os.listdir(dir_outs)):
    if not ff.startswith('_tweets_to_extract'): # it's simply a pickle file with a dictionary of tweets {date : set(tweet_id)}, date is of the format 'YYYY-M', we built it using the 01_Tweets_user_createdAt_place_ we already had the db created and whant to extract some tweets from it, assuming we had downloaded a larger tweet set
        continue
        
    tt = pd.read_pickle(dir_outs + ff)
    
    for month,tweets in tt.items():
        tweets = [x for x in tweets if x not in already_processed]
                
        for i in range(0, len(tweets), chunk_size):
            ids = tweets[i:i + chunk_size]
            
            with client.start_session() as session:
                try: 

                    cursor = db_tweets.find({'tweetId':{'$in':ids}}, projection=projection, session=session, no_cursor_timeout=True)

                    for tt in tqdm(cursor):
                       
                        if tt['tweetId'] in already_processed: # should not be needed
                            continue

                        if tt['lang'] != 'es':
                            continue

                        if tt['created'] < first_date or (last_date is not None and tt['created'] > last_date):
                            continue

                        tweets_stats.append({
                            'tweetId': tt['tweetId'],
                            'userId': tt['userId'],
                            'created': tt['created'],
                            'contributors': tt['contributors'],
                            'favoriteCount': tt['favoriteCount'],
                            'hastTags': tt['hastTags'],
                            'inReplyToScreenName': tt.get('inReplyToScreenName'),
                            'inReplyToStatusId': tt['inReplyToStatusId'],
                            'inReplyToUserId': tt['inReplyToUserId'],
                            'mediaEntity': tt['mediaEntity'],
                            'quotedStatusId': tt.get('quotedStatusId',-1),
                            'retweet': tt['retweet'],
                            'retweetCount': tt['retweetCount'],
                            'retweetId': tt.get('retweetId',-1),
                            'urlEntities': tt['urlEntities'],  
                            'userMentions':tt['userMentions'],
                            'placeId': tt.get('place')})

                        if tt['retweet'] or tt.get('retweetId',-1) != -1: # we don't count retweets here
                            continue

                        day = tt['created'].strftime('%Y-%m-%d')
                        tweets_per_day[day].append({'tweetId':tt['tweetId'],'userId':tt['userId'],'text':tt['text'].replace('\n',' ')})

                        # depending on the number of tweets in the cursor, an additional save step could be added here

                    for day in tweets_per_day:
                        if len(tweets_per_day[day]) > 0:
                            print(dir_outs + f'day_{day}_{str(int(datetime.now().timestamp()))}.pickle')
                            pd.DataFrame(tweets_per_day[day]).to_pickle(dir_outs + f'day_{day}_{str(int(datetime.now().timestamp()))}.pickle')
                            tweets_per_day[day].clear()

                    if len(tweets_stats) > 0:
                            pd.DataFrame(tweets_stats).to_pickle(dir_outs + f'tweets_filtered_stats{str(int(datetime.now().timestamp()))}.pickle')
                            tweets_stats.clear()

                finally:
                    cursor.close()  # Ensure the cursor is closed after use

#### Creation of the different files in Mendeley from the extracted db snapshot

In [None]:
def get_processed_ids(outs_dir,prefix,col='tweet_id',convert=True):
    processed = set()
    for ff in os.listdir(outs_dir):
        if not ff.startswith(prefix):
            continue
        processed.update(pd.read_pickle(outs_dir + ff)[col].values)
    
    if convert:
        processed = set([int(x,32) for x in processed])
    
    return processed

In [None]:
import string
digs = string.digits + string.ascii_letters

def int2base(x, base=32):
    if x < 0:
        sign = -1
    elif x == 0:
        return digs[0]
    else:
        sign = 1

    x *= sign
    digits = []

    while x:
        digits.append(digs[x % base])
        x = x // base

    if sign < 0:
        digits.append('-')

    digits.reverse()

    return ''.join(digits)

In [None]:
places = {}
db_places = db['places']  
with client.start_session() as session:
    try: # projection = {'name': 1, 'age': 1}
        cursor = db_places.find({}, projection={'fullName':1, 'country':1, 'placeId':1}, session=session, no_cursor_timeout=True)

        for pp in tqdm(cursor):
            places[pp['placeId']] = pp

    finally:
        cursor.close()  # Ensure the cursor is closed after use

In [None]:
file_prefix = 'tweets_filtered_stats_' #'tweets_filtered_stats' #all_tweets_stats

In [None]:
# 01 .Tweets_user_createdAt_place
# tweet_id: the id of each Twitter post in Long format.
# user_id: the id of the user that shared the post
# created_at: date and time of posting in Long format.
# place_fullName: full name of the included place if available.
# place_country: country of the included place if available.

table_01 = pd.DataFrame()

already_processed = get_processed_ids(dir_outs,'01_Tweets_user_createdAt_place_','tweet_id')
print('Already processed:',len(already_processed))

for file in tqdm(os.listdir(dir_outs)):
    
    if not file.startswith(file_prefix):
        continue
    
    df_tweets = pd.read_pickle(dir_outs + file)[['tweetId','userId','created','placeId']]
    
    df_tweets['place_fullName'] = [None if x is None else places[x]['fullName'] for x in df_tweets['placeId']]
    df_tweets['place_country'] = [None if x is None else places[x]['fullName'] for x in df_tweets['country']]

    df_tweets['tweetId'] = [int2base(x,32) for x in df_tweets['tweetId']]
    df_tweets['userId'] = [int2base(x,32) for x in df_tweets['userId']]
    df_tweets['created'] = [int2base(int(x.astype('M8[s]').tolist().timestamp()),,32) for x in df_tweets['created']]

    df_tweets = df_tweets.rename(columns={'tweetId':'tweet_id','userId':'user_id','created':'created_at'})

    table_01 = pd.concat([table_01,df_tweets])
    
    if len(table_01) >= 15_000_000: # saving in chunks to avoid losing computations due to unforeen events. Stored as parquet to save space
        table_01.to_parquet(dir_outs + f'01_Tweets_user_createdAt_place_{str(int(datetime.now().timestamp()))}.parquet',index=False)
        tabl_01 = pd.DataFrame()
        
if len(table_01) > 0:
    table_01.to_parquet(dir_outs + f'01_Tweets_user_createdAt_place_{str(int(datetime.now().timestamp()))}.parquet',index=False)
    tabl_01 = pd.DataFrame() 

In [None]:
# 02. Tweets_type
# tweet_id.
# original: 1 if the tweet is an original post, 0 otherwise.
# retweet: the retweeted tweet_id if a retweet, 0 otherwise.
# reply: the replied tweet id, 0 otherwise.
# quote: the quoted tweet_id, 0 otherwise. This can only be combined with Original or Reply.

table_02 = pd.DataFrame()

already_processed = get_processed_ids(dir_outs,'02_Tweets_type','tweet_id')
print('Already processed:',len(already_processed))

for file in tqdm(os.listdir(dir_outs)):
    
    if not file.startswith(file_prefix):
        continue
    
    df_tweets = pd.read_pickle(dir_outs + file)[['tweetId','retweetId','quotedStatusId','inReplyToStatusId']]

    if any(x in already_processed for x in df_tweets['tweetId'].values):
        continue

    df_tweets['retweetId'] = df_tweets['retweetId'].replace(-1,0)
    df_tweets['inReplyToStatusId'] = df_tweets['inReplyToStatusId'].replace(-1,0)
    df_tweets['quotedStatusId'] = df_tweets['quotedStatusId'].replace(-1,0)

    df_tweets['original'] = [1 if x == 0 else 0 for x in df_tweets['retweetId']]

    df_tweets['tweetId'] = [int2base(x) for x in df_tweets['tweetId']]
    df_tweets['retweetId'] = [int2base(x) for x in df_tweets['retweetId']]
    df_tweets['inReplyToStatusId'] = [int2base(x) for x in df_tweets['inReplyToStatusId']]
    df_tweets['quotedStatusId'] = [int2base(x) for x in df_tweets['quotedStatusId']]

    df_tweets = df_tweets.rename(columns={'tweetId':'tweet_id','retweetId':'retweet','quotedStatusId':'quote','inReplyToStatusId':'reply'})

    df_tweets = df_tweets[['tweet_id','original','retweet','reply','quote']]

    table_02 = pd.concat([table_02,df_tweets])

    if len(table_02) >= 15_000_000:
        table_02.to_parquet(dir_outs + f'02_Tweets_type{str(int(datetime.now().timestamp()))}.parquet',index=False)
        table_02 = pd.DataFrame()
        
if len(table_02) > 0:
    table_02.to_parquet(dir_outs + f'02_Tweets_type{str(int(datetime.now().timestamp()))}.parquet',index=False)
    table_02 = pd.DataFrame() 

In [None]:
# 03. Tweets_media_url_contributors_mentions
# tweet_id.
# media: number of media elements in the tweet.
# url: number of urls in the tweet.
# contributors: number of contributors in the tweet.
# mentions: number of mentions of other users in the tweet.

table_03 = pd.DataFrame()

already_processed = get_processed_ids(dir_outs,'03_Tweets_media_url_contributors_mentions','tweet_id')
print('Already processed:',len(already_processed))

for file in tqdm(os.listdir(dir_outs)):
    
    if not file.startswith(file_prefix'):
        continue
    
    df_tweets = pd.read_pickle(dir_outs + file)[['tweetId','mediaEntity','urlEntities','contributors','userMentions']]
    
    if any(x in already_processed for x in df_tweets['tweetId'].values):
        continue

    df_tweets['mediaEntity'] = [0 if x is None else len(x) for x in df_tweets['mediaEntity']]
    df_tweets['urlEntities'] = [0 if x is None else len(x) for x in df_tweets['urlEntities']]
    df_tweets['contributors'] = [0 if x is None else len(x) for x in df_tweets['contributors']]
    df_tweets['userMentions'] = [0 if x is None else len(x) for x in df_tweets['userMentions']]

    df_tweets['tweetId'] = [int2base(x) for x in df_tweets['tweetId']]

    df_tweets = df_tweets.rename(columns={'tweetId':'tweet_id',
                                          'mediaEntity':'media','urlEntities':'url','userMentions':'mentions'})

    table_03 = pd.concat([table_03,df_tweets])

    if len(table_03) >= 15_000_000:
        table_03.to_parquet(dir_outs + f'03_Tweets_media_url_contributors_mentions{str(int(datetime.now().timestamp()))}.parquet',index=False)
        table_03 = pd.DataFrame()
        
if len(table_03) > 0:
    table_03.to_parquet(dir_outs + f'03_Tweets_media_url_contributors_mentions{str(int(datetime.now().timestamp()))}.parquet',index=False)
    table_03 = pd.DataFrame() 

In [None]:
# 04. Tweets_hashtags
# tweet_id.
# hashtags: a list with the hashtags included in the tweet.
import re

hashtag_re = re.compile('#[a-zA-ZáéíóúÁÉÍÓÚüÜñÑ0-9_]+(?=\s|[^\w\s]|$)')

table_04 = pd.DataFrame()

prefix = "day_"

already_processed = get_processed_ids(dir_outs,'04_Tweets_hashtags','tweet_id')
print('Already processed:',len(already_processed))

for file in tqdm(os.listdir(dir_outs)):
    
    if not file.startswith(prefix):
        continue
    
    df_tweets = pd.read_pickle(dir_outs + file)

    if len(df_tweets) == 0:
        continue

    if any(x in already_processed for x in df_tweets['tweetId'].values):
        continue

    df_tweets['hashtags'] = ['\t'.join(hashtag_re.findall(x)) for x in df_tweets['text'].values]
    df_tweets = df_tweets[df_tweets['hashtags'].map(len) > 0]
    
    df_tweets['tweetId'] = [int2base(x) for x in df_tweets['tweetId']]

    df_tweets = df_tweets[['tweetId','hashtags']]

    df_tweets = df_tweets.rename(columns={'tweetId':'tweet_id'})

    table_04 = pd.concat([table_04,df_tweets])

    if len(table_04) >= 1_000_000:
        table_04.to_parquet(dir_outs + f'04_Tweets_hashtags{str(int(datetime.now().timestamp()))}.parquet',index=False)
        table_04 = pd.DataFrame()

if len(table_04) > 0:
    table_04.to_parquet(dir_outs + f'04_Tweets_hashtags{str(int(datetime.now().timestamp()))}.parquet',index=False)
    table_04 = pd.DataFrame() 

In [None]:
# 05. Tweets_urls
# tweet_id.
# urls: a tab separated list with the urls included in the tweet.

table_05 = pd.DataFrame()

already_processed = get_processed_ids(dir_outs,'05_Tweets_urls','tweet_id')
print('Already processed:',len(already_processed))

for file in tqdm(os.listdir(dir_outs)):
    
    if not file.startswith(file_prefix):
        continue
    
    df_tweets = pd.read_pickle(dir_outs + file)[['tweetId','urlEntities']]

    if any(x in already_processed for x in df_tweets['tweetId'].values):
        continue

    df_tweets = df_tweets[df_tweets['urlEntities'].map(len) > 0]
    
    df_tweets['urlEntities'] = ['' if len(x) == 0 else '\t'.join([y['expandedURL'] for y in x]) for x in df_tweets['urlEntities']]

    df_tweets['tweetId'] = [int2base(x) for x in df_tweets['tweetId']]

    df_tweets = df_tweets.rename(columns={'tweetId':'tweet_id',
                                          'urlEntities':'urls'})

    table_05 = pd.concat([table_05,df_tweets])
    
    if len(table_05) >= 15_000_000:
        table_05.to_parquet(dir_outs + f'05_Tweets_urls{str(int(datetime.now().timestamp()))}.parquet',index=False)
        table_05 = pd.DataFrame()

if len(table_05) > 0:
    table_05.to_parquet(dir_outs + f'05_Tweets_urls{str(int(datetime.now().timestamp()))}.parquet',index=False)
    table_05 = pd.DataFrame() 

In [None]:
# 06. Tweets_mentions
# tweet_id.
# mentions: a tab separated list with the user_id mentions included in the tweet.

table_06 = pd.DataFrame() 

already_processed = get_processed_ids(dir_outs,'06_Tweets_mentions','tweet_id')
print('Already processed:',len(already_processed))

for file in tqdm(os.listdir(dir_outs)):
    
    if not file.startswith(file_prefix):
        continue
        
    df_tweets = pd.read_pickle(dir_outs + file)[['tweetId','userMentions']]

    if any(x in already_processed for x in df_tweets['tweetId'].values):
        continue

    df_tweets = df_tweets[df_tweets['userMentions'].map(len) > 0]
    
    df_tweets['userMentions'] = ['' if len(x) == 0 else '\t'.join([int2base(y['userId']) for y in x]) for x in df_tweets['userMentions']]

    df_tweets['tweetId'] = [int2base(x) for x in df_tweets['tweetId']]

    df_tweets = df_tweets.rename(columns={'tweetId':'tweet_id',
                                          'userMentions':'mentions'})

    table_06 = pd.concat([table_06,df_tweets])

    if len(table_06) >= 15_000_000:
        table_06.to_parquet(dir_outs + f'06_Tweets_mentions{str(int(datetime.now().timestamp()))}.parquet',index=False)
        table_06 = pd.DataFrame()

if len(table_06) > 0:
    table_06.to_parquet(dir_outs + f'06_Tweets_mentions{str(int(datetime.now().timestamp()))}.parquet',index=False)
    table_06 = pd.DataFrame() 

In [None]:
# 07. Tweets_replies
# tweet_id
# replies_ids: a tab separated list with the tweet_id of replies.

table_07 = deque() 

with client.start_session() as session:
    try: # projection = {'name': 1, 'age': 1}
        cursor = db['tweetReplies'].find({}, projection={'tweetId':1,'replies':1}, session=session, no_cursor_timeout=True)

        for tt in tqdm(cursor):
            if len(tt['replies']) > 0:
                rr = [int2base(x) for x in tt['replies'] if x != -1]
                if len(rr) > 0:
                    table_07.append({'tweet_id':tt['tweetId'],'replies': '\t'.join(rr)})
               
    finally:
        cursor.close()  # Ensure the cursor is closed after use
        
table_07 = pd.DataFrame(table_07)
table_07.to_csv(dir_outs + '07_Tweets_replies.csv',index=False)
table_07.to_parquet(dir_outs + '07_Tweets_replies.parquet',index=False)

In [None]:
# 08. Users
# user_id.
# location: the user defined location for the account. It might not be a real location nor machine parseable.
# created_at: date and time of account creation in Long format.
# hasProfileImage: 1 if the user customized the profile account, 0 if the user kept the default profile image.
# follower_count: number of followers of the user.
# followee_count: number of followees of the user. 
# statuses_count: number of published tweets, regardless of their type.
# is_verified: whether the account is verified.
# favorites_count: the number of tweets this user has liked in the account’s lifetime.
# listed_count: the number of public lists that this user is a member of.

# Note: this retrieves all users in the db. It could be restricted to get only the users in the file 01_Tweets_user_createdAt_place_

table_08 = deque() 

projection = {"defaultProfileImage" : 1, "created": 1, "listedCount" : 1,  "location" : 1,  "favoritesCount" : 1,  
              "followerCount" : 1, "friendsCount" : 1,  "statusesCount" : 1, "userId" : 1,  "verified" : 1 }

with client.start_session() as session:
    try: 
        cursor = db['users'].find({}, projection=projection, session=session, no_cursor_timeout=True)

        for uu in tqdm(cursor):
            table_08.append({'user_id':uu['tweetId'],
                             'location':uu['location'], 'created_at':uu['created'], 'hasProfileImage':uu['defaultProfileImage'], 'follower_count':uu[''],
                             'followee_count':uu['friendsCount'], 'statuses_count':uu['statusesCount'], 'is_verified':uu['verified'], 'favorites_count':uu['favoritesCount'], 'listed_count':uu['listedCount']})
               
    finally:
        cursor.close()  # Ensure the cursor is closed after use

    if len(table_08) >= 15_000_000:
        pd.DataFrame(table_08).to_parquet(dir_outs + f'08_Users{str(int(datetime.now().timestamp()))}.parquet',index=False)
        table_08.clear()

if len(table_08) > 0:
    pd.DataFrame(table_08).to_parquet(dir_outs + f'08_Users{str(int(datetime.now().timestamp()))}.parquet',index=False)
    table_08.clear()

In [None]:
# 09-User_graph.csv
from collections import defaultdict, Counter

user_tweets = Counter()
tweet_user_map = {}

df_1 = pd.read_csv(dir_outs + '01-Tweets_user_createdAt_place.csv')

for i in tqdm(range(0,len(df_1))):
    user_tweets[df_1['user_id'].values[i]] += 1
    tweet_user_map[df_1['tweet_id'].values[i]] = df_1['user_id'].values[i]

del df_1

In [None]:
df_2 = pd.read_csv(dir_outs + '02-Tweets_type.csv')

user_relations = defaultdict(defaultdict(set).copy)
for i in tqdm(range(0,len(df_2))):

    if df_2['tweet_id'].values[i] not in tweet_user_map:
        continue

    if df_2['retweet'].values[i] in tweet_user_map:
        user_relations[tweet_user_map[df_2['tweet_id'].values[i]]]['retweet'].add(tweet_user_map[df_2['retweet'].values[i]])
    
    if df_2['reply'].values[i] in tweet_user_map:
        user_relations[tweet_user_map[df_2['tweet_id'].values[i]]]['reply'].add(tweet_user_map[df_2['reply'].values[i]])

    if df_2['quote'].values[i] in tweet_user_map:
        user_relations[tweet_user_map[df_2['tweet_id'].values[i]]]['quote'].add(tweet_user_map[df_2['quote'].values[i]])

del df_2

In [None]:
df_7 = pd.read_csv(dir_outs + '07-Tweets_replies.csv')
for i in tqdm(range(0,len(df_7))):

    if df_7['tweet_id'].values[i] not in tweet_user_map:
        continue

    rr = df_7['replies'].values[i].split('\t')
    for rep in rr:
        if rep in tweet_user_map:
            user_relations[tweet_user_map[rep]]['reply'].add(tweet_user_map[df_7['tweet_id'].values[i]])

del df_7

In [None]:
df_6 = pd.read_csv(dir_outs + '06-Tweets_mentions.csv')

for i in tqdm(range(0,len(df_6))):

    if df_6['tweet_id'].values[i] not in tweet_user_map:
        continue

    rr = df_6['mentions'].values[i].split('\t')
    for rep in rr:
        user_relations[tweet_user_map[df_6['tweet_id'].values[i]]]['mention'].add(rep)

del df_6

In [None]:
map_names = {1:"retweet",2:"reply",3:"quote",4:"mention"}
inverse_map = {v:k for k,v in map_names.items()}

In [None]:
from collections import deque

ll = deque()
for user,rels in tqdm(user_relations.items()):

    for rt in rels:

        ll.append({'user_id':user,'relations':inverse_map[rt],'ids': '\t'.join(rels[rt])})

df_9 = pd.DataFrame(ll)

df_9.to_csv(dir_outs + '09-Users_graph.csv',index=False)