Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

contextualize in packtivity instead of yadage -- it is a job-builder … #23

Merged
merged 4 commits into from
Dec 6, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 1 addition & 10 deletions packtivity/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,6 @@

log = logging.getLogger(__name__)

def finalize_input(jsondata,state):
for path,value in utils.leaf_iterator(jsondata):
actualval = state.contextualize_data(value)
path.set(jsondata,actualval)
return jsondata

def getinit_data(initfiles,parameters):
'''
get initial data from both a list of files and a list of 'pname=pvalue'
Expand All @@ -38,7 +32,6 @@ def getinit_data(initfiles,parameters):
@click.option('--parameter', '-p', multiple=True)
@click.option('-r', '--read', multiple=True, default = [])
@click.option('-w', '--write', multiple=True, default = [os.curdir])
@click.option('--contextualize/--no-contextualize', default = True)
@click.option('-s','--state', default = '')
@click.option('-t','--toplevel', default = os.getcwd())
@click.option('-c','--schemasource', default = yadageschemas.schemadir)
Expand All @@ -49,7 +42,7 @@ def getinit_data(initfiles,parameters):
@click.option('-x','--proxyfile',default = 'proxy.json')
@click.argument('spec')
@click.argument('parfiles', nargs = -1)
def runcli(spec,parfiles,state,parameter,read,write,toplevel,schemasource,asyncwait,contextualize,validate,verbosity,backend,proxyfile):
def runcli(spec,parfiles,state,parameter,read,write,toplevel,schemasource,asyncwait,validate,verbosity,backend,proxyfile):
logging.basicConfig(level = getattr(logging,verbosity))

spec = utils.load_packtivity(spec,toplevel,schemasource,validate)
Expand All @@ -62,8 +55,6 @@ def runcli(spec,parfiles,state,parameter,read,write,toplevel,schemasource,asyncw
state = LocalFSState(state['readwrite'],state['readonly'])
state.ensure()

if contextualize:
parameters = finalize_input(parameters,state)

is_sync, backend = bkutils.backend_from_string(backend)
backend_kwargs = {
Expand Down
18 changes: 17 additions & 1 deletion packtivity/syncbackends.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@

import packtivity.logutils as logutils
from packtivity.handlers import enable_plugins
from .utils import leaf_iterator

enable_plugins()

class packconfig(object):
Expand Down Expand Up @@ -64,29 +66,43 @@ def publish(publisher,parameters,state, pack_config):
handler = pub_handlers[pub_type][impl]
return handler(publisher,parameters,state)


def contextualize_parameters(parameters, state):
if not state: return parameters

contextualized_parameters = copy.deepcopy(parameters)
for leaf_pointer, leaf_value in leaf_iterator(parameters):
leaf_pointer.set(contextualized_parameters,state.contextualize_data(leaf_value))

return contextualized_parameters

def prepublish(spec, parameters, state, pack_config):
'''
attempts to prepublish output data, returns None if not possible
'''
parameters = contextualize_parameters(parameters, state)
pub = spec['publisher']

if pub['publisher_type'] in ['frompar-pub','constant-pub']:
return publish(pub,parameters,state,pack_config)
if pub['publisher_type'] in ['interpolated-pub', 'fromparjq-pub']:
from .statecontexts.posixfs_context import LocalFSState
if not state:
return publish(pub,parameters,state,pack_config)
if type(state) == LocalFSState:
if pub['glob'] == False or len(state.readwrite)==0:
if pub['glob'] == False or len(state.readwrite)==0:
return publish(pub,parameters,state,pack_config)
return None

def run_packtivity(spec, parameters,state,metadata,config):
with logutils.setup_logging_topic(metadata,state,'step',return_logger = True) as log:
try:
parameters = contextualize_parameters(parameters, state)
if spec['process'] and spec['environment']:
job = build_job(spec['process'], parameters, state, config)
env = build_env(spec['environment'], parameters, state, config)
run_in_env(env,job,state,metadata,config)

pubdata = publish(spec['publisher'], parameters,state, config)
log.info('publishing data: %s',pubdata)
return pubdata
Expand Down
5 changes: 2 additions & 3 deletions tests/test_asyncbackends.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ def test_create_multiproc():

def test_multiproc(tmpdir,basic_localfs_state,localproc_packspec):
basic_localfs_state.ensure()
pars = {'outputfile': basic_localfs_state.contextualize_data('{workdir}/helloworld.txt')}
pars = {'outputfile': '{workdir}/helloworld.txt'}
backend = MultiProcBackend(2)
proxy = backend.submit(localproc_packspec,pars,basic_localfs_state)
while not backend.ready(proxy):
Expand All @@ -17,12 +17,11 @@ def test_multiproc(tmpdir,basic_localfs_state,localproc_packspec):

def test_multiproc_fail(tmpdir,basic_localfs_state,localproc_pack_fail):
basic_localfs_state.ensure()
pars = {'outputfile': basic_localfs_state.contextualize_data('{workdir}/helloworld.txt')}
pars = {'outputfile': '{workdir}/helloworld.txt'}

backend = MultiProcBackend(2)
proxy = backend.submit(localproc_pack_fail,pars,basic_localfs_state)
while not backend.ready(proxy):
pass
assert backend.successful(proxy) == False
backend.fail_info(proxy)

18 changes: 9 additions & 9 deletions tests/test_main.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,58 +3,58 @@

def test_pack_call_local(tmpdir,basic_localfs_state,localproc_pack):
basic_localfs_state.ensure()
pars = {'outputfile': basic_localfs_state.contextualize_data('{workdir}/helloworld.txt')}
pars = {'outputfile': '{workdir}/helloworld.txt'}
localproc_pack(parameters = pars, state = basic_localfs_state)
assert tmpdir.join('helloworld.txt').check()

def test_pack_call_docker(tmpdir,basic_localfs_state,dockeproc_pack):
basic_localfs_state.ensure()
pars = {'outputfile': basic_localfs_state.contextualize_data('{workdir}/helloworld.txt')}
pars = {'outputfile': '{workdir}/helloworld.txt'}
dockeproc_pack(parameters = pars, state = basic_localfs_state)
assert tmpdir.join('helloworld.txt').check()

def test_pack_call_local_fail(tmpdir,basic_localfs_state,localproc_pack_fail,default_async):
basic_localfs_state.ensure()
pars = {'outputfile': basic_localfs_state.contextualize_data('{workdir}/helloworld.txt')}
pars = {'outputfile': '{workdir}/helloworld.txt'}
with pytest.raises(RuntimeError):
localproc_pack_fail(parameters = pars, state = basic_localfs_state)
assert tmpdir.join('helloworld.txt').check()

def test_pack_call_docker_fail(tmpdir,basic_localfs_state,docker_pack_fail,default_async):
basic_localfs_state.ensure()
pars = {'outputfile': basic_localfs_state.contextualize_data('{workdir}/helloworld.txt')}
pars = {'outputfile': '{workdir}/helloworld.txt'}
with pytest.raises(RuntimeError):
docker_pack_fail(parameters = pars, state = basic_localfs_state)

def test_pack_call_docker_script_fail(tmpdir,basic_localfs_state,docker_script_pack_fail,default_async):
basic_localfs_state.ensure()
pars = {'outputfile': basic_localfs_state.contextualize_data('{workdir}/helloworld.txt')}
pars = {'outputfile': '{workdir}/helloworld.txt'}
with pytest.raises(RuntimeError):
docker_script_pack_fail(parameters = pars, state = basic_localfs_state)

def test_pack_call_docker_script(tmpdir,basic_localfs_state,dockeproc_script_pack):
basic_localfs_state.ensure()
pars = {'outputfile': basic_localfs_state.contextualize_data('{workdir}/helloworld.txt')}
pars = {'outputfile': '{workdir}/helloworld.txt'}
dockeproc_script_pack(parameters = pars, state = basic_localfs_state)
assert tmpdir.join('helloworld.txt').check()

def test_pack_call_docker_async(tmpdir,basic_localfs_state,dockeproc_script_pack,default_async):
basic_localfs_state.ensure()
pars = {'outputfile': basic_localfs_state.contextualize_data('{workdir}/helloworld.txt')}
pars = {'outputfile': '{workdir}/helloworld.txt'}
dockeproc_script_pack(parameters = pars, state = basic_localfs_state, asyncbackend = default_async, asyncwait = True)
assert tmpdir.join('helloworld.txt').check()

def test_pack_call_docker_script_async(tmpdir,basic_localfs_state,dockeproc_script_pack,default_async):
basic_localfs_state.ensure()
pars = {'outputfile': basic_localfs_state.contextualize_data('{workdir}/helloworld.txt')}
pars = {'outputfile': '{workdir}/helloworld.txt'}
proxy = dockeproc_script_pack(parameters = pars, state = basic_localfs_state, asyncbackend = default_async)
while not default_async.ready(proxy): pass
default_async.result(proxy)
assert tmpdir.join('helloworld.txt').check()

def test_pack_prepublish(tmpdir,basic_localfs_state,localproc_pack,default_sync):
basic_localfs_state.ensure()
pars = {'outputfile': basic_localfs_state.contextualize_data('{workdir}/helloworld.txt')}
pars = {'outputfile': '{workdir}/helloworld.txt'}

assert default_sync.prepublish(localproc_pack.spec,pars,basic_localfs_state) == {
'output': str(tmpdir.join('helloworld.txt'))
Expand Down