# Data collection - Social Web 2022 group 26

This notebook displays the code used to create the dataset(s) utilized for the Social Web (2022) final paper for group 26 (A. Anthony Joseph; M.B. Trans; Y. Fan)

The code utilized here is independent of third party libraries, and interacts directly with Twitter's API endpoints. This was a deliberate design-choice due to better control over data output formatting, as well as the process of learning how to do so.

In [None]:
import requests
import os
import json
from time import sleep

os.environ['TOKEN'] = '***REMOVED***'


def auth():
    """Returns the API access-token as saved within the environment"""
    return os.getenv('TOKEN')


def create_headers(bearer_token):
    """Prepares the required security-headers expected by Twitter's API-endpoints"""
    headers = {"Authorization": "Bearer {}".format(bearer_token)}
    return headers



def create_count_url_v2(keyword, start_date, end_date):
    """Creates the URL and query-parameters for the count-endpoint based on the parameters defined
    further below, allowing us a rough overview of how many tweets we can expect for a given query"""
    search_url = "https://api.twitter.com/2/tweets/counts/all"  # Change to the endpoint you want to collect data from

    # we should params based on the endpoint you are using
    query_params = {'query': keyword, 'start_time': start_date, 'end_time': end_date, 'granularity': 'day'}
    return search_url, query_params


def create_search_url_v2(keyword, start_date, end_date, max_results=10):
    """Creates the URL and query-parameters for the search-endpoint based on the parameters defined further below"""
    search_url = "https://api.twitter.com/2/tweets/search/all"  # Change to the endpoint you want to collect data from

    # we should params based on the endpoint you are using
    query_params = {'query': keyword,
                        'start_time': start_date,
                        'end_time': end_date,
                        'max_results': max_results,
                        'expansions': 'attachments.poll_ids,attachments.media_keys,author_id,entities.mentions.username,geo.place_id,'
                      'in_reply_to_user_id,referenced_tweets.id,referenced_tweets.id.author_id,edit_history_tweet_ids',
        'media.fields': 'duration_ms,height,media_key,preview_image_url,type,url,width,public_metrics,alt_text,variants',
        'place.fields': 'contained_within,country,country_code,full_name,geo,id,name,place_type',
        'poll.fields': 'duration_minutes,end_datetime,id,options,voting_status',
        'tweet.fields': 'attachments,author_id,conversation_id,created_at,entities,geo,id,'
                        'in_reply_to_user_id,lang,public_metrics,possibly_sensitive,referenced_tweets,reply_settings,'
                        'source,text,withheld,edit_controls',
        'user.fields': 'created_at,description,entities,id,location,name,pinned_tweet_id,profile_image_url,protected,'
                       'public_metrics,url,username,verified,withheld',
                        'next_token': None}
    return search_url, query_params


def connect_to_endpoint(url, headers, params, next_token=None):
    """Connects directly to the endpoint specified through the supplied URL, headers and parameters and can pass next_tokens where required"""
    params['next_token'] = next_token  # params object received from create_url function
    response = requests.request("GET", url, headers=headers, params=params)
    print("Endpoint Response Code: " + str(response.status_code))
    if response.status_code != 200:
        raise ConnectionError(response.status_code, response.text)
    return response.json()



With the functions ready for use we can define our query with the variables defined below, and generate the required URL:

In [None]:
# Inputs for the request
bearer_token = auth()
headers = create_headers(bearer_token)
keyword = "#COP27 -is:retweet"  # We decided to remove retweets as they make up around 85% of our total query, and adds little to our graphs.
start_time = "2022-11-06T00:00:00.000Z"  # The start date of the COP27
end_time = "2022-11-20T00:00:00.000Z"  # The end date of the COP27 after the extension of another day
max_results = 500  # The maximum allowed tweets per request

url_search = create_search_url_v2(keyword, start_time, end_time, max_results)

Making sure the requests are working as intended:

In [None]:
test_response = connect_to_endpoint(url_search[0], headers, url_search[1])
print(test_response)

Retrieve an estimate of the number of tweets that matches our query:

In [None]:
url_count = create_count_url_v2(keyword, start_time, end_time)
count_response = connect_to_endpoint(url_count[0], headers, url_count[1])
print(count_response)

We define empty placeholders and default values:


In [None]:
# DO NOT RUN UNLESS STARTING NEW COLLECTION

tweets_downloaded = 0
requests_made = 0

data = []
includes = []
metas = []
errors = []

max_requests = (count_response['meta']['total_tweet_count']/500)*1.1  # we add another 10% as a buffer in case the estimate is wrong.

json_response = None
next_token = None

We start getting the tweets:

In [None]:
while requests_made < max_requests:
    try:
        if requests_made > 0:  # make sure the first requests has an empty next_token
            if "next_token" in json_response['meta'].keys():  # the last batch of tweets in a query does not return a next_token
                next_token = json_response['meta']['next_token']  # Takes the next_token from the previous output as the following requests' input
            else:
                print("All available tweets retrieved!")
                break
        # request a set of up to 500 tweets:
        json_response = connect_to_endpoint(url_search[0], headers, url_search[1], next_token)
        # store the tweets in our local data-structure that mimicks the structure of the response
        for tweet in json_response['data']:
            data.append(tweet)
        for include in json_response['includes']:
            includes.append(include)
        if hasattr(json_response, 'errors'):
            for error in json_response['errors']:
                includes.append(error)
        metas.append(json_response['meta'])

        # count the amounts of tweets retrieved and requests made, and print the stats:
        tweets_downloaded += len(json_response['data'])
        requests_made += 1
        print('next_token:', next_token, "Tweets downloaded:", tweets_downloaded)
        print('Meta content:', json_response['meta'])
    except ConnectionError as err:  # The API were very unstable as we were downloading the tweets:
        print(err, "... waiting 2 seconds to retry...")
        sleep(2)
        continue
print("The expected amount of requests have been carried out. Check the last response for a next_token!")

We can run the cell below to confirm the amounts of requests made and amount of tweets gathered

In [None]:
print(requests_made)
print(tweets_downloaded)

Put the returned tweets into a JSON-compatible dictionary and save it to a file:

In [None]:
json_file = {"data": data,
             "includes": includes,
             "errors": errors}

with open("tweets_hashtag_COP27_exclRetweets_2022-11-06_20.json", "w") as outfile:  # the filename reflects the query and time-frame
    json.dump(json_file, outfile)