In [13]:
import tweepy
from datetime import datetime, timedelta
from google.oauth2 import service_account
import re
import time
import nltk
from nltk.tokenize import WordPunctTokenizer
from nltk.corpus import stopwords 
from google.cloud import language
from google.cloud.language import enums
from google.cloud.language import types
from google.cloud import bigquery
import schedule
import logging

In [23]:
stop_words = stopwords.words('english')
stop_words.append('walmart')
stop_words = set(stop_words)

In [2]:
logging.basicConfig(filename='main_twitter_dash.log', filemode='w',level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')

In [3]:
def authentication(cons_key, cons_secret, acc_token, acc_secret):
    auth = tweepy.OAuthHandler(cons_key, cons_secret)
    auth.set_access_token(acc_token, acc_secret)
    api = tweepy.API(auth)
    return api

In [4]:
#Twitter API Setup
ACC_TOKEN = 'YOUR INFO HERE'
ACC_SECRET = 'YOUR INFO HERE'
CONS_KEY = 'YOUR INFO HERE'
CONS_SECRET = 'YOUR INFO HERE'

api = authentication(CONS_KEY,CONS_SECRET,ACC_TOKEN,ACC_SECRET)

In [5]:
#Google Service Account Credentials
credentials = service_account.Credentials.from_service_account_file('PATH TO GOOGLE CREDENTIALS')

#DBCLIENT INTIALIZATION
tweets_table_id = "TABLE NAME"
keywords_table_id = "TABLE NAME"
hashtags_table_id = "TABLE NAME"
company_tweets_table_id = "TABLE NAME"
replies_table_id = "TABLE NAME"
followers_table_id = "TABLE NAME"
project_id = "PROJECT NAME"
db_client = bigquery.Client(project=project_id, credentials = credentials)


In [16]:
class tweet:
    
    def clean_tweet(self, tweet_json):
        user_removed = re.sub(r'@[A-Za-z0-9]+','',tweet_json.decode('utf-8'))
        link_removed = re.sub('https?://[A-Za-z0-9./]+','',user_removed)
        number_removed = re.sub('[^a-zA-Z0-9]', ' ', link_removed)
        lower_case_tweet= number_removed.lower()
        tok = WordPunctTokenizer()
        words = tok.tokenize(lower_case_tweet)
        clean_tweet = (' '.join(words)).strip()
        return clean_tweet
    
    def get_hashtags(self, tweet_json):
        hashtags = []
        if tweet_json.entities['hashtags'] != []:
            hashtags = [hashtag["text"] for hashtag in tweet_json.entities['hashtags']]
            
        return hashtags
    
    def get_sentiment(self, text):
        client = language.LanguageServiceClient(credentials = credentials)
        document = types\
                   .Document(content=text,
                             type=enums.Document.Type.PLAIN_TEXT)
        sentiment= client\
                          .analyze_sentiment(document=document)\
                          .document_sentiment\
                          
        return sentiment.score
    
    def get_keywords(self, text):
        keywords = []
        
        #Verbs, adjs, and nouns
        parts_of_speech = [1, 6, 11]
        
        client = language.LanguageServiceClient(credentials = credentials)
        document = types\
                   .Document(content=text,
                             type=enums.Document.Type.PLAIN_TEXT)
        syntax = client\
                          .analyze_syntax(document=document)
        
        for token in syntax.tokens:
            if token.part_of_speech.tag in parts_of_speech and token.text.content not in stop_words and token.text.content != "rt":
                keywords.append(token.text.content) 
                          
        return keywords


    
    def __init__(self, tweet_json):
        self.json = tweet_json
    
    def get_last_company_tweet_primary_id(self):
        query = ("""
            SELECT MAX(primary_id)
            FROM `TABLE NAME`
            """
            )
        query_job = db_client.query(
                query
            )  

        results = query_job.result()

        for row in results:
            if str(row[0]) == 'None':
                return(1)
            else:
                return(row[0])
        
   

        
    def prepare_tweet(self):
        self.text = self.clean_tweet(self.json.text.encode('utf-8'))
        self.created_at = datetime.timestamp(self.json.created_at)
        self.hashtags = self.get_hashtags(self.json)
        self.sentiment = self.get_sentiment(self.text)
        self.keywords = self.get_keywords(self.text)
        self.primary_id = self.json.id
        
    def prepare_company_tweet(self):
        self.text = self.json.text
        self.created_at = datetime.timestamp(self.json.created_at)
        self.tweet_id = self.json.id
        self.primary_id = (self.get_last_company_tweet_primary_id() + 1)
        self.favorites = self.json.favorite_count
        self.retweets = self.json.retweet_count
        
    def prepare_reply(self):
        self.text = self.clean_tweet(self.json.text.encode('utf-8'))
        self.created_at = datetime.timestamp(self.json.created_at)
        self.sentiment = self.get_sentiment(self.text)
        self.primary_id = self.json.id
        self.response_id = self.json.in_reply_to_status_id
        
    def store_tweet(self):
        errors = []
       
        tweet_rows = [[self.primary_id, self.created_at, self.text, self.sentiment]]
        table = db_client.get_table(tweets_table_id)
        errors.append(db_client.insert_rows(table, tweet_rows))
        
        if self.keywords != []:
            keyword_rows = [[self.created_at, keyword, self.primary_id] for keyword in self.keywords]
            table = db_client.get_table(keywords_table_id)
            errors.append(db_client.insert_rows(table, keyword_rows))
            
        if self.hashtags != []:
            hashtag_rows = [[self.created_at, hashtag, self.primary_id] for hashtag in self.hashtags]
            table = db_client.get_table(hashtags_table_id)
            errors.append(db_client.insert_rows(table, hashtag_rows))
        
        if errors == True:
            logging.error("Tweet Storage Error" + errors)
        
        return errors
    
    def store_company_tweet(self):
        errors = []
       
        company_tweets_rows = [[self.primary_id, self.tweet_id, self.created_at, self.text, self.favorites, self.retweets]]
        table = db_client.get_table(company_tweets_table_id)
        errors.append(db_client.insert_rows(table, company_tweets_rows))
    
        if errors == True:
            logging.error("Company Tweet Storage Error" + errors)
            
        return errors
    
    def store_reply(self):
        errors = []
       
        reply_rows = [[self.primary_id, self.response_id, self.created_at, self.text, self.sentiment ]]
        table = db_client.get_table(replies_table_id)
        errors.append(db_client.insert_rows(table, reply_rows))
        
        if errors == True:
            logging.error("Reply Storage Error" + errors)
            
        return errors
    
    def get_data(self):
        print(self.text)
        print(self.created_at)
        print(self.hashtags)
        print(self.sentiment)
        print(self.keywords)
     

In [17]:
#override tweepy.StreamListener to add logic to on_status
class MyStreamListener(tweepy.StreamListener):
        
    def on_status(self, status):
        tweet_handler = tweet(status)
        tweet_handler.prepare_tweet()
        tweet_handler.store_tweet()
        


In [18]:
def run_stream():
    logging.info("Tweet Stream Started")

    try:
        myStreamListener = MyStreamListener()
        myStream = tweepy.Stream(auth = api.auth, listener=myStreamListener)
        myStream.filter(track=['walmart'], languages=['en'], is_async = True)
        time.sleep(20)
        myStream.disconnect()
        logging.info("Tweet Stream Closed")
        
    
    except Exception as e:
        logging.error("Streaming Error" + str(e))
        pass



In [None]:
schedule.every(30).minutes.do(run_stream) 

while True: 
    try:
        schedule.run_pending() 
        time.sleep(1) 
    
    except Exception as e:
        logging.error("Scheduler Failure" + str(e))
        pass