# Tweet Producer

@author: @raymondmarfurt

Streaming with Spark<br>
ZHAW CAS Machine Intelligence<br>
Big Data Project<br>

Use Twitter realtime API to collect tweets by filtering for specific hashtags, consolidate hashtags and store as raw JSON files

In [2]:
import tweepy
import json
import time
import traceback

CONSUMER_KEY = 'xxx'
CONSUMER_SECRET = 'yyy'
OAUTH_TOKEN = 'aaa'
OAUTH_TOKEN_SECRET = 'bbb'

HASHTAGS = ['#tesla',
            '#apple',
            '#Microsoft',
            '#mcdonalds',
            '#nike',
            '#pfizer',
            '#facebook',
            '#alphabet',
            '#goldmansachs',
            '#lockheadmartin']
HASHTAGS2 = [h[1:].lower() for h in HASHTAGS]

TWEET_DIR = "/tmp/tweet_small/"



In [3]:
dbutils.fs.rm(TWEET_DIR, recurse=True)

In [4]:
dbutils.fs.mkdirs(TWEET_DIR)

In [5]:
dbutils.fs.ls(TWEET_DIR)

In [6]:
import os
FILE_MAX_THRESHOLD = 40
FILE_MIN_THRESHOLD = 30
file_list = os.listdir("/dbfs" + TWEET_DIR)
number_of_files = len(file_list)

print("Number of files: " + str(number_of_files))

diff = 0

if number_of_files < FILE_MAX_THRESHOLD:
  print("Nothing to cleanup")
else:
  diff = number_of_files-FILE_MIN_THRESHOLD

for f in sorted(file_list)[:diff]:
  print("deleting file " + f)

In [7]:
import os
import json
import sys
import traceback

class MyStreamListener(tweepy.StreamListener):
    FILE_MAX_THRESHOLD = 40
    FILE_MIN_THRESHOLD = 30

    def on_status(self, status):
        id = status.id
        print("Processing tweet" + str(id))
        file_name = "/dbfs" + TWEET_DIR + str(id)
        new_status = self._transform_ht(status)
        if not new_status._json['entities']['hashtags'] == None:
            with open(file_name, 'w') as f:
                json.dump(new_status._json, f)
        else:
            print("skipping empty hashtag")
        self._cleanup()

    def on_error(self, status_code):
        if status_code == 420:
            print("!!! encountered 420. Aborting.")
            return False
        else:
            print("!!! UNKNOWN ERROR " + str(status_code))
            return False
          
    def _cleanup(self):
        file_list = os.listdir("/dbfs" + TWEET_DIR)
        number_of_files = len(file_list)
        print("Number of files in output folder: " + str(number_of_files))

        diff = 0
        if number_of_files > self.FILE_MAX_THRESHOLD:
            diff = number_of_files-self.FILE_MIN_THRESHOLD
            print("Need to clean up. Deleting " + str(diff) + " files...")
            for f in sorted(file_list)[:diff]:
                dbutils.fs.rm(TWEET_DIR + f)
              
              
    def _transform_ht(self, status):
        tweet = status._json
        retval = {}
        hashtag_list = []
        ht = None
        
        ht = self._check_for_hashtag(tweet['entities']['hashtags'])
        if ht == None and 'extended_tweet' in tweet.keys():
            ht = self._check_for_hashtag(tweet['extended_tweet']['entities']['hashtags'])
        if ht == None and 'retweeted_status' in tweet.keys():
            ht = self._check_for_hashtag(tweet['retweeted_status']['entities']['hashtags'])
            if ht == None and 'extended_tweet' in tweet['retweeted_status'].keys()\
                and 'entities' in tweet['retweeted_status']['extended_tweet'].keys()\
                and 'hashtags' in tweet['retweeted_status']['extended_tweet']['entities'].keys():
                ht = self._check_for_hashtag(tweet['retweeted_status']['extended_tweet']['entities']['hashtags'])
        if ht == None and 'quoted_status' in tweet.keys():
            ht = self._check_for_hashtag(tweet['quoted_status']['entities']['hashtags'])
            if ht == None and 'extended_tweet' in tweet['quoted_status'].keys()\
                and 'entities' in tweet['quoted_status']['extended_tweet'].keys()\
                and 'hashtags' in tweet['quoted_status']['extended_tweet']['entities'].keys():
                ht = self._check_for_hashtag(tweet['quoted_status']['extended_tweet']['entities']['hashtags'])
                
        if ht != None:
          ht['text'] = ht['text'].lower()
          print("setting hashtag to " + ht['text'])
        tweet['entities']['hashtags'] = [ht]
        return status

    def _check_for_hashtag(self, ht_list):
        for ht in ht_list:
            if ht['text'].lower() in HASHTAGS2:
                return ht
        return None


In [8]:
auth = tweepy.OAuthHandler(CONSUMER_KEY, CONSUMER_SECRET)
auth.set_access_token(OAUTH_TOKEN, OAUTH_TOKEN_SECRET)
api = tweepy.API(auth, wait_on_rate_limit=True, wait_on_rate_limit_notify=True)

myStreamListener = MyStreamListener()
last_try_time = 0

while True:
    print("---- start fetching ----")
    current_time = time.time()
    if current_time-last_try_time < 60*15:
        print("retrying too fast. Sleeping for 15min.")
        time.sleep(60*15)
    try:
        myStream = tweepy.Stream(auth = api.auth, listener=myStreamListener)
        myStream.filter(track=HASHTAGS, async=False)
    except IOError as ex:
        print('I just caught the exception: %s' % ex)
        traceback.print_stack()
        time.sleep(60)



In [9]:
#dbutils.fs.rm("/tmp/tweet_data", recurse=True)