In [None]:
from rapidfuzz import process, fuzz
import json
import numpy
import pandas
import pathlib
import pydash
import requests
import time
import tqdm
import unidecode

def value_extract(row, col):

    ''' Extract dictionary values. '''
  
    return pydash.get(row[col], "value")
   
def sparql_query(query, service):
 
    ''' Send sparql request, and formulate results into a dataframe. '''

    r = requests.get(service, params={"format": "json", "query": query})
    data = pydash.get(r.json(), "results.bindings")
    data = pandas.DataFrame.from_dict(data)
    for x in data.columns:
        data[x] = data.apply(value_extract, col=x, axis=1)
 
    return data

def normalise(row, col):

    ''' Normalise text for matching purposes. '''

    norm = unidecode.unidecode(str(row[col]).lower()).strip()

    return norm

def median_score(a_list, b_id, f):

    ''' Find best match per against lists, return median. '''

    test = wikidata.loc[wikidata.director_wikidata.isin([b_id])]
    b_list = test.film_label.unique()
    if len(a_list) < f or len(b_list) < f:
        return 0

    my_score = [process.extractOne(a, b_list, scorer=fuzz.WRatio)[1] for a in a_list]
    return numpy.median(my_score)

data_path = pathlib.Path.cwd() / 'sight_and_sound.parquet'

if not data_path.exists():

    index = requests.get('https://www.bfi.org.uk/sight-and-sound/greatest-films-all-time/all-voters').text
    index = index.split('<script type="text/javascript">var initialPageState = ')[1].split('</script>')[0]
    dataframe = pandas.DataFrame(pydash.get(json.loads(index), 'componentState.allVoters'))
    dataframe = dataframe[['firstname', 'surname', 'type', 'country', 'url']]

    votes = pandas.DataFrame()
    for x in tqdm.tqdm(dataframe.url.unique()):

        time.sleep(4)

        vote_page = pandas.read_html('https://www.bfi.org.uk'+x, encoding='utf8')[0]
        vote_page['url'] = x
        if len(vote_page) != 10:
            print(pathlib.Path(x).stem, 'should be only ten votes')
        votes = pandas.concat([votes, vote_page])

    dataframe = pandas.merge(dataframe, votes, on='url', how='left')
    dataframe = dataframe.astype(str)
    dataframe.to_parquet(data_path)
else:
    dataframe = pandas.read_parquet(data_path)

dataframe['Film'] = dataframe.apply(normalise, col='Film', axis=1)
dataframe['Director'] = dataframe.apply(normalise, col='Director', axis=1)

print(len(dataframe))
dataframe.head()

In [None]:
wikidata_path = pathlib.Path.cwd() / 'wikidata.parquet'

if not wikidata_path.exists():
    wikidata = pandas.DataFrame()
    for year in tqdm.tqdm(range(1880, 2025)):
        query = '''select ?film ?filmLabel ?title ?director ?directorLabel (year(?publication_date) as ?year) 
            where {
                ?film p:P31/wdt:P279* ?state .
                ?state ps:P31/wdt:P279* wd:Q11424 .
                ?film  wdt:P577 ?publication_date .
                filter (year(?publication_date) = '''+str(year)+''') .
                ?film wdt:P57 ?director
                optional { ?film wdt:P1476 ?title } .
                service wikibase:label { bd:serviceParam wikibase:language "[AUTO_LANGUAGE],en". }}'''
        extract = sparql_query(query, "https://query.wikidata.org/sparql")
        wikidata = pandas.concat([wikidata, extract])

    for x in ['film', 'director']:
        wikidata[x] = wikidata[x].str.split('/').str[-1]

    wikidata = pandas.concat([
        wikidata[[x for x in wikidata.columns.values if x != 'filmLabel']],
        wikidata[[x for x in wikidata.columns.values if x != 'title']].rename(columns={'filmLabel':'title'})
        ]).dropna().drop_duplicates()

    wikidata = wikidata.rename(columns={
        'film':'film_wikidata', 'director':'director_wikidata', 'title':'film_label', 'directorLabel':'director_label'})
    
    wikidata = wikidata.astype(str)
    wikidata.to_parquet(wikidata_path)
else:
    wikidata = pandas.read_parquet(wikidata_path)

wikidata['film_label'] = wikidata.apply(normalise, col='film_label', axis=1)
wikidata['director_label'] = wikidata.apply(normalise, col='director_label', axis=1)

print(len(wikidata)) 
wikidata.head()

In [None]:
match_path = pathlib.Path.cwd() / 'match.parquet'

if not match_path.exists():

    name_match_score = 60 # name matching tolerance
    title_match_score = 100 # title matching tolerance
    minimum_match_candidates = 4 # minimum matching options.

    result_dataframe = pandas.DataFrame(columns=['Director', 'director_wikidata'])
    for x in tqdm.tqdm(dataframe.Director.unique()[:4]):
        focus = dataframe.loc[dataframe.Director.isin([x])]
        c = process.extract(x, wikidata.director_label.unique(), scorer=fuzz.WRatio, limit=200)
        c = [y[0] for y in c if y[1] > name_match_score]
        candidates = wikidata.loc[wikidata.director_label.isin(c)] 
        result = {y:median_score(focus.Film.unique(), y, minimum_match_candidates) for y in candidates.director_wikidata.unique()}
        result = [k for k,v in result.items() if v == title_match_score]
        if len(result) == 1:
            result_dataframe.loc[len(result_dataframe)] = [(x), (result[0])]
 
    # result_dataframe = result_dataframe.astype(str)
    # result_dataframe.to_parquet(match_path)

print(len(result_dataframe))
result_dataframe.head()