Skip to content

Commit

Permalink
[#940] Add datapusher callback hook
Browse files Browse the repository at this point in the history
  • Loading branch information
domoritz committed Aug 8, 2013
1 parent af012e5 commit 8049691
Show file tree
Hide file tree
Showing 3 changed files with 66 additions and 0 deletions.
25 changes: 25 additions & 0 deletions ckanext/datastore/logic/action.py
Expand Up @@ -485,6 +485,31 @@ def datapusher_submit(context, data_dict):
return True


def datapusher_hook(context, data_dict):
""" Update datapusher task. This action is typically called by the
datapusher whenever the status of a job changes.
Expects a job with ``status``, ``metadata``.
"""

# TODO: use a schema to validate

p.toolkit.check_access('datapusher_submit', context, data_dict)

res_id = data_dict['metadata']['resource_id']

task = p.toolkit.get_action('task_status_show')(context, {
'entity_id': res_id,
'task_type': 'datapusher',
'key': 'datapusher'
})

task['state'] = data_dict['status']
task['last_updated'] = str(datetime.datetime.now())

p.toolkit.get_action('task_status_update')(context, task)


def _resource_exists(context, data_dict):
# Returns true if the resource exists in CKAN and in the datastore
model = _get_or_bust(context, 'model')
Expand Down
1 change: 1 addition & 0 deletions ckanext/datastore/plugin.py
Expand Up @@ -226,6 +226,7 @@ def get_actions(self):
'datastore_delete': action.datastore_delete,
'datastore_search': action.datastore_search,
'datapusher_submit': action.datapusher_submit,
'datapusher_hook': action.datapusher_hook,
'resource_show': self.resource_show_action,
}
if not self.legacy_mode:
Expand Down
40 changes: 40 additions & 0 deletions ckanext/datastore/tests/test_create.py
Expand Up @@ -2,6 +2,7 @@
import httpretty
import nose
import sys
import datetime
from nose.tools import assert_equal

import pylons
Expand Down Expand Up @@ -619,6 +620,45 @@ def test_send_datapusher_creates_task(self):

assert task['state'] == 'pending', task

def test_datapusher_hook(self):
package = model.Package.get('annakarenina')
resource = package.resources[0]

context = {
'user': self.sysadmin_user.name
}

p.toolkit.get_action('task_status_update')(context, {
'entity_id': resource.id,
'entity_type': 'resource',
'task_type': 'datapusher',
'key': 'datapusher',
'value': True,
'last_updated': str(datetime.datetime.now()),
'state': 'pending'
})

data = {
'status': 'success',
'metadata': {
'resource_id': resource.id
}
}
postparams = '%s=1' % json.dumps(data)
auth = {'Authorization': str(self.sysadmin_user.apikey)}
res = self.app.post('/api/action/datapusher_hook', params=postparams,
extra_environ=auth, status=200)
print res.body
res_dict = json.loads(res.body)

assert res_dict['success'] is True

task = tests.call_action_api(
self.app, 'task_status_show', entity_id=resource.id,
task_type='datapusher', key='datapusher')

assert task['state'] == 'success', task

def test_guess_types(self):
resource = model.Package.get('annakarenina').resources[1]

Expand Down

0 comments on commit 8049691

Please sign in to comment.