Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Max job retained pipelined runs #608

30 changes: 24 additions & 6 deletions services/orchest-api/app/app/apis/namespace_jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -153,12 +153,13 @@ def put(self, job_uuid):

job_update = request.get_json()

name = job_update.get("name", None)
cron_schedule = job_update.get("cron_schedule", None)
parameters = job_update.get("parameters", None)
env_variables = job_update.get("env_variables", None)
next_scheduled_time = job_update.get("next_scheduled_time", None)
strategy_json = job_update.get("strategy_json", None)
name = job_update.get("name")
cron_schedule = job_update.get("cron_schedule")
parameters = job_update.get("parameters")
env_variables = job_update.get("env_variables")
next_scheduled_time = job_update.get("next_scheduled_time")
strategy_json = job_update.get("strategy_json")
max_retained_pipeline_runs = job_update.get("max_retained_pipeline_runs")
confirm_draft = "confirm_draft" in job_update

try:
Expand All @@ -171,6 +172,7 @@ def put(self, job_uuid):
env_variables,
next_scheduled_time,
strategy_json,
max_retained_pipeline_runs,
confirm_draft,
)
except Exception as e:
Expand Down Expand Up @@ -822,6 +824,10 @@ def _transaction(
"status": "DRAFT",
"strategy_json": job_spec.get("strategy_json", {}),
"created_time": datetime.now(timezone.utc),
# If not specified -> no max limit -> -1.
"max_retained_pipeline_runs": job_spec.get(
"max_retained_pipeline_runs", -1
),
}
db.session.add(models.Job(**job))

Expand Down Expand Up @@ -860,6 +866,7 @@ def _transaction(
env_variables: Dict[str, str],
next_scheduled_time: str,
strategy_json: Dict[str, Any],
max_retained_pipeline_runs: int,
confirm_draft,
):
job = models.Job.query.with_for_update().filter_by(uuid=job_uuid).one()
Expand Down Expand Up @@ -954,6 +961,17 @@ def _transaction(
)
job.strategy_json = strategy_json

if max_retained_pipeline_runs is not None:
yannickperrenet marked this conversation as resolved.
Show resolved Hide resolved
if job.schedule is None and job.status != "DRAFT":
raise ValueError(
(
"Failed update operation. Cannot update the "
"max_retained_pipeline_runs of a job which is not a draft nor "
"a cron job."
)
)
job.max_retained_pipeline_runs = max_retained_pipeline_runs

if confirm_draft:
if job.status != "DRAFT":
raise ValueError("Failed update operation. The job is not a draft.")
Expand Down
9 changes: 9 additions & 0 deletions services/orchest-api/app/app/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -404,6 +404,15 @@ class Job(BaseModel):
server_default=text("timezone('utc', now())"),
)

# Max number of pipeline runs to retain. So that any newly created
# runs (e.g. in a cronjob) will lead to the deletion of the
# existing, oldest runs that are in an end state if the total number
# of job runs in the DB gets past this value. A value of -1 means
# that there is no such limit. The default value is -1.
max_retained_pipeline_runs = db.Column(
db.Integer, nullable=False, server_default=text("-1")
)
yannickperrenet marked this conversation as resolved.
Show resolved Hide resolved

def __repr__(self):
return f"<Job: {self.uuid}>"

Expand Down
21 changes: 21 additions & 0 deletions services/orchest-api/app/app/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -351,6 +351,13 @@
"env_variables": fields.Raw(
required=True, description="Environment variables of the job"
),
"max_retained_pipeline_runs": fields.Integer(
required=False,
description=(
"Max number of pipeline runs to retain. The oldest pipeline runs that "
"are in an end state that are over this number will be deleted."
),
),
},
)

Expand Down Expand Up @@ -470,6 +477,13 @@
description="List of run parameters.",
),
"strategy_json": fields.Raw(required=False, description="Strategy json."),
"max_retained_pipeline_runs": fields.Integer(
required=False,
description=(
"Max number of pipeline runs to retain. The oldest pipeline runs that "
"are in an end state that are over this number will be deleted."
),
),
},
)

Expand Down Expand Up @@ -527,6 +541,13 @@
"env_variables": fields.Raw(
required=False, description="Environment variables of the job"
),
"max_retained_pipeline_runs": fields.Integer(
required=True,
description=(
"Max number of pipeline runs to retain. The oldest pipeline runs that "
"are in an end state that are over this number will be deleted."
),
),
},
)

Expand Down
35 changes: 35 additions & 0 deletions services/orchest-api/app/migrations/versions/a25c8a60484c_.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
"""empty message

Revision ID: a25c8a60484c
Revises: 439807debe51
Create Date: 2021-12-21 09:32:21.827493

"""
import sqlalchemy as sa
from alembic import op

# revision identifiers, used by Alembic.
revision = "a25c8a60484c"
down_revision = "439807debe51"
branch_labels = None
depends_on = None


def upgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.add_column(
"jobs",
sa.Column(
"max_retained_pipeline_runs",
sa.Integer(),
server_default=sa.text("-1"),
nullable=False,
),
)
# ### end Alembic commands ###


def downgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.drop_column("jobs", "max_retained_pipeline_runs")
# ### end Alembic commands ###