Skip to content

Commit

Permalink
Merge 4b3a76e into 4f9d358
Browse files Browse the repository at this point in the history
  • Loading branch information
AntoineDao committed Feb 16, 2021
2 parents 4f9d358 + 4b3a76e commit 7fa912b
Show file tree
Hide file tree
Showing 10 changed files with 271 additions and 124 deletions.
2 changes: 1 addition & 1 deletion docs/schemas/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -14,5 +14,5 @@ executor.

plugins
recipes
repository
jobs
repository
14 changes: 11 additions & 3 deletions docs/schemas/jobs.rst
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ Schema
NProgress.start();
</script>
<script>
jsonSchemaViewer(window, '../_static/schemas/job-schema.json')
jsonSchemaViewer(window, '../_static/schemas/job-schemas-combined.json')
</script>
<script>
NProgress.done();
Expand All @@ -40,15 +40,23 @@ Schema

OpenAPI Docs
-------------
You can find the Open API Docs formatted by redoc `here <../_static/redoc-job.html#tag/job_model>`_.
You can find the Open API Docs formatted by redoc:

- `Job Schema <../_static/redoc-job.html#tag/job_model>`_.
- `Job Status Schema <../_static/redoc-job.html#tag/jobstatus_model>`_.
- `Run Status Schema <../_static/redoc-job.html#tag/runstatus_model>`_.

OpenAPI Definition
-------------------
You can find the OpenAPI JSON definition `here <../_static/schemas/job-openapi.json>`_.

JSON Schema Definition
-----------------------
You can find the JSON Schema definition `here <../_static/schemas/job-schema.json>`_.
You can find the JSON Schema definitions:

- `Job Schema <../_static/schemas/job-schema.json>`_.
- `Job Status Schema <../_static/schemas/job-status-schema.json>`_.
- `Run Status Schema <../_static/schemas/run-status-schema.json>`_.


Examples
Expand Down
26 changes: 23 additions & 3 deletions gen_schemas.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from pydantic_openapi_helper.inheritance import class_mapper

from queenbee.repository import RepositoryIndex
from queenbee.job import Job, JobStatus
from queenbee.job import Job, JobStatus, RunStatus
from queenbee.recipe import Recipe, RecipeInterface
from queenbee.plugin import Plugin

Expand Down Expand Up @@ -52,7 +52,7 @@
with open(os.path.join(folder, 'job-openapi.json'), 'w') as out_file:
json.dump(
get_openapi(
base_object=[Job], title='Queenbee Job Schema',
base_object=[Job, JobStatus, RunStatus], title='Queenbee Job Schema',
description='Schema documentation for Queenbee Jobs',
version=VERSION
),
Expand Down Expand Up @@ -93,9 +93,29 @@
indent=2
)


with open(os.path.join(folder, 'job-schemas-combined.json'), 'w') as out_file:
from pydantic import BaseModel
from queenbee.io.inputs.step import StepInputs
from queenbee.io.outputs.step import StepOutputs
from typing import List, Union
class JobSchemas(BaseModel):
job: Job
job_status: JobStatus
run_status: RunStatus
results: List[Union[StepInputs, StepOutputs]]

out_file.write(JobSchemas.schema_json())

with open(os.path.join(folder, 'job-schema.json'), 'w') as out_file:
out_file.write(Job.schema_json())

with open(os.path.join(folder, 'job-status-schema.json'), 'w') as out_file:
out_file.write(JobStatus.schema_json())

with open(os.path.join(folder, 'run-status-schema.json'), 'w') as out_file:
out_file.write(RunStatus.schema_json())

with open(os.path.join(folder, 'plugin-schema.json'), 'w') as out_file:
out_file.write(Plugin.schema_json())

Expand All @@ -112,7 +132,7 @@
"url": "./queenbee_inheritance.json"
}

models = [Recipe, Plugin, Job, RepositoryIndex, RecipeInterface, JobStatus]
models = [Recipe, Plugin, Job, RepositoryIndex, RecipeInterface, JobStatus, RunStatus]
openapi = get_openapi(
models,
title='Queenbee Schema',
Expand Down
4 changes: 2 additions & 2 deletions queenbee/io/inputs/job.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
"""Input objects for Queenbee jobs."""
from typing import Dict, List, Union
from typing import Dict, List, Union, Any

from pydantic import Field, constr

Expand All @@ -19,7 +19,7 @@ class JobArgument(BaseModel):
'Job\'s DAG template.'
)

value: str = Field(
value: Any = Field(
...,
description='The value of the job argument.'
)
Expand Down
2 changes: 1 addition & 1 deletion queenbee/io/inputs/step.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

import json
from typing import Union, List, Dict, Any
from pydantic import constr, Field
from pydantic import constr, Field, root_validator

from .function import FunctionStringInput, FunctionIntegerInput, \
FunctionNumberInput, FunctionBooleanInput, FunctionFolderInput, \
Expand Down
4 changes: 2 additions & 2 deletions queenbee/job/__init__.py
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
from .job import Job, JobArguments
from .status import JobStatus
from .job import Job, JobArguments, JobStatus
from .run import RunStatus, StepStatus
68 changes: 65 additions & 3 deletions queenbee/job/job.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
from typing import Dict, List
from typing import Dict, List, Generator
from datetime import datetime

from pydantic import Field, constr

from ..base.basemodel import BaseModel
from ..io.inputs.job import JobArguments

from .status import BaseStatus

class Job(BaseModel):
"""Queenbee Job.
Expand All @@ -19,7 +21,7 @@ class Job(BaseModel):
description='The source url for downloading the recipe.'
)

arguments: List[JobArguments] = Field(
arguments: List[List[JobArguments]] = Field(
None,
description='Input arguments for this job.'
)
Expand All @@ -40,3 +42,63 @@ class Job(BaseModel):
description='Optional user data as a dictionary. User data is for user reference'
' only and will not be used in the execution of the job.'
)


class JobStatus(BaseModel):
"""Parametric Job Status."""

api_version: constr(regex='^v1beta1$') = Field('v1beta1', readOnly=True)

type: constr(regex='^JobStatus$') = 'JobStatus'

id: str = Field(
...,
description='The ID of the individual job.'
)

status: str = Field(
...,
description='The status of this task. Can be "Running", "Succeeded", "Failed" '
'or "Error"'
)

message: str = Field(
None,
description='Any message produced by the task. Usually error/debugging hints.'
)

started_at: datetime = Field(
...,
description='The time at which the task was started'
)

finished_at: datetime = Field(
None,
description='The time at which the task was completed'
)

source: str = Field(
None,
description='Source url for the status object. It can be a recipe or a function.'
)

runs_pending: int = Field(
0,
description='The count of runs that are pending'
)

runs_running: int = Field(
0,
description='The count of runs that are running'
)

runs_completed: int = Field(
0,
description='The count of runs that have completed'
)

runs_failed: int = Field(
0,
description='The count of runs that have failed'
)

42 changes: 42 additions & 0 deletions queenbee/job/result.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
"""Queenbee Results
The results are a combination of step inputs and outputs for a list of
runs.
"""

from typing import List, Dict, Union
from ..io.inputs.step import StepStringInput, StepInputs
from ..io.outputs.step import StepOutputs
from .run import RunStatus

class Results(List[Union[StepInputs, StepOutputs]]):

@classmethod
def from_runs(cls, runs: List[RunStatus]) -> 'Results':
res = []

for run in runs:
row = [
StepStringInput(
name='job-id',
description='The ID of the job that generated this run',
value=run.job_id,
),
StepStringInput(
name='run-id',
description='The ID of the run this result row should be associated with',
value=run.id,
),
StepStringInput(
name='run-status',
description='The stauts of the run this result row should be associated with',
value=run.status,
),
]

row.extend(run.inputs)
row.extend(run.outputs)

res.append(row)

return res
121 changes: 121 additions & 0 deletions queenbee/job/run.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
"""Queenbee run class.
A Run contains the status of an individual recipe being executed
"""
from enum import Enum
from datetime import datetime
from pydantic import Field, constr
from typing import List, Dict, Union

from ..base.basemodel import BaseModel
from ..io.common import IOBase
from ..io.inputs.step import StepStringInput, StepInputs
from ..io.outputs.step import StepOutputs
from .status import BaseStatus


class StatusType(str, Enum):
"""Type enum for status type."""
Function = 'Function'

DAG = 'DAG'

Loop = 'Loop'

Unknown = 'Unknown'


class StepStatus(BaseStatus):
"""The Status of a Job Step"""
type: constr(regex='^StepStatus$') = 'StepStatus'

id: str = Field(
...,
description='The step unique ID'
)

name: str = Field(
...,
description='A human readable name for the step. Usually defined by the '
'DAG task name but can be extended if the step is part of a loop for example. '
'This name is unique within the boundary of the DAG/Job that generated it.'
)

status_type: StatusType = Field(
...,
description='The type of step this status is for. Can be "Function", "DAG" or '
'"Loop"'
)

template_ref: str = Field(
...,
description='The name of the template that spawned this step'
)

command: str = Field(
None,
description='The command used to run this step. Only applies to Function steps.'
)

inputs: List[StepInputs] = Field(
...,
description='The inputs used by this step.'
)

outputs: List[StepOutputs] = Field(
...,
description='The outputs produced by this step.'
)

boundary_id: str = Field(
None,
description='This indicates the step ID of the associated template root \
step in which this step belongs to. A DAG step will have the id of the \
parent DAG for example.'
)

children_ids: List[str] = Field(
...,
description='A list of child step IDs'
)

outbound_steps: List[str] = Field(
...,
description='A list of the last step to ran in the context of this '
'step. In the case of a DAG or a job this will be the last step that has '
'been executed. It will remain empty for functions.'
)


class RunStatus(BaseStatus):
"""Job Status."""
api_version: constr(regex='^v1beta1$') = Field('v1beta1', readOnly=True)

type: constr(regex='^RunStatus$') = 'RunStatus'

id: str = Field(
...,
description='The ID of the individual run.'
)

job_id: str = Field(
...,
description='The ID of the job that generated this run'
)

entrypoint: str = Field(
None,
description='The ID of the first step in the run.'
)

steps: Dict[str, StepStatus] = {}

inputs: List[StepInputs] = Field(
...,
description='The inputs used for this run.'
)

outputs: List[StepOutputs] = Field(
...,
description='The outputs produced by this run.'
)
Loading

0 comments on commit 7fa912b

Please sign in to comment.