Skip to content

Commit

Permalink
Add ability to resume a run from the management command #22
Browse files Browse the repository at this point in the history
Signed-off-by: Thomas Druez <tdruez@nexb.com>
  • Loading branch information
tdruez committed Sep 24, 2020
1 parent 68e7c2b commit e758d84
Show file tree
Hide file tree
Showing 4 changed files with 64 additions and 11 deletions.
2 changes: 1 addition & 1 deletion scanpipe/management/commands/graph.py
Expand Up @@ -37,7 +37,7 @@ def graphviz_installed():


class Command(BaseCommand):
help = "Generate Pipeline graph with Graphviz."
help = "Generate pipeline graph with Graphviz."

def add_arguments(self, parser):
parser.add_argument(
Expand Down
39 changes: 34 additions & 5 deletions scanpipe/management/commands/run.py
Expand Up @@ -20,6 +20,8 @@
# ScanCode.io is a free software code scanning tool from nexB Inc. and others.
# Visit https://github.com/nexB/scancode.io for support and download.

import sys

from django.core.management import CommandError

from scanpipe.management.commands import ProjectCommand
Expand All @@ -28,13 +30,40 @@
class Command(ProjectCommand):
help = "Run pipelines of a project."

def add_arguments(self, parser):
super().add_arguments(parser)
parser.add_argument(
"--resume",
action="store_true",
help="Resume the latest failed pipeline execution.",
)

def handle(self, *args, **options):
super().handle(*args, **options)

run = self.project.get_next_run()
if options["resume"]:
action = "resume"
run = self.project.get_latest_failed_run()
task_function = "resume_pipeline_task_async"
else:
action = "run"
run = self.project.get_next_run()
task_function = "run_pipeline_task_async"

if not run:
raise CommandError(f"No pipelines to run on Project {self.project}")
raise CommandError(f"No pipelines to {action} on project {self.project}")

msg = f"Pipeline {run.pipeline} {action} in progress..."
self.stdout.write(msg)

getattr(run, task_function)()

msg = f"Pipeline {run.pipeline} run in progress..."
self.stdout.write(self.style.SUCCESS(msg))
run.run_pipeline_task_async()
run.refresh_from_db()
if run.task_succeeded:
msg = f"{run.pipeline} successfully executed on project {self.project}"
self.stdout.write(self.style.SUCCESS(msg))
else:
msg = f"Error during {run.pipeline} execution:\n"
self.stderr.write(self.style.ERROR(msg))
self.stderr.write(run.task_output)
sys.exit(1)
4 changes: 3 additions & 1 deletion scanpipe/tasks.py
Expand Up @@ -80,9 +80,11 @@ def resume_pipeline_task(self, run_pk):
info(f"Enter `{self.name}` Task.id={task_id}", run_pk)

run = get_run_instance(run_pk)
# Capture the run_id before resetting the task
run_id = run.get_run_id()

run.reset_task_values()
run.set_task_started(task_id)
run_id = run.get_run_id()

info(f'Resume pipeline: "{run.pipeline}" on project: "{run.project.name}"', run_pk)
cmd = f"{python} {run.pipeline} resume --origin-run-id {run_id}"
Expand Down
30 changes: 26 additions & 4 deletions scanpipe/tests/test_commands.py
Expand Up @@ -23,6 +23,7 @@
import tempfile
from io import StringIO
from pathlib import Path
from unittest import mock

from django.core.management import CommandError
from django.core.management import call_command
Expand Down Expand Up @@ -191,18 +192,39 @@ def test_scanpipe_management_command_show_pipeline(self):
self.assertEqual(expected, out.getvalue())

def test_scanpipe_management_command_run(self):
out = StringIO()

project = Project.objects.create(name="my_project")
options = ["--project", project.name]

expected = "No pipelines to run on Project my_project"
out = StringIO()
expected = "No pipelines to run on project my_project"
with self.assertRaisesMessage(CommandError, expected):
call_command("run", *options, stdout=out)

pipeline = "scanpipe/pipelines/docker.py"
project.add_pipeline(pipeline)
call_command("run", *options, stdout=out)

def task_success(run):
run.task_exitcode = 0
run.save()

out = StringIO()
with mock.patch("scanpipe.models.Run.run_pipeline_task_async", task_success):
call_command("run", *options, stdout=out)
expected = "Pipeline scanpipe/pipelines/docker.py run in progress..."
self.assertIn(expected, out.getvalue())
expected = "successfully executed on project my_project"
self.assertIn(expected, out.getvalue())

def task_failure(run):
run.task_output = "Error log"
run.task_exitcode = 1
run.save()

err = StringIO()
project.add_pipeline(pipeline)
with mock.patch("scanpipe.models.Run.run_pipeline_task_async", task_failure):
with self.assertRaisesMessage(SystemExit, "1"):
call_command("run", *options, stdout=out, stderr=err)
expected = "Error during scanpipe/pipelines/docker.py execution:"
self.assertIn(expected, err.getvalue())
self.assertIn("Error log", err.getvalue())

0 comments on commit e758d84

Please sign in to comment.