In [None]:
import json
import numpy
import pandas
import pathlib
import pydash
import requests
import tqdm
from wikibaseintegrator import WikibaseIntegrator, datatypes, wbi_login
from wikibaseintegrator.models import Reference, References
from wikibaseintegrator.wbi_config import config as wbi_config
from wikibaseintegrator import wbi_login, WikibaseIntegrator
from wikibaseintegrator.wbi_enums import ActionIfExists

def value_extract(row, col):

    ''' Extract dictionary values. '''

    return pydash.get(row[col], 'value')    
    
def sparql_query(query, service):

    ''' Send sparql request, and formulate results into a dataframe. '''

    r = requests.get(service, params = {'format': 'json', 'query': query})
    data = pydash.get(r.json(), 'results.bindings')
    data = pandas.DataFrame.from_dict(data)
    for x in data.columns:    
        data[x] = data.apply(value_extract, col=x, axis=1)

    return data

def engineer(row, col, extant, entitytype):

    ''' Write entities to wikibase. '''

    if col == 'O' and row['type'] != 'wikibase-item':
        return ''
    else:
        name = row[col]
        if name in extant:
            return pathlib.Path(extant[name]).stem
        else:
            if entitytype == 'P':
                create_property = wbi.property.new(datatype=row['type'])
                create_property.labels.set('en', name)
                report = create_property.write()

                # this is a ridiculous solution, a parsable json report would be great.

                ident = [x for x in str(report).split('\n') if "_id='P" in x]
                if len(ident) == 1:
                    return ident[0].split("'")[1]
                else:
                    raise Exception("Surprise, this method didn't work.")
            else:
                create_item = wbi.item.new()
                create_item.labels.set('en', name)
                report = create_item.write()

                # this is a ridiculous solution, a parsable json report would be great.

                ident = [x for x in str(report).split('\n') if "_id='Q" in x]
                if len(ident) == 1:
                    return ident[0].split("'")[1]
                else:
                    raise Exception("Surprise, this method didn't work.")

def extant_entities():

    ''' Return dictionary of entities per label. '''

    q = sparql_query("""
        SELECT ?entity ?label WHERE {
            ?entity rdfs:label ?label . } """, 
        f'https://query.filmbase.wiki/proxy/wdqs/bigdata/namespace/wdq/sparql')

    if len(q):
        q = q[['entity', 'label']].drop_duplicates(subset='label')
        q = q.set_index('label').T.to_dict('records')[0]
        return q
    else:
        return dict()

def transfer_column(row):

    ''' Transfer values between columns. '''

    if row['type'] != 'wikibase-item':
        return row['O']
    else:
        return row['O2']

def extant_statements():

    ''' Return dataframe of statements. '''

    q = sparql_query("""
        SELECT ?S2 ?P2 ?O2 WHERE {
            ?S2 ?P2 ?O2 . } """, 
        f'https://query.filmbase.wiki/proxy/wdqs/bigdata/namespace/wdq/sparql')

    return q

with open(pathlib.Path.home() / 'filmbase_config.json') as conf:
    conf = json.load(conf)

WDUSER = conf['username']
WDPASS = conf['password']
wbi_config['MEDIAWIKI_API_URL'] = f"https://{conf['url']}/w/api.php"
wbi_config['USER_AGENT'] = conf['agent']
login_instance = wbi_login.Login(user=WDUSER, password=WDPASS)
wbi = WikibaseIntegrator(login=login_instance)

model = pandas.read_json(pathlib.Path.cwd() / 'data-model.json').rename(columns={'name':'P'})
data = pandas.read_json(pathlib.Path.cwd() / 'dataset.json')
data = pandas.merge(data, model, on='P', how='left')
data['O'] = data['O'].astype('str').str.strip()

undefined = data.copy()
undefined = undefined.loc[undefined.type.isin([numpy.nan])]

if len(undefined):
    raise Exception('Properties present in data which are not defined in Data Model.')

print(len(data))
data.head()

In [None]:
# reference property

cited_dataframe = pandas.DataFrame(data={'P': ['cited by'], 'type': ['string'], 'O': ['string']})
cited_dataframe['P2'] = cited_dataframe.apply(engineer, col='P', extant=extant_entities(), entitytype='P', axis=1)

print(len(cited_dataframe))
cited_dataframe.head()

In [None]:
# write properties.

build_property = data.copy()
build_property = build_property[['P', 'type']].drop_duplicates()
build_property['P2'] = build_property.apply(engineer, col='P', extant=extant_entities(), entitytype='P', axis=1)
build_property = build_property[['P', 'P2']].drop_duplicates()
data = pandas.merge(data, build_property, on=['P'], how='left')

print(len(data))
data.head()

In [None]:
# write objects.

builder = data.copy()
builder = builder[['O', 'type']].drop_duplicates()
builder['O2'] = builder.apply(engineer, col='O', extant=extant_entities(), entitytype='O', axis=1)
builder = builder.loc[builder.type.isin(['wikibase-item'])]
data = pandas.merge(data, builder, on=['O', 'type'], how='left')
data['O2'] = data.apply(transfer_column, axis=1)

print(len(data))
data.head()

In [None]:
# write subjects.

builder = data.copy()
builder = builder[['S']].drop_duplicates()
builder['S2'] = builder.apply(engineer, col='S', extant=extant_entities(), entitytype='S', axis=1)
data = pandas.merge(data, builder, on=['S'], how='left')

print(len(data))
data.head()

In [None]:
# write statements. 

existing = extant_statements()
for x in ['https://filmbase.wiki/prop/direct/', 'https://filmbase.wiki/entity/']:
    for y in ['S2', 'P2', 'O2']:
        existing[y] = existing[y].str.replace(x, '', regex=False)

existing['Remove'] = True

statements = model.copy()
statements = statements[['P']].drop_duplicates()
statements = pandas.merge(statements, data, on='P', how='left')
statements = pandas.merge(statements, existing, on=['S2', 'P2', 'O2'], how='left')
statements = statements.loc[~statements.Remove.isin([True])]

cite_id = pathlib.Path(extant_entities()['cited by']).stem
for x in tqdm.tqdm(statements.to_dict('records')):

    element = wbi.item.get(x['S2'])
    if x['type'] == 'string':
        claim_references = References()  
        claim_reference1 = Reference()
        if len(x['R']):
            claim_reference1.add(datatypes.String(prop_nr=cite_id, value=x['R']))
            claim_references.add(claim_reference1)
        element.claims.add(datatypes.String(prop_nr=x['P2'], value=str(x['O']), references=claim_references), action_if_exists=ActionIfExists.APPEND)
    elif x['type'] == 'quantity':
        claim_references = References()  
        claim_reference1 = Reference()
        if len(x['R']):
            claim_reference1.add(datatypes.String(prop_nr=cite_id, value=x['R']))
            claim_references.add(claim_reference1)
        element.claims.add(datatypes.Quantity(prop_nr=x['P2'], amount=str(x['O']), references=claim_references), action_if_exists=ActionIfExists.APPEND)
    else:
        claim_references = References()  
        claim_reference1 = Reference()
        if len(x['R']):
            claim_reference1.add(datatypes.String(prop_nr=cite_id, value=x['R']))
            claim_references.add(claim_reference1)
        element.claims.add(datatypes.Item(prop_nr=x['P2'], value=str(x['O2']), references=claim_references), action_if_exists=ActionIfExists.APPEND)
    element.write()

print(len(statements))
statements.head()