In [1]:
#This file performs the following functions:
#1. Extracts streaming data from Twitter
#2. Pre-processes it
#3. Loads it into MySQL

import credentials #Import the keys from credentials.py
import settings #Import the related setting constants from settings.py 

import re
import tweepy
import mysql.connector
import pandas as pd
from textblob import TextBlob
from dateutil import parser
import json

In [2]:
#Override the tweepy.StreamingClient class to add logic to on_data
class MyStream(tweepy.StreamingClient):
    #This is called when raw data is received from the stream. This method handles sending the data to other methods.
    def on_data(self, raw_data):
        '''
        Extract info from tweets
        ''' 
        data = json.loads(raw_data.decode('utf-8'))
        #Extract the attributes from each tweet
        tweet_id = data['data']['id'] #Tweet ID
        created_at = parser.parse(data['data']['created_at']).strftime("%Y-%m-%d %H:%M:%S") #Creation time of the Tweet
        text = remove_emojis(data['data']['text']) #Content of the tweet, pre-processing it  
        retweet_count = data['data']['public_metrics']['retweet_count']
        like_count = data['data']['public_metrics']['like_count']
        longitude = None
        latitude = None
        if 'coordinates' in data['data']['geo']:
            longitude = data['data']['geo']['coordinates']['coordinates'][0]
            latitude = data['data']['geo']['coordinates']['coordinates'][1]
        sentiment = TextBlob(text).sentiment #Retrieving the sentiment of the tweet
        polarity = sentiment.polarity #Retrieving the polarity of the tweet
        subjectivity = sentiment.subjectivity #Retrieving the sentiment of the tweet
        
        #Extracting user info
        user_created_at = parser.parse(data['includes']['users'][0]['created_at']).strftime("%Y-%m-%d %H:%M:%S")
        user_location = None
        if 'location' in data['includes']['users'][0]:
            user_location = remove_emojis(data['includes']['users'][0]['location'])
        user_description = remove_emojis(data['includes']['users'][0]['description'])
        user_followers_count = data['includes']['users'][0]['public_metrics']['followers_count']
        
        print(data['data']['text'])
        print(f'Longitude: {longitude}, Latitude: {latitude}')
        
        #Storing all the data in MySQL
        if database.is_connected():
            cursor = database.cursor()
            query = f"INSERT INTO {settings.TABLE_NAME} (tweet_id, created_at, text, polarity, subjectivity, user_created_at, user_location, user_description, user_followers_count, longitude, latitude, retweet_count, like_count) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)"
            values = (tweet_id, created_at, text, polarity, subjectivity, user_created_at, user_location,
                     user_description, user_followers_count, longitude, latitude, retweet_count,
                     like_count)
            cursor.execute(query, values)
            database.commit()
            cursor.close()
    
#     #This is called when includes are received.
#     def on_includes(self, includes):
#         pass
        
#      #This is called when a Tweet is received.
#     def on_tweet(self, tweet):
#         pass
        
    def on_errors(self, status_code):
        '''
        Since the Twitter API has rate limits, we must stop scraping data once the limit 
        has been crossed.
        '''
        if status_code == 420: #marks the end of the monthly limit rate (2M)
            #return False to disconnect the stream
            return False

In [3]:
#Functions used to pre-process the tweet text
def clean_tweet_text(self, tweet_text): 
    ''' 
    Removing links and special characters using Regex
    '''
    return ' '.join(re.sub("(@[A-Za-z0-9]+)|([^0-9A-Za-z \t])|(\w+:\/\/\S+)", " ", tweet_text).split()) 
def remove_emojis(tweet_text):
    '''
    Strip all non-ASCII characters so that emojis are removed
    '''
    if tweet_text:
        return tweet_text.encode('ascii', 'ignore').decode('ascii')
    else:
        return None

In [4]:
#Creating the database
database = mysql.connector.connect(
    host="localhost",
    user="root",
    passwd=credentials.DATABASE_PASSWORD,
    database="TwitterDatabase",
    charset = 'utf8'
)
if database.is_connected():
    '''
    Check if the table exits. If it does not, then create a new one.
    '''
    cursor = database.cursor()
    cursor.execute("""
        SELECT COUNT(*)
        FROM information_schema.tables
        WHERE table_name = '{0}'
        """.format(settings.TABLE_NAME))
    if cursor.fetchone()[0] != 1:
        cursor.execute(f"CREATE TABLE {settings.TABLE_NAME} ({settings.TABLE_ATTRIBUTES})")
        database.commit()
    cursor.close()

ProgrammingError: 1045 (28000): Access denied for user 'root'@'localhost' (using password: NO)

In [5]:
#Authentication
client = tweepy.Client(credentials.BEARER_TOKEN)

In [6]:
#Streaming tweets
myStream = MyStream(credentials.BEARER_TOKEN)
#Set the language preference and the words to be tracked.
#The track parameter is an array of search terms to stream.
#Using Twitter's free Standard Stream.
if myStream.get_rules()[0]:
    myStream.delete_rules([rule.id for rule in myStream.get_rules()[0]])
myStream.add_rules(tweepy.StreamRule(f"{settings.TRACK_WORDS} lang:en -is:retweet"))
myStream.filter(expansions=['author_id'], 
                user_fields=['created_at','location','description','public_metrics'],
                tweet_fields=['created_at','geo','public_metrics'])
#The following part won't be reached as the stream listener won't stop automatically. 
#We need to press the STOP button to finish the process.
#Closing the database
database.close()

Unauthorized: 401 Unauthorized
Unauthorized