Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

adds a scoped object to store values under scope-wide names #55

Merged
merged 8 commits into from
Jul 5, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ FROM fedora
ARG VCS_REF=none
LABEL org.label-schema.vcs-ref=$VCS_REF \
org.label-schema.vcs-url="https://github.com/diana-hep/yadage"
RUN dnf install -y gcc gcc-c++ graphviz-devel ImageMagick python-devel libffi-devel openssl openssl-devel unzip nano autoconf automake libtool redhat-rpm-config make; dnf clean all
RUN dnf install -y gcc gcc-c++ graphviz-devel ImageMagick python python-devel libffi-devel openssl openssl-devel unzip nano autoconf automake libtool redhat-rpm-config make; dnf clean all
COPY . /yadage
WORKDIR /yadage
RUN curl https://bootstrap.pypa.io/get-pip.py | python -
Expand Down
8 changes: 8 additions & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,14 @@ def singlestage_cases(tmpdir,localfs_state_provider):
wflow = YadageWorkflow.createFromJSON(data,localfs_state_provider)
return wflow

@pytest.fixture()
def value_registering_workflow(tmpdir,localfs_state_provider):
'''a workflow object with horizontally scalable map stage scheduling sub-workflows'''
data = yadage.workflow_loader.workflow('workflow.yml','tests/testspecs/registering_values')
wflow = YadageWorkflow.createFromJSON(data,localfs_state_provider)
return wflow


@pytest.fixture()
def multiproc_backend():
backend = setupbackend_fromstring('multiproc:4')
Expand Down
22 changes: 22 additions & 0 deletions tests/test_schedulers.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,28 @@ def test_singlestep_cases_first(singlestage_cases, foregroundasync_backend):

assert wflow.dag.getNodeByName('hello_world').task.spec == right_taskspec

def test_value_registration(value_registering_workflow, foregroundasync_backend):
wflow = value_registering_workflow

inputdata = {'msg': 'Hello World'}
wflow.view().init(inputdata)

assert wflow.rules[0].applicable(wflow) == False
wflow.view().rules[-1].apply(wflow) #apply init
assert len(wflow.dag.nodes()) == 1 #init applied

assert wflow.rules[0].applicable(wflow) == True #first stage is good
wflow.rules[0].apply(wflow) #apply first stage

value = wflow.view().getValue('ascopedfile')
assert value
assert type(value) == dict
assert value['expression_type'] == 'stage-output-selector'
assert value['stages'] == 'hello'
assert value['output'] == 'output'
assert value['unwrap'] == True


def test_singlestep_cases_second(singlestage_cases):
wflow = singlestage_cases
inputdata = {'par': 1}
Expand Down
2 changes: 1 addition & 1 deletion tests/test_wflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ def test_serialize():
json.dumps(data)

def test_deserialize():
data = {'rules': [], 'applied': [], 'dag': {'nodes': [], 'edges': []}, 'stepsbystage': {}, 'bookkeeping': {}}
data = {'rules': [], 'applied': [], 'dag': {'nodes': [], 'edges': []}, 'stepsbystage': {}, 'bookkeeping': {}, 'values': {}}
wflow = YadageWorkflow.fromJSON(data)
assert data == wflow.json()

Expand Down
41 changes: 41 additions & 0 deletions tests/testspecs/registering_values/workflow.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
stages:
- name: hello
dependencies: [init]
scheduler:
scheduler_type: singlestep-stage
parameters:
msg: {stages: init, output: msg, unwrap: true}
outputfile: "{workdir}/hello.txt"
register_values:
ascopedfile: {
expression_type: "stage-output-selector",
stages: "hello", output: "output", unwrap: true
}
step:
process:
process_type: 'string-interpolated-cmd'
cmd: echo {msg} > {outputfile}
environment:
environment_type: 'localproc-env'
publisher:
publisher_type: 'interpolated-pub'
publish:
output: '{outputfile}'

- name: anotherstage
dependencies: [hello]
scheduler:
scheduler_type: singlestep-stage
parameters:
inputfile: {expression_type: fromvalue, key: ascopedfile}
outputfile: "{workdir}/world.txt"
step:
process:
process_type: 'string-interpolated-cmd'
cmd: cat {inputfile} | tr '[a-z]' '[A-Z]' > {outputfile}
environment:
environment_type: 'localproc-env'
publisher:
publisher_type: 'interpolated-pub'
publish:
output: '{outputfile}'
11 changes: 11 additions & 0 deletions yadage/handlers/expression_handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,3 +96,14 @@ def stage_output_selector(stageview, selection):
selection.get('unwrap', False))
log.debug('selected outputs %s', outputs)
return outputs

@expression('fromvalue')
def value_resolver(view, expression):
value = view.getValue(expression['key'])
log.debug('resolved to %s', value)
if isinstance(value,dict) and 'expression_type' in value:
log.debug('looking up expression %s', value)
return handlers[value['expression_type']](view, value)
else:
log.info('not an expression value')
return value
11 changes: 10 additions & 1 deletion yadage/handlers/scheduler_handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,11 @@ def step_or_stages(name, spec, state_provider, inputs, parameters, dependencies)
return None, None
raise RuntimeError('do not know what kind of stage spec we are dealing with. %s', spec.keys())

def register_expressions(stage, expressions):
if not expressions: return
for key, expression in expressions.items():
stage.view.addValue(key, expression)

def addStepOrWorkflow(name, stage, parameters, inputs, spec):
'''
adds a step or a sub-workflow based on a init step
Expand Down Expand Up @@ -163,6 +168,7 @@ def singlestep_stage(stage, spec):
finalized, inputs = finalize_input(parameters, stage.view)
finalized = TypedLeafs(finalized, getattr(stage.state_provider,'datamodel',None))
addStepOrWorkflow(stage.name, stage, finalized, inputs, spec)
register_expressions(stage, spec.get('register_values'))

def chunk(alist, chunksize):
'''split a list into equal-sized chunks of size chunksize'''
Expand Down Expand Up @@ -257,6 +263,8 @@ def multistep_stage(stage, spec):
finalized, inputs = finalize_input(pars, stage.view)
finalized = TypedLeafs(finalized, getattr(stage.state_provider,'datamodel',None))
addStepOrWorkflow(singlename, stage, finalized, inputs, spec)
register_expressions(stage, spec.get('register_values'))


def process_noderef(leafobj,resultscript,view):
n = view.dag.getNode(leafobj['_nodeid'])
Expand Down Expand Up @@ -300,7 +308,7 @@ def jq_stage(stage, spec):

log.info('finalized to: %s',after_post)
addStepOrWorkflow(singlename, stage, after_post, inputs, spec)

register_expressions(stage, spec.get('register_values'))

@scheduler('init-stage')
def init_stage(stage, spec):
Expand Down Expand Up @@ -329,3 +337,4 @@ def init_stage(stage, spec):
task.s(**spec['parameters'])
task.used_inputs(inputs)
stage.addStep(task)
register_expressions(stage, spec.get('register_values'))
3 changes: 3 additions & 0 deletions yadage/wflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ def __init__(self):
super(YadageWorkflow, self).__init__()
self.stepsbystage = {}
self.bookkeeping = {}
self.values = {}

def view(self, offset=''):
return WorkflowView(self, offset)
Expand All @@ -30,6 +31,7 @@ def json(self):
)
data['bookkeeping'] = self.bookkeeping
data['stepsbystage'] = self.stepsbystage
data['values'] = self.values
return data

@classmethod
Expand All @@ -43,6 +45,7 @@ def fromJSON(cls, data,
instance.applied_rules = [OffsetStage.fromJSON(x,state_provider_deserializer) for x in data['applied'] ]
instance.bookkeeping = data['bookkeeping']
instance.stepsbystage = data['stepsbystage']
instance.values = data['values']

instance.dag = adage.serialize.dag_from_json(
data['dag'],
Expand Down
14 changes: 14 additions & 0 deletions yadage/wflowview.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ def __init__(self, workflowobj, offset=''):
self.offset = offset
self.steps = JsonPointer(self.offset).resolve(workflowobj.stepsbystage)
self.bookkeeper = JsonPointer(self.offset).resolve(workflowobj.bookkeeping)
self.values = JsonPointer(self.offset).resolve(workflowobj.values)

def view(self, offset):
'''
Expand Down Expand Up @@ -108,6 +109,15 @@ def addRule(self, rule, offset=''):
thisoffset.resolve(self.bookkeeper)['_meta']['stages'] += [offsetstage.identifier]
return offsetstage.identifier

def addValue(self, key, value):
v = self.values.setdefault('_values', {})
if key in v:
raise RuntimeError('cannot overwrite value')
v[key] = value

def getValue(self, key):
return self.values['_values'][key]

def addStep(self, task, stage, depends_on=None):
'''
adds a node to the DAG connecting it to the passed depending nodes
Expand Down Expand Up @@ -146,6 +156,10 @@ def addWorkflow(self, rules, stage=None):
offset = JsonPointer.from_parts([stage, len(self.steps[stage]) - 1]).path
self.steps[stage][-1]['_offset'] = offset

self.values.setdefault(stage,[]).append({})
offset = JsonPointer.from_parts([stage, len(self.values[stage]) - 1]).path
self.values[stage][-1] = {}

for rule in rules:
self.addRule(rule, offset)

Expand Down