Skip to content

Commit

Permalink
Merge branch '940-update-task-status' into 938-datapusher
Browse files Browse the repository at this point in the history
  • Loading branch information
domoritz committed Aug 8, 2013
2 parents 8dde66e + b7f21ed commit 13854a2
Show file tree
Hide file tree
Showing 3 changed files with 77 additions and 7 deletions.
30 changes: 30 additions & 0 deletions ckanext/datastore/logic/action.py
Expand Up @@ -461,10 +461,15 @@ def datapusher_submit(context, data_dict):
if not datapusher_url:
return False

callback_url = p.toolkit.url_for(
controller='api', action='action', logic_function='datapusher_hook',
ver=3, qualified=True)

user = p.toolkit.get_action('user_show')(context, {'id': context['user']})
requests.post(urlparse.urljoin(datapusher_url, 'job'), data=json.dumps({
'api_key': user['apikey'],
'job_type': 'push_to_datastore',
'result_url': callback_url,
'metadata': {
'ckan_url': pylons.config['ckan.site_url'],
'resource_id': res_id
Expand All @@ -484,6 +489,31 @@ def datapusher_submit(context, data_dict):
return True


def datapusher_hook(context, data_dict):
""" Update datapusher task. This action is typically called by the
datapusher whenever the status of a job changes.
Expects a job with ``status``, ``metadata``.
"""

# TODO: use a schema to validate

p.toolkit.check_access('datapusher_submit', context, data_dict)

res_id = data_dict['metadata']['resource_id']

task = p.toolkit.get_action('task_status_show')(context, {
'entity_id': res_id,
'task_type': 'datapusher',
'key': 'datapusher'
})

task['state'] = data_dict['status']
task['last_updated'] = str(datetime.datetime.now())

p.toolkit.get_action('task_status_update')(context, task)


def _resource_exists(context, data_dict):
# Returns true if the resource exists in CKAN and in the datastore
model = _get_or_bust(context, 'model')
Expand Down
1 change: 1 addition & 0 deletions ckanext/datastore/plugin.py
Expand Up @@ -226,6 +226,7 @@ def get_actions(self):
'datastore_delete': action.datastore_delete,
'datastore_search': action.datastore_search,
'datapusher_submit': action.datapusher_submit,
'datapusher_hook': action.datapusher_hook,
'resource_show': self.resource_show_action,
}
if not self.legacy_mode:
Expand Down
53 changes: 46 additions & 7 deletions ckanext/datastore/tests/test_create.py
Expand Up @@ -2,6 +2,7 @@
import httpretty
import nose
import sys
import datetime
from nose.tools import assert_equal

import pylons
Expand Down Expand Up @@ -562,18 +563,17 @@ def test_providing_res_with_url_calls_datapusher_correctly(self):
body=json.dumps({'job_id': 'foo', 'job_key': 'bar'}))

package = model.Package.get('annakarenina')
data = {
'resource': {'package_id': package.id, 'url': 'demo.ckan.org'}
}
postparams = '%s=1' % json.dumps(data)
auth = {'Authorization': str(self.sysadmin_user.apikey)}
self.app.post('/api/action/datastore_create', params=postparams,
extra_environ=auth, status=200)

tests.call_action_api(
self.app, 'datastore_create', apikey=self.sysadmin_user.apikey,
resource=dict(package_id=package.id, url='demo.ckan.org'))

assert len(package.resources) == 4, len(package.resources)
resource = package.resources[3]
data = json.loads(httpretty.last_request().body)
assert data['metadata']['resource_id'] == resource.id, data
assert data['result_url'].endswith('/action/datapusher_hook'), data
assert data['result_url'].startswith('http://'), data

def test_cant_provide_resource_and_resource_id(self):
package = model.Package.get('annakarenina')
Expand Down Expand Up @@ -618,6 +618,45 @@ def test_send_datapusher_creates_task(self):

assert task['state'] == 'pending', task

def test_datapusher_hook(self):
package = model.Package.get('annakarenina')
resource = package.resources[0]

context = {
'user': self.sysadmin_user.name
}

p.toolkit.get_action('task_status_update')(context, {
'entity_id': resource.id,
'entity_type': 'resource',
'task_type': 'datapusher',
'key': 'datapusher',
'value': True,
'last_updated': str(datetime.datetime.now()),
'state': 'pending'
})

data = {
'status': 'success',
'metadata': {
'resource_id': resource.id
}
}
postparams = '%s=1' % json.dumps(data)
auth = {'Authorization': str(self.sysadmin_user.apikey)}
res = self.app.post('/api/action/datapusher_hook', params=postparams,
extra_environ=auth, status=200)
print res.body
res_dict = json.loads(res.body)

assert res_dict['success'] is True

task = tests.call_action_api(
self.app, 'task_status_show', entity_id=resource.id,
task_type='datapusher', key='datapusher')

assert task['state'] == 'success', task

def test_guess_types(self):
resource = model.Package.get('annakarenina').resources[1]

Expand Down

0 comments on commit 13854a2

Please sign in to comment.