In [5]:
import json
from tqdm import tqdm
from datetime import datetime
import pytz
import pymongo
import mysql.connector

In [6]:
def read_json_file(filename):
    """
    Read a JSON file where each JSON object is separated by a newline.
    """
    with open(filename, 'r') as f:
        json_list = []
        json_object = ''
        for line in f:
            if line.strip():
                json_object += line
            else:
                if json_object:
                    json_list.append(json.loads(json_object))
                    json_object = ''
        if json_object:
            json_list.append(json.loads(json_object))
    return json_list

In [7]:
def extract_hashtags(tweet_obj):
    """Extract hashtags from tweet entities."""
    return [tag['text'] for tag in tweet_obj['entities']['hashtags']]

In [8]:
def get_tweet_source(tweet_obj):
    """Get tweet source from tweet object."""
    source = tweet_obj['source'].lower()
    if 'android' in source:
        return 'android'
    elif 'iphone' in source:
        return 'iphone'
    else:
        return 'web'

In [9]:
def process_user_data(user_obj, user_list, user_dict):
    """Process user data and update user_list."""
    selected_keys = ['id', 'screen_name', 'description', 'followers_count', 'created_at']
    user_data = {key: user_obj.get(key) for key in selected_keys}

    if user_data['id'] not in user_dict:
        user_list.append(user_data)
        user_dict[user_data['id']] = True

In [10]:
def process_tweet_data(tweet_obj, tweet_list, user_list, user_dict, tweet_dict, additional_keys=None):
    """Process tweet data and update tweet_list."""
    process_user_data(tweet_obj['user'], user_list, user_dict)

    if 'extended_tweet' in tweet_obj:
        tweet_obj['text'] = tweet_obj['extended_tweet']['full_text']

    tweet_obj['hashtags'] = extract_hashtags(tweet_obj)
    tweet_obj['source'] = get_tweet_source(tweet_obj)
    tweet_obj['user_id'] = tweet_obj['user']['id']

    if tweet_obj.get('in_reply_to_user_id') is not None and tweet_obj.get('in_reply_to_screen_name') is not None:
        new_user_obj = {
            'id': tweet_obj['in_reply_to_user_id'],
            'screen_name': tweet_obj['in_reply_to_screen_name'],
            'description': None,
            'followers_count': None,
            'created_at': None
        }
        process_user_data(new_user_obj, user_list, user_dict)

    selected_keys = ['id', 'text', 'created_at', 'source', 'retweet_count', 'favorite_count', 'lang', 'reply_count', 'user_id', 'hashtags']
    if additional_keys:
        selected_keys += additional_keys

    new_tweet = {key: tweet_obj.get(key) for key in selected_keys}

    if new_tweet['id'] not in tweet_dict:
        tweet_list.append(new_tweet)
        tweet_dict[new_tweet['id']] = True

In [11]:
def flatten_tweets(tweets):
    """ Flattens out tweet dictionaries so relevant JSON is in a top-level dictionary. """
    tweets_list = []
    retweets_list = []
    quotes_list = []
    replies_list = []
    user_list = []
    user_dict = {}
    tweet_dict = {}

    for tweet_obj in tqdm(tweets, desc="Processing tweets"):
        if tweet_obj['in_reply_to_status_id'] is not None:
            process_tweet_data(tweet_obj, replies_list, user_list, user_dict, tweet_dict, additional_keys=['in_reply_to_status_id', 'in_reply_to_user_id', 'in_reply_to_screen_name'])

        # Check for retweeted_status
        is_retweet = 'retweeted_status' in tweet_obj
        if is_retweet:
            tweet_obj['retweeted_status_id'] = tweet_obj['retweeted_status']['id']
            tweet_obj['retweeted_status_user_id'] = tweet_obj['retweeted_status']['user']['id']
            tweet_obj['retweeted_status_screen_name'] = tweet_obj['retweeted_status']['user']['screen_name']
            process_tweet_data(tweet_obj, retweets_list, user_list, user_dict, tweet_dict, additional_keys=['retweeted_status_id', 'retweeted_status_user_id', 'retweeted_status_screen_name'])
            process_tweet_data(tweet_obj['retweeted_status'], tweets_list, user_list, user_dict, tweet_dict)

        # Check for quoted_status
        is_quote = 'quoted_status' in tweet_obj
        if is_quote:
            tweet_obj['quoted_status_id'] = tweet_obj['quoted_status']['id']
            tweet_obj['quoted_status_user_id'] = tweet_obj['quoted_status']['user']['id']
            tweet_obj['quoted_status_screen_name'] = tweet_obj['quoted_status']['user']['screen_name']
            process_tweet_data(tweet_obj, quotes_list, user_list, user_dict, tweet_dict, additional_keys=['quoted_status_id', 'quoted_status_user_id', 'quoted_status_screen_name'])
            process_tweet_data(tweet_obj['quoted_status'], tweets_list, user_list, user_dict, tweet_dict)

        if not (is_retweet or is_quote or tweet_obj['in_reply_to_status_id']):
            process_tweet_data(tweet_obj, tweets_list, user_list, user_dict, tweet_dict)

    return tweets_list, retweets_list, quotes_list, replies_list, user_list

In [12]:
def convert_to_edt_and_epoch(tweets_list):
    eastern_tz = pytz.timezone("US/Eastern")
    
    for tweet in tweets_list:
        created_at = tweet.get('created_at')
        
        if created_at is not None:
            created_at_dt = datetime.strptime(created_at, '%a %b %d %H:%M:%S %z %Y')
            
            created_at_edt = created_at_dt.astimezone(eastern_tz)
            tweet['created_at_edt'] = created_at_edt.strftime('%a %b %d %H:%M:%S %Z %Y')
            tweet['created_at_epoch'] = int(created_at_dt.timestamp())
        else:
            tweet['created_at_edt'] = None
            tweet['created_at_epoch'] = None

In [13]:
# Read and process JSON files
tweets_data_1 = read_json_file("../../data/corona-out-2")
tweets_data_2 = read_json_file("../../data/corona-out-3")
tweets_data = tweets_data_1 + tweets_data_2

tweets_list, retweets_list, quotes_list, replies_list, user_list = flatten_tweets(tweets_data)

# Convert timestamps
convert_to_edt_and_epoch(tweets_list)
convert_to_edt_and_epoch(retweets_list)
convert_to_edt_and_epoch(quotes_list)
convert_to_edt_and_epoch(replies_list)
convert_to_edt_and_epoch(user_list)

# Print lengths of lists
print(f"tweets_list: {len(tweets_list)}, retweets_list: {len(retweets_list)}, quotes_list: {len(quotes_list)}, replies_list: {len(replies_list)}, user_list: {len(user_list)}")
print(f"tweets_data: {len(tweets_data)}")

Processing tweets: 100%|██████████| 120434/120434 [00:03<00:00, 30643.12it/s]


tweets_list: 41529, retweets_list: 72244, quotes_list: 8696, replies_list: 16132, user_list: 117795
tweets_data: 120434


In [14]:
# Establish a connection to MongoDB
client = pymongo.MongoClient("mongodb://localhost:27017/")

# Select the database
db = client["twitter_database"]

# Define the collection names
collections = ["tweets_collection", "retweets_collection", "replies_collection", "quotes_collection"]

# Delete the collections
for collection_name in collections:
    if collection_name in db.list_collection_names():
        # If the collection exists, drop it
        db[collection_name].drop()
        print(f"Deleted collection: {collection_name}")
    else:
        print(f"Collection not found: {collection_name}")

# Create or load the collections
collections_dict = {}
for collection_name in collections:
    if collection_name in db.list_collection_names():
        # If the collection exists, load it
        collection = db.get_collection(collection_name)
        print(f"Loaded collection: {collection_name}")
    else:
        # If the collection does not exist, create it
        collection = db.create_collection(collection_name)
        print(f"Created collection: {collection_name}")

    collections_dict[collection_name] = collection

tweets_collection = collections_dict["tweets_collection"]
retweets_collection = collections_dict["retweets_collection"]
replies_collection = collections_dict["replies_collection"]
quotes_collection = collections_dict["quotes_collection"]

Deleted collection: tweets_collection
Deleted collection: retweets_collection
Deleted collection: replies_collection
Deleted collection: quotes_collection
Created collection: tweets_collection
Created collection: retweets_collection
Created collection: replies_collection
Created collection: quotes_collection


In [15]:
# Insert the values into the MongoDB collections one by one

# Insert the values into the MongoDB collections one by one with progress bars
for tweet in tqdm(tweets_list, desc="Inserting tweets", colour="green", bar_format="{l_bar}{bar}| {n_fmt}/{total_fmt} [{elapsed}]"):
    tweets_collection.insert_one(tweet)

for retweet in tqdm(retweets_list, desc="Inserting retweets", colour="green", bar_format="{l_bar}{bar:10}| {n_fmt}/{total_fmt} [{elapsed}]"):
    retweets_collection.insert_one(retweet)

for quote in tqdm(quotes_list, desc="Inserting quotes", colour="green", bar_format="{l_bar}{bar:10}| {n_fmt}/{total_fmt} [{elapsed}]"):
    quotes_collection.insert_one(quote)

for reply in tqdm(replies_list, desc="Inserting replies", colour="green", bar_format="{l_bar}{bar:10}| {n_fmt}/{total_fmt} [{elapsed}]"):
    replies_collection.insert_one(reply)

print("Data insertion completed.")

Inserting tweets: 100%|[32m██████████[0m| 41529/41529 [00:17]
Inserting retweets: 100%|[32m██████████[0m| 72244/72244 [00:30]
Inserting quotes: 100%|[32m██████████[0m| 8696/8696 [00:03]
Inserting replies: 100%|[32m██████████[0m| 16132/16132 [00:06]

Data insertion completed.





In [16]:
# Establish a connection to MySQL without specifying the database
mysql_connection = mysql.connector.connect(
    host="localhost",
    user="root",
    password="team6"
)

# Check if the twitter_database exists and create it if not
cursor = mysql_connection.cursor()
cursor.execute("CREATE DATABASE IF NOT EXISTS twitter_database;")
mysql_connection.commit()

# Close the initial connection
cursor.close()
mysql_connection.close()

In [17]:
# Establish a connection to MySQL again, this time specifying the database
mysql_connection = mysql.connector.connect(
    host="localhost",
    user="root",
    password="team6",
    database="twitter_database"
)

# Create the users table if it doesn't exist
create_users_table_query = """
CREATE TABLE IF NOT EXISTS users (
    id BIGINT PRIMARY KEY,
    screen_name VARCHAR(255) NOT NULL,
    description TEXT,
    followers_count INT,
    created_at VARCHAR(255),
    created_at_edt VARCHAR(255),
    created_at_epoch INT
);
"""

cursor = mysql_connection.cursor()
cursor.execute("DROP TABLE IF EXISTS users;")
cursor.execute(create_users_table_query)
mysql_connection.commit()

# Function to insert user data one by one into the MySQL database
def insert_user_data(user):
    insert_user_query = """
    INSERT INTO users (id, screen_name, description, followers_count, created_at, created_at_edt, created_at_epoch)
    VALUES (%(id)s, %(screen_name)s, %(description)s, %(followers_count)s, %(created_at)s, %(created_at_edt)s, %(created_at_epoch)s)
    ON DUPLICATE KEY UPDATE
        screen_name = %(screen_name)s,
        description = %(description)s,
        followers_count = %(followers_count)s,
        created_at = %(created_at)s,
        created_at_edt = %(created_at_edt)s,
        created_at_epoch = %(created_at_epoch)s;
    """

    cursor.execute(insert_user_query, user)
    mysql_connection.commit()

# Insert the user data into the MySQL database one by one with progress bars
for user in tqdm(user_list, desc="Inserting users", colour="green", bar_format="{l_bar}{bar:10}| {n_fmt}/{total_fmt} [{elapsed}]"):
    insert_user_data(user)

# Close the MySQL connection
cursor.close()
mysql_connection.close()

print("User data insertion completed.")

Inserting users: 100%|[32m██████████[0m| 117795/117795 [01:47]

User data insertion completed.





# Creating Indexes

## MySQL Indexes:

1.  For `get_user_info(user_id)` query:

*   Index on `id` field to quickly search users by their ID:

`CREATE INDEX idx_users_id ON users (id);`

========================================================================================================================================================================


2.  For `get_all_users(search_string, date_range)` query:

*   Index on `screen_name`, `created_at_epoch`, and `followers_count` fields to speed up search, date range filtering, and sorting by followers count:

`CREATE INDEX idx_users_screen_name_created_at_epoch_followers_count ON users (screen_name, created_at_epoch, followers_count);`

========================================================================================================================================================================


3.  For `get_top_users_by_followers(limit=10)` query:

*   Index on `followers_count` field to quickly sort users by followers count:

`CREATE INDEX idx_users_followers_count ON users (followers_count);`


## MongoDB Indexes:

1.  For `get_user_tweets(user_id, collection)` query:

*   Index on `user_id` and `created_at_edt` fields to efficiently filter and sort tweets by user ID and creation date:

`db.{collection_name}.createIndex({user_id: 1, created_at_edt: -1})`

========================================================================================================================================================================

2.  For `display_collection(collection, search_string, date_range)` query:

*   Index on `text`, `created_at_epoch`, and `retweet_count` fields for efficient text search, date range filtering, and sorting by retweet count:

`db.{collection_name}.createIndex({text: "text", created_at_epoch: 1, retweet_count: -1})`

========================================================================================================================================================================


3.  For `get_top_tweets_by_metric(metric, limit=10)` query:

*   Index on the metric field (e.g., `retweet_count`) to quickly sort tweets by the metric:

`db.{collection_name}.createIndex({{metric}: -1})`

========================================================================================================================================================================


4.  For `display_top_hashtags(limit=10)` query:

*   Index on `hashtags` field to speed up aggregation of hashtags:

`db.{collection_name}.createIndex({hashtags: 1})`

========================================================================================================================================================================


5.  For `display_top_sources(limit=10)` query:

*   Index on `source` field to speed up aggregation of sources:

`db.{collection_name}.createIndex({source: 1})`

========================================================================================================================================================================


6.  For `display_hashtags(collection, search_string, date_range)` query:

*   Index on `hashtags`, `created_at_epoch`, and `retweet_count` fields for efficient hashtag search, date range filtering, and sorting by retweet count:

`db.{collection_name}.createIndex({hashtags: 1, created_at_epoch: 1, retweet_count: -1})`

In [18]:
# Create MySQL indexes
mysql_connection = mysql.connector.connect(
    host="localhost",
    user="root",
    password="team6",
    database="twitter_database"
)
cursor = mysql_connection.cursor()

mysql_indexes = [
    ("idx_users_id", "id"),
    ("idx_users_screen_name_created_at_epoch_followers_count", "screen_name, created_at_epoch, followers_count"),
    ("idx_users_followers_count", "followers_count")
]

for index_name, fields in mysql_indexes:
    query = f"CREATE INDEX {index_name} ON users ({fields})"
    cursor.execute(query)

mysql_connection.commit()

# Close the MySQL connection
cursor.close()
mysql_connection.close()

In [19]:
# Create indexes for each collection
tweets_collection.create_index([("user_id", 1), ("created_at_edt", -1)])
quotes_collection.create_index([("user_id", 1), ("created_at_edt", -1)])
replies_collection.create_index([("user_id", 1), ("created_at_edt", -1)])
retweets_collection.create_index([("user_id", 1), ("created_at_edt", -1)])

tweets_collection.create_index([("text", "text"), ("created_at_epoch", 1), ("retweet_count", -1)])
quotes_collection.create_index([("text", "text"), ("created_at_epoch", 1), ("retweet_count", -1)])
replies_collection.create_index([("text", "text"), ("created_at_epoch", 1), ("retweet_count", -1)])
retweets_collection.create_index([("text", "text"), ("created_at_epoch", 1), ("retweet_count", -1)])

tweets_collection.create_index([("retweet_count", -1)])
tweets_collection.create_index([("favorite_count", -1)])

tweets_collection.create_index([("hashtags", 1)])

tweets_collection.create_index([("source", 1)])

tweets_collection.create_index([("hashtags", 1), ("created_at_epoch", 1), ("retweet_count", -1)])

print("Indexes created successfully.")

Indexes created successfully.
