# Part 1 -- Setup and Collection of Tweets

Pull Tweets from the Twitter API (using Tweepy) and collect **Tweets from 30 tech thought leaders and news outlets** using MongoDB as a task manager (manual distributed processing).

**Load lib codes**

In [1]:
!pwd

/home/jovyan/work/Portfolio/Analyzing_Unstructured_Data_for_Finance/ipynb


In [2]:
from os import chdir
chdir('/home/jovyan/work/Portfolio/Analyzing_Unstructured_Data_for_Finance/')

from lib import *
# suppress_warnings()
from lib.twitter_keys import my_keys

In [3]:
!pip install pymongo pyquery tweepy
import pymongo
import tweepy



**Use AppAuth (instead of OAuth) to maximize Twitter API's Rate Limit**

In [4]:
# Replace the API_KEY and API_SECRET with your application's key and secret
auth = tweepy.AppAuthHandler(my_keys['CONSUMER_KEY'], my_keys['CONSUMER_SECRET'])

# Authorize twitter, initialize tweepy
api = tweepy.API(auth, wait_on_rate_limit=True, wait_on_rate_limit_notify=True)

if (not api):
    print ("Can't Authenticate")
    sys.exit(-1)

In [5]:
# Identify port for better security of MongoDB
cli = pymongo.MongoClient(host='52.27.11.214', port=27016)

In [6]:
# Instantiates when you put data in 
task_collection = cli.twitter_db.task_collection
completed_collection = cli.twitter_db.completed_collection

cli.twitter_db.collection_names()

[u'task_collection']

In [7]:
print(task_collection.count())
print(completed_collection.count())

30
0


In [8]:
# task_collection.drop()
# completed_collection.drop()

**Implementing distributed processing from scratch**<br>
We want to use MongoDB as a task manager so we can run multiple AWS instances to work on one query. This allows us to run our query faster, so instead of waiting for one code to run on one notebook, we can have multiple notebooks running the same code.

First, I create a "lookup_list" that will be stored in a MongoDB collection called task_collection. My query will pull any item with a pending status, get those Tweets, and update the status from <u>pending</u> to <u>complete</u>.

In [9]:
twitter_30_df = pd.read_csv('/home/jovyan/work/Portfolio/Analyzing_Unstructured_Data_for_Finance/data/twitter_users_30.csv')
handle = list(twitter_30_df['Handle'])

In [10]:
lookup_list = []
for h in handle:
    lookup_dict = {'handle': h,
                   'status': 'pending'}
    lookup_list.append(lookup_dict)

In [11]:
lookup_list

[{'handle': 'BarackObama', 'status': 'pending'},
 {'handle': 'cnnbrk', 'status': 'pending'},
 {'handle': 'BillGates', 'status': 'pending'},
 {'handle': 'realDonaldTrump', 'status': 'pending'},
 {'handle': 'TheEconomist', 'status': 'pending'},
 {'handle': 'jimcramer', 'status': 'pending'},
 {'handle': 'HillaryClinton', 'status': 'pending'},
 {'handle': 'TechCrunch', 'status': 'pending'},
 {'handle': 'elonmusk', 'status': 'pending'},
 {'handle': 'NewYorker', 'status': 'pending'},
 {'handle': 'mcuban', 'status': 'pending'},
 {'handle': 'SeekingAlpha', 'status': 'pending'},
 {'handle': 'MarketWatch', 'status': 'pending'},
 {'handle': 'CNBC', 'status': 'pending'},
 {'handle': 'Forbes', 'status': 'pending'},
 {'handle': 'sacca', 'status': 'pending'},
 {'handle': 'MktsInsider', 'status': 'pending'},
 {'handle': 'ForbesInvestor', 'status': 'pending'},
 {'handle': 'themotleyfool', 'status': 'pending'},
 {'handle': 'ReformedBroker', 'status': 'pending'},
 {'handle': 'StockTwits', 'status': 'pend

In [12]:
for dictionary in lookup_list:
    task_collection.insert_one(dictionary)

In [13]:
task_collection.count()

30

In [14]:
def get_tweets_to_mongo(lookup):

    maxTweets = 10000000 # Some arbitrary large number
    tweetsPerQry = 200  # this is the max the API permits

    # If results from a specific ID onwards are reqd, set since_id to that ID.
    # else default to no lower limit, go as far back as API allows
    sinceId = None

    # If results only below a specific ID are, set max_id to that ID.
    # else default to no upper limit, start from the most recent tweet matching the search query.
    max_id = -1E10

    tweetCount = 0
    while tweetCount < maxTweets:
        try:
            if (max_id <= 0):
                if (not sinceId):
                    new_tweets = api.user_timeline(screen_name = lookup, count=tweetsPerQry)
                else:
                    new_tweets = api.user_timeline(screen_name = lookup, count=tweetsPerQry,
                                                    since_id=sinceId)
            else:

                if (not sinceId):
                    new_tweets = api.user_timeline(screen_name = lookup, count=tweetsPerQry,
                                                    max_id=str(max_id - 1))
                else:
                    new_tweets = api.user_timeline(screen_name = lookup, count=tweetsPerQry,
                                                    max_id=str(max_id - 1),
                                                    since_id=sinceId)
            if not new_tweets:
                print("No more tweets found")
                break
            for tweet in new_tweets:
                tweet_dict = {'username': tweet.user.screen_name,
                              'timestamp': tweet.created_at, 
                              'text': tweet.text.encode("utf-8")}
                mongo_response = completed_collection.insert_one(tweet_dict)

            tweetCount += len(new_tweets)
            print("Downloaded {0} tweets for user: {1}".format(tweetCount, lookup))
            max_id = new_tweets[-1].id  

        except tweepy.TweepError as e:
            # Just exit if any error
            task_collection.insert_one({'handle': lookup, 'status': 'failed'})
            print("some error : " + str(e))
            break

    print ("Downloaded {0} tweets for user: {1} & saved to Mongo\n".format(tweetCount, lookup))

In [15]:
from datetime import datetime
start = datetime.now()

while True:
    # Get a user from the mongo collection
    task = task_collection.find_one_and_delete({'status': 'pending'})

    if type(task) != dict:
        print('Done!')
        break

    # Assign the handle to task_handle
    task_handle = task['handle']
        
        
    get_tweets_to_mongo(task_handle)
    task_collection.insert_one({'handle': task['handle'], 'status': 'complete'})
    
end = datetime.now()
print(end - start)

Downloaded 200 tweets for user: BarackObama
Downloaded 400 tweets for user: BarackObama
Downloaded 600 tweets for user: BarackObama
Downloaded 800 tweets for user: BarackObama
Downloaded 1000 tweets for user: BarackObama
Downloaded 1200 tweets for user: BarackObama
Downloaded 1400 tweets for user: BarackObama
Downloaded 1600 tweets for user: BarackObama
Downloaded 1800 tweets for user: BarackObama
Downloaded 2000 tweets for user: BarackObama
Downloaded 2200 tweets for user: BarackObama
Downloaded 2399 tweets for user: BarackObama
Downloaded 2599 tweets for user: BarackObama
Downloaded 2796 tweets for user: BarackObama
Downloaded 2996 tweets for user: BarackObama
Downloaded 3196 tweets for user: BarackObama
Downloaded 3234 tweets for user: BarackObama
No more tweets found
Downloaded 3234 tweets for user: BarackObama & saved to Mongo

Downloaded 200 tweets for user: cnnbrk
Downloaded 400 tweets for user: cnnbrk
Downloaded 600 tweets for user: cnnbrk
Downloaded 800 tweets for user: cnnbrk

Downloaded 1000 tweets for user: mcuban
Downloaded 1199 tweets for user: mcuban
Downloaded 1398 tweets for user: mcuban
Downloaded 1597 tweets for user: mcuban
Downloaded 1796 tweets for user: mcuban
Downloaded 1986 tweets for user: mcuban
Downloaded 2161 tweets for user: mcuban
Downloaded 2236 tweets for user: mcuban
No more tweets found
Downloaded 2236 tweets for user: mcuban & saved to Mongo

Downloaded 200 tweets for user: SeekingAlpha
Downloaded 400 tweets for user: SeekingAlpha
Downloaded 600 tweets for user: SeekingAlpha
Downloaded 800 tweets for user: SeekingAlpha
Downloaded 1000 tweets for user: SeekingAlpha
Downloaded 1200 tweets for user: SeekingAlpha
Downloaded 1400 tweets for user: SeekingAlpha
Downloaded 1600 tweets for user: SeekingAlpha
Downloaded 1800 tweets for user: SeekingAlpha
Downloaded 2000 tweets for user: SeekingAlpha
Downloaded 2200 tweets for user: SeekingAlpha
Downloaded 2400 tweets for user: SeekingAlpha
Downloaded 2600 tweets for user: SeekingAlpha
Downloa

Downloaded 1800 tweets for user: StockTwits
Downloaded 2000 tweets for user: StockTwits
Downloaded 2200 tweets for user: StockTwits
Downloaded 2400 tweets for user: StockTwits
Downloaded 2600 tweets for user: StockTwits
Downloaded 2800 tweets for user: StockTwits
Downloaded 3000 tweets for user: StockTwits
Downloaded 3200 tweets for user: StockTwits
Downloaded 3215 tweets for user: StockTwits
No more tweets found
Downloaded 3215 tweets for user: StockTwits & saved to Mongo

Downloaded 200 tweets for user: steve_hanke
Downloaded 400 tweets for user: steve_hanke
Downloaded 600 tweets for user: steve_hanke
Downloaded 800 tweets for user: steve_hanke
Downloaded 1000 tweets for user: steve_hanke
Downloaded 1200 tweets for user: steve_hanke
Downloaded 1400 tweets for user: steve_hanke
Downloaded 1600 tweets for user: steve_hanke
Downloaded 1800 tweets for user: steve_hanke
Downloaded 2000 tweets for user: steve_hanke
Downloaded 2200 tweets for user: steve_hanke
Downloaded 2400 tweets for use

In [16]:
completed_collection.count()

94856

In [17]:
# Look at the statuses in task_collection. Everything should be "complete."
curs = task_collection.find()
list_of_docs = []
for i in range(curs.count()):
    list_of_docs.append(curs.next())
    df = pd.DataFrame(list_of_docs)
df

Unnamed: 0,_id,handle,status
0,593deb7857bbd40476642ee0,BarackObama,complete
1,593deb8257bbd40476643b78,cnnbrk,complete
2,593deb8857bbd404766444cd,BillGates,complete
3,593deb9257bbd4047664516a,realDonaldTrump,complete
4,593deb9a57bbd40476645e1d,TheEconomist,complete
5,593deba857bbd40476646aca,jimcramer,complete
6,593debb157bbd4047664776f,HillaryClinton,complete
7,593debbb57bbd404766483fe,TechCrunch,complete
8,593debc357bbd40476649019,elonmusk,complete
9,593debcc57bbd40476649c9d,NewYorker,complete


In [18]:
# A faster way of getting data out of a collection
tweets_list = [document for document in completed_collection.find()]
tweets_df = pd.DataFrame(tweets_list)
tweets_df.head()

Unnamed: 0,_id,text,timestamp,username
0,593deb7057bbd4047664223e,"On this National Gun Violence Awareness Day, l...",2017-06-02 17:35:54,BarackObama
1,593deb7057bbd4047664223f,Forever grateful for the service and sacrifice...,2017-05-29 13:09:16,BarackObama
2,593deb7057bbd40476642240,Good to see my friend Prince Harry in London t...,2017-05-27 13:15:25,BarackObama
3,593deb7057bbd40476642241,"Through faith, love, and resolve the character...",2017-05-25 14:13:35,BarackObama
4,593deb7057bbd40476642242,Our hearts go out to those killed and wounded ...,2017-05-23 16:56:14,BarackObama


In [19]:
# Looks like we successfully got all our data from MongoDB
tweets_df.shape

(94856, 4)