Skip to content

Commit

Permalink
[#2788] Commit only once at the end of a search index rebuild
Browse files Browse the repository at this point in the history
Commiting changes in the Solr search index is a heavy task that takes
significant time. We are currently commiting changes after each update,
which is probably fine for individual updated (ie users editing or
creating a dataset), but when rebuilding the whole index it is
unnecessary. A single commit at the end of the process is needed, and
that speeds the reindexing about a 85%.

A flag has been added (`-e` or `--commit-each`) to allow the old
behaviour (commiting after each edit).
  • Loading branch information
amercader committed Aug 2, 2012
1 parent 56fa097 commit 4ad7ac0
Show file tree
Hide file tree
Showing 4 changed files with 47 additions and 13 deletions.
21 changes: 18 additions & 3 deletions ckan/lib/cli.py
Expand Up @@ -277,7 +277,7 @@ class SearchIndexCommand(CkanCommand):
'''Creates a search index for all datasets
Usage:
search-index [-i] [-o] [-r] rebuild [dataset-name] - reindex dataset-name if given, if not then rebuild full search index (all datasets)
search-index [-i] [-o] [-r] [-e] rebuild [dataset-name] - reindex dataset-name if given, if not then rebuild full search index (all datasets)
search-index check - checks for datasets not indexed
search-index show {dataset-name} - shows index of a dataset
search-index clear [dataset-name] - clears the search index for the provided dataset or for the whole ckan instance
Expand All @@ -301,6 +301,13 @@ def __init__(self,name):
self.parser.add_option('-r', '--refresh', dest='refresh',
action='store_true', default=False, help='Refresh current index (does not clear the existing one)')

self.parser.add_option('-e', '--commit-each', dest='commit_each',
action='store_true', default=False, help=
'''Perform a commit after indexing each dataset. This ensures that changes are
immediately available on the search, but slows significantly the process.
Default is false.'''
)

def command(self):
self._load_config()

Expand All @@ -322,14 +329,22 @@ def command(self):
print 'Command %s not recognized' % cmd

def rebuild(self):
from ckan.lib.search import rebuild
from ckan.lib.search import rebuild, commit

# BY default we don't commit after each request to Solr, as it is
# a really heavy operation and slows things a lot

if len(self.args) > 1:
rebuild(self.args[1])
else:
rebuild(only_missing=self.options.only_missing,
force=self.options.force,
refresh=self.options.refresh)
refresh=self.options.refresh,
defer_commit=(not self.options.commit_each))

if not self.options.commit_each:
commit()

def check(self):
from ckan.lib.search import check

Expand Down
12 changes: 9 additions & 3 deletions ckan/lib/search/__init__.py
Expand Up @@ -131,7 +131,7 @@ def notify(self, entity, operation):
log.warn("Discarded Sync. indexing for: %s" % entity)


def rebuild(package_id=None, only_missing=False, force=False, refresh=False):
def rebuild(package_id=None, only_missing=False, force=False, refresh=False, defer_commit=False):
'''
Rebuilds the search index.
Expand Down Expand Up @@ -175,12 +175,13 @@ def rebuild(package_id=None, only_missing=False, force=False, refresh=False):

for pkg_id in package_ids:
try:
package_index.insert_dict(
package_index.update_dict(
get_action('package_show')(
{'model': model, 'ignore_auth': True,
'validate': False},
{'id': pkg_id}
)
),
defer_commit
)
except Exception, e:
log.error('Error while indexing dataset %s: %s' %
Expand All @@ -195,6 +196,11 @@ def rebuild(package_id=None, only_missing=False, force=False, refresh=False):
log.info('Finished rebuilding search index.')


def commit():
package_index = index_for(model.Package)
package_index.commit()
log.info('Commited pending changes on the search index')

def check():
from ckan import model
package_query = query_for(model.Package)
Expand Down
23 changes: 18 additions & 5 deletions ckan/lib/search/index.py
Expand Up @@ -92,10 +92,10 @@ class PackageSearchIndex(SearchIndex):
def remove_dict(self, pkg_dict):
self.delete_package(pkg_dict)

def update_dict(self, pkg_dict):
self.index_package(pkg_dict)
def update_dict(self, pkg_dict, defer_commit=False):
self.index_package(pkg_dict, defer_commit)

def index_package(self, pkg_dict):
def index_package(self, pkg_dict, defer_commit=False):
if pkg_dict is None:
return
pkg_dict['data_dict'] = json.dumps(pkg_dict)
Expand Down Expand Up @@ -222,14 +222,27 @@ def index_package(self, pkg_dict):
# send to solr:
try:
conn = make_connection()
conn.add_many([pkg_dict], _commit=True)
commit = not defer_commit
conn.add_many([pkg_dict], _commit=commit)
except Exception, e:
log.exception(e)
raise SearchIndexError(e)
finally:
conn.close()

commit_debug_msg = 'Not commited yet' if defer_commit else 'Commited'
log.debug('Updated index for %s [%s]' % (pkg_dict.get('name'), commit_debug_msg))

def commit(self):
try:
conn = make_connection()
conn.commit(wait_flush=False, wait_searcher=False)
except Exception, e:
log.exception(e)
raise SearchIndexError(e)
finally:
conn.close()

log.debug("Updated index for %s" % pkg_dict.get('name'))

def delete_package(self, pkg_dict):
conn = make_connection()
Expand Down
4 changes: 2 additions & 2 deletions ckan/tests/lib/test_cli.py
Expand Up @@ -80,7 +80,7 @@ def test_clear_and_rebuild_index(self):

# Rebuild index
self.search.args = ()
self.search.options = FakeOptions(only_missing=False,force=False,refresh=False)
self.search.options = FakeOptions(only_missing=False,force=False,refresh=False,commit_each=False)
self.search.rebuild()
pkg_count = model.Session.query(model.Package).filter(model.Package.state==u'active').count()

Expand All @@ -103,7 +103,7 @@ def test_clear_and_rebuild_only_one(self):

# Rebuild index for annakarenina
self.search.args = ('rebuild annakarenina').split()
self.search.options = FakeOptions(only_missing=False,force=False,refresh=False)
self.search.options = FakeOptions(only_missing=False,force=False,refresh=False,commit_each=False)
self.search.rebuild()

self.query.run({'q':'*:*'})
Expand Down

0 comments on commit 4ad7ac0

Please sign in to comment.