Skip to content

Commit

Permalink
[#1616][search] Improvements on the search index CLI
Browse files Browse the repository at this point in the history
Several fixes and new options for the `search-index` paster command:

* Add -o option to only reindex datasets not already indexed
* Add -i option to ignore exceptions when rebuilding
* Add -r option to just refresh the index (not clearing it first)
* Fix show command to show the index stored for a dataset
* Add support for clearing the index of just one dataset
  • Loading branch information
amercader committed Mar 20, 2012
1 parent a9385df commit 10cfd16
Show file tree
Hide file tree
Showing 6 changed files with 299 additions and 104 deletions.
113 changes: 74 additions & 39 deletions ckan/lib/cli.py
Expand Up @@ -8,12 +8,12 @@
from paste.script.util.logging_config import fileConfig
import re

class MockTranslator(object):
def gettext(self, value):
return value
class MockTranslator(object):
def gettext(self, value):
return value

def ugettext(self, value):
return value
def ugettext(self, value):
return value

def ungettext(self, singular, plural, n):
if n > 1:
Expand Down Expand Up @@ -54,13 +54,13 @@ def _load_config(self):
self.registry.register(pylons.translator, self.translator_obj)

def _setup_app(self):
cmd = paste.script.appinstall.SetupCommand('setup-app')
cmd.run([self.filename])
cmd = paste.script.appinstall.SetupCommand('setup-app')
cmd.run([self.filename])


class ManageDb(CkanCommand):
'''Perform various tasks on the database.
db create # alias of db upgrade
db init # create and put in default data
db clean
Expand All @@ -82,7 +82,7 @@ class ManageDb(CkanCommand):
max_args = None
min_args = 1

def command(self):
def command(self):
self._load_config()
from ckan import model
import ckan.lib.search as search
Expand Down Expand Up @@ -170,7 +170,7 @@ def _postgres_load(self, filepath):
self._run_cmd(pg_cmd)

def _run_cmd(self, command_line):
import subprocess
import subprocess
retcode = subprocess.call(command_line, shell=True)
if retcode != 0:
raise SystemError('Command exited with errorcode: %i' % retcode)
Expand All @@ -196,7 +196,7 @@ def load(self, only_load=False):
print 'Upgrading DB'
from ckan import model
model.repo.upgrade_db()

print 'Rebuilding search index'
import ckan.lib.search
ckan.lib.search.rebuild()
Expand Down Expand Up @@ -270,47 +270,82 @@ class SearchIndexCommand(CkanCommand):
'''Creates a search index for all datasets
Usage:
search-index rebuild [package-name] - reindex package-name if given, if not then rebuild full search index (all packages)
search-index check - checks for packages not indexed
search-index show {package-name} - shows index of a package
search-index clear - clears the search index for this ckan instance
search-index [-i] [-o] [-r] rebuild [dataset-name] - reindex dataset-name if given, if not then rebuild full search index (all datasets)
search-index check - checks for datasets not indexed
search-index show {dataset-name} - shows index of a dataset
search-index clear [dataset-name] - clears the search index for the provided dataset or for the whole ckan instance
'''

summary = __doc__.split('\n')[0]
usage = __doc__
max_args = 2
min_args = 0

def __init__(self,name):

super(SearchIndexCommand,self).__init__(name)

self.parser.add_option('-i', '--force', dest='force',
action='store_true', default=False, help='Ignore exceptions when rebuilding the index')

self.parser.add_option('-o', '--only-missing', dest='only_missing',
action='store_true', default=False, help='Index non indexed datasets only')

self.parser.add_option('-r', '--refresh', dest='refresh',
action='store_true', default=False, help='Refresh current index (does not clear the existing one)')

def command(self):
self._load_config()
from ckan.lib.search import rebuild, check, show, clear

if not self.args:
# default to printing help
print self.usage
return

cmd = self.args[0]
cmd = self.args[0]
if cmd == 'rebuild':
if len(self.args) > 1:
rebuild(self.args[1])
else:
rebuild()
self.rebuild()
elif cmd == 'check':
check()
self.check()
elif cmd == 'show':
if not len(self.args) == 2:
import pdb; pdb.set_trace()
self.args
show(self.args[1])
self.show()
elif cmd == 'clear':
clear()
self.clear()
else:
print 'Command %s not recognized' % cmd

def rebuild(self):
from ckan.lib.search import rebuild

if len(self.args) > 1:
rebuild(self.args[1])
else:
rebuild(only_missing=self.options.only_missing,
force=self.options.force,
refresh=self.options.refresh)
def check(self):
from ckan.lib.search import check

check()

def show(self):
from ckan.lib.search import show

if not len(self.args) == 2:
print 'Missing parameter: dataset-name'
return
index = show(self.args[1])
pprint(index)

def clear(self):
from ckan.lib.search import clear

package_id =self.args[1] if len(self.args) > 1 else None
clear(package_id)

class Notification(CkanCommand):
'''Send out modification notifications.
In "replay" mode, an update signal is sent for each dataset in the database.
Usage:
Expand All @@ -332,7 +367,7 @@ def command(self):
cmd = 'replay'
else:
cmd = self.args[0]

if cmd == 'replay':
dome = DomainObjectModificationExtension()
for package in Session.query(Package):
Expand Down Expand Up @@ -466,12 +501,12 @@ def get_user_str(self, user):
if user.name != user.display_name:
user_str += ' display=%s' % user.display_name
return user_str

def list(self):
from ckan import model
print 'Users:'
users = model.Session.query(model.User)
print 'count = %i' % users.count()
print 'count = %i' % users.count()
for user in users:
print self.get_user_str(user)

Expand All @@ -484,7 +519,7 @@ def show(self):

def setpass(self):
from ckan import model

if len(self.args) < 2:
print 'Need name of the user.'
return
Expand Down Expand Up @@ -524,7 +559,7 @@ def password_prompt(cls):

def add(self):
from ckan import model

if len(self.args) < 2:
print 'Need name of the user.'
return
Expand Down Expand Up @@ -561,10 +596,10 @@ def add(self):

if not password:
password = self.password_prompt()

print('Creating user: %r' % username)


user_params = {'name': unicode(username),
'password': password}
if apikey:
Expand Down Expand Up @@ -641,7 +676,7 @@ def _get_dataset(self, dataset_ref):
dataset = model.Package.get(unicode(dataset_ref))
assert dataset, 'Could not find dataset matching reference: %r' % dataset_ref
return dataset

def show(self, dataset_ref):
from ckan import model
import pprint
Expand Down Expand Up @@ -670,7 +705,7 @@ def purge(self, dataset_ref):
dataset.purge()
model.repo.commit_and_remove()
print '%s purged' % name


class Celery(CkanCommand):
'''Run celery daemon
Expand All @@ -686,7 +721,7 @@ def command(self):
os.environ['CKAN_CONFIG'] = os.path.abspath(self.options.config)
from ckan.lib.celery_app import celery
celery.worker_main(argv=['celeryd', '--loglevel=INFO'])


class Ratings(CkanCommand):
'''Manage the ratings stored in the db
Expand Down Expand Up @@ -721,7 +756,7 @@ def count(self):
q = model.Session.query(model.Rating)
print "%i ratings" % q.count()
q = q.filter(model.Rating.user_id == None)
print "of which %i are anonymous ratings" % q.count()
print "of which %i are anonymous ratings" % q.count()

def clean(self, user_ratings=True):
from ckan import model
Expand Down
80 changes: 64 additions & 16 deletions ckan/lib/search/__init__.py
Expand Up @@ -13,6 +13,17 @@

log = logging.getLogger(__name__)

import sys
import cgitb
import warnings
def text_traceback():
with warnings.catch_warnings():
warnings.simplefilter("ignore")
res = 'the original traceback:'.join(
cgitb.text(sys.exc_info()).split('the original traceback:')[1:]
).strip()
return res

SIMPLE_SEARCH = config.get('ckan.simple_search', False)

SUPPORTED_SCHEMA_VERSIONS = ['1.3']
Expand Down Expand Up @@ -111,29 +122,61 @@ def notify(self, entity, operation):
else:
log.warn("Discarded Sync. indexing for: %s" % entity)

def rebuild(package=None):
def rebuild(package_id=None,only_missing=False,force=False,refresh=False):
'''
Rebuilds the search index.
If a dataset id is provided, only this dataset will be reindexed.
When reindexing all datasets, if only_missing is True, only the
datasets not already indexed will be processed. If force equals
True, if an execption is found, the exception will be logged, but
the process will carry on.
'''
from ckan import model
log.debug("Rebuilding search index...")

package_index = index_for(model.Package)

if package:
if package_id:
pkg_dict = get_action('package_show_rest')(
{'model': model, 'ignore_auth': True, 'api_version':1},
{'id': package}
{'id': package_id}
)
package_index.remove_dict(pkg_dict)
package_index.insert_dict(pkg_dict)
else:
# rebuild index
package_index.clear()
for pkg in model.Session.query(model.Package).filter(model.Package.state == 'active').all():
package_index.insert_dict(
get_action('package_show_rest')(
{'model': model, 'ignore_auth': True, 'api_version':1},
{'id': pkg.id}
package_ids = [r[0] for r in model.Session.query(model.Package.id).filter(model.Package.state == 'active').all()]
if only_missing:
log.debug('Indexing only missing packages...')
package_query = query_for(model.Package)
indexed_pkg_ids = set(package_query.get_all_entity_ids(max_results=len(package_ids)))
package_ids = set(package_ids) - indexed_pkg_ids # Packages not indexed

if len(package_ids) == 0:
log.debug('All datasets are already indexed')
return
else:
log.debug('Rebuilding the whole index...')
# When refreshing, the index is not previously cleared
if not refresh:
package_index.clear()

for pkg_id in package_ids:
try:
package_index.insert_dict(
get_action('package_show_rest')(
{'model': model, 'ignore_auth': True, 'api_version':1},
{'id': pkg_id}
)
)
)
except Exception,e:
log.error('Error while indexing dataset %s: %s' % (pkg_id,str(e)))
if force:
log.error(text_traceback())
continue
else:
raise

model.Session.commit()
log.debug('Finished rebuilding search index.')

Expand All @@ -153,14 +196,19 @@ def check():

def show(package_reference):
from ckan import model
package_index = index_for(model.Package)
print package_index.get_index(package_reference)
package_query = query_for(model.Package)

def clear():
return package_query.get_index(package_reference)

def clear(package_reference=None):
from ckan import model
log.debug("Clearing search index...")
package_index = index_for(model.Package)
package_index.clear()
if package_reference:
log.debug("Clearing search index for dataset %s..." % package_reference)
package_index.delete_package({'id':package_reference})
else:
log.debug("Clearing search index...")
package_index.clear()


def check_solr_schema_version(schema_file=None):
Expand Down
4 changes: 2 additions & 2 deletions ckan/lib/search/index.py
Expand Up @@ -158,8 +158,8 @@ def index_package(self, pkg_dict):

def delete_package(self, pkg_dict):
conn = make_connection()
query = "+%s:%s +id:\"%s\" +site_id:\"%s\"" % (TYPE_FIELD, PACKAGE_TYPE,
pkg_dict.get('id'),
query = "+%s:%s (+id:\"%s\" OR +name:\"%s\") +site_id:\"%s\"" % (TYPE_FIELD, PACKAGE_TYPE,
pkg_dict.get('id'), pkg_dict.get('id'),
config.get('ckan.site_id'))
try:
conn.delete_query(query)
Expand Down

0 comments on commit 10cfd16

Please sign in to comment.