In [1]:
import pymongo
import mysql.connector
import pandas as pd
import json
import tweepy
import sys
from dotenv import dotenv_values

In [2]:
config = dotenv_values(".env")  # config = {"USER": "foo", "EMAIL": "foo@example.org"}


# Step 1: Data Collection

In [3]:
tweet_counter = 0
TWEET_MAX = int(config['TWEET_MAX'])
class MyStreamListener(tweepy.StreamListener):
    def __init__(self, api, write_file):
        self.api = api
        self.me = api.me()
        self.write_file = write_file

    def on_status(self, tweet):
        """
        1.extract the username
        """
        global tweet_counter
        tweet_counter += 1
        print("tweet_counter", tweet_counter)
        if tweet_counter <= TWEET_MAX:
            json.dump(tweet._json, self.write_file)
            if tweet_counter + 1 != TWEET_MAX + 1:
                self.write_file.write(',')

        else:
            self.write_file.write(']')
            self.write_file.close()
            print("Reached max allowed tweets:", TWEET_MAX)
            sys.exit(0)

    def on_error(self, status):
        print("Error detected")

def collect_data():
    auth = tweepy.OAuthHandler(config['CONSUMER_KEY'], config['CONSUMER_SECRET'])
    auth.set_access_token(config['ACCESS_TOKEN'], config['ACCESS_TOKEN_SECRET'])

    api = tweepy.API(auth, wait_on_rate_limit=True, wait_on_rate_limit_notify=True)

    write_file = open("sample.json", "w")
    write_file.write('[')
    tweets_listener = MyStreamListener(api, write_file)
    stream = tweepy.Stream(api.auth, tweets_listener)
    stream.filter(track=["#sundayvibes", "UFCVegas23", "#WrestleMania"])


# Only run once to collect tweets

In [4]:
# collect_data()

# Step 2: Data Storage

**Set up mysql and mongodb connections**

In [5]:
def setup_mysql():
    properties = {
        'user': config['USER_SQL'],
        'password': config['PASSWORD_SQL'],
        'host': 'localhost',
        'database': 'tweets_db_sql',
        'raise_on_warnings': True,
    }
    conn = mysql.connector.connect(**properties)
    conn.autocommit = True
    cursor = conn.cursor(buffered = True)
    query = """
    CREATE TABLE IF NOT EXISTS user 
      ( 
         user_id          VARCHAR(255), 
         user_name        VARCHAR(20), 
         screen_name      VARCHAR(20), 
         followers_count  BIGINT, 
         friends_count    BIGINT, 
         listed_count     BIGINT, 
         favourites_count BIGINT, 
         statuses_count     BIGINT, 
         PRIMARY KEY(user_id) 
      );
    """
    cursor.execute(query)
    return conn, cursor

In [6]:
sql_conn, sql_cursor = setup_mysql()

In [7]:
def setup_mongodb():
    user = config['USER_MONGO']
    password = config['PASSWORD_MONGO']
    conn_string = f"mongodb+srv://{user}:{password}@cluster0.6iqrn.mongodb.net"
    client = pymongo.MongoClient(conn_string)
    dbnames = client.list_database_names()
    if "tweets_db_mongo" in dbnames:
        print("db exists. Will be deleted...")
        client.drop_database("tweets_db_mongo")
    tweets_db_mongo = client["tweets_db_mongo"]
    col_names = tweets_db_mongo.list_collection_names()
    if "tweets_col" in col_names:
        print("Tweets Collection exists. Will be deleted...")
        tweets_db_mongo.tweets_col.drop()
    tweets_col = tweets_db_mongo["tweets_col"]
    return tweets_db_mongo

In [8]:
tweets_db_mongo = setup_mongodb()

**Get twitter data from previous step**

In [9]:
def get_json_data(filename):
    with open(filename, "r") as read_file:
        json_data = json.load(read_file)
    return json_data

In [10]:
json_data = get_json_data('sample.json')
json_one_tweet = get_json_data('one_tweet.json')

In [11]:
json_one_tweet

[{'created_at': 'Sat Apr 10 01:48:58 +0000 2021',
  'id': 1380699292269215745,
  'id_str': '1380699292269215745',
  'text': 'RT @ohjimliet: guess what i drink this morningg😋\n\nI vote #BTSARMY for #BestFanArmy at the 2021 #iHeartAwards @BTS_twt https://t.co/scqL74hf…',
  'source': '<a href="http://twitter.com/download/iphone" rel="nofollow">Twitter for iPhone</a>',
  'truncated': False,
  'in_reply_to_status_id': None,
  'in_reply_to_status_id_str': None,
  'in_reply_to_user_id': None,
  'in_reply_to_user_id_str': None,
  'in_reply_to_screen_name': None,
  'user': {'id': 1315728762563158019,
   'id_str': '1315728762563158019',
   'name': 'gianna',
   'screen_name': 'geeaaanaaa',
   'location': None,
   'url': None,
   'description': None,
   'translator_type': 'none',
   'protected': False,
   'verified': False,
   'followers_count': 0,
   'friends_count': 7,
   'listed_count': 0,
   'favourites_count': 627,
   'statuses_count': 158,
   'created_at': 'Mon Oct 12 19:00:14 +0000 2020',
 

In [12]:
def insert_mysql(record, sql_cursor):
    insert_query = """
    INSERT INTO user 
            ( 
                        user_id, 
                        user_name, 
                        screen_name, 
                        followers_count, 
                        friends_count, 
                        listed_count, 
                        favourites_count, 
                        statuses_count 
            ) 
            VALUES 
            ( 
                        '{}', 
                        '{}', 
                        '{}', 
                        {}, 
                        {}, 
                        {}, 
                        {}, 
                        {} 
            );""".format(*record)
    sql_cursor.execute(insert_query)
    

In [13]:
def insert_mongo(document_dict, tweets_db_mongo):
    tweets_db_mongo.tweets_col.insert_one(document_dict)
    

In [14]:
def store_data_mongo_mysql(json_data, sql_conn, sql_cursor, tweets_db_mongo):
    for tweet in json_data:
        #Check if retweet
        user = None
        is_retweet = False
        tweet_filter = None
        if 'retweeted_status' in tweet:
            print('==this is a retweet')
            tweet_filtered = tweet['retweeted_status'] #Only examine fields in retweeted status, so essentially overwrite
            is_retweet = True
            
        else:
            tweet_filtered = tweet
            print('==this is a tweet')
            
            
        user = tweet_filtered['user']
        record = (user['id_str'], user['name'], user['screen_name'], user['followers_count'], 
                  user['friends_count'], user['listed_count'], user['favourites_count'], user['statuses_count'])
        insert_mysql(record, sql_cursor)
        document_dict = {
            'tweet_id': tweet_filtered['id_str'],
            'user_id': user['id_str'],
            'is_retweet': is_retweet,
            'tweet_text': tweet_filtered['text'],
            'in_reply_to_status_id': tweet_filtered['in_reply_to_status_id_str'],
            'in_reply_to_user_id': tweet_filtered['in_reply_to_user_id_str'],
            'in_reply_to_screen_name': tweet_filtered['in_reply_to_screen_name'],
            'coordinates': tweet_filtered['coordinates'],
            'place': tweet_filtered['place'],
            'quote_count': tweet_filtered['quote_count'],
            'reply_count': tweet_filtered['reply_count'],
            'retweet_count': tweet_filtered['retweet_count'],
            'favorite_count': tweet_filtered['favorite_count'],
            'hashtags': tweet_filtered['entities']['hashtags'],
            'lang': tweet_filtered['lang'],
            'timestamp_ms': tweet['timestamp_ms']
        }
        insert_mongo(document_dict, tweets_db_mongo)
        

# Add user information my sql

In [15]:
store_data_mongo_mysql(json_one_tweet, sql_conn, sql_cursor, tweets_db_mongo)

==this is a retweet


In [16]:
sql_cursor.close()
sql_conn.close()