|
|
@@ -1,21 +1,43 @@ |
|
|
import logging
|
|
|
import os
|
|
|
import json
|
|
|
from six import StringIO
|
|
|
|
|
|
import jwt
|
|
|
import requests
|
|
|
from six import StringIO
|
|
|
from dpp_runner.lib import DppRunner
|
|
|
|
|
|
from datapackage import Package
|
|
|
|
|
|
from conductor.blueprints.user.controllers import PUBLIC_KEY
|
|
|
from .models import package_registry
|
|
|
|
|
|
os_api_url = os.environ.get('OS_API_URL')
|
|
|
runner = DppRunner()
|
|
|
|
|
|
def copy_except(obj, fields):
|
|
|
return dict(
|
|
|
(k, v)
|
|
|
for k, v in obj.items()
|
|
|
if k not in fields
|
|
|
)
|
|
|
|
|
|
def upload(datapackage, callback, token, cache_set):
|
|
|
|
|
|
def prepare_field(field):
|
|
|
ret = {
|
|
|
'header': field['name'],
|
|
|
'osType': field['osType'],
|
|
|
}
|
|
|
if 'title' in field:
|
|
|
ret['title'] = field['title']
|
|
|
ret['options'] = copy_except(field,
|
|
|
('name', 'title', 'osType', 'type'))
|
|
|
return ret
|
|
|
|
|
|
|
|
|
def upload(datapackage, token, cache_get, cache_set):
|
|
|
"""Initiate a package load to the database
|
|
|
:param datapackage: URL for datapackage to load
|
|
|
:param callback: URL for callback to send to loader
|
|
|
:param token: authentication token for user performing the upload
|
|
|
"""
|
|
|
try:
|
|
|
@@ -32,25 +54,33 @@ def upload(datapackage, callback, token, cache_set): |
|
|
"error": 'unauthorized'
|
|
|
}
|
|
|
else:
|
|
|
params = {
|
|
|
'package': datapackage,
|
|
|
'callback': callback
|
|
|
key = 'os-conductor:package:'+datapackage
|
|
|
ret = {
|
|
|
"progress": 0,
|
|
|
"status": "queued"
|
|
|
}
|
|
|
load_url = os_api_url+'/api/3/loader/'
|
|
|
response = requests.get(load_url, params=params)
|
|
|
if response.status_code == 200:
|
|
|
key = 'os-conductor:package:'+datapackage
|
|
|
ret = {
|
|
|
"progress": 0,
|
|
|
"status": "queued"
|
|
|
}
|
|
|
else:
|
|
|
ret = {
|
|
|
"status": "fail",
|
|
|
"error": 'HTTP %s' % response.status_code
|
|
|
}
|
|
|
if key is not None:
|
|
|
cache_set(key, ret, 3600)
|
|
|
package = Package(datapackage)
|
|
|
desc = package.descriptor
|
|
|
|
|
|
fiscal_spec = {
|
|
|
'dataset-name:': desc['name'],
|
|
|
'resource-name': package.resources[0].name,
|
|
|
'title': desc.get('title', desc['name']),
|
|
|
'owner-id': token['userid'],
|
|
|
'sources': [
|
|
|
{
|
|
|
'url': package.resources[0].source
|
|
|
}
|
|
|
],
|
|
|
'fields': [
|
|
|
prepare_field(f)
|
|
|
for f in package.resources[0].descriptor['schema']['fields']
|
|
|
if 'osType' in f
|
|
|
]
|
|
|
}
|
|
|
runner.start('fiscal', json.dumps(fiscal_spec).encode('utf8'),
|
|
|
verbosity=2, status_cb=StatusCallback(datapackage, cache_get, cache_set))
|
|
|
return ret
|
|
|
|
|
|
|
|
|
@@ -60,24 +90,44 @@ def upload_status(datapackage, cache_get): |
|
|
return ret
|
|
|
|
|
|
|
|
|
def upload_status_update(datapackage, status, error,
|
|
|
progress, cache_get, cache_set):
|
|
|
logging.error('upload_status_update: %s sts:%s, err:%s, prog:%s',
|
|
|
datapackage, status, error, progress)
|
|
|
if datapackage is not None and status is not None:
|
|
|
key = 'os-conductor:package:'+datapackage
|
|
|
ret = cache_get(key)
|
|
|
class StatusCallback:
|
|
|
|
|
|
def __init__(self, datapackage_url, cache_get, cache_set):
|
|
|
self.datapackage_url = datapackage_url
|
|
|
self.cache_get = cache_get
|
|
|
self.cache_set = cache_set
|
|
|
self.statuses = {}
|
|
|
self.error = None
|
|
|
|
|
|
def status(self):
|
|
|
statuses = self.statuses.values()
|
|
|
if 'FAILED' in statuses:
|
|
|
return 'fail'
|
|
|
if 'INPROGRESS' in statuses:
|
|
|
return 'loading-data'
|
|
|
return 'done'
|
|
|
|
|
|
def __call__(self, pipeline_id, status, errors=None, stats=None):
|
|
|
logging.error('upload_status_update: %s pipeline:%s, status:%s, err:%s, stats:%s',
|
|
|
self.datapackage_url, pipeline_id, status, errors, stats)
|
|
|
key = 'os-conductor:package:'+self.datapackage_url
|
|
|
ret = self.cache_get(key)
|
|
|
if ret is None:
|
|
|
ret = {
|
|
|
'status': status,
|
|
|
'progress': 0
|
|
|
}
|
|
|
if progress is not None:
|
|
|
ret['progress'] = int(progress)
|
|
|
if status == 'fail' and error is not None:
|
|
|
ret['error'] = error
|
|
|
ret['status'] = status
|
|
|
cache_set(key, ret, 3600)
|
|
|
if status == 'FAILED' and errors is not None and self.error is None:
|
|
|
self.error = '\n'.join(errors)
|
|
|
ret['error'] = self.error
|
|
|
self.statuses[pipeline_id] = status
|
|
|
ret['status'] = self.status()
|
|
|
if ret['status'] == 'done':
|
|
|
if stats is not None:
|
|
|
progress = stats.get('count_of_rows')
|
|
|
if progress:
|
|
|
ret['progress'] = int(progress)
|
|
|
self.cache_set(key, ret, 3600)
|
|
|
|
|
|
|
|
|
def toggle_publish(name, token, toggle=False, publish=False):
|
|
|
|
0 comments on commit
4f195b7