<a href="https://colab.research.google.com/github/s2t2/tweet-analysis-2021/blob/main/notebooks/User_Lookups.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

Fetching tweets for each user in a given dataset.

# Setup

## Mounting the Drive

In [135]:
import os

from google.colab import drive

drive.mount('/content/drive')
print(os.listdir(os.getcwd())) 

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).
['.config', 'number_decorators.py', '__pycache__', 'drive', 'sample_data']


In [136]:
DIRPATH = '/content/drive/My Drive/Research/Tweet Analysis 2021'
print(DIRPATH)
os.path.isdir(DIRPATH)

/content/drive/My Drive/Research/Tweet Analysis 2021


True

## Configuring Credentials 


In [137]:
# google.cloud checks the file at path designated by the GOOGLE_APPLICATION_CREDENTIALS env var
# so we set it here using the shared credentials JSON file from our shared google drive
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = os.path.join(DIRPATH, "credentials", "tweet-research-shared-268bbccc0aac.json") 
# and verify it for good measure
GOOGLE_APPLICATION_CREDENTIALS = os.getenv("GOOGLE_APPLICATION_CREDENTIALS") # implicit check by google.cloud
print(GOOGLE_APPLICATION_CREDENTIALS) # verification for implicit check
print(os.path.isfile(GOOGLE_APPLICATION_CREDENTIALS)) # verification for implicit check

/content/drive/My Drive/Research/Tweet Analysis 2021/credentials/tweet-research-shared-268bbccc0aac.json
True


In [138]:
!pip install python-dotenv



In [139]:
DOTENV_FILEPATH = os.path.join(DIRPATH, "credentials", "tweet-research-shared.env")
print(DOTENV_FILEPATH)
print(os.path.isfile(DOTENV_FILEPATH))

/content/drive/My Drive/Research/Tweet Analysis 2021/credentials/tweet-research-shared.env
True


In [140]:
# load environment variables from the .env file stored on drive
from dotenv import load_dotenv

load_dotenv(DOTENV_FILEPATH)

True

# Helpers

## Number Decorators

In [141]:
if not os.path.isfile("number_decorators.py"):
    !wget -q https://raw.githubusercontent.com/s2t2/tweet-analysis-2021/main/helpers/number_decorators.py

from number_decorators import fmt_n

## BigQuery Service

In [142]:
#if not os.path.isfile("bq_service.py"):
#    !wget -q https://raw.githubusercontent.com/s2t2/tweet-analysis-2021/main/helpers/bq_service.py
#
#from bq_service import BigQueryService

from google.cloud import bigquery
from pandas import DataFrame


def split_into_batches(my_list, batch_size=10_000):
    """Splits a list into evenly sized batches""" 
    # h/t: https://stackoverflow.com/questions/312443/how-do-you-split-a-list-into-evenly-sized-chunks
    for i in range(0, len(my_list), batch_size):
        yield my_list[i : i + batch_size]

class BigQueryService():
    def __init__(self):
        self.client = bigquery.Client()

    def execute_query(self, sql):
        job = self.client.query(sql)
        return job.result()

    def query_to_df(self, sql):
        records = [dict(row) for row in list(self.execute_query(sql))]
        return DataFrame(records)

    def insert_records_in_batches(self, table, records):
        """
        Inserts records in batches because attempting to insert too many rows at once 
            may result in google.api_core.exceptions.BadRequest: 400

        Params:
            table (table ID string, Table, or TableReference)
            records (list of dictionaries)
        """
        rows_to_insert = [list(d.values()) for d in records]
        #errors = self.client.insert_rows(table, rows_to_insert)
        errors = []
        batches = list(split_into_batches(rows_to_insert, batch_size=5_000))
        for batch in batches:
            errors += self.client.insert_rows(table, batch)
        return errors



In [143]:

bq_service = BigQueryService()
print(bq_service)

<__main__.BigQueryService object at 0x7fd3580667b8>


## Twitter Service

In [144]:

import tweepy
#from tweepy import OAuthHandler, API, Cursor, TweepError
#from tweepy.error import TweepError

TWITTER_API_KEY = os.getenv("TWITTER_API_KEY", default="OOPS")
TWITTER_API_KEY_SECRET = os.getenv("TWITTER_API_KEY_SECRET", default="OOPS")
TWITTER_ACCESS_TOKEN = os.getenv("TWITTER_ACCESS_TOKEN", default="OOPS")
TWITTER_ACCESS_TOKEN_SECRET = os.getenv("TWITTER_ACCESS_TOKEN_SECRET", default="OOPS")

class TwitterService:
    def __init__(self, api_key=TWITTER_API_KEY, api_key_secret=TWITTER_API_KEY_SECRET, 
                        access_token=TWITTER_ACCESS_TOKEN, access_token_secret=TWITTER_ACCESS_TOKEN_SECRET):
        """Docs: 
            https://docs.tweepy.org/en/latest/getting_started.html
            https://docs.tweepy.org/en/latest/api.html
        """
        auth = tweepy.OAuthHandler(api_key, api_key_secret)
        auth.set_access_token(access_token, access_token_secret)
        self.api = tweepy.API(auth, wait_on_rate_limit=True, wait_on_rate_limit_notify=True)

    def get_user(self, screen_name):
        return self.api.get_user(screen_name)

    def get_tweets(self, screen_name):
        return self.api.user_timeline(screen_name, 
            tweet_mode="extended", 
            #count=150, 
            exclude_replies=True, 
            include_rts=False
        )

    def get_statuses(self, screen_name=None, user_id=None, limit=2000):
        """See: 
            https://docs.tweepy.org/en/latest/api.html#timeline-methods 
            https://docs.tweepy.org/en/v3.10.0/cursor_tutorial.html
        """
        request_params = {"cursor": -1, "exclude_replies": False, "include_rts": True}
        if user_id: 
            request_params["user_id"] = user_id
        elif screen_name: 
            request_params["screen_name"] = screen_name

        cursor = tweepy.Cursor(self.api.user_timeline, **request_params)
        return cursor.items(limit)
        #return cursor.pages()


In [145]:
twitter_service = TwitterService()
print(twitter_service)

<__main__.TwitterService object at 0x7fd3580c7438>


In [None]:
from pprint import pprint

screen_name = "s2t2"

user = twitter_service.get_user(screen_name)
#pprint(user._json)
print("USER:", screen_name.upper())
print("FOLLOWERS:", fmt_n(user.followers_count))
print("FRIENDS:", fmt_n(user.friends_count))
print("STATUSES:", fmt_n(user.statuses_count))
print("LISTED:", fmt_n(user.listed_count))
print("LATEST_STATUS:", user.status.id, user.status.text)

#counter = 1
#for status in twitter_service.get_statuses(screen_name):
#    print("----------------")
#    print(counter, status)
#    counter+=1
#    #if counter >= 100: break

#print("GET STATUSES:", len(list(twitter_service.get_statuses(screen_name))))

Rate limit reached. Sleeping for: 465


# Fetching Users from BigQuery

In [None]:
sql = """
    SELECT 
        count(distinct status_id) as status_count 
        ,count(distinct user_id) as user_count
    FROM `tweet-research-shared.disinfo_2021.tweets_view` t
"""
#results = list(bq_service.execute_query(sql, verbose=True))
#results = dict(results[0])
#print("TWEETS:", fmt_n(results["status_count"]))
#print("USERS:", fmt_n(results["user_count"]))
bq_service.query_to_df(sql)

In [None]:
sql = """
    SELECT
       count(distinct user_id) as user_count
       ,count(distinct status_id) as status_count
    FROM `tweet-research-shared.disinfo_2021.tweets_view`
    WHERE REGEXP_CONTAINS(upper(status_text), '#WWG1WGA')
"""
bq_service.query_to_df(sql)

In [None]:
limit = None
term = "#WWG1WGA"

sql = f"""
    WITH users as (
        SELECT 
            t.user_id
            ,string_agg(distinct upper(t.user_screen_name), ', ') as screen_names
            ,any_value(t.user_verified) as user_verified
            ,extract(date from any_value(t.user_created_at)) as user_created_on
            ,count(distinct t.status_id) as status_count
        FROM `tweet-research-shared.disinfo_2021.tweets_view` t
        WHERE REGEXP_CONTAINS(upper(status_text), '{term}')
        GROUP BY 1
        ORDER BY status_count desc
    )

    SELECT 
        u.*
        --,ul.user_id as lookup_user_id
    FROM users u
    LEFT JOIN `tweet-research-shared.disinfo_2021.user_lookups` ul ON ul.user_id = u.user_id
        AND ul.user_id is null -- skip previously looked up users
    ORDER BY status_count desc
"""
if limit:
    sql += f" LIMIT {limit} "

users_df = bq_service.query_to_df(sql)
users_df

# Fetching Users from Twitter

In [None]:
users_df.head()

In [None]:
#from pdb import set_trace as breakpoint

#zzz = None
records = []

for index, row in users_df.iterrows():
    #print(index, dict(row))
    #if index >=5: break

    record = {
        "user_id": row["user_id"],
        "error_code": None,
        "follower_count": None,
        "friend_count": None,
        "listed_count": None,
        "status_count": None,
        "latest_status_id": None
    }
    try:
        user = twitter_service.get_user(row["user_id"])
        record["follower_count"] = user.followers_count
        record["friend_count"] = user.friends_count
        record["listed_count"] = user.listed_count
        record["status_count"] = user.statuses_count
        try:
            # 'User' object has no attribute 'status'
            record["latest_status_id"] = user.status.id
        except:
            pass
    except tweepy.error.TweepError as err:
        #breakpoint()
        #zzz = err # cache for later / attempts to parse the reason attribute. its ok we can look up the code here:
        # https://developer.twitter.com/ja/docs/basics/response-codes
        # 63 means user has been suspended, etc.
        record["error_code"] = err.api_code
    
    print(index, record)
    records.append(record)


In [None]:
#zzz
#pprint(dir(zzz))
#zzz.reason
#zzz.api_code

## Storing User Lookups in BigQuery

In [None]:
table = bq_service.client.get_table("tweet-research-shared.disinfo_2021.user_lookups")
bq_service.insert_records_in_batches(records=records, table=table)

In [None]:
bq_service.query_to_df("""
    SELECT count(distinct user_id) as user_count FROM `tweet-research-shared.disinfo_2021.user_lookups` 
""")