Skip to content

Commit

Permalink
#700 add muliprocessing reindex
Browse files Browse the repository at this point in the history
  • Loading branch information
kindly authored and ckan team committed Mar 28, 2013
1 parent 20639d2 commit b809f48
Show file tree
Hide file tree
Showing 2 changed files with 61 additions and 5 deletions.
57 changes: 53 additions & 4 deletions ckan/lib/cli.py
@@ -1,5 +1,6 @@
import collections
import csv
import multiprocessing as mp
import os
import datetime
import sys
Expand All @@ -8,6 +9,7 @@
import ckan.include.rjsmin as rjsmin
import ckan.include.rcssmin as rcssmin
import ckan.lib.fanstatic_resources as fanstatic_resources
import sqlalchemy as sa

import paste.script
from paste.registry import Registry
Expand Down Expand Up @@ -69,7 +71,7 @@ class CkanCommand(paste.script.command.Command):
default_verbosity = 1
group_name = 'ckan'

def _load_config(self):
def _get_config(self):
from paste.deploy import appconfig
if not self.options.config:
msg = 'No config file supplied'
Expand All @@ -78,7 +80,10 @@ def _load_config(self):
if not os.path.exists(self.filename):
raise AssertionError('Config filename %r does not exist.' % self.filename)
fileConfig(self.filename)
conf = appconfig('config:' + self.filename)
return appconfig('config:' + self.filename)

def _load_config(self):
conf = self._get_config()
assert 'ckan' not in dir() # otherwise loggers would be disabled
# We have now loaded the config. Now we can import ckan for the
# first time.
Expand Down Expand Up @@ -308,11 +313,13 @@ def version(self):
print Session.execute('select version from migrate_version;').fetchall()



class SearchIndexCommand(CkanCommand):
'''Creates a search index for all datasets
Usage:
search-index [-i] [-o] [-r] [-e] rebuild [dataset-name] - reindex dataset-name if given, if not then rebuild full search index (all datasets)
search-index rebuild_fast - reindex using multiprocessing using all cores. This acts in the same way as rubuild -r [EXPERIMENTAL]
search-index check - checks for datasets not indexed
search-index show {dataset-name} - shows index of a dataset
search-index clear [dataset-name] - clears the search index for the provided dataset or for the whole ckan instance
Expand Down Expand Up @@ -344,14 +351,18 @@ def __init__(self,name):
)

def command(self):
self._load_config()

if not self.args:
# default to printing help
print self.usage
return

cmd = self.args[0]
# Do not run load_config yet
if cmd == 'rebuild_fast':
self.rebuild_fast()
return

self._load_config()
if cmd == 'rebuild':
self.rebuild()
elif cmd == 'check':
Expand Down Expand Up @@ -400,6 +411,44 @@ def clear(self):
package_id =self.args[1] if len(self.args) > 1 else None
clear(package_id)

def rebuild_fast(self):
### Get out config but without starting pylons environment ####
conf = self._get_config()

### Get ids using own engine, otherwise multiprocess will balk
db_url = conf['sqlalchemy.url']
engine = sa.create_engine(db_url)
package_ids = []
result = engine.execute("select id from package where state = 'active';")
for row in result:
package_ids.append(row[0])

def start(ids):
## load actual enviroment for each subprocess, so each have thier own
## sa session
self._load_config()
from ckan.lib.search import rebuild, commit
rebuild(package_ids=ids)
commit()

def chunks(l, n):
""" Yield n successive chunks from l.
"""
newn = int(len(l) / n)
for i in xrange(0, n-1):
yield l[i*newn:i*newn+newn]
yield l[n*newn-newn:]

for chunk in chunks(package_ids, mp.cpu_count()):
processes = []
process = mp.Process(target=start, args=(chunk,))
processes.append(process)
process.daemon = True
process.start()

for process in processes:
process.join()

class Notification(CkanCommand):
'''Send out modification notifications.
Expand Down
9 changes: 8 additions & 1 deletion ckan/lib/search/__init__.py
Expand Up @@ -134,7 +134,7 @@ def notify(self, entity, operation):
log.warn("Discarded Sync. indexing for: %s" % entity)


def rebuild(package_id=None, only_missing=False, force=False, refresh=False, defer_commit=False):
def rebuild(package_id=None, only_missing=False, force=False, refresh=False, defer_commit=False, package_ids=None):
'''
Rebuilds the search index.
Expand All @@ -155,6 +155,13 @@ def rebuild(package_id=None, only_missing=False, force=False, refresh=False, def
log.info('Indexing just package %r...', pkg_dict['name'])
package_index.remove_dict(pkg_dict)
package_index.insert_dict(pkg_dict)
elif package_ids:
for package_id in package_ids:
pkg_dict = logic.get_action('package_show')(
{'model': model, 'ignore_auth': True, 'validate': False},
{'id': package_id})
log.info('Indexing just package %r...', pkg_dict['name'])
package_index.update_dict(pkg_dict, True)
else:
package_ids = [r[0] for r in model.Session.query(model.Package.id).
filter(model.Package.state == 'active').all()]
Expand Down

0 comments on commit b809f48

Please sign in to comment.