If you are working with data that did not come from a bibliographic source you likely only have a name in natural order and maybe some other biographical information. For example you may have a list of names and a couple of other data points like a university they attended or what country they are from. While Wikidata is not a bibliographic centric system there is a lot of data and works in the ecosystem. In this tutorial we are going to use Wikidata to reconcile names and try to get associated work titles.


Getting started checklist:
1. A CSV or TSV file with metadata, at minimum it needs to contain the author’s full name and the full title of the work.
3. Python, with Pandas, Requests module installed and internet connection

In this example we are comparing data of people that have won literary awards, it also has some other data like universities attended and some dates. Depending on the dataset you are using you will have different data points to use to compare but the approach is the same. You can an example of the data we are using below:

Unnamed: 0,person_id,full_name,given_name,last_name,gender,elite_institution,graduate_degree,mfa_degree,iowa_mfa_person_id,stegner,role,prize_institution,prize_name,prize_year,prize_genre,prize_type,prize_amount,title_of_winning_book
150,1105,John Barkham,John,Barkham,male,,,,,,judge,Columbia University,Pulitzer Prize,1961,prose,book,15000.0,To Kill A Mockingbird
151,1789,Paul Harding,Paul,Harding,male,,graduate,University of Iowa,2455.0,,winner,Columbia University,Pulitzer Prize,2010,prose,book,15000.0,Tinkers
152,1789,Paul Harding,Paul,Harding,male,,graduate,University of Iowa,2455.0,,winner,PEN America,Robert W. Bingham Prize for Debut Short Story ...,2010,prose,book,25000.0,Tinkers
153,522,David Kennedy,David,Kennedy,male,"Stanford University, Yale University",graduate,,,,judge,Columbia University,Pulitzer Prize,2010,prose,book,15000.0,Tinkers
154,354,Charles Johnson,Charles,Johnson,male,,graduate,,,,judge,Columbia University,Pulitzer Prize,2010,prose,book,15000.0,Tinkers
155,70,Alfred Kreynborg,Alfred,Kreynborg,male,,,,,,judge,Columbia University,Pulitzer Prize,1961,poetry,book,15000.0,Times Three: Selected Verse From Three Decades
156,1430,Louis Untermeyer,Louis,Untermeyer,male,,,,,,judge,Columbia University,Pulitzer Prize,1961,poetry,book,15000.0,Times Three: Selected Verse From Three Decades
157,545,David St John,David,St John,male,,graduate,University of Iowa,3483.0,,judge,National Book Foundation,National Book Award,2007,poetry,book,10000.0,"Time And Materials: Poems, 1997-2005"
158,2305,Vijay Seshadri,Vijay,Seshadri,male,Columbia University,graduate,Columbia University,,,judge,National Book Foundation,National Book Award,2007,poetry,book,10000.0,"Time And Materials: Poems, 1997-2005"
159,1962,Robert Hass,Robert,Hass,male,Stanford University,graduate,,,,winner,National Book Foundation,National Book Award,2007,poetry,book,10000.0,"Time And Materials: Poems, 1997-2005"


Our first steps will be defining some variables we will be using, set these variables below based on your setup:


`path_to_tsv` - the path to the TSV/CSV file you want to run it on

`id_author_name` - the name of the column header in the file that contains author's name

`id_title_name` - the name of the column header in the file that contains title of the work

`user_agent` - this is the value put into the headers on each request, it is good practice to identifiy your client/project

`pause_between_req` - number of seconds to wait between each API call, if you want the script to run slower

We also will load the modules we will be using.

In [None]:
path_to_tsv = "/path/to/the_file.tsv"
id_author_name = 'author'
id_title_name = 'title'
user_agent = 'YOUR PROJECT NAME HERE'
pause_between_req = 0

import pandas as pd
import requests
import time
import string
import unicodedata

Before we start working with the file we need to define some helper functions. The first function (`add_qid`) will be what each row of the file is run throuh to modify the values. We are also define `normalize_string` and `levenshtein` to compare strings.

In [None]:
def add_qid(d):


    # if there is already a value skip it
    if 'author_wikidata' in d:
        if type(d['author_wikidata']) == str:        
            # print('Skip',d[id_column_name])
            return d

    
    # We are going to search Wikdiata for the author name first using this `query` endpoint, we pass the string and it will 
    # return Items in the system by label
    url = "https://www.wikidata.org/w/api.php"
    params = {
        'action':'query',
        'srsearch':d[id_column_name],
        'format':'json',
        'list':'search',
        'srlimit':'10'
    }
    headers = {
        'Accept' : 'application/json',
        'User-Agent': user_agent
    }
    r = requests.get(url, params=params, headers=headers)

    data = r.json()

    # make a list of the qids to check
    qids = []
    for s in data['query']['search']:
        qids.append(s['title'])

    total_hits = data['query']['searchinfo']['totalhits']

    # no hits, change nothing
    if len(qids) == 0:
        return d

    # we now have a bunch of Items that have matched the string, we are going to ask 
    # wikdiata to return some other data we can use to compare 
    # this part of the script really depends on what data you have to compare to,
    # below is an example of having a decent amount of additional information to use to 
    # compare with, you maynot have some of this data of course and would not use those sections
    # build the SPARQL query we are going to use
    qids_with_quotes = []
    for q in qids:
        qids_with_quotes.append(f'wd:{q}')

    # this sparql query is being built to ask for all these possible data points that we have in our dataset to compare with
    # you can look at items in wikidat to see what properties are available
    sparql = f"""
        SELECT ?item ?itemLabel ?occupation ?occupationLabel ?birth ?death ?award ?awardLabel ?viaf ?lccn ?education ?educationLabel
        WHERE 
        {{

            VALUES ?item {{ { " ".join(qids_with_quotes)  }}}

            ?item wdt:P31 wd:Q5.
          
            optional{{
              ?item wdt:P106 ?occupation.
            }}
            optional{{
              ?item wdt:P569 ?birth.
            }}
            optional{{
              ?item wdt:P570 ?death.
            }}          
            optional{{
              ?item wdt:P166 ?award.
            }}    
            optional{{
              ?item wdt:P214 ?viaf.
            }}              
            optional{{
              ?item wdt:P244 ?lccn.
            }}
            optional{{
              ?item wdt:P69 ?education.
            }}

            SERVICE wikibase:label {{ bd:serviceParam wikibase:language "[AUTO_LANGUAGE],en". }}
        }}
    """
    params = {
        'query' : sparql
    }

    headers = {
        'Accept' : 'application/json',
        'User-Agent': user_agent
    }
    url = "https://query.wikidata.org/sparql"

    
    r = requests.get(url, params=params, headers=headers)

    data = r.json()

    people = {}

    # did we get any results
    if len(data['results']['bindings']) > 0:

      # build out our people dictonary with the data we have recived
      # we are going to use a score system where when data matches we increase the score and the 
      # wikidata item with the highest score will be our match

      for result in data['results']['bindings']:

        qid = result['item']['value'].split('/')[-1]

        if qid not in people:

          people[qid] = {
            'qid':qid,
            'score':0,
            'score_log':[],
            'occupation':[],
            'birth':None,
            'death':None,
            'viaf':None,
            'lccn':None,
            'label':None,
            'award': [],
            'education':[]
          }
        
        # make string lists out of these catagories
        if 'occupationLabel' in result:
          people[qid]['occupation'].append(result['occupationLabel']['value'])
          people[qid]['occupation'] = list(set(people[qid]['occupation']))

        if 'awardLabel' in result:
          people[qid]['award'].append(result['awardLabel']['value'])
          people[qid]['award'] = list(set(people[qid]['award']))

        if 'educationLabel' in result:
          people[qid]['education'].append(result['educationLabel']['value'])
          people[qid]['education'] = list(set(people[qid]['education']))


        if 'itemLabel' in result:
          people[qid]['label'] = result['itemLabel']['value']

        if 'birth' in result:
          try:            
            # if it is set to this value it is "20.th century, which is unhelpful in our use"
            if result['birth']['value'] != '2000-01-01T00:00:00Z':
              people[qid]['birth'] = int(result['birth']['value'].split('-')[0])
          except:
            people[qid]['birth'] = None

        if 'death' in result:
          try:
            if result['death']['value'] != '2000-01-01T00:00:00Z':
              people[qid]['death'] = int(result['death']['value'].split('-')[0])
          except:
            people[qid]['death'] = None

        if 'viaf' in result:
          people[qid]['viaf'] = result['viaf']['value']

        if 'lccn' in result:
          people[qid]['lccn'] = result['lccn']['value']


    # You can setup different types of checks, 
    # for example in our data set we are looking at writes, so if they do not have a "writerly" occupation
    # then we can exclude them by default
    for p in people:

      has_writerly_occ = False
      
      for occ in ['writer', 'poet', 'novelist', 'short story writer', 'author', 'literary critic', 'journalist', 'biographer', 'historian', 'comics artist', 'playwright']:
        if occ in people[p]['occupation']:
          has_writerly_occ = True
      
      if has_writerly_occ == False:
        people[p]['score'] = -1
        people[p]['score_log'].append("No writerly occ")
        continue
      else:
        people[p]['score'] = people[p]['score'] +  1
        people[p]['score_log'].append("Has writerly occ")
      

      # Here is one option to check the name to see how similar it is to the name you have
      # using levenshtein distance to compare
      if levenshtein(normalize_string(people[p]['label']),normalize_string(d['full_name'])) < 3:
        people[p]['score'] = people[p]['score'] + 1
        people[p]['score_log'].append("Has very similar name")
      elif levenshtein(normalize_string(people[p]['label']),normalize_string(d['full_name'])) >= 3:
        people[p]['score'] = people[p]['score'] - 1
      elif levenshtein(normalize_string(people[p]['label']),normalize_string(d['full_name'])) >= 5:
        people[p]['score'] = people[p]['score'] - 2
      elif levenshtein(normalize_string(people[p]['label']),normalize_string(d['full_name'])) >= 8:
        people[p]['score'] = people[p]['score'] - 3        
      

      # points for having a VIAF or LCCN
      # in this dataset we add extra points for records that have LCCN or VIAF values 
      # which points to that they are the bibilograhic person we want

      if people[p]['lccn'] != None or people[p]['viaf'] != None:
        people[p]['score'] = people[p]['score'] + 1
        people[p]['score_log'].append("Has VIAF OR LCCN")
      
      # points for being being alive for important date in our data set
      if people[p]['death'] != None:
        try:
          # there is a try here for bad data in d['prize_year'], if the int(d['prize_year']) fails just skip it
          award_year = int(d['prize_year'])
          if award_year <= people[p]['death']:
            people[p]['score'] = people[p]['score'] + 1     
            people[p]['score_log'].append("not dead when awarded")
        except:
          pass
      
      # if they were not born when the award was granted then very bad
      if people[p]['birth'] != None:
        try:          
          award_year = int(d['prize_year'])
          if award_year <= people[p]['birth']:
            people[p]['score'] = -1
            people[p]['score_log'].append("not alive when awarded = -1")        
        except:
          pass
      
      # compare awards won compared to our award won
      if len(people[p]['award']) >0 and type(d['prize_name']) == str:
        awards = " ".join(people[p]['award']).lower().replace('fellowship','').replace('prize','').replace('award','')
        if d['prize_name'].lower().replace('fellowship','').replace('prize','').replace('award','') in awards:
          people[p]['score'] = people[p]['score'] + 1
          people[p]['score_log'].append("matched on award")        

      # compare education
      if len(people[p]['education']) > 0  != None and type(d['elite_institution']) == str:
        edu = " ".join(people[p]['education']).lower().replace('university of ','').replace('university','').replace('college','')
        if d['elite_institution'].lower().replace('university of ','').replace('university','').replace('college','') in edu:
          people[p]['score'] = people[p]['score'] + 1
          people[p]['score_log'].append("matched on edu")        

      if len(people[p]['education']) > 0  != None and type(d['mfa_degree']) == str:
        edu = " ".join(people[p]['education']).lower().replace('university of ','').replace('university','').replace('college','')
        if d['mfa_degree'].lower().replace('university of ','').replace('university','').replace('college','') in edu:
          people[p]['score'] = people[p]['score'] + 1
          people[p]['score_log'].append("matched on edu")  


      

    best_person_score = 0
    best_person = None
    for p in people:
      if people[p]['score'] > best_person_score:
        best_person_score = people[p]['score']
        best_person = people[p]

    if best_person != None:
      if best_person_score > 1:

        d['author_wikidata'] = best_person['qid']
        d['author_wikidata_label'] = best_person['label']

        if best_person['viaf'] != None:
          d['author_viaf'] = best_person['viaf']
        if best_person['lccn'] != None:
          d['author_lccn'] = best_person['lccn']

        # some debug fields, the score they got and the log of activity
        d['match_score'] = best_person_score
        d['match_log'] = "|".join(best_person['score_log'])

    time.sleep(pause_between_req)

    return d

def normalize_string(s):
    s = str(s)
    s = s.translate(str.maketrans('', '', string.punctuation))
    s = " ".join(s.split())
    s = s.lower()
    s = s.casefold()
    s = ''.join(c for c in unicodedata.normalize('NFD', s) if unicodedata.category(c) != 'Mn')
    s = s.replace('the','')
    return s

def levenshtein(s1, s2):
    if len(s1) < len(s2):
        return levenshtein(s2, s1)

    # len(s1) >= len(s2)
    if len(s2) == 0:
        return len(s1)

    previous_row = range(len(s2) + 1)
    for i, c1 in enumerate(s1):
        current_row = [i + 1]
        for j, c2 in enumerate(s2):
            insertions = previous_row[j + 1] + 1 # j+1 instead of j since previous_row and current_row are one character longer
            deletions = current_row[j] + 1       # than s2
            substitutions = previous_row[j] + (c1 != c2)
            current_row.append(min(insertions, deletions, substitutions))
        previous_row = current_row
    
    return previous_row[-1]

Our next step will be to load the Pandas module and load the data we are using, you can adjust the `sep` argument to change what delimiter is being used (for example if you are using a CSV file, change it to ","). Once loaded we pass each record to the `add_qid()` function to kick off adding the data to the record



In [None]:
df = pd.read_csv(path_to_tsv, sep='\t', header=0, low_memory=False)
df = df.apply(lambda d: add_qid(d),axis=1 )  

# we are writing out the file to the same location here, you may want to modifythe filename to create a new file, and change the sep argument if using a CSV
df.to_csv(path_to_tsv, sep='\t')




The below code does the same thing as the block above but it breaks the CSV/TSV into multiple chunks and writes it out after each chunk, this allows for recovery from any errors such as as internet timeout or other problems that would cause you to loose all progress unless the script runs flawlessly, you would likely want to use this approch for larger datasets. 

In [None]:
# load the tsv
df = pd.read_csv(path_to_tsv, sep='\t', header=0, low_memory=False)

# we are going to split the dataframe into chunks so we can save our progress as we go but don't want to save the entire file on on every record operation
n = 100  #chunk row size
list_df = [df[i:i+n] for i in range(0,df.shape[0],n)]

# loop through each chunk
for idx, df_chunk in enumerate(list_df):

    # if you want it to skip X number of chunks uncomment this, the number is the row to skip to
    # if idx < 10:
    #     continue

    print("Working on chunk ", idx, 'of', len(list_df))
    list_df[idx] = list_df[idx].apply(lambda d: add_qid(d),axis=1 )  

    reformed_df = pd.concat(list_df)
    reformed_df.to_csv(path_to_tsv, sep='\t')


Once you have the Qid for the author you can ask wikidata for the Titles that author has created using a SPARQL query:

In [1]:
def getTitle(author_qid):

    sparql = f"""
        SELECT ?item ?itemLabel 
        WHERE 
        {{
        ?item wdt:P50 wd:{{author_qid}}. #P50 is has author
        SERVICE wikibase:label {{ bd:serviceParam wikibase:language "[AUTO_LANGUAGE],en". }}
        }}       
    """
    params = {
        'query' : sparql
    }

    headers = {
        'Accept' : 'application/json',
        'User-Agent': user_agent
    }
    url = "https://query.wikidata.org/sparql"

    r = requests.get(url, params=params, headers=headers)

    data = r.json()


    # did we get any results
    if len(data['results']['bindings']) > 0:

      for result in data['results']['bindings']:
        title_qid = result['item']['value'].split('/')[-1]
        title_label = result['itemLabel']['value']
