# Notebook Author: 
### Brandon Salter 

In [None]:
# link to data: https://drive.google.com/file/d/1KErBeVoLADCa16ck7ReCMkfiRF4Ly_Lp/view?usp=sharing

# Dependencies

In [61]:
import json 
from google.cloud import bigquery
from google.oauth2 import service_account
import pandas as pd
from tqdm import tqdm
import os
lines = 101916*2
print(pd.get_option('display.max_columns'))
pd.set_option('display.max_columns', 100)
print(pd.get_option('display.max_columns'))

100
100


# Write Data to MongoDB and BigQuery

In [2]:
path = '/Users/brandonsalter/Documents/TwitterSearchApp_694_Team4_2024/project_key.json'
credentials = service_account.Credentials.from_service_account_file(path)

project_id = 'msds-417117'
client = bigquery.Client(credentials= credentials,project=project_id)

In [3]:
tweets_dataset = "msds-417117.Tweets"
tweets_table = "msds-417117.Tweets.Tweets"
retweets_dataset = "msds-417117.Retweets"
retweets_table = "msds-417117.Retweets.Retweets"

def create_dataset(dataset_id):
    # TODO(developer): Set dataset_id to the ID of the dataset to create.
    # dataset_id = "{}.your_dataset".format(client.project)

    # Construct a full Dataset object to send to the API.
    dataset = bigquery.Dataset(dataset_id)

    # TODO(developer): Specify the geographic location where the dataset should reside.
    dataset.location = "US"

    # Send the dataset to the API for creation, with an explicit timeout.
    # Raises google.api_core.exceptions.Conflict if the Dataset already
    # exists within the project.
    dataset = client.create_dataset(dataset, timeout=30)  # Make an API request.
    print("Created dataset {}.{}".format(client.project, dataset.dataset_id))

create_dataset(tweets_dataset)
create_dataset(retweets_dataset)

Created dataset msds-417117.Tweets
Created dataset msds-417117.Retweets


In [4]:
# TODO(developer): Set table_id to the ID of the table to create.
# table_id = "your-project.your_dataset.your_table_name"

tweet_schema = [
    bigquery.SchemaField("created_at", "STRING", mode="REQUIRED"),
    bigquery.SchemaField("hashtags", "STRING", mode="REQUIRED"),
    bigquery.SchemaField("id_str_tweet", "STRING", mode="REQUIRED"),
    bigquery.SchemaField("text", "STRING", mode="REQUIRED"),
    # bigquery.SchemaField("favorite_count", "INTEGER", mode="REQUIRED"),
    bigquery.SchemaField("lang", "STRING", mode="REQUIRED"),
    # bigquery.SchemaField("place", "STRING", mode="REQUIRED"),
    # bigquery.SchemaField("quote_count", "INTEGER", mode="REQUIRED"),
    # bigquery.SchemaField("reply_count", "INTEGER", mode="REQUIRED"),
    bigquery.SchemaField("retweet_count", "INTEGER", mode="REQUIRED"),
    bigquery.SchemaField("favourites_count_user", "INTEGER", mode="REQUIRED"),
    bigquery.SchemaField("followers_count_user", "INTEGER", mode="REQUIRED"),
    bigquery.SchemaField("friends_count_user", "INTEGER", mode="REQUIRED"),
    bigquery.SchemaField("id_str_user", "STRING", mode="REQUIRED"),
    # bigquery.SchemaField("location_user", "STRING", mode="REQUIRED"),
    bigquery.SchemaField("name_user", "STRING", mode="REQUIRED"),
    bigquery.SchemaField("screen_name_user", "STRING", mode="REQUIRED"),
    bigquery.SchemaField("statuses_count_user", "INTEGER", mode="REQUIRED"),
    bigquery.SchemaField("verified_user", "STRING", mode="REQUIRED"),
]

table = bigquery.Table(tweets_table, schema=tweet_schema)
table = client.create_table(table)  # Make an API request.
print(
    "Created table {}.{}.{}".format(table.project, table.dataset_id, table.table_id)
)

Created table msds-417117.Tweets.Tweets


In [5]:
retweet_schema = [
    bigquery.SchemaField("created_at", "STRING", mode="REQUIRED"),
    bigquery.SchemaField("hashtags", "STRING", mode="REQUIRED"),
    bigquery.SchemaField("id_str_retweet", "STRING", mode="REQUIRED"),
    bigquery.SchemaField("text", "STRING", mode="REQUIRED"),
    # bigquery.SchemaField("favorite_count", "INTEGER", mode="REQUIRED"),
    bigquery.SchemaField("lang", "STRING", mode="REQUIRED"),
    # bigquery.SchemaField("place", "STRING", mode="REQUIRED"),
    # bigquery.SchemaField("quote_count", "INTEGER", mode="REQUIRED"),
    # bigquery.SchemaField("reply_count", "INTEGER", mode="REQUIRED"),
    bigquery.SchemaField("retweet_count", "INTEGER", mode="REQUIRED"),
    bigquery.SchemaField("favourites_count_user", "INTEGER", mode="REQUIRED"),
    bigquery.SchemaField("followers_count_user", "INTEGER", mode="REQUIRED"),
    bigquery.SchemaField("friends_count_user", "INTEGER", mode="REQUIRED"),
    bigquery.SchemaField("id_str_retweet_user", "STRING", mode="REQUIRED"),
    # bigquery.SchemaField("location_user", "STRING", mode="REQUIRED"),
    bigquery.SchemaField("name_user", "STRING", mode="REQUIRED"),
    bigquery.SchemaField("screen_name_user", "STRING", mode="REQUIRED"),
    bigquery.SchemaField("statuses_count_user", "INTEGER", mode="REQUIRED"),
    bigquery.SchemaField("verified_user", "STRING", mode="REQUIRED"),
    bigquery.SchemaField("id_str_tweet", "STRING", mode="REQUIRED"), # original tweet id
    bigquery.SchemaField("id_str_tweet_user", "STRING", mode="REQUIRED"), # original tweet user id
]

table = bigquery.Table(retweets_table, schema=retweet_schema)
table = client.create_table(table)  # Make an API request.
print(
    "Created table {}.{}.{}".format(table.project, table.dataset_id, table.table_id)
)

Created table msds-417117.Retweets.Retweets


In [80]:
retweet_keys = ["created_at", "hashtags","id_str_retweet", "text", "lang", "retweet_count", 
                "favourites_count_user", "followers_count_user", "friends_count_user", 
                "id_str_retweet_user", "name_user", "screen_name_user", "statuses_count_user", 
                "verified_user", "id_str_tweet", "id_str_tweet_user"]
retweets_df = pd.DataFrame(columns=retweet_keys)

tweet_keys = ["created_at", "hashtags", "id_str_tweet", "text", "lang", "retweet_count", 
              "favourites_count_user", "followers_count_user", "friends_count_user", 
              "id_str_user", "name_user", "screen_name_user", "statuses_count_user", 
              "verified_user"]
tweets_df = pd.DataFrame(columns=tweet_keys)

In [56]:
tweets_set = () # to know if processed or not
tweets = {} # tweet_id, retweet_count 
users = set()

with open("corona-out-3.json", "r") as f1:
    for line in tqdm(f1, total=lines):
        try:
            data = json.loads(line)

            # Unprocessed tweet
            if data['id_str'] not in tweets_set:
                
                # add tweet/retweet id (distinct and not yet seen)
                tweets[data['id_str']] = 0
                
                # extract hashtag values
                hashtag = data['entities']['hashtags']
                hashtag = ([key['text'] for key in hashtag] if len(hashtag) > 0 else ['NULL'])
                hashtag = ', '.join(hashtag)

                # Unprocessed Retweet
                if (data['text'].startswith('RT')):

                    # NOTE: streaming not allowed in BigQuery free tier
                    
                    #"stream" these keys to the Retweets table in BigQuery
                    retweets_df.loc[len(retweets_df)] = {"created_at": data['created_at'],
                                                        "hashtags": hashtag,
                                                        "id_str_retweet": data['id_str'],
                                                        "text": data['text'],
                                                        # "favorite_count": data['favorite_count'],
                                                        "lang": data['lang'],
                                                        # "place": data['place'],
                                                        # "quote_count": data['quote_count'],
                                                        # "reply_count": data['reply_count'],
                                                        "retweet_count": data['retweet_count'],
                                                        "favourites_count_user": data['user']['favourites_count'],
                                                        "followers_count_user": data['user']['followers_count'],
                                                        "friends_count_user": data['user']['friends_count'],
                                                        "id_str_retweet_user": data['user']['id_str'],
                                                        # "location_user": data['user']['location'],
                                                        "name_user": data['user']['name'],
                                                        "screen_name_user": data['user']['screen_name'],
                                                        "statuses_count_user": data['user']['statuses_count'],
                                                        "verified_user": str(data['user']['verified']),
                                                        "id_str_tweet": data['retweeted_status']['id_str'],
                                                        "id_str_tweet_user": data['retweeted_status']['user']['id_str']}
                    # Original tweet not processed yet
                    if data['retweeted_status']['id_str'] not in tweets:
                        tweets[data['retweeted_status']['id_str']] = 1    
                    # Original tweet has been processed
                    else:
                        tweets[data['retweeted_status']['id_str']] += 1
                
                # Unprocessed Tweet
                else:
                    # "stream" these keys to the Tweets table in BigQuery
                    tweets_df.loc[len(tweets_df)] = {"created_at": data['created_at'],
                                                    "hashtags": hashtag,
                                                    "id_str_tweet": data['id_str'],
                                                    "text": data['text'],
                                                    # "favorite_count": data['favorite_count'],
                                                    "lang": data['lang'],
                                                    # "place": data['place'],
                                                    # "quote_count": data['quote_count'],
                                                    # "reply_count": data['reply_count'],
                                                    "retweet_count": data['retweet_count'],
                                                    "favourites_count_user": data['user']['favourites_count'],
                                                    "followers_count_user": data['user']['followers_count'],
                                                    "friends_count_user": data['user']['friends_count'],
                                                    "id_str_user": data['user']['id_str'],
                                                    # "location_user": data['user']['location'],
                                                    "name_user": data['user']['name'],
                                                    "screen_name_user": data['user']['screen_name'],
                                                    "statuses_count_user": data['user']['statuses_count'],
                                                    "verified_user": str(data['user']['verified'])} 
                
                tweets_set.add(data['id_str'])
                                   
            #user = data['user']
            # psuedocode: 
            # if user has not been seen before
            #            add to user table 
            # update datastores with any metrics that you are tracking 
            
        except:
            continue

print('tweet rows:', len(tweets_df)) # 40793 --> 40804
print('retweet rows:', len(retweets_df)) # 61090 --> 61101

100%|██████████| 203832/203832 [05:29<00:00, 618.19it/s]

tweet rows: 40804
retweet rows: 61101





In [71]:
rtcount = pd.DataFrame(pd.Series(tweets)).rename(columns={0:'retweet_count'})
rt_count_tweets = pd.merge(tweets_df[['id_str_tweet']], rtcount, how='left', left_on='id_str_tweet', right_index=True)
rt_count_retweets = pd.merge(retweets_df[['id_str_retweet']], rtcount, how='left', left_on='id_str_retweet', right_index=True)
print(rt_count_tweets.retweet_count.sum())
print(rt_count_retweets.retweet_count.sum())
tweets_df['retweet_count'] = rt_count_tweets['retweet_count']

22305
0


In [76]:
# https://docs.python.org/3/library/functions.html#open
try:
    retweets_df.to_csv('retweets.csv', mode='x')
except:
    os.remove('retweets.csv')
    retweets_df.to_csv('retweets.csv', mode='x')
try:
    tweets_df.to_csv('tweets.csv', mode='x')
except:
    os.remove('tweets.csv')
    retweets_df.to_csv('tweets.csv', mode='x')

In [77]:
# optional: you can pass in a job_config with a schema if you want to define
# a specific schema
retweet_job_config = bigquery.LoadJobConfig(schema=retweet_schema)
tweet_job_config = bigquery.LoadJobConfig(schema=tweet_schema)

In [78]:
load_job = client.load_table_from_dataframe(
    retweets_df, retweets_table,
    job_config=retweet_job_config
)  # this will execute the load job

result = load_job.result()
result.done() # prints True if done

True

In [79]:
load_job = client.load_table_from_dataframe(
    tweets_df, tweets_table,
    job_config=tweet_job_config
)  # this will execute the load job

result = load_job.result()
result.done() # prints True if done

True