![Twitter](https://upload.wikimedia.org/wikipedia/commons/thumb/5/51/Twitter_logo.svg/469px-Twitter_logo.svg.png)

In [None]:
import twitter
import nltk

from pymongo import MongoClient

__version__ = '1.0'
__all__ = []
__author__ = 'Axel Oehmichen - ao1011@imperial.ac.uk'

In [None]:
import time

def prepare_tweet(tweet_json):
    tweet_dic = dict(tweet_json)
    user_id = tweet_dic["user"]["id"]
    tweet_dic["user"] =str(user_id)
    return tweet_dic

def insert_timeline_into_mongo(twitter_user_id, api, MONGO_URL):

    mongo_client = MongoClient(MONGO_URL )
    users_collection = mongo_client.twitter.twitterUsers
    tweets_collection = mongo_client.twitter.tweets

    max_id = None  # since_id parameter to the greatest ID of all the Tweets your application has already processed.
    count = 10  # We retrieve 10 tweets at a time
    current_count = 0
    max_count = 1500
    timeline_json = []
    # Spark output isn't idempotent so, although the insert occurs in a transaction,
    # it's possible for it to succeed in both tasks before one can be cancelled.
    if users_collection.find({"user.id": twitter_user_id}).count() == 0:
        users_collection.insert_one({"id": twitter_user_id})
        while current_count <= max_count:
            # We retrieve the first chunk of tweets
            timeline_chunk = api.GetUserTimeline(twitter_user_id, max_id=max_id, count=count)
            if len(timeline_chunk) == 1 :
                current_count = max_count + 1
            else:
                max_id = timeline_chunk[-1].id
                timeline_json.extend(timeline_chunk)               
                # We insert the tweets into the collection
                tweets_collection.insert_many([ prepare_tweet(timeline_chunk[i]._json)  for i in range(len(timeline_chunk))])
                current_count += len(timeline_chunk)
                time.sleep(1)
        
        # We insert our user to the user collection
        users_collection.update_one({'id': twitter_user_id}, {"$set":  dict(timeline_json[0].user._json)}, upsert=False)
    # we close our mongo connection
    mongo_client.close()
    return "OK"

In [None]:
def process_user(twitter_user_id, api, MONGO_URL):

    tweets_inserted_status = insert_timeline_into_mongo(twitter_user_id, api, MONGO_URL)
    
    return (twitter_user_id , tweets_inserted_status)

# Main Program

We will now set all the paramters required to access twitter and the MongoDb database.

In [None]:
# Twitter key and secret for OAuth
consumer_key = "XXX"
consumer_secret = "YYY"

access_token = "AAA"
access_token_secret = "BBB"

api = twitter.Api(consumer_key=consumer_key,
                  consumer_secret=consumer_secret,
                  access_token_key=access_token,
                  access_token_secret=access_token_secret)

# The users chosen are
user_ids = ["25073877", "813286", "1339835893", "52544275", "409486555", "759251", "3235334092"]

# Address of the mongo cluster
MONGO_URL = "mongodb://"

We retieve the timelines for the specified users and print out "OK" when the task is completed by the worker.

In [None]:
users_ids_rdd = sc.parallelize(user_ids)
insertion = users_ids_rdd.map(lambda user_id : process_user(user_id, api, MONGO_URL))

In [None]:
insertion.collect()

# Natural Language Processing

### We do a small language processing on the tweets and we insert them back into a new collection.
If you are interested in discovering further the nltk library : http://www.nltk.org/

In [None]:
def process_tweets_for_user(twitter_user_id, MONGO_URL):
    mongo_client = MongoClient(MONGO_URL )
    tweets_collection = mongo_client.twitter.tweets
    tweets_processed = mongo_client.twitter.processedTweets

    for tweet in tweets_collection.find({"user": twitter_user_id}):
        text = tweet["text"]
        tokens = nltk.word_tokenize(text)
        tagged = nltk.pos_tag(tokens)
        doc = {"text": text,
               "tokens": tokens,
               "tagged": tagged
               }
        tweets_processed.insert_one(doc)
    return "Processed"

In [None]:
process_status = users_ids_rdd.map(lambda user_id : process_tweets_for_user(user_id, MONGO_URL))

In [None]:
process_status.collect()

# Exercises

**The reference documentation for pymongo is available at that address:** https://api.mongodb.com/python/current/ 

Queries:
* Count the number of tweets and users
* Print out the name of all the users inserted
* Find the most retweeted tweet
* Find the shortest tweet
* Count all the words used the tweets and find the top 5 most used



In [None]:
mongo_client = MongoClient(MONGO_URL)

# Count the number of tweets 
mongo_client.twitter.tweets.count()

# Count the number of users
mongo_client.twitter.twitterUsers.count()

In [None]:
# Print out the name of all the users inserted 
cursor = mongo_client.twitter.twitterUsers.find({},{"name": 1})

for document in cursor:
    print document["name"]

In [None]:
from pymongo import DESCENDING 
# Find the most retweeted tweet
most_retweeted = mongo_client.twitter.tweets.find().sort("retweet_count", DESCENDING).limit(1)

for t in most_retweeted:
    print t["retweet_count"]
    print t

In [None]:
# Find the shortest tweet
def text_length(tweet_id, MONGO_URL):
    mongo_client = MongoClient(MONGO_URL)
    tweet = mongo_client.twitter.tweets.find_one({"_id": tweet_id["_id"]})
    return (tweet["text"], len(tweet["text"]))

def compare_length(tweet1, tweet2):
    if(tweet1[1]<tweet2[1]):
        return  tweet1
    else:
        return tweet2

tweets_ids = sc.parallelize(list(mongo_client.twitter.tweets.find({},{"_id": 1})))

shortest_tweet = tweets_ids.map(lambda tweet_id : text_length(tweet_id, MONGO_URL)).reduce(lambda t1,t2: compare_length(t1,t2))

print "The shortest tweet is " + str(shortest_tweet[0]) + "\nAnd the length is " +  str(shortest_tweet[1])

In [None]:
# Count all the words used the tweets and find the top 5 most used
from bson.code import Code
mapper = Code("""
               function () {
                 this.tokens.forEach(function(z) {
                   emit(z, 1);
                 });
               }
               """)

reducer = Code("""
                function (key, values) {
                  var total = 0;
                  for (var i = 0; i < values.length; i++) {
                    total += values[i];
                  }
                  return total;
                }
                """)

result =  mongo_client.twitter.processedTweets.map_reduce(mapper, reducer, "myresults")

for doc in result.find().sort("value", DESCENDING).limit(5):
    print doc