diff --git a/packtivity/cli.py b/packtivity/cli.py index 501d0fe..4357630 100644 --- a/packtivity/cli.py +++ b/packtivity/cli.py @@ -13,12 +13,6 @@ log = logging.getLogger(__name__) -def finalize_input(jsondata,state): - for path,value in utils.leaf_iterator(jsondata): - actualval = state.contextualize_data(value) - path.set(jsondata,actualval) - return jsondata - def getinit_data(initfiles,parameters): ''' get initial data from both a list of files and a list of 'pname=pvalue' @@ -38,7 +32,6 @@ def getinit_data(initfiles,parameters): @click.option('--parameter', '-p', multiple=True) @click.option('-r', '--read', multiple=True, default = []) @click.option('-w', '--write', multiple=True, default = [os.curdir]) -@click.option('--contextualize/--no-contextualize', default = True) @click.option('-s','--state', default = '') @click.option('-t','--toplevel', default = os.getcwd()) @click.option('-c','--schemasource', default = yadageschemas.schemadir) @@ -49,7 +42,7 @@ def getinit_data(initfiles,parameters): @click.option('-x','--proxyfile',default = 'proxy.json') @click.argument('spec') @click.argument('parfiles', nargs = -1) -def runcli(spec,parfiles,state,parameter,read,write,toplevel,schemasource,asyncwait,contextualize,validate,verbosity,backend,proxyfile): +def runcli(spec,parfiles,state,parameter,read,write,toplevel,schemasource,asyncwait,validate,verbosity,backend,proxyfile): logging.basicConfig(level = getattr(logging,verbosity)) spec = utils.load_packtivity(spec,toplevel,schemasource,validate) @@ -62,8 +55,6 @@ def runcli(spec,parfiles,state,parameter,read,write,toplevel,schemasource,asyncw state = LocalFSState(state['readwrite'],state['readonly']) state.ensure() - if contextualize: - parameters = finalize_input(parameters,state) is_sync, backend = bkutils.backend_from_string(backend) backend_kwargs = { diff --git a/packtivity/syncbackends.py b/packtivity/syncbackends.py index 271f6a3..83be786 100644 --- a/packtivity/syncbackends.py +++ b/packtivity/syncbackends.py @@ -5,6 +5,8 @@ import packtivity.logutils as logutils from packtivity.handlers import enable_plugins +from utils import leaf_iterator + enable_plugins() class packconfig(object): @@ -64,11 +66,23 @@ def publish(publisher,parameters,state, pack_config): handler = pub_handlers[pub_type][impl] return handler(publisher,parameters,state) + +def contextualize_parameters(parameters, state): + if not state: return parameters + + contextualized_parameters = copy.deepcopy(parameters) + for leaf_pointer, leaf_value in leaf_iterator(parameters): + leaf_pointer.set(contextualized_parameters,state.contextualize_data(leaf_value)) + + return contextualized_parameters + def prepublish(spec, parameters, state, pack_config): ''' attempts to prepublish output data, returns None if not possible ''' + parameters = contextualize_parameters(parameters, state) pub = spec['publisher'] + if pub['publisher_type'] in ['frompar-pub','constant-pub']: return publish(pub,parameters,state,pack_config) if pub['publisher_type'] in ['interpolated-pub', 'fromparjq-pub']: @@ -76,17 +90,25 @@ def prepublish(spec, parameters, state, pack_config): if not state: return publish(pub,parameters,state,pack_config) if type(state) == LocalFSState: - if pub['glob'] == False or len(state.readwrite)==0: + print state.readwrite + print (pub['glob'] == False), len(state.readwrite)==0 + if pub['glob'] == False or len(state.readwrite)==0: + print 'CAN PREPUBLISH!!' return publish(pub,parameters,state,pack_config) return None def run_packtivity(spec, parameters,state,metadata,config): with logutils.setup_logging_topic(metadata,state,'step',return_logger = True) as log: try: + parameters = contextualize_parameters(parameters, state) + log.info('contextualized... %s', parameters) if spec['process'] and spec['environment']: + log.info('run..... %s', parameters) job = build_job(spec['process'], parameters, state, config) env = build_env(spec['environment'], parameters, state, config) run_in_env(env,job,state,metadata,config) + log.info('done..... %s', parameters) + pubdata = publish(spec['publisher'], parameters,state, config) log.info('publishing data: %s',pubdata) return pubdata diff --git a/tests/test_asyncbackends.py b/tests/test_asyncbackends.py index 8c7051f..27c34aa 100644 --- a/tests/test_asyncbackends.py +++ b/tests/test_asyncbackends.py @@ -5,7 +5,7 @@ def test_create_multiproc(): def test_multiproc(tmpdir,basic_localfs_state,localproc_packspec): basic_localfs_state.ensure() - pars = {'outputfile': basic_localfs_state.contextualize_data('{workdir}/helloworld.txt')} + pars = {'outputfile': '{workdir}/helloworld.txt'} backend = MultiProcBackend(2) proxy = backend.submit(localproc_packspec,pars,basic_localfs_state) while not backend.ready(proxy): @@ -17,7 +17,7 @@ def test_multiproc(tmpdir,basic_localfs_state,localproc_packspec): def test_multiproc_fail(tmpdir,basic_localfs_state,localproc_pack_fail): basic_localfs_state.ensure() - pars = {'outputfile': basic_localfs_state.contextualize_data('{workdir}/helloworld.txt')} + pars = {'outputfile': '{workdir}/helloworld.txt'} backend = MultiProcBackend(2) proxy = backend.submit(localproc_pack_fail,pars,basic_localfs_state) @@ -25,4 +25,3 @@ def test_multiproc_fail(tmpdir,basic_localfs_state,localproc_pack_fail): pass assert backend.successful(proxy) == False backend.fail_info(proxy) - diff --git a/tests/test_main.py b/tests/test_main.py index 6b35875..a290a21 100644 --- a/tests/test_main.py +++ b/tests/test_main.py @@ -3,50 +3,50 @@ def test_pack_call_local(tmpdir,basic_localfs_state,localproc_pack): basic_localfs_state.ensure() - pars = {'outputfile': basic_localfs_state.contextualize_data('{workdir}/helloworld.txt')} + pars = {'outputfile': '{workdir}/helloworld.txt'} localproc_pack(parameters = pars, state = basic_localfs_state) assert tmpdir.join('helloworld.txt').check() def test_pack_call_docker(tmpdir,basic_localfs_state,dockeproc_pack): basic_localfs_state.ensure() - pars = {'outputfile': basic_localfs_state.contextualize_data('{workdir}/helloworld.txt')} + pars = {'outputfile': '{workdir}/helloworld.txt'} dockeproc_pack(parameters = pars, state = basic_localfs_state) assert tmpdir.join('helloworld.txt').check() def test_pack_call_local_fail(tmpdir,basic_localfs_state,localproc_pack_fail,default_async): basic_localfs_state.ensure() - pars = {'outputfile': basic_localfs_state.contextualize_data('{workdir}/helloworld.txt')} + pars = {'outputfile': '{workdir}/helloworld.txt'} with pytest.raises(RuntimeError): localproc_pack_fail(parameters = pars, state = basic_localfs_state) assert tmpdir.join('helloworld.txt').check() def test_pack_call_docker_fail(tmpdir,basic_localfs_state,docker_pack_fail,default_async): basic_localfs_state.ensure() - pars = {'outputfile': basic_localfs_state.contextualize_data('{workdir}/helloworld.txt')} + pars = {'outputfile': '{workdir}/helloworld.txt'} with pytest.raises(RuntimeError): docker_pack_fail(parameters = pars, state = basic_localfs_state) def test_pack_call_docker_script_fail(tmpdir,basic_localfs_state,docker_script_pack_fail,default_async): basic_localfs_state.ensure() - pars = {'outputfile': basic_localfs_state.contextualize_data('{workdir}/helloworld.txt')} + pars = {'outputfile': '{workdir}/helloworld.txt'} with pytest.raises(RuntimeError): docker_script_pack_fail(parameters = pars, state = basic_localfs_state) def test_pack_call_docker_script(tmpdir,basic_localfs_state,dockeproc_script_pack): basic_localfs_state.ensure() - pars = {'outputfile': basic_localfs_state.contextualize_data('{workdir}/helloworld.txt')} + pars = {'outputfile': '{workdir}/helloworld.txt'} dockeproc_script_pack(parameters = pars, state = basic_localfs_state) assert tmpdir.join('helloworld.txt').check() def test_pack_call_docker_async(tmpdir,basic_localfs_state,dockeproc_script_pack,default_async): basic_localfs_state.ensure() - pars = {'outputfile': basic_localfs_state.contextualize_data('{workdir}/helloworld.txt')} + pars = {'outputfile': '{workdir}/helloworld.txt'} dockeproc_script_pack(parameters = pars, state = basic_localfs_state, asyncbackend = default_async, asyncwait = True) assert tmpdir.join('helloworld.txt').check() def test_pack_call_docker_script_async(tmpdir,basic_localfs_state,dockeproc_script_pack,default_async): basic_localfs_state.ensure() - pars = {'outputfile': basic_localfs_state.contextualize_data('{workdir}/helloworld.txt')} + pars = {'outputfile': '{workdir}/helloworld.txt'} proxy = dockeproc_script_pack(parameters = pars, state = basic_localfs_state, asyncbackend = default_async) while not default_async.ready(proxy): pass default_async.result(proxy) @@ -54,7 +54,7 @@ def test_pack_call_docker_script_async(tmpdir,basic_localfs_state,dockeproc_scri def test_pack_prepublish(tmpdir,basic_localfs_state,localproc_pack,default_sync): basic_localfs_state.ensure() - pars = {'outputfile': basic_localfs_state.contextualize_data('{workdir}/helloworld.txt')} + pars = {'outputfile': '{workdir}/helloworld.txt'} assert default_sync.prepublish(localproc_pack.spec,pars,basic_localfs_state) == { 'output': str(tmpdir.join('helloworld.txt'))