diff --git a/conductor/blueprints/package/blueprint.py b/conductor/blueprints/package/blueprint.py index 3d64291..2588e8c 100644 --- a/conductor/blueprints/package/blueprint.py +++ b/conductor/blueprints/package/blueprint.py @@ -2,7 +2,7 @@ import os import json -from flask import Blueprint, Response, request, abort, url_for +from flask import Blueprint, Response, request, abort from flask.ext.jsonpify import jsonpify from werkzeug.contrib.cache import MemcachedCache, SimpleCache diff --git a/conductor/blueprints/package/controllers.py b/conductor/blueprints/package/controllers.py index f577784..e4e40bd 100644 --- a/conductor/blueprints/package/controllers.py +++ b/conductor/blueprints/package/controllers.py @@ -1,3 +1,4 @@ +import email.utils import logging import os import json @@ -60,18 +61,42 @@ def prepare_field(field, slugs): return ret +def make_upload_complete_callback(name, token): + """Callback function when upload is complete.""" + def on_upload_complete_callback(name=name, token=token): + # Make package private + toggle_publish(name, token, toggle=False, publish=False) + + # Obfuscate email in author field + name, datapackage_url, datapackage, \ + model, dataset_name, author,\ + status, loaded = package_registry.get_raw(name) + + # Get the full name from the author field, and rewrite it without + # domain in the email + fullname, email_addr = email.utils.parseaddr(datapackage['author']) + email_addr = '{0}@not.shown'.format(email_addr.split('@')[0]) + datapackage['author'] = '{0} <{1}>'.format(fullname, email_addr) + + package_registry.save_model(name, datapackage_url, datapackage, + model, dataset_name, author, + status, loaded) + + return on_upload_complete_callback + + def upload(datapackage, token, cache_get, cache_set): """Initiate a package load to the database :param datapackage: URL for datapackage to load :param token: authentication token for user performing the upload """ + encoded_token = token try: token = jwt.decode(token.encode('ascii'), PUBLIC_KEY, algorithm='RS256') except jwt.InvalidTokenError: token = None - key = None if token is None: ret = { @@ -108,9 +133,16 @@ def upload(datapackage, token, cache_get, cache_set): if 'osType' in f ] } - status_cb = StatusCallback(datapackage, cache_get, cache_set) + package_id = '{0}:{1}'.format(token['userid'], + slugify(desc['name'], + separator='_', + to_lower=True)) + on_upload_complete_callback = \ + make_upload_complete_callback(package_id, encoded_token) + status_cb = StatusCallback(datapackage, cache_get, cache_set, + on_upload_complete_callback) runner.start('fiscal', json.dumps(fiscal_spec).encode('utf8'), - verbosity=2, status_cb=status_cb) + verbosity=0, status_cb=status_cb) except Exception as e: ret = { "status": "fail", @@ -128,12 +160,14 @@ def upload_status(datapackage, cache_get): class StatusCallback: - def __init__(self, datapackage_url, cache_get, cache_set): + def __init__(self, datapackage_url, cache_get, cache_set, + complete_callback): self.datapackage_url = datapackage_url self.cache_get = cache_get self.cache_set = cache_set self.statuses = {} self.error = None + self.on_complete_callback = complete_callback def status(self): statuses = self.statuses.values() @@ -168,6 +202,7 @@ def __call__(self, pipeline_id, status, errors=None, stats=None): progress = stats.get('count_of_rows') if progress: ret['progress'] = int(progress) + self.on_complete_callback() self.cache_set(key, ret, 3600)