## Configuration

### Imports

In [3]:
# Utilities
from IPython.display import display
from fastprogress import master_bar, progress_bar
import os
import ntpath
import numpy as np
import pandas as pd
from collections.abc import MutableMapping

# MongoDB functionality
from pymongo.errors import BulkWriteError
from pymongo import MongoClient, InsertOne
from pymongo.bulk import BulkOperationBuilder

# Indexes
import uuid
from bson import ObjectId

### Variables

In [4]:
# Directories where CSV data is stored
ROOT_DIR = "/home/mattia/javier/botbusters-spanish-general-elections-network-analysis/"
DATA_DIR = ROOT_DIR + "data/"

# Change path to root
os.chdir(ROOT_DIR)


# MongoDB parameters
mongoclient = MongoClient('localhost', 27017)
db = mongoclient.influence

### Support Functions

In [5]:
def make_uuid(uuid_str):
    """Makes an UUID from string
    
    Keyword arguments:
    uuid_str -- uuid string to be converted into UUID
    """
    uuid_str = str(uuid_str)
    if not uuid_str.strip():
        return None
    try:
        return uuid.UUID(uuid_str)
    except Exception as ex:
        print(uuid_str, ex)
        return None
    
def make_objid(text):
    """Makes an ObjectId of 4 bytes
    
    Keyword arguments:
    text -- string to be converted into Object ID
    """
    text = str(text)
    if not text.strip():
        return None
    try:
        return ObjectId(text.rjust(24,"0"))
    except Exception as ex:
        print(text, ex)
        return None
    
def df_to_mongodb(df, collection):
    """Saves the dataframe in a MongoDB collection

    Keyword arguments:
    df -- dataframe to dump
    collection -- MongoDB collection to fulfill
    """
    try:
 
        if df is None:
            return

        print("Preparing DB operations...", end=" ")
        records = df.to_dict('records')
        
        operations = []
        for record in progress_bar(records):
            operations.append(InsertOne(record))  

            if len(operations) > 20000:
                results = collection.bulk_write(operations)
                print("M:", str(results.matched_count).rjust(8, " "),
                      " I:", str(results.inserted_count).rjust(8, " "),
                      " U:", str(results.upserted_count).rjust(8, " "))
                operations = []

        if len(operations) > 0: 
            results = collection.bulk_write(operations)
            print("M:", str(results.matched_count).rjust(8, " "),
                  " I:", str(results.inserted_count).rjust(8, " "),
                  " U:", str(results.upserted_count).rjust(8, " "))

    except Exception as e:
        print("Exception. Message:", e)

## Load CSV in MongoDB

### Tweets collection

In [4]:
tweets_columns = {
    # tweets
    0: str,
    1: str,
    2: str,
    3: str,
    4: 'Int64',
    5: str,                     
    6: str
}

def read_tweets(filename):
    """Parses the tweets CSV returning a DataFrame.
    
    Keyword arguments:
    filename -- name of the CSV
    """
    print("Processing", ntpath.basename(filename), end="\t")

    df = pd.read_csv(filename, 
                     low_memory=False, 
                     keep_default_na=True, 
                     dtype=tweets_columns, 
                     usecols=[0,1,2,3,4,5,6],
                     names=['_id', 'user_id', 'source', 'date', 'retweets', 'tweet_id', 'url'])

    print("CSV", end=" ")

    
    # Discard URL entries
    df = df[df.source=='twitter']
    df.drop(columns=['source'],inplace=True)
    
    # Make index as UUID
    df['_id'] = df['_id'].apply(make_uuid)
    
    # Create ObjectIDs to avoid any potential issue
    df['user_id'] = df['user_id'].apply(make_objid)
    df['tweet_id'] = df['tweet_id'].apply(make_objid)

    # Make datetime objects
    df['date'] = pd.to_datetime(df['date'], infer_datetime_format=False, format="%Y-%m-%d %H:%M:%S")
    
    # Force integer
    df['retweets'] = df['retweets'].astype('int')

    print("OK", end="; ")

    print("#:", len(df), end=" entries; ")

    return df

In [5]:
%%time
df_tweets = read_tweets(DATA_DIR+'dataset/tweets.csv')
display(df_tweets.head())
df_to_mongodb(df_tweets, db.tweets)

Processing tweets.csv	CSV OK; #: 1875457 entries; 

Unnamed: 0,_id,user_id,date,retweets,tweet_id,url
36751,a86e778c-46a3-11ea-9505-02420a0000af,276977398,2019-11-04 21:07:45,0,1191462058602192907,https://twitter.com/nenamoni92/status/11914620...
36787,44a195f2-468c-11ea-9505-02420a0000af,2906096735,2019-11-04 22:25:58,0,1191481739606183937,https://twitter.com/Fenix_Nebeda/status/119148...
37086,44a1996c-468c-11ea-9505-02420a0000af,1620996282,2019-11-04 22:54:41,0,1191488966639443968,https://twitter.com/marisa0687/status/11914889...
37151,75dba318-46a7-11ea-9505-02420a0000af,1163542858604916739,2019-11-04 21:17:16,0,1191464452987772928,https://twitter.com/Hispanica16/status/1191464...
37282,77c4e81a-46a7-11ea-9505-02420a0000af,279465279,2019-11-04 21:15:49,1,1191464087064109058,https://twitter.com/SergiJioKun/status/1191464...


Preparing DB operations... 

M:        0  I:    20001  U:        0
M:        0  I:    20001  U:        0
M:        0  I:    20001  U:        0
M:        0  I:    20001  U:        0
M:        0  I:    20001  U:        0
M:        0  I:    20001  U:        0
M:        0  I:    20001  U:        0
M:        0  I:    20001  U:        0
M:        0  I:    20001  U:        0
M:        0  I:    20001  U:        0
M:        0  I:    20001  U:        0
M:        0  I:    20001  U:        0
M:        0  I:    20001  U:        0
M:        0  I:    20001  U:        0
M:        0  I:    20001  U:        0
M:        0  I:    20001  U:        0
M:        0  I:    20001  U:        0
M:        0  I:    20001  U:        0
M:        0  I:    20001  U:        0
M:        0  I:    20001  U:        0
M:        0  I:    20001  U:        0
M:        0  I:    20001  U:        0
M:        0  I:    20001  U:        0
M:        0  I:    20001  U:        0
M:        0  I:    20001  U:        0
M:        0  I:    20001  U:        0
M:        0 

### Users collection

In [6]:
users_columns = {
    # users
    0: str,
    1: str
}

def read_users(filename):
    """Parses the users CSV returning a DataFrame.
    
    Keyword arguments:
    filename -- name of the CSV
    """
    print("Processing", ntpath.basename(filename), end="\t")
    df = pd.read_csv(filename, 
                     low_memory=False, 
                     keep_default_na=True, 
                     dtype=users_columns, 
                     names=['_id', 'username'])
    print("CSV", end=" ")

    # Make index as UUID
    df['_id'] = df['_id'].apply(make_objid)
    print("OK", end="; ")
    print("#:", len(df), end=" entries; ")
    return df

In [7]:
%%time
df_users = read_users(DATA_DIR+'dataset/users.csv')
display(df_users.head())
df_to_mongodb(df_users, db.users)

Processing users.csv	CSV OK; #: 2802467 entries; 

Unnamed: 0,_id,username
0,2266588688,PoderMigrante_N
1,471028961,mabelsaroman
2,2792368467,Javi_Diaz2000
3,180918124,RoblexxBlog
4,1184444845047386112,ArturoFlexible


Preparing DB operations... 

M:        0  I:    20001  U:        0
M:        0  I:    20001  U:        0
M:        0  I:    20001  U:        0
M:        0  I:    20001  U:        0
M:        0  I:    20001  U:        0
M:        0  I:    20001  U:        0
M:        0  I:    20001  U:        0
M:        0  I:    20001  U:        0
M:        0  I:    20001  U:        0
M:        0  I:    20001  U:        0
M:        0  I:    20001  U:        0
M:        0  I:    20001  U:        0
M:        0  I:    20001  U:        0
M:        0  I:    20001  U:        0
M:        0  I:    20001  U:        0
M:        0  I:    20001  U:        0
M:        0  I:    20001  U:        0
M:        0  I:    20001  U:        0
M:        0  I:    20001  U:        0
M:        0  I:    20001  U:        0
M:        0  I:    20001  U:        0
M:        0  I:    20001  U:        0
M:        0  I:    20001  U:        0
M:        0  I:    20001  U:        0
M:        0  I:    20001  U:        0
M:        0  I:    20001  U:        0
M:        0 

### Hashtags collection

In [8]:
hashtags_columns = {
    # hashtags
    0: str,
    1: str
}

def read_hashtags(filename):
    """Parses the hashtags CSV returning a DataFrame.
    
    Keyword arguments:
    filename -- name of the CSV
    """
    print("Processing", ntpath.basename(filename), end="\t")
    df = pd.read_csv(filename, 
                     low_memory=False, 
                     keep_default_na=True, 
                     dtype=hashtags_columns, 
                     names=['tweet_id', 'hashtag'])
    print("CSV", end=" ")

    # Make index
    df['tweet_id'] = df['tweet_id'].apply(make_uuid)
    print("OK", end="; ")
    print("#:", len(df), end=" entries; ")
    return df

In [None]:
%%time
df_hashtags = read_hashtags(DATA_DIR+'dataset/hashtags.csv')
display(df_hashtags.head())
df_to_mongodb(df_hashtags, db.hashtags)

### Mentions collection

In [10]:
mentions_columns = {
    # hashtags
    0: str,
    1: str
}

def read_mentions(filename):
    """Parses the mentions CSV returning a DataFrame.
    
    Keyword arguments:
    filename -- name of the CSV
    """
    print("Processing", ntpath.basename(filename), end="\t")
    df = pd.read_csv(filename, 
                     low_memory=False, 
                     keep_default_na=True, 
                     dtype=mentions_columns, 
                     names=['user_id','tweet_id'])
    print("CSV", end=" ")

    # Make indexes
    df['tweet_id'] = df['tweet_id'].apply(make_uuid)
    df['user_id'] = df['user_id'].apply(make_objid)

    print("OK", end="; ")
    print("#:", len(df), end=" entries; ")
    return df

In [None]:
%%time
df_mentions = read_mentions(DATA_DIR+'dataset/mentions.csv')
display(df_mentions.head())
df_to_mongodb(df_mentions, db.mentions)

### Retweets collection

In [12]:
retweets_columns = {
    0: str,
    1: str,
    2: str,
    3: str
}

def read_retweets(filename):
    """Parses the retweets CSV returning a DataFrame.
    
    Keyword arguments:
    filename -- name of the CSV
    """
    print("Processing", ntpath.basename(filename), end="\t")
    df = pd.read_csv(filename, 
                     low_memory=False, 
                     keep_default_na=True, 
                     dtype=retweets_columns, 
                     names=['_id', 'tweet_id', 'user_id', 'date']
                    )
    
    print("CSV", end=" ")

    # Make index
    df['_id'] = df['_id'].apply(make_objid)
    df['tweet_id'] = df['tweet_id'].apply(make_objid)
    df['user_id'] = df['user_id'].apply(make_objid)
    
    df['date'] = pd.to_datetime(df['date'], infer_datetime_format=False, format="%Y-%m-%d %H:%M:%S")


    print("OK", end="; ")
    print("#:", len(df), end=" entries; ")
    return df

In [None]:
%%time
df_retweets = read_retweets(DATA_DIR+'dataset/retweets.csv')
display(df_retweets.head())
df_to_mongodb(df_retweets, db.retweets)

## Check data coherence

In [6]:
def flatten(d, parent_key='', sep='_'):
    """Formats MongoDB results
    
    Keyword arguments:
    d -- dictionary with key and uncleaned values
    parent_key --
    sep --
    """
    items = []
    for k, v in d.items():
        new_key = parent_key + sep + k if parent_key else k
        if isinstance(v, MutableMapping):
            items.extend(flatten(v, new_key, sep=sep).items())
        else:
            items.append((new_key, v))
    return dict(items)

## Load CSV in MongoDB

### Tweets

In [7]:
def get_tweets(collection):
    """
    Gets tweets
    
    collection - Tweets MongoDB collection
    """
    tweets = list(collection.find({},
                                 {'_id' : True, 'user_id' : True, 'tweet_id' : True}))
    
    print("Number of tweets in DB:", len(tweets))
    tweets = [flatten(t) for t in tweets]
    df_tweets = pd.DataFrame(tweets)
    return df_tweets

In [8]:
%%time
df_tweets = get_tweets(db.tweets)
display(df_tweets.head(5))

Number of tweets in DB: 1875457


Unnamed: 0,_id,user_id,tweet_id
0,a86e778c-46a3-11ea-9505-02420a0000af,276977398,1191462058602192907
1,44a195f2-468c-11ea-9505-02420a0000af,2906096735,1191481739606183937
2,44a1996c-468c-11ea-9505-02420a0000af,1620996282,1191488966639443968
3,75dba318-46a7-11ea-9505-02420a0000af,1163542858604916739,1191464452987772928
4,77c4e81a-46a7-11ea-9505-02420a0000af,279465279,1191464087064109058


CPU times: user 26.2 s, sys: 1.89 s, total: 28.1 s
Wall time: 29.6 s


### Users

In [9]:
def get_users(collection):
    """
    Gets users
    
    collection - Users MongoDB collection
    """
    users = list(collection.find({},
                                 {'_id' : True}))
    
    print("Number of users in DB:", len(users))
    users = [flatten(u) for u in users]
    df_users = pd.DataFrame(users)
    return df_users

In [10]:
%%time
df_users = get_users(db.users)
display(df_users.head(5))

Number of users in DB: 2802467


Unnamed: 0,_id
0,2266588688
1,471028961
2,2792368467
3,180918124
4,1184444845047386112


CPU times: user 20.5 s, sys: 1.9 s, total: 22.4 s
Wall time: 24.1 s


#### Checking tweets without users

In [14]:
%%time
tweets_without_users =  df_tweets[~df_tweets.user_id.isin(df_users._id)]
print(len(tweets_without_users))

0
CPU times: user 4.89 s, sys: 0 ns, total: 4.89 s
Wall time: 4.89 s


#### Checking users without tweets

In [11]:
%%time
users_without_tweets =  df_users[~df_users._id.isin(df_tweets.user_id)]
print(len(users_without_tweets))

2426541
CPU times: user 3.76 s, sys: 0 ns, total: 3.76 s
Wall time: 3.76 s


### Retweets

In [14]:
def get_retweets(collection):
    """
    Gets retweets
    
    collection - Retweets MongoDB collection
    """
    retweets = list(collection.find({},
                                 {'_id': False, 'tweet_id' : True, 'user_id' : True}))
    
    print("Number of retweets in DB:", len(retweets))
    retweets = [flatten(r) for r in retweets]
    df_retweets = pd.DataFrame(retweets)
    return df_retweets

In [15]:
%%time
df_retweets = get_retweets(db.retweets)
display(df_retweets.head(5))

Number of retweets in DB: 39344305


Unnamed: 0,_id,tweet_id,user_id
0,1193668589628383234,1193667913368121351,4460826197
1,1192582587753140230,1192581634291355649,1075150211725619200
2,1185317085502738433,1185312751301906433,3251522811
3,1190660503070236672,1190659978471911424,904385876
4,1190660746096644099,1190659978471911424,952575694021758976


CPU times: user 8min 22s, sys: 44.3 s, total: 9min 7s
Wall time: 9min 33s


#### Checking retweets without referenced tweet

In [16]:
%%time
retweets_without_tweet =  df_retweets[~df_retweets.tweet_id.isin(df_tweets.tweet_id)]
print(len(retweets_without_tweet))

0
CPU times: user 34.2 s, sys: 419 ms, total: 34.6 s
Wall time: 34.6 s


#### Checking retweets without user

In [17]:
%%time
retweets_without_user =  df_retweets[~df_retweets.user_id.isin(df_users._id)]
print(len(retweets_without_user))

0
CPU times: user 55.7 s, sys: 62.4 ms, total: 55.8 s
Wall time: 55.8 s


### Mentions

In [18]:
def get_mentions(collection):
    """
    Gets mentions
    
    collection - Mentions MongoDB collection
    """
    mentions = list(collection.find({},
                                 {'_id': False, 'tweet_id' : True, 'user_id' : True}))
    
    print("Number of mentions in DB:", len(mentions))
    mentions = [flatten(m) for m in mentions]
    df_mentions = pd.DataFrame(mentions)
    return df_mentions

In [19]:
%%time
df_mentions = get_mentions(db.mentions)
display(df_mentions.head(5))

Number of mentions in DB: 2047447


Unnamed: 0,user_id,tweet_id
0,405499878,c677460e-468b-11ea-9505-02420a0000af
1,850752506972045314,c677460e-468b-11ea-9505-02420a0000af
2,81553608,c677460e-468b-11ea-9505-02420a0000af
3,68740712,c677460e-468b-11ea-9505-02420a0000af
4,13623532,1bc32100-46cd-11ea-9505-02420a0000af


CPU times: user 18.6 s, sys: 51.7 ms, total: 18.6 s
Wall time: 20 s


#### Checking mentions without referenced tweet

In [20]:
%%time
mentions_without_tweet =  df_mentions[~df_mentions.tweet_id.isin(df_tweets._id)]
print(len(mentions_without_tweet))

0
CPU times: user 9min 58s, sys: 215 ms, total: 9min 58s
Wall time: 9min 58s


#### Checking mentions without user

In [21]:
%%time
mentions_without_user =  df_mentions[~df_mentions.user_id.isin(df_users._id)]
print(len(mentions_without_user))

124762
CPU times: user 5.16 s, sys: 7.68 ms, total: 5.16 s
Wall time: 5.16 s


### Mentions

In [24]:
def get_hashtags(collection):
    """
    Gets mentions
    
    collection - Hashtags MongoDB collection
    """
    hashtags = list(collection.find({},
                                 {'_id': False, 'tweet_id' : True}))
    
    print("Number of hashtags in DB:", len(hashtags))
    hashtags = [flatten(h) for h in hashtags]
    df_hashtags = pd.DataFrame(hashtags)
    return df_hashtags

In [26]:
%%time
df_hashtags = get_hashtags(db.hashtags)
display(df_hashtags.head(5))

Number of hashtags in DB: 2603272


Unnamed: 0,tweet_id
0,f7c581e4-4681-11ea-a6d9-02420a000681
1,f7c581e4-4681-11ea-a6d9-02420a000681
2,f7c581e4-4681-11ea-a6d9-02420a000681
3,c677460e-468b-11ea-9505-02420a0000af
4,c677460e-468b-11ea-9505-02420a0000af


CPU times: user 22.8 s, sys: 83.8 ms, total: 22.9 s
Wall time: 24.4 s


#### Checking hashtags without tweet

In [27]:
%%time
hashtags_without_tweet =  df_hashtags[~df_hashtags.tweet_id.isin(df_tweets._id)]
print(len(hashtags_without_tweet))

0
CPU times: user 12min 42s, sys: 312 ms, total: 12min 42s
Wall time: 12min 42s


## Other checkings

#### Freq of each user

In [28]:
freq = df_tweets.user_id.value_counts()

In [31]:
freq2 = df_retweets.user_id.value_counts()

In [32]:
freq2.head(10)

000000000000002479879152    15063
000000000000003064247597    11116
000000000000003387713985     9539
000001061332198266204164     9475
000000000000001723241016     8468
000000830507792998027266     8062
000000000000000128534745     7859
000000825432095577296896     7838
000000000000002420883178     7763
000000000000000216711237     7700
Name: user_id, dtype: int64

In [33]:
fsum = freq.add(freq2, fill_value=0)
display(fsum.head(10))

000000000000000000001054     1.0
000000000000000000001059     1.0
000000000000000000002862     1.0
000000000000000000003065    13.0
000000000000000000003911     1.0
000000000000000000003968     5.0
000000000000000000003971    60.0
000000000000000000005748     1.0
000000000000000000005803     3.0
000000000000000000006490     4.0
Name: user_id, dtype: float64

In [50]:
fsum.to_pickle(path=DATA_DIR+'users_freq.pickle')