# Twitter Analysis Pipeline

![](https://www.evernote.com/l/AAFtiWq4lUNNiqyswF72dUV3f_7Geq_TcykB/image.png)

In [1]:
! pip install boto3 pymongo twitter --quiet

In [2]:
import lib 

## Write Tweets to S3

### Create a Tweet Iterator

In [3]:
consumer_key    = None
consumer_secret = None
token           = None
token_secret    = None
bounding_box    = "-118.5137323688,34.0001996344,-118.4702449172,34.0331651696"

tweet_iterator  = lib.create_tweet_iterator(token, 
                                            token_secret,
                                            consumer_key,
                                            consumer_secret,
                                            bounding_box)

MissingCredentialsError: You must supply strings for token_secret and consumer_secret, not None.

In [None]:
tweets = [next(tweet_iterator) for _ in range(50)]

In [None]:
tweets_2 = []
for _ in range(50):
    tweets_2.append(next(tweet_iterator))

### Write list of tweets to JSON file on disk

In [None]:
import json 

username = 'joshua'

filename = lib.create_timestamped_filename(username)
with open(filename, 'w') as outfile:
    json.dump(tweets, outfile)
    
filename = lib.create_timestamped_filename(username)
with open(filename, 'w') as outfile:
    json.dump(tweets_2, outfile)    

### Write Tweet Files to S3, Step 1 - Create a Boto Client to S3

https://boto3.readthedocs.io

In [None]:
s3_client = lib.create_boto_client()

In [None]:
from os import listdir

In [None]:
current_directory = listdir()
current_directory = [file for file in current_directory if 'tweets-joshua' in file]
current_directory

In [None]:
from os import rename

### Write Tweet Files to S3, Step 2 - Write a file to S3

In [None]:
S3_BUCKET = 'uclax-data-science'

for filename in current_directory:
    lib.write_file_to_S3(s3_client, filename, S3_BUCKET)
    old_filename = filename
    filename = filename.replace('.json', '')
    filename = filename + '-processed.json'
    rename(old_filename, filename)

In [None]:
current_directory = listdir()
current_directory = [file for file in current_directory if 'tweets-joshua' in file]
current_directory

In [4]:
my_list = list(range(1,11))
my_list = [val for val in my_list if val%2 == 0]
my_list

[2, 4, 6, 8, 10]

### List Files on S3

In [None]:
s3_files = lib.list_files_in_S3_bucket(s3_client, S3_BUCKET)
s3_files = [file for file in s3_files if 'tweets-joshua' in file]
s3_files

## Write Tweets to Mongo

### Read an object from S3

In [None]:
S3_BUCKET = 'uclax-data-science'
key = s3_files[1]

tweets_from_s3 = lib.read_object_from_S3(s3_client, key, S3_BUCKET)

### Write Tweets to Mongo

In [None]:
from pymongo.errors import DuplicateKeyError

In [None]:
collection_client = lib.create_mongo_client_to_database_collection('twitter', 'tweets')
for tweet in tweets_from_s3:
    try:
        collection_client.insert_one(tweet)
    except DuplicateKeyError as e:
        print(e)

In [None]:
collection_client = lib.create_mongo_client_to_database_collection('twitter', 'tweets')
for tweet in tweets_from_s3:
    try:
        collection_client.insert_one(tweet)
    except DuplicateKeyError as e:
        print(e)

In [None]:
for key in s3_files[2:]:
    for tweet in tweets_from_s3:
        try:
            collection_client.insert_one(tweet)
        except DuplicateKeyError as e:
            print(e)

## One More Time

In [None]:
tweets = [next(tweet_iterator) for _ in range(50)]

filename = lib.create_timestamped_filename(username)
with open(filename, 'w') as outfile:
    json.dump(tweets, outfile)

In [None]:
current_directory = listdir()
current_directory = [file for file in current_directory 
                     if ('tweets-joshua' in file) and ('processed' not in file)]
current_directory

In [None]:
for filename in current_directory:
    lib.write_file_to_S3(s3_client, filename, S3_BUCKET)
    old_filename = filename
    filename = filename.replace('.json', '')
    filename = filename + '-processed.json'
    rename(old_filename, filename)

In [None]:
s3_files = lib.list_files_in_S3_bucket(s3_client, S3_BUCKET)
s3_files = [file for file in s3_files if 'tweets-joshua' in file]
s3_files

In [None]:
s3_files[-1]

In [None]:
key = s3_files[-1]

In [None]:
tweets_from_s3 = lib.read_object_from_S3(s3_client, key, S3_BUCKET)

In [None]:
collection_client.insert_many(tweets_from_s3)

## Automate It

In [None]:
from datetime import datetime

In [None]:
TOKEN           = "12270642-8ggOVEjcQ7DAP2VCCZbXYoRtiOf0rngzrsyb82h7k"
TOKEN_SECRET    = "85KnJJBbdvhZyGhOVLPwk47x2IAHaLEFGhNJdrHXo1Tv8"
CONSUMER_KEY    = "Jt9H3ve228Tl8yafi4Ip8Iwbw"
CONSUMER_SECRET = "xXxkq7QM1D7K1w0tFKRIYYviS2EQ288jzbbiwzEYYRQrMyvCDN"
BOUNDING_BOX    = "-118.5137323688,34.0001996344,-118.4702449172,34.0331651696"

DATABASE        = 'twitter'
COLLECTION      = 'tweets'
USERNAME        = 'joshua'

S3_BUCKET       = 'uclax-data-science'

In [None]:
tweet_iterator    = lib.create_tweet_iterator(TOKEN, TOKEN_SECRET, 
                                              CONSUMER_KEY, CONSUMER_SECRET, BOUNDING_BOX)
s3_client         = lib.create_boto_client()
collection_client = lib.create_mongo_client_to_database_collection(DATABASE, COLLECTION)

In [None]:
def timestamp():
    now = datetime.now().strftime('%D %H:%M:%S')
    print(now, end=' | ')
    print('Collecting Tweets', end=' | ')
    
def collect_tweets(n=50):
    tweets = [next(tweet_iterator) for _ in range(n)]
    print('{} Tweets'.format(n), end=' | ')
    return tweets
    
def write_to_disk(tweets):
    filename = lib.create_timestamped_filename(USERNAME) 
    with open(filename, 'w') as outfile:
        json.dump(tweets, outfile)    
    print('Written to Disk', end=' | ')
    return filename
    
def write_to_S3(filename):   
    lib.write_file_to_S3(s3_client, filename, S3_BUCKET)
    old_filename = filename
    filename = filename.replace('.json', '')
    filename = filename + '-processed.json'
    rename(old_filename, filename)
    print('Written to S3', end=' | ')
    
def insert_to_mongo(key):
    tweets_from_s3 = lib.read_object_from_S3(s3_client, key, S3_BUCKET)    
    collection_client.insert_many(tweets_from_s3)
    print('Inserted to Mongo')

In [None]:
while True:
    timestamp()
    tweets = collect_tweets()
    filename = write_to_disk(tweets)
    write_to_S3(filename)
    insert_to_mongo(filename)