Skip to content

Commit

Permalink
Merge branch '1350/post-upload-tasks'
Browse files Browse the repository at this point in the history
  • Loading branch information
brew committed Apr 20, 2018
2 parents be2235f + 8e8b26c commit 336ff89
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 5 deletions.
2 changes: 1 addition & 1 deletion conductor/blueprints/package/blueprint.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import os
import json

from flask import Blueprint, Response, request, abort, url_for
from flask import Blueprint, Response, request, abort
from flask.ext.jsonpify import jsonpify
from werkzeug.contrib.cache import MemcachedCache, SimpleCache

Expand Down
43 changes: 39 additions & 4 deletions conductor/blueprints/package/controllers.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import email.utils
import logging
import os
import json
Expand Down Expand Up @@ -60,18 +61,42 @@ def prepare_field(field, slugs):
return ret


def make_upload_complete_callback(name, token):
"""Callback function when upload is complete."""
def on_upload_complete_callback(name=name, token=token):
# Make package private
toggle_publish(name, token, toggle=False, publish=False)

# Obfuscate email in author field
name, datapackage_url, datapackage, \
model, dataset_name, author,\
status, loaded = package_registry.get_raw(name)

# Get the full name from the author field, and rewrite it without
# domain in the email
fullname, email_addr = email.utils.parseaddr(datapackage['author'])
email_addr = '{0}@not.shown'.format(email_addr.split('@')[0])
datapackage['author'] = '{0} <{1}>'.format(fullname, email_addr)

package_registry.save_model(name, datapackage_url, datapackage,
model, dataset_name, author,
status, loaded)

return on_upload_complete_callback


def upload(datapackage, token, cache_get, cache_set):
"""Initiate a package load to the database
:param datapackage: URL for datapackage to load
:param token: authentication token for user performing the upload
"""
encoded_token = token
try:
token = jwt.decode(token.encode('ascii'),
PUBLIC_KEY,
algorithm='RS256')
except jwt.InvalidTokenError:
token = None

key = None
if token is None:
ret = {
Expand Down Expand Up @@ -108,9 +133,16 @@ def upload(datapackage, token, cache_get, cache_set):
if 'osType' in f
]
}
status_cb = StatusCallback(datapackage, cache_get, cache_set)
package_id = '{0}:{1}'.format(token['userid'],
slugify(desc['name'],
separator='_',
to_lower=True))
on_upload_complete_callback = \
make_upload_complete_callback(package_id, encoded_token)
status_cb = StatusCallback(datapackage, cache_get, cache_set,
on_upload_complete_callback)
runner.start('fiscal', json.dumps(fiscal_spec).encode('utf8'),
verbosity=2, status_cb=status_cb)
verbosity=0, status_cb=status_cb)
except Exception as e:
ret = {
"status": "fail",
Expand All @@ -128,12 +160,14 @@ def upload_status(datapackage, cache_get):

class StatusCallback:

def __init__(self, datapackage_url, cache_get, cache_set):
def __init__(self, datapackage_url, cache_get, cache_set,
complete_callback):
self.datapackage_url = datapackage_url
self.cache_get = cache_get
self.cache_set = cache_set
self.statuses = {}
self.error = None
self.on_complete_callback = complete_callback

def status(self):
statuses = self.statuses.values()
Expand Down Expand Up @@ -168,6 +202,7 @@ def __call__(self, pipeline_id, status, errors=None, stats=None):
progress = stats.get('count_of_rows')
if progress:
ret['progress'] = int(progress)
self.on_complete_callback()
self.cache_set(key, ret, 3600)


Expand Down

0 comments on commit 336ff89

Please sign in to comment.