# Country wise Data Scraping

In [None]:
import tweepy
import json
import time
import pandas as pd
import pickle
# import threading
import numpy as np
import re

# Connecting with Google Sheets
import gspread
from df2gspread import df2gspread as d2g
from df2gspread import gspread2df as g2d

from oauth2client.service_account import ServiceAccountCredentials

# Geo-Parsing the text data

import geopandas as gpd
from geotext import GeoText

# import matplotlib.pyplot as plt
# %matplotlib inline
# from geopy.geocoders import Nominatim
# from geopy.exc import GeocoderTimedOut

# download nltk dependent files : punkt, stopwords, averaged_perceptron_tagger, maxent_ne_chunker, words
import nltk
from nltk import load_parser
from nltk import word_tokenize
from nltk.corpus import stopwords
from nltk import word_tokenize, pos_tag, ne_chunk
from nltk import Tree

In [None]:

# Default Scope
scope = ['https://spreadsheets.google.com/feeds', 'https://www.googleapis.com/auth/drive']

# Name of our Service Account Key
google_key_file = '../../TwitterAppData/twitterscrapeddata-73e2d5298a3a.json'
credentials = ServiceAccountCredentials.from_json_keyfile_name(google_key_file, scope)
gc = gspread.authorize(credentials)


# This is the Worksheet ID - CountryScrape
spreadsheet_key = '1692ntzc3LTMeEqHYSgV3KGbSGSNEiycqPAl03ZNPMiQ'

In [None]:
twitterAppCredentials = json.load(open("../../TwitterAppData/twiterAppData_CountryScrape.json", "r"))

consumer_key = twitterAppCredentials["consumer_key"]
consumer_secret = twitterAppCredentials["consumer_secret"]
access_token = twitterAppCredentials["access_token"]
access_token_secret = twitterAppCredentials["access_token_secret"]

auth = tweepy.OAuthHandler(consumer_key, consumer_secret)
auth.set_access_token(access_token, access_token_secret)
api = tweepy.API(auth,wait_on_rate_limit=True)

In [None]:
col_indexes = ["created_at",
               "id_str", "user_id",
               "is_quote_status",
               "text", "verified", 
               "likes", "retweet_count",
               "location","json"
              ]

In [None]:
def get_continuous_chunks(text, label):
    chunked = ne_chunk(pos_tag(word_tokenize(text)))
    prev = None
    continuous_chunk = []
    current_chunk = []

    for subtree in chunked:
        if type(subtree) == Tree and subtree.label() == label:
            current_chunk.append(" ".join([token for token, pos in subtree.leaves()]))
            
        if current_chunk:
            named_entity = " ".join(current_chunk)
            if named_entity not in continuous_chunk:
                continuous_chunk.append(named_entity)
                current_chunk = []
        else:
            continue
    return continuous_chunk

In [None]:
def getUnTaggedLocation(userLoc, text):
    flag_user = 0
    loc = {}
    
    if not(userLoc is np.nan):
        #user_loc not null
        userLoc = re.split(",|;|/|\n",userLoc)
        flag_user = 1
    
    flag_text = 1
    textLoc = get_continuous_chunks(text,'GPE')
    if len(textLoc) == 0:
        textLoc = None
        flag_text = 0

        
    if (flag_user*flag_text != 0):     # Neither user nor Text is empty
        textLoc.extend(userLoc)
        loc = set(textLoc)
        
    elif(flag_user == 0)&(flag_text == 0):   # Both user and Text is empty
        return None
        
    else:                                  # Either user or Text is empty
        if (flag_user == 1):                          # User NOT empty
            loc = userLoc
        else:                                         # Text NOT Empty
            loc = textLoc
            
    out = []
    for i in loc:
        out.append(''.join(list(GeoText(i).country_mentions.keys())))
    out = set(out)
    if '' in out:
        out.remove('')
        if len(out) == 0:
            out = None
    return out

In [None]:
def getTaggedLocation(tweetPlace):
    if tweetPlace is not None:
#         print(tweetPlace["full_name"],tweetPlace["country_code"])
        tweet_loc = set([tweetPlace["full_name"],tweetPlace["country_code"]])
        return tweet_loc
    else:
        return None

In [None]:
def getEnsembleLoc(geoTaggedLoc, userLoc, text):
    if geoTaggedLoc is np.nan:
        return getTaggedLocation
    else:
        return getUnTaggedLocation(userLoc, text)

In [None]:
def GetStructuredTweet(inp):
    try:
        created_at = inp["created_at"]
        id_str = inp["id_str"]
        user_id = inp["user"]["id_str"]
        is_quote_status = inp["is_quote_status"]
        
        text = inp["extended_tweet"]["full_text"] if "extended_tweet" in inp else inp["text"]
    
        likes = inp["favorite_count"]
        retweet_count = inp["retweet_count"]
        verified = inp["user"]["verified"]
        json_body = json.dumps(inp)

        user_loc = inp["user"]["location"] or np.nan
        
        location = getEnsembleLoc(inp["place"], user_loc, text)
        
        if(location != None):
            location = ','.join(list(location))
        
        k = pd.Series([created_at,
                       id_str, user_id,
                       is_quote_status,
                       text, verified,
                       likes, retweet_count,
                       location, json_body
                      ],
                 index = col_indexes).to_frame().T
        return k
    except BaseException as ex:
        print('failed in Structuring the tweet: ',inp["id_str"],ex)
        return pd.DataFrame(columns = col_indexes)

In [None]:

class Manage_ID:
    
    def __init__(self, location):
        try:
            self.filePath = "../data/ID/idSet_"+location
            print(self.filePath)
            self.id_set = set(pickle.load(open(self.filePath,"rb")))
        except:
            self.id_set = set()
            print("Input File Empty")
        print("begin->", len(self.id_set))
        return
    
    def __del__(self):
        print("end->", len(self.id_set))
        pickle.dump(self.id_set, open(self.filePath, "wb"))
        return
        
    def CheckID(self,inputID):      # Flag = True -> already Exists
        return inputID in self.id_set
    
    def UpdateID(self, inputID):
        self.id_set.add(inputID)
        return
        
    def GetLen(self):
        return len(self.id_set)
        
    def OutputID(self):
        for i in self.id_set:
            print(i)

In [None]:
class TweetFilter():
    def __init__(self, location, bufferSize):
        self.status = ''
        self.structuredTweet = ''
        self.idSet = Manage_ID(location)
        
        self.rowCount = self.idSet.GetLen()+1             #Change to ensure data not overriden
        self.isFirstUpload = True                         # Flag to identify is First upload( of an already existing idSet). 
                                                          # Set to False if idSet is newly made!
            
        self.location = location
        print("Uploading to Worksheet: "+location)
        
        self.failed_tweets = []
        self.buffer_size = bufferSize
        
        self.tweetsList = pd.DataFrame(columns = col_indexes)
        return
    
    def __del__(self):
        print("Closing Uploads, Clearing Buffer for "+self.location)
        self.UploadTweet()
        del self.idSet
        
    
    def FailPreFilters(self, data):
        flag = False                # Assume every tweet passes pre-filters by default; untill proven otherwise
        # Add Duplication Check
        flag = self.idSet.CheckID(data["id_str"])    # Flag = True -> already Exists
        
        return flag
    
    
    def FailPostFilters(self, data):
        flag = False                # Assume every tweet passes post-filters by default; untill proven otherwise
        # Add Country Check
        try:
            if self.location not in data["location"][0]:
                flag = True
        except:
            flag = True
        
        return flag
    
    
    def UploadTweet(self):
        try:
            recordCols = False
            if self.rowCount == 1:
                self.isFirstUpload = False
                recordCols = True
                
            if self.isFirstUpload:
                self.rowCount = self.rowCount + 1  # To cater for the Header
                self.isFirstUpload = False
            
            x = d2g.upload(self.tweetsList,
                       spreadsheet_key,
                       self.location,                 # Country Name = WorksheetName
                       credentials=credentials,
                       col_names=recordCols,
                       row_names=True,
                       start_cell = ''.join(['A',str(self.rowCount)]),  
                       clean=False)
            
            print("Uploading to WorkSheet: "+self.location,end=" : ")
            print(str(len(self.tweetsList))+" observations starting from cell: "+''.join(['A',str(self.rowCount)]))

            if recordCols:
                self.rowCount = self.rowCount + self.buffer_size +1
            else:
                self.rowCount = self.rowCount + self.buffer_size
            self.tweetsList = self.tweetsList.iloc[0:0]
            return True
        except BaseException as ex:
            print('failed while Uploading to Google Sheets: ',str(ex))
            self.failed_tweets.append(self.status["id_str"])
            return False

    def StoreTweet(self, data):
        # Store Tweets and Upload Them
        try:
            if self.idSet.CheckID(data["id_str"][0]):
                return False
            
            self.idSet.UpdateID(data["id_str"][0])
            
            self.tweetsList = pd.concat([self.tweetsList, data], ignore_index=True)
            if self.tweetsList.shape[0] >= self.buffer_size:
                status = self.UploadTweet()
            return True
        
        except BaseException as ex:
            print('failed in UploadTweet: ',str(ex))
            self.failed_tweets.append(self.status["id_str"])
            return False
        
    def ProcessTweet(self):
        try:
            # StructureTweet()
            structuredTweet = GetStructuredTweet(self.status)

            # Discard Tweet if Post-Filter Criterias not met
            if self.FailPostFilters(structuredTweet): 
                return False

            # StoreTweet()
            status = self.StoreTweet(structuredTweet)
            return status

        except BaseException as ex:
            print('failed: ',str(ex))
            self.failed_tweets.append(self.status["id_str"])
            return False
        
    def PreProcessTweet(self, tweet):
        body = ''
        #Quoted
        if "quoted_status" in tweet._json:
            if "retweeted_status" in tweet._json["quoted_status"]:
                body = tweet._json["quoted_status"]["retweeted_status"]
            else:
                body = tweet._json["quoted_status"]
        
        #Retweeted
        elif "retweeted_status" in tweet._json:
            if "quoted_status" in tweet._json["retweeted_status"]:
                body = tweet._json["retweeted_status"]["quoted_status"]
            else:
                body = tweet._json["retweeted_status"]
            
        #Original
        else:
            body = tweet._json

        # Discard Tweet if Pre-Filter Criterias not met
        if self.FailPreFilters(body):
            return False
        #else
        self.status = body
        self.ProcessTweet()
        return

In [None]:

#override tweepy.StreamListener to add logic to on_status
class MyStreamListener(tweepy.StreamListener):

    def __init__(self, location, bufferSize):
        super().__init__()
        self.dataFilter = TweetFilter(location, bufferSize)
        
    def on_status(self, status):
        self.dataFilter.PreProcessTweet(status)
        
    def on_error(self, status_code):
        print(status_code)
        if status_code == 420:
            #returning False in on_error disconnects the stream
            return False
        if status_code == 406:
            print("on_error code: 406")
            #returning False in on_error disconnects the stream
            return False
        
    def on_exception(self, exception):
        print(exception)
        return

In [None]:
class CountryScraper:
    def __init__(self, topicList, location, buffer_size):
        self.countryListener = MyStreamListener(location, buffer_size)
        self.countryStream = tweepy.Stream(auth = api.auth, listener=self.countryListener)
        return

    def BeginStreaming(self):
        self.countryStream.filter(track=topicList, languages = ["en"], is_async=True)
        return
    
    def EndStreaming(self):
        self.countryStream.disconnect()
        return
    
    def GetDetails(self):
        print(self.countryListener.dataFilter.tweetsList.shape)
        print("Failed Tweets:\n",self.countryListener.dataFilter.failed_tweets)
        return self.countryListener.dataFilter.tweetsList
    
    def __del__(self):
        del self.countryListener
        del self.countryStream
        

In [None]:
buffer_size = 200

if __name__=="__main__":
    topicList = ['blackLivesMatter']
    countryList = ["US"]
    countryScrape = {}
    
    for country in countryList:
        countryScrape[country] = CountryScraper(topicList, country, buffer_size)
        
        print("\n")
        
        if(countryScrape[country] == False):
            print("Discarding Instance for "+country)
            countryScrape.remove(country)
        else:
            countryScrape[country].BeginStreaming()

In [None]:
# countryScrape["GB"].GetDetails().tail(7)
# countryScrape["IN"].GetDetails().tail(7)
countryScrape["US"].GetDetails().tail(7)

In [None]:
for location in countryScrape.keys():
    print("Stopping Scraping for: ",location)
    countryScrape[location].EndStreaming()

In [None]:
del countryScrape

In [None]:
# #Reconciling IDSets
# wks_name = "US"
# df2 = g2d.download(spreadsheet_key, wks_name, credentials=credentials, col_names = True, row_names = True)
# allIDs = set(df2["id_str"])
# print(len(allIDs))
# pickle.dump(allIDs, open("../data/ID/idSet_"+wks_name, "wb"))