The notebook is to be distributed to the remote servers for data collection, it will run on a local notebook and store the updated meta data on that server

In [None]:


import pandas as pd
import re
import numpy as np
import os
from tqdm import tqdm


#Installing required library for getting location:
!pip install GeoText
from geotext import GeoText

#Installing required library for language detection:
#Need >= 3.0.0
!pip install googletrans==3.1.0a0
from googletrans import Translator

#Installing the required libraries for extracting twitter data
!pip install gender-guesser
#Need >= 4.8.0
!pip install tweepy==4.8.0

#Setting up email notification for when errors are triggered
import smtplib 
import socket

import tweepy as tw
from tweepy.errors import TooManyRequests
from tweepy.errors import NotFound
from tweepy.errors import Forbidden
#NotFound: 404 Not Found

#from datetime import datetime, timedelta
import gender_guesser.detector
import copy
from datetime import date, timedelta, datetime
import warnings
import time

#Library needed for language detection
! pip install langdetect
from langdetect import detect, detect_langs, LangDetectException

Mounted at /content/drive
Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting GeoText
  Downloading geotext-0.4.0-py2.py3-none-any.whl (2.0 MB)
[K     |████████████████████████████████| 2.0 MB 7.1 MB/s 
[?25hInstalling collected packages: GeoText
Successfully installed GeoText-0.4.0
Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting googletrans==3.1.0a0
  Downloading googletrans-3.1.0a0.tar.gz (19 kB)
Collecting httpx==0.13.3
  Downloading httpx-0.13.3-py3-none-any.whl (55 kB)
[K     |████████████████████████████████| 55 kB 2.6 MB/s 
[?25hCollecting hstspreload
  Downloading hstspreload-2022.11.1-py3-none-any.whl (1.4 MB)
[K     |████████████████████████████████| 1.4 MB 17.9 MB/s 
[?25hCollecting rfc3986<2,>=1.3
  Downloading rfc3986-1.5.0-py2.py3-none-any.whl (31 kB)
Collecting sniffio
  Downloading sniffio-1.3.0-py3-none-any.whl (10 kB)
Collecting httpcore==0.9.*

# Running the updating user meta data - need to change in VM:
1. default keys in update_df_user_meta
2.  input_df_path and output_folder_path in updating_and_storing_user_meta_data

In [None]:
"""
Keys used in the different VMs - #.XX is used to reference which VM is used

#.84
def update_df_user_meta(input_df,
                        input_consumer_key= CONSUMER_KEY,
                        input_consumer_secret= CONSUMER_KEY_SECRET,
                        input_access_token= ACCESS_TOKEN,
                        input_access_token_secret= ACCESS_TOKEN_SECRET,
                        input_bearer_token = BEARER_TOKEN
                        ):

#.94
def update_df_user_meta(input_df,
                        input_consumer_key= CONSUMER_KEY,
                        input_consumer_secret= CONSUMER_KEY_SECRET,
                        input_access_token= ACCESS_TOKEN,
                        input_access_token_secret= ACCESS_TOKEN_SECRET,
                        input_bearer_token = BEARER_TOKEN
                        ):

#.95
def update_df_user_meta(input_df,
                        input_consumer_key= CONSUMER_KEY,
                        input_consumer_secret= CONSUMER_KEY_SECRET,
                        input_access_token= ACCESS_TOKEN,
                        input_access_token_secret= ACCESS_TOKEN_SECRET,
                        input_bearer_token = BEARER_TOKEN
                        ):


The function takes in a subsetted df, queries the meta data of the users and populates the subsetted df.

It takes in a df as its only non default argument. 
The Twitter API credentials need to be set in the default arguments of the function

The function returns the updated df, a dictionary of user ids that failed some sort of validation criteria, and the user id of the last user to be populated.


"""  
def update_df_user_meta(input_df,
                        input_consumer_key= CONSUMER_KEY,
                        input_consumer_secret= CONSUMER_KEY_SECRET,
                        input_access_token= ACCESS_TOKEN,
                        input_access_token_secret= ACCESS_TOKEN_SECRET,
                        input_bearer_token = BEARER_TOKEN
                        ):

  ##Connecting to the api, setting the keys
  auth = tw.OAuthHandler(input_consumer_key, input_consumer_secret)
  if(input_access_token_secret!= ""):
    auth.set_access_token(input_access_token, input_access_token_secret)

  api = tw.API(auth, wait_on_rate_limit=True)

  #Creating a copy of the inputted df
  df = copy.deepcopy(input_df)


  #Creating a dictionary used for keeping track of faulty user_ids
  bad_user_ids = {"get_user_from_api":[],"user_created_at_match":[],"user_id_match":[], "user_name_matc":[], "user_screen_name_match":[]}

  #Looping through the df
  for i in tqdm(range(len(df))):
    
    #Ran into some issues with to short time inntervals between queries
    time.sleep(1)
    
    try:
      #ilocing the user_id sometimes returns a format that is non compatible with api requirements -> calling int() 
      user = api.get_user(user_id = int(df.user_id.iloc[i]))
      
    #Error handling if the api cannot retrieve the user from the API - either because it is private or blocked
    except NotFound:
      bad_user_ids["get_user_from_api"].append(df.user_id.iloc[i])
      warnings.warn("Twitter API could not retrive info for user: "+ str(df.user_id.iloc[i]), DeprecationWarning)
      continue
    
    #Error habndling in case the user is suspended
    except Forbidden as forbidden_error:
      if "User has been suspended" in repr(forbidden_error):
        continue
      print("except triggered, last user_id = " +str(df.user_id.iloc[i-1])+"\nForbidden exception triggered: "+str(forbidden_error))
      return df, bad_user_ids, df.user_id.iloc[i-1]

    #Error handling in case there is something faulty with the run -> the script will return the current state of df, bad_user_id, last_index_to_be_populated
    except Exception as e:
      print("except triggered, last user_id = " +str(df.user_id.iloc[i-1])+"\nException triggered: "+str(e))
      return df, bad_user_ids, df.user_id.iloc[i-1]

    
    #Validating user data that should not change when comparing the API input to the dataset:
      #Need to convert df date back from unix timestamp and compare on date, due to rounding
      #If found, the system stores the user id in bad_user_ids and skips the iteration of the df loop
    if datetime.fromtimestamp(df.created_at_1.iloc[i]/1000).date() != user.created_at.date():
      warnings.warn("User retrived from Twitter API has a different value for created_at than in the data, this is true for: "+ str(df.user_id.iloc[i]), DeprecationWarning)
      bad_user_ids["user_created_at_match"].append(df.user_id.iloc[i])
      continue

    #Same for user_id -> this is a non-mutable field
    if df.user_id.iloc[i] != user.id:
      warnings.warn("User retrived from Twitter API has a different value for user_id than in the data, this is true for: "+ str(df.user_id.iloc[i]), DeprecationWarning)
      bad_user_ids["user_id_match"].append(df.user_id.iloc[i])
      continue

    
    #Allowing the user to change name and screen_name as this is possible:
    #https://help.twitter.com/en/managing-your-account/change-twitter-handle
    if df.user_name.iloc[i] != user.name:
      bad_user_ids["user_name_matc"].append(df.user_id.iloc[i])
      df.at[i, 'user_name'] = user.name


    if df.user_screen_name.iloc[i] != user.screen_name:
      bad_user_ids["user_screen_name_match"].append(df.user_id.iloc[i])
      df.at[i, 'user_screen_name'] = user.screen_name


    #user_profile_location - if the user has not specified profile location, user.profile_location.get('name') will return an AttributeError   
    #Since we have performed GeoText on the location in pre-processing the location is in format Country, city, city, city, ....
    #This is the same as what is returned from Twitter API when calling user.profile_location.get('name') -> we choose the location with the most ,
    if type(user.profile_location) == dict:
        df.at[i,'user_location'] = user.profile_location.get('name')


    #user_description - update if new
    if type(user.description) == str:
      df.at[i, 'user_description'] = user.description


    #user_url - update if new
    if type(user.url) == str:
        df.at[i,'user_url'] = user.url

    
    #protected - update if new
    if type(user.protected) == bool:
        df.at[i,'protected'] = float(user.protected)


    #followers_count - update if new
    if type(user.followers_count) == int:
      df.at[i,'followers_count'] = float(user.followers_count)

    
    #friends_count - update if new
    if type(user.friends_count) == int:
      df.at[i,'friends_count'] = float(user.friends_count)


    #listed_count - update if new
    if type(user.listed_count) == int:
      df.at[i,'listed_count'] = float(user.listed_count)



    #favourites_count - update if new
    if type(user.favourites_count) == int:
      df.at[i,'favourites_count'] = float(user.favourites_count)


    #utc_offset - update if new
    #missing values are marked as np.nan meaning comparing on type (float) does not work
    if type(user.utc_offset) == str:
      df.at[i,'utc_offset'] = user.utc_offset


    #time_zone - update if new
    if type(user.time_zone) == str:
      df.at[i,'time_zone'] = user.time_zone


    #geo_enabled - update if new
    if type(user.geo_enabled) == bool:
      df.at[i,'geo_enabled'] = float(user.geo_enabled)


    #verified - might change - for now: all acounts except 3 in dataset have binary labels if verified or not - update if new:
    if type(user.verified) == bool:
      df.at[i,'verified'] = float(user.verified)

      
    #statuses_count -update if new
    if type(user.statuses_count) == int:
      df.at[i,'statuses_count'] = float(user.statuses_count)


    #lang - update if new
    if type(user.lang) == str:
      df.at[i,'lang'] = user.lang


    #contributors_enabled - update if new
    if type(user.contributors_enabled) == bool:
      df.at[i,'contributors_enabled'] = float(user.contributors_enabled)


    #is_translator - update if new
    if type(user.is_translator) == bool:
      df.at[i,'is_translator'] = float(user.is_translator)


    #is_translation_enabled - update if new
    if type(user.is_translation_enabled) == bool:
      df.at[i,'is_translation_enabled'] = float(user.is_translation_enabled)


    #profile_background_color - update if new
    if type(user.profile_background_color) == str:
      df.at[i,'profile_background_color'] = user.profile_background_color


    #profile_background_image_url - update if new
    if type(user.profile_background_image_url) == str:
      df.at[i,'profile_background_image_url'] = user.profile_background_image_url


    #profile_background_image_url_https - update if new
    if type(user.profile_background_image_url_https) == str:
      df.at[i,'profile_background_image_url_https'] = user.profile_background_image_url_https


    #profile_background_tile - update if new
    if type(user.profile_background_tile) == bool:
      df.at[i,'profile_background_tile'] = float(user.profile_background_tile)


    #profile_image_url - update if new
    if type(user.profile_image_url) == str:
      df.at[i,'profile_image_url'] = user.profile_image_url
      

    #profile_image_url_https - update if new
    if type(user.profile_image_url_https) == str:
      df.at[i,'profile_image_url_https'] = user.profile_image_url_https


    #profile_link_color - update if new
    if type(user.profile_link_color) == str:
      df.at[i,'profile_link_color'] = user.profile_link_color


    #profile_sidebar_border_color - update if new
    if type(user.profile_sidebar_border_color) == str:
      df.at[i,'profile_sidebar_border_color'] = user.profile_sidebar_border_color


    #profile_sidebar_fill_color - update if new
    if type(user.profile_sidebar_fill_color) == str:
      df.at[i,'profile_sidebar_fill_color'] = user.profile_sidebar_fill_color


    #profile_text_color - update if new
    if type(user.profile_text_color) == str:
      df.at[i,'profile_text_color'] = user.profile_text_color
      

    #profile_use_background_image - update if new
    if type(user.profile_use_background_image) == bool:
      df.at[i,'profile_use_background_image'] = float(user.profile_use_background_image)


    #translator_type - update if new
    if type(user.translator_type) == str:
      df.at[i,'translator_type'] = user.translator_type


    #withheld - update if new
    if type(user.withheld_in_countries) == list:
      df.at[i,'withheld'] = user.withheld_in_countries


    #has_extended_profile - update if new
    if type(user.has_extended_profile) == bool:
      df.at[i,'has_extended_profile'] = float(user.has_extended_profile)


    #default_profile - update if new
    if type(user.default_profile) == bool:
      df.at[i,'default_profile'] = float(user.default_profile)


    #default_profile_image - update if new
    if type(user.default_profile_image) == bool:
      df.at[i,'default_profile_image'] = float(user.default_profile_image)


      
  return df, bad_user_ids, df.iloc[-1].user_id


In [None]:

def updating_and_storing_user_meta_data(input_df_path,
                                        output_folder_path,
                                        increment_intervall = 300
                                        ):
  
  """
  The function takes in an input file path and a othput folder path as its only non-default arguments.
    output_folder_path is to be specified without the final /

  The increment interval controls how often the function stores updated data 
  and at which intervals it subsets the df and sends it to update_df_user_meta.
  It is defualted to 300 as this is the rate limit of the elevated API

  The frequent storage is needed as the function is intended to be used without supervision on a virtual machine.

  The function has no return

  """


  #Reading in the df from input_df_path
  df = pd.read_json(input_df_path)

  #Reseting index as the main df has been split when creating the distributed datasets
  df = df.reset_index(drop=True)
  
  #Instantiating dummy variables updated with returns from update_df_user_meta
    #Creating an empty df to be populated incrementally
  df2 = copy.deepcopy(df)
  df2 = df2.dropna()
  bad_user_ids = {"get_user_from_api":[],"user_created_at_match":[],"user_id_match":[], "user_name_matc":[], "user_screen_name_match":[]}
  index_of_last_user_to_be_populated = -1


  #Looping through df from input_df_path
  for i in range(0,len(df),increment_intervall):
   
    try:
      
      #Subsetting df and feeding into update_df_user_meta
      d = df.iloc[index_of_last_user_to_be_populated+1:i+increment_intervall].reset_index(drop=True)
      df3, bad_user_ids2, last_user_to_be_populated = update_df_user_meta(d)
      
      #Extending df2 with this iterations return value
      df2 = df2.append(df3)
      df2 = df2.reset_index(drop=True)

      #extending the bad_user_ids dictionary 
      [bad_user_ids.setdefault(k, []).extend(v) for k,v in bad_user_ids2.items()]

      #Updating last index to be populated
      index_of_last_user_to_be_populated = df.index[df.user_id == last_user_to_be_populated][0]

      #Storing relevant data:
      df2.to_json(output_folder_path+'/start_user_id_'+str(df.user_id.iloc[0])+'_end_user_id'+str(df2.user_id.iloc[-1])+'.json')
        
      #Creating a df from bad_user_ids with None to make sure all values are the same length
      pd.DataFrame(dict([ (k,pd.Series(v)) for k,v in bad_user_ids.items() ])).to_json(output_folder_path+'/bad_user_ids_start_user_id_'+str(df.user_id.iloc[0])+'_end_user_id'+str(df2.user_id.iloc[-1])+'.json')

      print('\n\nSuccessfully stored start_user_id_'+str(df.user_id.iloc[0])+'_end_user_id'+str(df2.user_id.iloc[-1])+'\n\n')
    
    #Error hanlding - sending an email to recipricant from sender
    except Exception as e:
      
      #extending the get_user_from_api from this iteration
      bad_user_ids['get_user_from_api'].extend(df.user_id.iloc[index_of_last_user_to_be_populated+1:i+increment_intervall].tolist())
      
      #Storing bad_user_ids
      pd.DataFrame(dict([ (k,pd.Series(v)) for k,v in bad_user_ids.items() ])).to_json(output_folder_path+'/bad_user_ids_start_user_id_'+str(df.user_id.iloc[0])+'_end_user_id'+str(last_user_to_be_populated)+'.json')
     
      print("Main for loop failed, Error: "+ str(e) + "i: "+str(i) + "increment_intervall: " + str(increment_intervall))
      
      #Sending RECIPRICANT@cbs.dk an email if exception is triggered
      #Getting local IP
      s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
      s.connect(("8.8.8.8", 80))
      LOCAL_IP = s.getsockname()[0]
      s.close()

      #Setting up connection to sender email - setup is used for gmail sender - RECIPRICANT can be any mail
      with smtplib.SMTP_SSL('smtp.gmail.com', 465) as connection:  
        email_address = 'SENDER@gmail.com'
        email_password = 'TWO_FACTOR_AUTHENTICATION_PASSWORD'
        connection.login(email_address, email_password )
        connection.sendmail(from_addr=email_address, to_addrs='RECIPRICANT@cbs.dk', 
        msg="IP: "+str(LOCAL_IP)+"\nMain for loop failed, Error: "+ str(e) + "i: "+str(i) + "increment_intervall: " + str(increment_intervall)
        )


  #Storing one final copy after exiting the for loop
  df2.to_json(output_folder_path+'/FINAL_start_user_id_'+str(df.user_id.iloc[0])+'_end_user_id'+str(df2.user_id.iloc[-1])+'.json')
  
  #Creating a df from bad_user_ids with None to make sure all values are the same length
  pd.DataFrame(dict([ (k,pd.Series(v)) for k,v in bad_user_ids.items() ])).to_json(output_folder_path+'/FINAL_bad_user_ids_start_user_id_'+str(df.user_id.iloc[0])+'_end_user_id'+str(df2.user_id.iloc[-1])+'.json')


  print("File stored at: "+ str(output_folder_path+'/FINAL_start_user_id_'+str(df.user_id.iloc[0])+'_end_user_id'+str(df2.user_id.iloc[-1])+'.json'))





In [None]:
updating_and_storing_user_meta_data(input_df_path = 'input_df_path',
                                    output_folder_path = 'output_folder_path'
                                    increment_intervall = 300
                                        )