# Influential Tweets Analysis

Welcome to the influential Tweets analysis notebook! The analysis can be configured using the variables under the Configuration heading. The output for the project will be found under the Example heading at the bottom.

## Imports

In [1]:
import tweepy
from dotenv import load_dotenv
import os
from enum import Enum
import json
import time
import datetime

def importUsers(users_file):
    users = []
    
    with open(users_file) as json_file:
        data = json.load(json_file)
        for user in data['users']:
            users.append(user['screen_name'])
    
    return users

## Configuration
If dates are specified, the example will use the dates over num_tweets for choosing the tweets to look at. Set start_date and end_date to None to use num_tweets.

In [2]:
# JSON file to import users from
users_file = 'data/top-100.json' # other options include 'data/space-100.json' and 'data/finance-100.json'

# List of users to analyze
users = importUsers(users_file)

# Date (inclusive) to start analyzing tweets of users
# start_date = datetime.date(2020, 6, 15) # (year, month, day)
start_date = datetime.datetime.today().date() - datetime.timedelta(days=3) # 3 days ago

# Date (inclusive) to stop analyzing tweets of users
# end_date = datetime.date(2020, 6, 15) # (year, month, day)
end_date = datetime.datetime.today().date() # today

# Number of most recent tweets to look at for each user (Max = ~3200)
num_tweets = 20

# Threshold at which to print tweet urls to the screen for the example
threshold = 4

## Tweepy Configuration

In [3]:
# Load env variables
load_dotenv()
consumer_key = os.getenv('CONSUMER_KEY')
consumer_secret = os.getenv('CONSUMER_SECRET')

# Set up tweepy with Twitter API
auth = tweepy.OAuthHandler(consumer_key, consumer_secret)
api = tweepy.API(auth, wait_on_rate_limit = True, wait_on_rate_limit_notify = True)

## Influential Tweets Class Definition

The code block below contains the InfluentialTweetTracker class that hosts the logic for finding the commonly interacted with tweets between the different users. There are also two helper Enum classes.

In [4]:
class TweetType(Enum):
    TWEET = 1
    RETWEET = 2
    REPLY_TWEET = 3
    QUOTE_TWEET = 4
    
class TweetSelectionType(Enum):
    BY_NUMBER = 1
    BY_DATES = 2

class InfluentialTweetTracker:
    # Dictionary such that key = tweet id and value = # of times tweet has been interacted with by different users
    tweet_counts = {}

    # Constructor for class
    # If start_date and end_date are specified, will use that when determing range
    # of tweets to consider. Otherwise will use the count parameter.
    def __init__(self, users, count = 10, start_date = None, end_date = None):
        self.users = users
        
        if start_date is not None and end_date is not None:
            self.start_date = start_date
            self.end_date = end_date
            self.selection_type = TweetSelectionType.BY_DATES
        else:
            self.count = count
            self.selection_type = TweetSelectionType.BY_NUMBER
        
    # Classifies type of tweet (returns TweetType enum)
    def __classifyTweet(self, tweet):
        if hasattr(tweet, 'retweeted_status'):
            return TweetType.RETWEET
        elif hasattr(tweet, 'quoted_status'):
            return TweetType.QUOTE_TWEET
        elif tweet.in_reply_to_status_id is not None:
            return TweetType.REPLY_TWEET
        else:
            return TweetType.TWEET
        
    # If tweet at tweet_id does not exist in dictionary, creates a new entry, otherwise adds 1
    def __addTweetToMap(self, tweet_id):
        if tweet_id in self.tweet_counts:
            self.tweet_counts[tweet_id] += 1
        else: 
            self.tweet_counts[tweet_id] = 1
            
    def __getUserTimeline(self, user, count, max_id = None):
        if max_id is not None:
            return api.user_timeline(screen_name = user, count = count, tweet_mode = 'extended', max_id = max_id)
        else:
            return api.user_timeline(screen_name = user, count = count, tweet_mode = 'extended')
    
    # Gets number of recent tweets specified by count for user
    # using custom paging because tweepy cursor.items(count) seemed
    # to be getting tweets one at a time
    def __getRecentTweetsByCount(self, user, count):
        tweets = []
        
        if (count >= 200):
            num_pages = count // 200
            for i in range(0, num_pages):
                if i == 0:
                    tweets.extend(self.__getUserTimeline(user, 200))
                else:
                    tweets.extend(self.__getUserTimeline(user, 200, tweets[-1].id))
        
            num_remaining = count - 200 * num_pages        
            if num_remaining > 0:
                tweets.extend(self.__getUserTimeline(user, num_remaining, tweets[-1].id))
        else:
            tweets.extend(self.__getUserTimeline(user, count))
        
        return tweets
    
    # Attempts to get all of the dates in the range using the users timelines
    def __getTweetsByDates(self, user, start_date, end_date):
        tweets = self.__getUserTimeline(user, 200)
        
        if len(tweets) > 0:
            most_recent_tweet_date = tweets[0].created_at.date()
            least_recent_tweet_date = tweets[-1].created_at.date()
        
        # Go far enough back into history to capture relevant tweets
        while len(tweets) > 0 and start_date < least_recent_tweet_date:
            tweets.extend(self.__getUserTimeline(user, 200, tweets[-1].id))
            least_recent_tweet_date = tweets[-1].created_at.date()
        
        # Remove unwanted tweets from start
        while len(tweets) > 0 and end_date < most_recent_tweet_date:
            tweets.pop(0)
            if len(tweets) > 0:
                most_recent_tweet_date = tweets[0].created_at.date()
        
        # Remove unwanted tweets from end
        while len(tweets) > 0 and start_date > least_recent_tweet_date:
            tweets.pop()
            if len(tweets) > 0:
                least_recent_tweet_date = tweets[-1].created_at.date()
        
        return tweets
        
    # Main method that runs the analysis and returns the tweet_counts dictionary
    def findInfluentialTweets(self):
        self.tweet_counts = {}
        
        for user in self.users:
            recent_tweets = None
            
            if self.selection_type == TweetSelectionType.BY_NUMBER:
                recent_tweets = self.__getRecentTweetsByCount(user, self.count)
            elif self.selection_type == TweetSelectionType.BY_DATES:
                recent_tweets = self.__getTweetsByDates(user, self.start_date, self.end_date)
            
            for tweet in recent_tweets:
                tweet_type = self.__classifyTweet(tweet)
                
                if tweet_type == TweetType.RETWEET:
                    tweet_id = tweet.retweeted_status.id
                    self.__addTweetToMap(tweet_id)
                elif tweet_type == TweetType.QUOTE_TWEET:
                    tweet_id = tweet.id
                    tweet_quote_id = tweet.quoted_status.id
                    self.__addTweetToMap(tweet_id)
                    self.__addTweetToMap(tweet_quote_id)
                elif tweet_type == TweetType.REPLY_TWEET:
                    tweet_id = tweet.id
                    tweet_reply_to_id = tweet.in_reply_to_status_id
                    self.__addTweetToMap(tweet_id)
                    self.__addTweetToMap(tweet_reply_to_id)
                else:
                    tweet_id = tweet.id
                    self.__addTweetToMap(tweet_id)
                
        return self.tweet_counts

## Example

Below outputs links to the tweets above the threshold given the configuration specified under the Configuration heading.

In [5]:
tracker = InfluentialTweetTracker(users, count = num_tweets, start_date = start_date, end_date = end_date)
start_time = time.time()
influential_tweets = tracker.findInfluentialTweets()
elapsed_time = time.time() - start_time

print('Elapsed time: %.2f s\n' % elapsed_time)
for key in influential_tweets:
    if influential_tweets[key] >= threshold:
        print('https://twitter.com/any/status/%s - %s' % (key, influential_tweets[key]))

Elapsed time: 207.68 s

https://twitter.com/any/status/1278324680311681024 - 4
https://twitter.com/any/status/1278176059876401152 - 4
https://twitter.com/any/status/1278284552679624705 - 4
https://twitter.com/any/status/1278040506908446721 - 4
https://twitter.com/any/status/1277212069868318720 - 4
https://twitter.com/any/status/1277215720418484224 - 4
