In [1]:
import json
from datetime import datetime
import time
from pymongo import MongoClient

In [2]:
def connect_to_mongo_database():
    try:
        client = MongoClient("localhost", 27017)
        print("Connected to MongoDB database")
        print(f"Databases available: {client.list_database_names()}")
        return client
    except Exception as e:
        print(f"Error occurred while connecting to MongoDB: {e}")

In [3]:
def create_database(client, db_name):
    try:
        db = client[db_name]
        print(f"Created the database {db_name} successfully")
        return db
    except Exception as e:
        print(f"Error occurred while creating database in mongo: {e}")

In [4]:
def create_collection(db, collection_name):
    try:
        collection = db[collection_name]
        print(f"Created the collection {collection_name} successfully")
        return collection
    except Exception as e:
        print(f"Error occurred while creating collection inside mongo database: {e}")
    

In [5]:
client = connect_to_mongo_database()

Connected to MongoDB database
Databases available: ['admin', 'bikedb', 'config', 'local', 'twitter-database']


In [6]:
db = create_database(client, "twitter-database")

Created the database twitter-database-new successfully


In [7]:
collection = create_collection(db, "tweets")

Created the collection tweets successfully


In [19]:
class Tweet(object):
    
    def __init__(self, tweet, retweet_count=1, source_tweet_id=0):
        self.tweet_id = tweet['id_str']
        self.text =  tweet['text']
        self.hashtag = list(map(lambda x: x["text"], tweet['entities']['hashtags']))
        self.user_id = tweet['user']['id_str']
        self.user_name = tweet['user']['name']
        self.user_screen_name = tweet['user']['screen_name']
        self.likes_count = tweet['favorite_count']
        self.retweet_count = retweet_count
        self.source_tweet_id = source_tweet_id
        self.tweet_score = 0
        self.created_at = self.get_created_date(tweet['created_at'])
        
    @staticmethod
    def get_created_date(created_at):
        created_at_date = datetime.strptime(created_at, "%a %b %d %H:%M:%S %z %Y")
        created_at_date = created_at_date.strftime("%Y-%m-%d %H:%M:%S")
        return created_at_date
    
    def get_tweet(self):
        return vars(self)

In [20]:
def insert_tweet(collection, tweet):
    try:
        collection.insert_one(tweet)
    except Exception as e:
        print(f"Error occurred while inserting tweet: {e}")

In [21]:
def update_tweet(collection, tweet_id):
    try:
        collection.update_one({'tweet_id': tweet_id}, {"$inc": {'retweet_count': 1}})
    except Exception as e:
        print(f"Error updating tweet {tweet_id}: {e}")

In [22]:
def tweet_exists(collection, tweet_id):
    tweet = collection.find_one({"tweet_id": tweet_id})
    return True if tweet else False

In [23]:
def get_tweets_count(collection):
    return collection.count_documents({})

In [24]:
def load_tweet_data_to_database(collection, file_path):
    
    start_time = time.time()
    
    with open(file_path, "r") as read_file:
        for line in read_file:
            try:
                data = json.loads(line)

                if tweet_exists(collection, data['id_str']):
                    continue

                if data['text'].startswith('RT'):
                    if data.get('retweeted_status'):
                        source_tweet_id = data.get('retweeted_status').get('id_str')
                        if tweet_exists(collection, source_tweet_id):
                            update_tweet(collection, source_tweet_id)
                        else:
                            tweet = data.get('retweeted_status')
                            tweet_object = Tweet(tweet)
                            insert_tweet(collection, tweet_object.get_tweet())
                else:
                    source_tweet_id = 0

                retweet_object = Tweet(data, 0, source_tweet_id) 
              
                insert_tweet(collection, retweet_object.get_tweet())

            except:
                continue
        
    print(f"Successfully inserted {get_tweets_count(collection)} tweets in {time.time() - start_time} seconds")

In [25]:
start_time = time.time()
load_tweet_data_to_database(collection, "../data/corona-out-2")
print(f"Successfully loaded collection in {time.time() - start_time} seconds")

Successfully inserted 22144 tweets in 76.57179021835327 seconds
Successfully loaded collection in 76.57342886924744 seconds


In [26]:
start_time = time.time()
load_tweet_data_to_database(collection, "../data/corona-out-3")
print(f"Successfully loaded collection in {time.time() - start_time} seconds")

Successfully inserted 134139 tweets in 2585.4248919487 seconds
Successfully loaded collection in 2585.431185245514 seconds


In [27]:
client.list_database_names()

['admin',
 'bikedb',
 'config',
 'local',
 'twitter-database',
 'twitter-database-new']

In [28]:
collection.count_documents({})

134139

In [30]:
# Update all documents
collection.update_many(
    {},  # Filter to select all documents
    [
        {'$set': {'tweet_score': {'$add': [{'$multiply': [0.6, '$retweet_count']}, {'$multiply': [0.4, '$likes_count']}]} } }
    ]
)


UpdateResult({'n': 134139, 'nModified': 134139, 'ok': 1.0, 'updatedExisting': True}, acknowledged=True)

In [31]:
collection.create_index([("text", "text")])

'text_text'

In [37]:
collection.create_index([("user_screen_name", 1), ("tweet_score", -1)])

'user_screen_name_1_tweet_score_-1'