# Creating the dataset

In [12]:
import pandas as pd
import requests
import time
import numpy as np
from concurrent.futures import ThreadPoolExecutor

In [13]:
import os
os.chdir("YOUR PATH HERE")

In [14]:
filename = "openalex_names_full_study_df10_0_100000_slim.csv"
path = ""
in_df = pd.read_csv(path+filename)

In [15]:
# add headers so that code is added into OpenAlex's "polite" pool
headers = {'mailto':'YOUR EMAIL HERE'}

# Defining Functions

In [16]:
class Bucket:
    '''
    A token bucket to rate limit API requests.
    '''
    def __init__(self, max_tokens, refill_rate):
        """
        Initialize a new token bucket with a maximum number of tokens
        max_tokens which regenerate after refill_rate seconds. tokens represents
        the number of tokens currently in the bucket. current_time represents
        the current time and is used in refill behaviour.

        token_checkout_times is a list of the last ten times that a token was
        checked out and is initialized to current_time repeated 10 times. In the
        course of operating API calls, token_checkout_times will be ordered in
        an increasing order. That is, the earliest times (smallest values)
        appear first. Old checkout times are removed from the beginning of the
        list (index 0) and new checkout times are added to the end of the list
        (index 9).  As such, token_checkout_times should almost always have
        length 10, but it may be less than 10 if API calls take a long time
        to come back (i.e., new API calls are made before some API calls
        are returned). token_checkout_times will be constantly updated so that
        refill can check if enough time has elapsed before refilling the token
        bucket.

        An API request can only be made if tokens is greater than 1.
        """

        self.max_tokens = max_tokens
        self.tokens = max_tokens
        self.current_time = time.perf_counter()
        self.token_checkout_times = [self.current_time] * self.max_tokens
        self.refill_rate = refill_rate

    def refill(self):
        """
        Increment the token count by 1 if the current time is at least
        refill_rate after the 10th last token was checked out (the first item
        in token_checkout_times).

        >>> bucket = Bucket(10, 2)
        >>> bucket.get_token()
        >>> bucket.tokens == 9
        True
        >>> bucket.refill()
        >>> bucket.tokens == 9
        True
        >>> time.sleep(bucket.refill_rate)
        >>> bucket.refill()
        >>> bucket.tokens == 10
        True
        """
        self.current_time = time.perf_counter()
        while len(self.token_checkout_times) == 0:
            time.sleep(0.01)
            self.current_time = time.perf_counter()
        if self.current_time - self.token_checkout_times[0] >= self.refill_rate:
            self.tokens += 1

    def get_token(self):
        """
        If tokens < 1, then continue calling refill to attempt refilling the
        token bucket. This prevents the API call from being made.

        If tokens >= 1, then "check out" a token by decreasing tokens by 1 and
        removing the first (earliest) item of token_checkout_times. After the
        API call is succesfully made, functions should call update_checkout_time
        to add the time of the successful call to token_checkout_times.

        Together with update_checkout_times, get_token can be thought of as
        "pushing out" the first item of token_checkout_times from the start of
        the list and adding the current time to the end of the list.

        """
        while self.tokens < 1:
            self.refill()
            time.sleep(0.1)

        self.token_checkout_times.pop(0)
        self.tokens -= 1

    def update_checkout_time(self):
        """
        Add the current time to the end of token_checkout_times.
        update_checkout_times should only be called after get_token to ensure
        that token_checkout_times has 10 elements in the long run.
        """
        self.token_checkout_times = self.token_checkout_times + [time.perf_counter()]

In [17]:
WORKS_COLUMNS = ['pub_total', 'pub_id', 'pub_doi', 'pub_title',
                  'put_year', 'pub_type', 'pub_cited_by_url,', 'pub_cites',
                  'pub_journal_id', 'pub_journal_name',
                  'pub_keywords', 'authorship_position', 'pub_facility_id',
                  'pub_facility_display_name', 'pub_facility_ror',
                  'pub_facility_country','pub_facility_type', 'timestamp_download']

In [37]:
def search_works(in_df: pd.DataFrame):
    """
    Return a dataframe (out_df) containing bibliographic information from
    OpenAlex about works produced by the authors listed in in_df. in_df must
    contain at least the following two columns:
     - diss_uid: a unique string identifier for each author assigned by the
       author name data source (ex. 1995c3b3df3cd2)
     - works_api_url: a URL string containing the link to the OpenAlex directory
       containing the author's works
       (ex. https://api.openalex.org/works?filter=author.id:A2001663675
       corresponds to the author name data source identifier 1995c3b3df3cd2)

    out_df is a dataframe with columns WORKS_COLUMNS, each corresponding to some
    piece of bibliographic information in the OpenAlex system. Each row will
    usually correspond to one publication. More speficifically, if an author
    in in_df has:
     - NO publications listed on OpenAlex, there will be one row in out_df with
       the author's diss_uid and works_api_url. This row will have a 0 entry in
       the pub_total column and null entries in all other columns.
     - ONE publication listed, there will be one row in out_df with the author's
       diss_uid and works_api_url. This row will have a 1 entry in the pub_total
       column and entries reflecting its bibliographic information in all other
       columns.
     - N publications listed (with N > 1), there will be N rows in out_df each
       containing the same diss_uid and works_api_url. Each row will correspond
       to a different publication, and hence the entries in other columns (ex.
       pub_doi or pub_year) will differ between rows.

    """
    # create output dataframe
    out_df = pd.DataFrame(columns=['works_api_url', 'diss_uid'] + WORKS_COLUMNS)

    # create a token bucket with 10 max tokens and 1 second between refills
    bucket = Bucket(10, 1)

    # extract URLs to send to OpenAlex API from in_df
    df_url_clean = clean_url_list(in_df)
    works_url_list = list(df_url_clean['works_api_url'])
    diss_uids = list(df_url_clean['diss_uid'])

    # execute requests from in_df in parallel
    with ThreadPoolExecutor(max_workers=12) as pool:
        futures = [pool.submit(query_API_works, url, diss_uid, 200, bucket)
                   for (url, diss_uid) in zip(works_url_list, diss_uids)]

        # add the result of each hit to out_df
        for future in futures:
            out_df = pd.concat((out_df, future.result()))

    return out_df

In [38]:
def query_API_works(works_url: str, diss_uid: str, per_page: int,
                    token_bucket: Bucket):
    """
    Return a dataframe (df) containing selected variables corresponding to the
    works returned by an OpenAlex search query for works_url. diss_uid is not
    used in the API call but is reassigned to df (which contains the search
    results) after the API call to help identify which author the results
    correspond to. per_page corresponds to the OpenAlex per_page parameter and
    controls how many results are displayed per page. token_bucket is
    shared between all parallel processes of query_API_works and is used to
    rate limit API requests to adhere to OpenAlex's 10 requests per second limit.
    """
    print(f'Starting works query for {works_url}.')
    df = pd.DataFrame(columns=WORKS_COLUMNS) # create output df
    (pub_total, response_json) = issue_query(works_url, '*', per_page,
                                             token_bucket) # send API request

    # if 503 error, change per_page to 25 and resend query
    while response_json == '<Response [503]>':
        per_page = 25
        print('503 error encountered. Results per page has been ' +
              'changed to 25 (default value: 200)')
        (pub_total, response_json) = issue_query(works_url, '*', per_page,
                                                 token_bucket)

    # record general information about search results
    pub_total = response_json['meta']['count']
    results = response_json['results']

    # if the author has no works, create an entry with no bibliographic info
    if pub_total == 0:
        df = df.append({'pub_total': 0}, ignore_index=True)

    # if all results are on one page, iterate over each result and extract data
    elif pub_total <= per_page:
        results = response_json['results']
        for result in results:
            variables = extract_variables_works(works_url, result)
            variables = [pub_total] + variables
            df.loc[len(df)] = variables

    # if more than one page of results, iterate over pages and then over results
    else:
        for i in range(0, pub_total, per_page):
            results = response_json['results']
            # extract the cursor to navigate to the next page
            next_cursor = response_json['meta']['next_cursor']
            for result in results:
                variables = extract_variables_works(works_url, result)
                variables = [pub_total] + variables
                df.loc[len(df)] = variables
            # issue query for the next page of results
            response_json = issue_query(works_url, next_cursor, per_page,
                                        token_bucket)[1]

            # if 503 error, change per_page to 25 and resend query
            while response_json == '<Response [503]>':
                per_page = 25
                print('503 error encountered. Results per page has been ' +
                      'changed to 25 (default value: 200)')
                (pub_total, response_json) = issue_query(works_url, next_cursor,
                                                         per_page, token_bucket)

    # add the identification parameters (diss_url, diss_uid) back into dataframe
    df = add_uid_url(df, works_url, diss_uid)
    print(f'Finished query for {works_url}.')

    return df

In [39]:
def add_uid_url(df: pd.DataFrame, works_url: str, diss_uid: str):
    df['works_api_url'] = works_url
    df['diss_uid'] = diss_uid

    return df

In [40]:
def issue_query(works_url: str, cursor: str, per_page: int,
                token_bucket: Bucket):
    search_url = (f'{works_url}&per_page={per_page}&cursor={cursor}')
    # attempt to acquire token before launching API call
    token_bucket.get_token()
    oa_response = requests.get(search_url, headers=headers)
    # update the list of checkout times once API call is successful
    token_bucket.update_checkout_time()
    response_code = str(oa_response)

    flag = False
    while response_code != '<Response [200]>':
        # save the error code to print later
        error_code = response_code
        print(response_code + search_url)
        if response_code == '<Response [503]>':
            # pass control back to query_API_works
            return (0, response_code)
        else:
            token_bucket.get_token()
        oa_response = requests.get(search_url, headers=headers)
        token_bucket.update_checkout_time()
        response_code = str(oa_response)
        flag = True

    if flag:
        print('RESOLVED ' + error_code + search_url)

    response_json = oa_response.json()
    # get total number of hits
    pub_total = response_json['meta']['count']

    return (pub_total, response_json)

In [41]:
def clean_url_list(in_df: pd.DataFrame):
    """
    There are some author searches which didn't lead to a works url. We want to remove those from our search
    """
    # in_df['works_api_url'].replace('', np.nan, inplace=True)
    in_df.dropna(subset=['works_api_url'], inplace=True)
    out_df = in_df[['works_api_url', 'diss_uid']]


    return out_df

In [42]:
def extract_variables_works(works_url: str, result: list):
    """
    Return a list (variables) containing variables of interest corresponding to
    a specific OpenAlex API search result (result).
    """
    variables = []

    # pull selected variables from each result and add it to variables
    # if no such variable exists, leave empty string

    #results
    ### pub_id
    try:
        variables.append(result['id'])
    except:
        variables.append('')
    ### pub_doi
    try:
        variables.append(result['doi'])
    except:
        variables.append('')
    ### pub_title
    try:
        variables.append(result['title'])
    except:
        variables.append('')
    ### pub_year
    try:
        variables.append(result['publication_year'])
    except:
        variables.append('')
    ### pub_type
    try:
        variables.append(result['type'])
    except:
        variables.append('')
    ### pub_cited_by_url
    try:
        variables.append(result['cited_by_api_url'])
    except:
        variables.append('')
    ### pub_cites
    try:
        variables.append(result['cited_by_count'])
    except:
        variables.append('')


    # results / host venue
    ## pub_journal_id:
    try:
        variables.append(result['host_venue']['id'])
    except:
        variables.append('')
    ## pub_journal_name:
    try:
        variables.append(result['host_venue']['display_name'])
    except:
        variables.append('')

    # results / concepts:
    ### pub_keywords
    concepts = result['concepts']
    keywords = []
    for concept in concepts:
        try:
            keywords.append(concept['display_name'])
        except:
            keywords.append('')

    variables.append(keywords)

    # results / authorship
    ### authorship_position
    authorship = result['authorships']

    cnt=0
    stop_loop = 0
    for author in authorship:
        author_id_round = works_url.split("filter=author.id:")
        author_id_round = author_id_round[1]

        try:
            author_id_authorship = author['author']['id']
        except:
            author_id_authorship = "openalex.org/"

        author_id_authorship = author_id_authorship.split("openalex.org/")
        author_id_authorship = author_id_authorship[1]

        if (author_id_round == author_id_authorship):
            if stop_loop !=1:
                try:
                    authorship_position = author['author_position']
                    variables.append(author['author_position'])
                except:
                    variables.append('')

                ### results / authorship / author

                ###results / authorships / institutions
                # institutions = authorship['institutions']
                # print('institutions',institutions)

                try:
                    pub_facility_id = author['institutions'][cnt]['id']
                    variables.append(author['institutions'][cnt]['id'])
                except:
                    #print('no institutions info') # --> it says there is no institutions information coded
                    variables.append('')

                try:
                    pub_facility_display_name = author['institutions'][cnt]['display_name']
                    variables.append(author['institutions'][cnt]['display_name'])
                except:
                  variables.append('')

                try:
                    pub_facility_ror = author['institutions'][cnt]['ror']
                    variables.append(author['institutions'][cnt]['ror'])
                except:
                    variables.append('')

                try:
                    pub_facility_country = author['institutions'][cnt]['country_code']
                    variables.append(author['institutions'][cnt]['country_code'])
                except:
                    variables.append('')

                try:
                    pub_facility_type = author['institutions'][cnt]['type']
                    variables.append(author['institutions'][cnt]['type'])
                except:
                    variables.append('')

                cnt = cnt+1
                stop_loop = 1

    if stop_loop ==0:
        variables.append('')
        variables.append('')
        variables.append('')
        variables.append('')
        variables.append('')
        variables.append('')


    # Timestamp Download:
    import datetime
    now = datetime.datetime.now()
    variables.append(now.strftime("%Y_%m_%d"))

    return variables

# Doctests

In [24]:
import doctest
doctest.testmod()

TestResults(failed=0, attempted=8)

# Def Run Functions Multiple

In [None]:
def run_functions_multiple(in_df, start, stop, queries_per_file,
                           queries_per_call, master_path):
    '''
    Call run_functions to generate multiple files with query results from in_df.
    Each output file will have name beginning with master_path with the index
    number of the first query appended to the end (ex. some_path/some_file_120).
    master_path should NOT have a file extension (.csv) at the end.

    Start querying at start and stop querying at stop. queries_per_file decides
    how many queries will have their results written to a specific file, whereas
    queries_per_call is passed to run_functions and determines how many queries
    are sent to search_names at once. queries_per_file should be greater than
    queries_per_call.
    '''
    if queries_per_file < queries_per_call:
        print(f'Please set queries_per_file to be greater than queries_per_call'
              f'Function execution terminated.')
        return

    for i in range(start, stop, queries_per_file):
        print(f'Starting on query {str(start)}.')
        run_functions(in_df, i, i+queries_per_file, queries_per_call,
                      master_path+'openalex_works_full_study_'+str(i)+'_'+str(i+queries_per_file)+'.csv')


In [None]:
def create_slim_df(in_path: str, out_path: str, columns_to_keep: list):
    """
    Create a dataframe saved to out_path with only the specified columns 
    (columns_to_keep) from in_path.
    """
    in_df = pd.read_csv(in_path)
    in_df = in_df[columns_to_keep]
    in_df.to_csv(out_path, index=False, header=True, encoding='utf8')
    return None

In [None]:
def run_functions(in_df: pd.DataFrame, start: int, stop: int, step: int,
                  path: str):
    '''
    Write the results of search_names to path. Take in_df and perform search
    queries on the rows from start to stop. Starts a new search using search_names
    every step rows to increase performance.

    Creates a new file to write output. If file already exists, prompts the user
    on whether they want to append new output to existing file. Append is useful
    when search_names stops unexpectedly, leaving an incomplete file. The user
    would then restart querying at the point where the querying stopped and
    would choose to append to an existing file.
    '''
    # generate one set of headers
    df_headers = pd.DataFrame(columns=['diss_name', 'diss_uid']+WORKS_COLUMNS)

    # try to create the file if it doesn't already exist
    try:
        df_headers.to_csv(path, index=False, header=True, mode='x')
        print(f'NEW FILE CREATED at {path}. \n')
    except FileExistsError:
        print(f'FILE ALREADY EXISTS at {path}. Append instead? \n')
        # request user input to prevent accidental appending
        response = input('Y/N: ')
        if response.upper() != 'Y':
           return

    # create slice of in_df
    in_df = in_df[start:stop]
    # execute search_names function for subslices of in_df determined by step
    for i in range(0,len(in_df), step):
        print(f'====INITIATING queries {str(i)} to {str(i+step)}====')
        df_temp = in_df[i:i+step]
        df_results = search_works(df_temp)
        print(f'\nFINISHED queries {str(i)} to {str(i+step)}.', end=' ')
        print(f'WRITING to {path}...', end = ' ')
        df_results.to_csv(path, index=False, header=False, mode='a',
                          encoding='utf8')
        print('SUCCESSFULLY wrote to file.')

    print(f'\nSUCCESS in querying items {str(start)} to {str(stop)}. (Query end)')

    # create "slim" df for next step of OA code
    slim_columns = ['pub_facility_id']
    slim_path = path[:-4] + '_slim' + '.csv'
    create_slim_df(path, slim_path , slim_columns)
    print(f'Created slim df at {slim_path} containing columns {str(slim_columns)}.\n')

    return None