Skip to content

Commit

Permalink
feat(parametric jobs): set up basic schema objects for parametric job…
Browse files Browse the repository at this point in the history
… and status
  • Loading branch information
AntoineDao committed Feb 10, 2021
1 parent 007c2fa commit db86f5f
Show file tree
Hide file tree
Showing 2 changed files with 86 additions and 2 deletions.
26 changes: 25 additions & 1 deletion queenbee/job/job.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import Dict, List
from typing import Dict, List, Generator
from pydantic import Field, constr

from ..base.basemodel import BaseModel
Expand Down Expand Up @@ -40,3 +40,27 @@ class Job(BaseModel):
description='Optional user data as a dictionary. User data is for user reference'
' only and will not be used in the execution of the job.'
)


class ParametricJob(Job):
"""Queenbee Parametric Job.
A ParametricJob is an object to submit a list of arguments to execute
the same Queenbee Recipe using multiple combinations of arguments.
"""
api_version: constr(regex='^v1beta1$') = Field('v1beta1', readOnly=True)

type: constr(regex='^ParametricJob$') = 'ParametricJob'

arguments: List[List[JobArguments]] = Field(
None,
description='Input arguments for this job.'
)

def yield_jobs(self) -> Generator[Job]:
for args in self.arguments:
yield Job(
source=self.source,
arguments=args,
labels=self.labels,
)
62 changes: 61 additions & 1 deletion queenbee/job/status.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from typing import List, Dict

from ..io.common import IOBase
from ..io.inputs.step import StepInputs
from ..io.inputs.step import StepStringInput, StepInputs
from ..io.outputs.step import StepOutputs


Expand Down Expand Up @@ -143,3 +143,63 @@ class JobStatus(BaseStatus):
...,
description='The outputs produced by this job.'
)


class ParametricJobChildStatusCount(BaseModel):

pending: int = 0

running: int = 0

completed: int = 0

failed: int = 0

class ParametricJobStatus(BaseStatus):
"""Parametric Job Status."""

api_version: constr(regex='^v1beta1$') = Field('v1beta1', readOnly=True)

type: constr(regex='^ParametricJobStatus$') = 'ParametricJobStatus'

id: str = Field(
...,
description='The ID of the individual job.'
)

children_status: ParametricJobChildStatusCount = Field(
ParametricJobChildStatusCount(),
description='A count of the status type for each child run of this parametric job'
)

children: List[JobStatus] = Field(
[],
description='A list of job status objects corresponding to each child of the parametric job'
)

class ParametricJobResults(List[Union[StepInputs, StepOutputs]]):

@classmethod
def from_children(cls, children: List[JobStatus]) -> 'ParametricJobResults':
res = []

for job in children:
row = [
StepStringInput(
name='parametric-child-job-id',
description='The ID of the job this result row should be associated with',
value=job.id,
),
StepStringInput(
name='parametric-child-job-status',
description='The stauts of the job this result row should be associated with',
value=job.status,
),
]

row.extend(job.inputs)
row.extend(job.outputs)

res.append(row)

return res

0 comments on commit db86f5f

Please sign in to comment.