Skip to content

Commit

Permalink
Merge 9d8ddf0 into 9f20e0d
Browse files Browse the repository at this point in the history
  • Loading branch information
wetneb committed Sep 25, 2020
2 parents 9f20e0d + 9d8ddf0 commit 6646d34
Show file tree
Hide file tree
Showing 10 changed files with 181 additions and 46 deletions.
16 changes: 16 additions & 0 deletions docs/indexing.rst
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ provided to index people, organizations and places at
{
"language": "en", # The preferred language
"name": "human_organization_location", # An identifier for the profile
"solrconfig": "tapioca", # the name of the Solr pipeline to index the dumps
"restrict_properties": [
"P2427", "P1566", "P496", # Include all items bearing any of these properties
],
Expand Down Expand Up @@ -118,3 +119,18 @@ bottleneck:
bunzip2 < latest-all.json.bz2 | tapioca index-dump my_collection_name - --profile profiles/human_organization_place.json


Indexing via SPARQL
-------------------

If the collection of items to ingest is small enough, it can be fetched by a SPARQL query. In that
case we can avoid processing an entire dump and selectively index the items which are returned by the SPARQL
query. The SPARQL query is written in a file:

::

tapioca index-sparql my_collection_name my_sparql_query_file --propfile profiles/human_organization_place.json


The SPARQL query is required to have a variable `item` which ranges over the items to index. It is recommended
that the query returns distinct items.

21 changes: 21 additions & 0 deletions opentapioca/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,26 @@ def index_dump(collection_name, filename, profile, shards, skip, solr='http://lo
tagger.index_stream(collection_name, dump, indexing_profile,
batch_size=2000, commit_time=10, delete_excluded=False, skip_docs=skip)

@click.command()
@click.argument('collection_name')
@click.argument('sparql_query_file')
@click.option('-p', '--profile', help='Filename of the indexing profile to use')
@click.option('-s', '--shards', default=1, help='Number of shards to use when creating the collection, if needed')
def index_sparql(collection_name, sparql_query_file, profile, shards, solr='http://localhost:8983/solr/'):
"""
Indexes the results of a SPARQL query which contains an "item" variable pointing to items to index
"""
tagger = TaggerFactory(solr)
indexing_profile = IndexingProfile.load(profile)
try:
tagger.create_collection(collection_name, num_shards=shards, configset=indexing_profile.solrconfig)
except CollectionAlreadyExists:
pass
with open(sparql_query_file, 'r') as f:
query = f.read()
query_results = SparqlReader(query)
tagger.index_stream(collection_name, query_results, indexing_profile, batch_size=50, commit_time=10, delete_excluded=False)

@click.command()
@click.argument('collection_name')
@click.option('-p', '--profile', help='Filename of the indexing profile to use')
Expand Down Expand Up @@ -193,6 +213,7 @@ def train_classifier(collection, bow, pagerank, dataset, output, max_iter):
cli.add_command(compute_pagerank)
cli.add_command(pagerank_shell)
cli.add_command(index_dump)
cli.add_command(index_sparql)
cli.add_command(index_stream)
cli.add_command(delete_collection)
cli.add_command(train_classifier)
Expand Down
46 changes: 46 additions & 0 deletions opentapioca/readers/apireaderbase.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
import logging
import requests

from time import sleep
from opentapioca.wditem import WikidataItemDocument

logger = logging.getLogger(__name__)

class APIReaderBase(object):
"""
Base class for a reader that relies on the MediaWiki API to fetch
item contents.
"""

def __init__(self, mediawiki_api):
self.mediawiki_api = mediawiki_api
self.retries = 5
self.delay = 5

def fetch_items(self, qids):
"""
Given a list of qids, fetch the corresponding documents via the Wikidata API.
"""
if not qids:
return []
for retries in range(self.retries):
try:
req = requests.get(self.mediawiki_api, {
'format':'json',
'action':'wbgetentities',
'ids':'|'.join(qids)})
req.raise_for_status()
result = req.json().get('entities').values()
return [WikidataItemDocument(payload) for payload in result if 'missing' not in payload]
except (requests.exceptions.RequestException, ValueError, TypeError, AttributeError) as e:
logger.warning(e)
if retries < self.retries-1:
sleep_time = (1+retries)*self.delay
logger.info('Retrying wbgetentities in {}'.format(sleep_time))
sleep(sleep_time)
else:
logger.error('Failed to fetch entities')
logger.error(req.url)
raise


47 changes: 47 additions & 0 deletions opentapioca/readers/sparqlreader.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
import json
import re
import logging

from .apireaderbase import APIReaderBase
from opentapioca.sparqlwikidata import sparql_wikidata
from opentapioca.utils import to_q

logger = logging.getLogger(__name__)

class SparqlReader(APIReaderBase):
"""
Generates a collection of `WikidataItemDocument` from
a SPARQL query which contains an "item" variable.
"""

def __init__(self,
query,
endpoint='https://query.wikidata.org/sparql',
mediawiki_api='https://www.wikidata.org/w/api.php'):
super(SparqlReader, self).__init__(mediawiki_api)
self.endpoint = endpoint
self.query = query
self.batch_size = 50
self.query_results = None

def __enter__(self):
self.query_results = sparql_wikidata(self.query, endpoint=self.endpoint)['bindings']
return self

def __exit__(self, *args, **kwargs):
return None

def __iter__(self):
if self.query_results is None:
raise ValueError('Query results have not been fetched.')
while self.query_results:
batch = self.query_results[:self.batch_size]
self.query_results = self.query_results[self.batch_size:]

qids = [to_q(result['item']['value']) for result in batch if 'item' in result]
qids_without_none = [qid for qid in qids if qid]

# Fetch item contents
for item in self.fetch_items(qids_without_none):
yield item

45 changes: 8 additions & 37 deletions opentapioca/readers/streamreader.py
Original file line number Diff line number Diff line change
@@ -1,36 +1,31 @@
import json
import re
import requests
import requests.exceptions
import logging

from opentapioca.wditem import WikidataItemDocument
from time import sleep
from sseclient import SSEClient
from .apireaderbase import APIReaderBase

logger = logging.getLogger(__name__)

class WikidataStreamReader(object):
class WikidataStreamReader(APIReaderBase):
"""
Generates a stream of `WikidataItemDocument` from
the Wikidata edit stream.
"""

def __init__(self,
endpoint='https://stream.wikimedia.org/v2/stream/recentchange',
wiki='wikidatawiki',
mediawiki_api='https://www.wikidata.org/w/api.php',
from_time=None):
super(WikidataStreamReader, self).__init__(mediawiki_api)
self.endpoint = endpoint
self.wiki = wiki
self.mediawiki_api = mediawiki_api
self.from_time = from_time
self.stream = None
self.item_buffer = []
self.batch_size = 50
self.namespaces = [0]
self.retries = 5
self.delay = 5
self.id_re = re.compile(r'^Q[1-9]\d+$')

def __enter__(self):
Expand All @@ -39,7 +34,7 @@ def __enter__(self):
url += '?since='+self.from_time.isoformat().replace('+00:00', 'Z')
self.stream = SSEClient(url)
return self

def __exit__(self, *args, **kwargs):
return None

Expand All @@ -50,14 +45,14 @@ def __iter__(self):
while not stream_ended:
# Fetch new batch of events
qids = [self.fetch_next_qid() for _ in range(self.batch_size)]

stream_ended = None in qids
qids_without_none = {qid for qid in qids if qid}

# Fetch item contents
for item in self.fetch_items(qids_without_none):
yield item

def fetch_next_qid(self):
"""
Fetches the next Qid in the Wikidata edit stream
Expand All @@ -73,29 +68,5 @@ def fetch_next_qid(self):
return change['title']
except ValueError:
pass

def fetch_items(self, qids):
"""
Given a list of qids, fetch the corresponding documents via the Wikidata API.
"""
for retries in range(self.retries):
try:
req = requests.get(self.mediawiki_api, {
'format':'json',
'action':'wbgetentities',
'ids':'|'.join(qids)})
req.raise_for_status()
result = req.json().get('entities').values()
return [WikidataItemDocument(payload) for payload in result if 'missing' not in payload]
except (requests.exceptions.RequestException, ValueError, TypeError, AttributeError) as e:
logger.warning(e)
if retries < self.retries-1:
sleep_time = (1+retries)*self.delay
logger.info('Retrying wbgetentities in {}'.format(sleep_time))
sleep(sleep_time)
else:
logger.error('Failed to fetch entities')
logger.error(req.url)
raise


4 changes: 2 additions & 2 deletions opentapioca/sparqlwikidata.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import requests

def sparql_wikidata(query_string):
results = requests.get('https://query.wikidata.org/sparql', {'query': query_string, 'format': 'json'}).json()
def sparql_wikidata(query_string, endpoint='https://query.wikidata.org/sparql'):
results = requests.get(endpoint, {'query': query_string, 'format': 'json'}).json()
return results['results']
6 changes: 6 additions & 0 deletions opentapioca/tests/data/dummy_sparql_query_response.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{"results":{"bindings":[
{"item":{"value":"http://www.wikidata.org/entity/Q123"}},
{"item":{"value":"http://www.wikidata.org/entity/Q456"}},
{"item":{"value":null}},
{"item":{"value":"http://www.wikidata.org/entity/Q789"}}
]}}
10 changes: 9 additions & 1 deletion opentapioca/tests/test_fixtures.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ def cache_requests():
requests_cache.install_cache(cache_name=location)
yield
requests_cache.uninstall_cache()

@pytest.fixture
def testdir():
return os.path.dirname(os.path.abspath(__file__))
Expand All @@ -24,3 +24,11 @@ def load(qid):
with open(filename, 'r') as f:
return WikidataItemDocument(json.load(f))
return load

@pytest.fixture
def wbgetentities_response(testdir):
with open(os.path.join(testdir, 'data', 'wbgetentities_response.json'), 'r') as f:
return f.read()



25 changes: 25 additions & 0 deletions opentapioca/tests/test_sparqlreader.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@

import pytest
import requests_mock
import os

from opentapioca.readers.sparqlreader import SparqlReader
from .test_fixtures import wbgetentities_response
from .test_fixtures import testdir

@pytest.fixture
def dummy_sparql_query_response(testdir):
with open(os.path.join(testdir, 'data', 'dummy_sparql_query_response.json'), 'r') as f:
return f.read()

def test_iterate(wbgetentities_response, dummy_sparql_query_response):
query = "mysparqlquery"
reader = SparqlReader(query)
with requests_mock.mock() as mocker:
mocker.get('https://www.wikidata.org/w/api.php?format=json&action=wbgetentities&ids=Q123%7CQ456%7CQ789', text=wbgetentities_response)
mocker.get('https://query.wikidata.org/sparql?format=json&query=mysparqlquery', text=dummy_sparql_query_response)

with reader as entered_reader:
items = list(entered_reader)

assert [item.get('id') for item in items] == ['Q123', 'Q456']
7 changes: 1 addition & 6 deletions opentapioca/tests/test_streamreader.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from opentapioca.readers.streamreader import WikidataStreamReader
from collections import namedtuple
from .test_fixtures import testdir
from .test_fixtures import wbgetentities_response
from opentapioca.wditem import WikidataItemDocument

EventStubBase = namedtuple('EventStubBase', ['data', 'event'])
Expand All @@ -19,12 +20,6 @@ def EventStub(event='message', wiki='wikidatawiki', namespace=0, title='Q123'):
))


@pytest.fixture
def wbgetentities_response(testdir):
with open(os.path.join(testdir, 'data', 'wbgetentities_response.json'), 'r') as f:
return f.read()


@pytest.fixture
def event_stream():
return [
Expand Down

0 comments on commit 6646d34

Please sign in to comment.