diff --git a/queenbee/job/job.py b/queenbee/job/job.py index 0fe37f8d..8b37cb3f 100644 --- a/queenbee/job/job.py +++ b/queenbee/job/job.py @@ -1,4 +1,4 @@ -from typing import Dict, List +from typing import Dict, List, Generator from pydantic import Field, constr from ..base.basemodel import BaseModel @@ -40,3 +40,27 @@ class Job(BaseModel): description='Optional user data as a dictionary. User data is for user reference' ' only and will not be used in the execution of the job.' ) + + +class ParametricJob(Job): + """Queenbee Parametric Job. + + A ParametricJob is an object to submit a list of arguments to execute + the same Queenbee Recipe using multiple combinations of arguments. + """ + api_version: constr(regex='^v1beta1$') = Field('v1beta1', readOnly=True) + + type: constr(regex='^ParametricJob$') = 'ParametricJob' + + arguments: List[List[JobArguments]] = Field( + None, + description='Input arguments for this job.' + ) + + def yield_jobs(self) -> Generator[Job]: + for args in self.arguments: + yield Job( + source=self.source, + arguments=args, + labels=self.labels, + ) diff --git a/queenbee/job/status.py b/queenbee/job/status.py index 37203f56..4e3a5d39 100644 --- a/queenbee/job/status.py +++ b/queenbee/job/status.py @@ -8,7 +8,7 @@ from typing import List, Dict from ..io.common import IOBase -from ..io.inputs.step import StepInputs +from ..io.inputs.step import StepStringInput, StepInputs from ..io.outputs.step import StepOutputs @@ -143,3 +143,63 @@ class JobStatus(BaseStatus): ..., description='The outputs produced by this job.' ) + + +class ParametricJobChildStatusCount(BaseModel): + + pending: int = 0 + + running: int = 0 + + completed: int = 0 + + failed: int = 0 + +class ParametricJobStatus(BaseStatus): + """Parametric Job Status.""" + + api_version: constr(regex='^v1beta1$') = Field('v1beta1', readOnly=True) + + type: constr(regex='^ParametricJobStatus$') = 'ParametricJobStatus' + + id: str = Field( + ..., + description='The ID of the individual job.' + ) + + children_status: ParametricJobChildStatusCount = Field( + ParametricJobChildStatusCount(), + description='A count of the status type for each child run of this parametric job' + ) + + children: List[JobStatus] = Field( + [], + description='A list of job status objects corresponding to each child of the parametric job' + ) + +class ParametricJobResults(List[Union[StepInputs, StepOutputs]]): + + @classmethod + def from_children(cls, children: List[JobStatus]) -> 'ParametricJobResults': + res = [] + + for job in children: + row = [ + StepStringInput( + name='parametric-child-job-id', + description='The ID of the job this result row should be associated with', + value=job.id, + ), + StepStringInput( + name='parametric-child-job-status', + description='The stauts of the job this result row should be associated with', + value=job.status, + ), + ] + + row.extend(job.inputs) + row.extend(job.outputs) + + res.append(row) + + return res \ No newline at end of file