[Tutorial](https://www.analyticsvidhya.com/blog/2020/08/analysing-streaming-tweets-with-python-and-postgresql/)

In [1]:
import tweepy
import os
import psycopg2
import time
from dotenv import load_dotenv

load_dotenv()

api_key = os.getenv("API_KEY")
api_secret_key = os.getenv("API_SECRET")
access_token = os.getenv("ACCESS_TOKEN")
access_token_secret = os.getenv("ACCESS_TOKEN_SECRET")
pg_user = os.getenv("PG_USER")
pg_password = os.getenv("PG_PASSWORD")
pg_host = os.getenv("PG_HOST")
pg_port = os.getenv("PG_PORT")
pg_db = os.getenv("PG_DB")

auth = tweepy.OAuthHandler(api_key, api_secret_key)
auth.set_access_token(access_token, access_token_secret)

api = tweepy.API(auth)

conn = psycopg2.connect(host=pg_host, database=pg_db, port=pg_port, user=pg_user, password=pg_password)

In [2]:
public_tweet = api.home_timeline(count=5)

for tweet in public_tweet:
    print("-->", tweet.text)

--> I’m gonna miss these two savages when I flee this burning state.  They’re hilarious, brutal, smart as fuck and they… https://t.co/9dfPrsvaih
--> Thank you! We were all going into the unknown so I was thrilled to accept the award ❤️❤️❤️ https://t.co/b4JCSRSVvZ
--> More than 771k acres have burned in just the last 6 days in California. Look at the satellite image on the right —… https://t.co/fhhwxre0rA
--> Have a question for our speaker about GPU computing in Python? 🙋‍♀️ Leave it here in the comments.


In [3]:
result = api.search(['covid', 'Covid-19'], lang='en', count=10)

print(result[0]._json.keys())

dict_keys(['created_at', 'id', 'id_str', 'text', 'truncated', 'entities', 'metadata', 'source', 'in_reply_to_status_id', 'in_reply_to_status_id_str', 'in_reply_to_user_id', 'in_reply_to_user_id_str', 'in_reply_to_screen_name', 'user', 'geo', 'coordinates', 'place', 'contributors', 'retweeted_status', 'is_quote_status', 'retweet_count', 'favorite_count', 'favorited', 'retweeted', 'lang'])


In [4]:
print(result[0].user._json)

{'id': 950480871131291648, 'id_str': '950480871131291648', 'name': 'Therisa', 'screen_name': 'justCDNTherisa', 'location': 'Toronto , Ontario', 'description': 'A great leader must first be a great human being who will only lead with love but never ignore the hate! #BLACKLIVESMATTER #BREONNATAYLOR #WEARADAMNMASK', 'url': None, 'entities': {'description': {'urls': []}}, 'protected': False, 'followers_count': 4860, 'friends_count': 3184, 'listed_count': 3, 'created_at': 'Mon Jan 08 21:34:33 +0000 2018', 'favourites_count': 12652, 'utc_offset': None, 'time_zone': None, 'geo_enabled': False, 'verified': False, 'statuses_count': 9373, 'lang': None, 'contributors_enabled': False, 'is_translator': False, 'is_translation_enabled': False, 'profile_background_color': 'F5F8FA', 'profile_background_image_url': None, 'profile_background_image_url_https': None, 'profile_background_tile': False, 'profile_image_url': 'http://pbs.twimg.com/profile_images/1034209403413483522/hR3XwiHq_normal.jpg', 'profil

In [5]:
print(result[4].entities['hashtags'])

[]


In [6]:
class MyStreamListener(tweepy.StreamListener):
    
    def __init__(self, time_limit=300):
        self.start_time = time.time()
        self.limit = time_limit
        super(MyStreamListener, self).__init__()
    
    def on_connect(self):
        print("Connected to Twitter API.")
        
    def on_status(self, status):
        
        
        # Tweet ID
        tweet_id = status.id
        
        # User ID
        user_id = status.user.id
        # Username
        username = status.user.name
        
        
        # Tweet
        if status.truncated == True:
            tweet = status.extended_tweet['full_text']
            hashtags = status.extended_tweet['entities']['hashtags']
        else:
            tweet = status.text
            hashtags = status.entities['hashtags']
        
        # Read hastags
        hashtags = read_hashtags(hashtags)            
        
        # Retweet count
        retweet_count = status.retweet_count
        # Language
        lang = status.lang
        
        
        # If tweet is not a retweet and tweet is in English
        if not hasattr(status, "retweeted_status") and lang=="en":
            # Connect to database
            dbConnect(user_id, username, tweet_id, tweet, retweet_count, hashtags)
            
        if (time.time() - self.start_time) > self.limit:
            
            print(time.time(), self.start_time, self.limit)
            return False
            
    def on_error(self, status_code):
        if status_code == 420:
            # Returning False in on_data disconnects the stream
            return False

In [7]:
def read_hashtags(tag_list):
    hashtags = []
    for tag in tag_list:
        hashtags.append(tag['text'])
    return hashtags

In [8]:
# # Table creation
# commands = (# Table 1
#             '''Create Table TwitterUser(User_Id BIGINT PRIMARY KEY, User_Name TEXT);''',
#             # Table 2
#             '''Create Table TwitterTweet(Tweet_Id BIGINT PRIMARY KEY,
#                                          User_Id BIGINT,
#                                          Tweet TEXT,
#                                          Retweet_Count INT,
#                                          CONSTRAINT fk_user
#                                              FOREIGN KEY(User_Id)
#                                                  REFERENCES TwitterUser(User_Id));''',
#             # Table 3
#             '''Create Table TwitterEntity(Id SERIAL PRIMARY KEY,
#                                          Tweet_Id BIGINT,
#                                          Hashtag TEXT,
#                                          CONSTRAINT fk_user
#                                              FOREIGN KEY(Tweet_Id)
#                                                  REFERENCES TwitterTweet(Tweet_Id));''')


In [9]:
# cur = conn.cursor()

# for command in commands:
#     cur.execute(command)
    
# conn.commit()
# cur.close()
# conn.close()

DuplicateTable: relation "twitteruser" already exists


In [10]:
def dbConnect(user_id, user_name, tweet_id, tweet, retweet_count, hashtags):
    
    conn = psycopg2.connect(host=pg_host, database=pg_db ,port=pg_port, user=pg_user, password=pg_password)
    
    cur = conn.cursor()

    # insert user information
    command = '''INSERT INTO TwitterUser (user_id, user_name) VALUES (%s,%s) ON CONFLICT
                 (User_Id) DO NOTHING;'''
    cur.execute(command,(user_id,user_name))

    # insert tweet information
    command = '''INSERT INTO TwitterTweet (tweet_id, user_id, tweet, retweet_count) VALUES (%s,%s,%s,%s);'''
    cur.execute(command,(tweet_id, user_id, tweet, retweet_count))
    
    # insert entity information
    for i in range(len(hashtags)):
        hashtag = hashtags[i]
        command = '''INSERT INTO TwitterEntity (tweet_id, hashtag) VALUES (%s,%s);'''
        cur.execute(command,(tweet_id, hashtag))
    
    # Commit changes
    conn.commit()
    
    # Disconnect
    cur.close()
    conn.close()

In [None]:
myStreamListener = MyStreamListener()
myStream = tweepy.Stream(auth=api.auth, listener=myStreamListener, tweet_mode="extended")
myStream.filter(track=['Covid', 'covid-19'])

Connected to Twitter API.
