Skip to content

Commit

Permalink
Converts tasking system to use Django models
Browse files Browse the repository at this point in the history
Moves all tasking code to pulp.platform.tasks and updates
existing usage to use the new location. Also the code
itself is converted to use the new Django models.

https://pulp.plan.io/issues/2154
closes #2154
  • Loading branch information
Brian Bouterse committed Oct 3, 2016
1 parent 236a0e4 commit aba8ba2
Show file tree
Hide file tree
Showing 72 changed files with 1,062 additions and 1,311 deletions.
4 changes: 4 additions & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,10 @@ addons:
install:
- "pip install -r test_requirements.txt"
- "pushd app/ && python setup.py develop && popd"
- "pushd common/ && python setup.py develop && popd"
- "pushd exceptions/ && python setup.py develop && popd"
- "pushd plugin/ && python setup.py develop && popd"
- "pushd tasking/ && python setup.py develop && popd"
before_script:
- psql -U postgres -c "CREATE USER pulp WITH SUPERUSER LOGIN;"
- psql -U postgres -c "CREATE DATABASE pulp OWNER pulp;"
Expand Down
13 changes: 13 additions & 0 deletions app/etc/pulp/server.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -42,3 +42,16 @@
# pulp.app:
# level: INFO

# Broker configuration
#
# `broker`: Broker configuration for Pulp. By default, Pulp uses the Qpid
# broker without SSL. It is possible to change this by providing a different
# broker string. The remaining options are used to configure SSL settings.
#
# broker:
# url: qpid://localhost/
# celery_require_ssl: False
# ssl_ca_certificate: /etc/pki/pulp/qpid/ca.crt
# ssl_client_key: /etc/pki/pulp/qpid/client.crt
# ssl_client_certificate: /etc/pki/pulp/qpid/client.crt
# login_method:
4 changes: 2 additions & 2 deletions app/pulp/app/models/catalog.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,11 @@ class DownloadCatalog(Model):
Relations:
:cvar artifact: The artifact that is expected to be present at ``url``.
:type artifact: pulp.platform.models.Artifact
:type artifact: pulp.app.models.Artifact
:cvar importer: The importer that contains the configuration necessary
to access ``url``.
:type importer: pulp.platform.models.Importer
:type importer: pulp.app.models.Importer
"""
# Although there is a Django field for URLs based on CharField, there is
# not technically any limit on URL lengths so it's simplier to allow any
Expand Down
20 changes: 10 additions & 10 deletions app/pulp/app/models/progress.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from django.db import models

from pulp.app.models import Model, Task
from pulp.app.tasks.task_system import get_current_task_id
from pulp.tasking import get_current_task_id


_logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -44,14 +44,14 @@ class ProgressReport(Model):
SKIPPED = 'skipped'
RUNNING = 'running'
COMPLETED = 'completed'
ERRORED = 'errored'
FAILED = 'failed'
CANCELED = 'canceled'
STATES = (
(WAITING, 'Waiting'),
(SKIPPED, 'Skipped'),
(RUNNING, 'Running'),
(COMPLETED, 'Completed'),
(ERRORED, 'Errored'),
(FAILED, 'Failed'),
(CANCELED, 'Canceled')
)
message = models.TextField()
Expand Down Expand Up @@ -88,9 +88,9 @@ def __enter__(self):

def __exit__(self, type, value, traceback):
"""
Update the progress report state to COMPLETED or ERRORED.
Update the progress report state to COMPLETED or FAILED.
If an exception occurs the progress report state is saved as ERRORED and the exception is
If an exception occurs the progress report state is saved as FAILED and the exception is
not suppressed. If the context manager exited without exception the progress report state
is saved as COMPLETED.
Expand All @@ -100,7 +100,7 @@ def __exit__(self, type, value, traceback):
self.state = self.COMPLETED
self.save()
else:
self.state = self.ERRORED
self.state = self.FAILED
self.save()


Expand All @@ -122,13 +122,13 @@ class ProgressSpinner(ProgressReport):
>>> metadata_progress.save()
The ProgressSpinner() is a context manager that provides automatic state transitions for the
RUNNING COMPLETED and ERRORED states. Use it as follows:
RUNNING COMPLETED and FAILED states. Use it as follows:
>>> spinner = ProgressSpinner('Publishing Metadata')
>>> with spinner:
>>> # spinner is at 'running'
>>> publish_metadata()
>>> # spinner is at 'completed' if no exception or 'errored' if an exception was raised
>>> # spinner is at 'completed' if no exception or 'failed' if an exception was raised
You can also use this short form:
Expand Down Expand Up @@ -162,7 +162,7 @@ class ProgressBar(ProgressReport):
>>> progress_bar.save()
The ProgressBar() is a context manager that provides automatic state transitions for the RUNNING
COMPLETED and ERRORED states. The increment() method can be called in the loop as work is
COMPLETED and FAILED states. The increment() method can be called in the loop as work is
completed. Use it as follows:
>>> progress_bar = ProgressBar(message='Publishing files', total=len(files_iterator))
Expand All @@ -172,7 +172,7 @@ class ProgressBar(ProgressReport):
>>> for file in files_iterator:
>>> handle(file)
>>> progress_bar.increment() # increments and saves
>>> # progress_bar is at 'completed' if no exception or 'errored' if an exception was raised
>>> # progress_bar is at 'completed' if no exception or 'failed' if an exception was raised
A convenience method called iter() allows you to avoid calling increment() directly:
Expand Down
86 changes: 74 additions & 12 deletions app/pulp/app/models/task.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,19 @@
"""
Django models related to the Tasking system
"""
from gettext import gettext as _
import logging

from django.db import models
from django.utils import timezone

from pulp.app.models import Model
from pulp.app.fields import JSONField
from pulp.common import TASK_FINAL_STATES
from pulp.exceptions import PulpException


_logger = logging.getLogger(__name__)


class ReservedResource(Model):
Expand All @@ -27,7 +36,7 @@ class ReservedResource(Model):
resource = models.TextField()

task = models.OneToOneField("Task")
worker = models.ForeignKey("Worker")
worker = models.ForeignKey("Worker", on_delete=models.CASCADE, related_name="reservations")


class Worker(Model):
Expand All @@ -43,7 +52,7 @@ class Worker(Model):
:type last_heartbeat: models.DateTimeField
"""
name = models.TextField(db_index=True, unique=True)
last_heartbeat = models.DateTimeField()
last_heartbeat = models.DateTimeField(auto_now=True)


class TaskLock(Model):
Expand Down Expand Up @@ -92,8 +101,8 @@ class Task(Model):
:cvar finished_at: The time the task finished executing
:type finished_at: models.DateTimeField
:cvar error: Collection of errors that might have occurred while task was running
:type error: models.JSONField
:cvar non_fatal_errors: Dictionary of non-fatal errors that occurred while task was running.
:type non_fatal_errors: models.JSONField
:cvar result: Return value of the task
:type result: models.JSONField
Expand All @@ -109,20 +118,16 @@ class Task(Model):

WAITING = 'waiting'
SKIPPED = 'skipped'
ACCEPTED = 'accepted'
RUNNING = 'running'
SUSPENDED = 'suspended'
COMPLETED = 'completed'
ERRORED = 'errored'
FAILED = 'failed'
CANCELED = 'canceled'
STATES = (
(WAITING, 'Waiting'),
(SKIPPED, 'Skipped'),
(ACCEPTED, 'Accepted'),
(RUNNING, 'Running'),
(SUSPENDED, 'Suspended'),
(COMPLETED, 'Completed'),
(ERRORED, 'Errored'),
(FAILED, 'Failed'),
(CANCELED, 'Canceled')
)
group = models.UUIDField(null=True)
Expand All @@ -131,11 +136,68 @@ class Task(Model):
started_at = models.DateTimeField(null=True)
finished_at = models.DateTimeField(null=True)

error = JSONField()
non_fatal_errors = JSONField()
result = JSONField()

parent = models.ForeignKey("Task", null=True, related_name="spawned_tasks")
worker = models.ForeignKey("Worker", null=True)
worker = models.ForeignKey("Worker", null=True, related_name="tasks")

def set_running(self):
"""
Set this Task to the running state, save it, and log output in warning cases.
This updates the :attr: `started_at` and sets the :attr: `state` to :attr: `RUNNING`.
"""
if self.state != self.WAITING:
msg = _('Task __call__() occurred but Task %s is not at WAITING')
_logger.warning(msg % self.request.id)
self.state = Task.RUNNING
self.started_at = timezone.now()
self.save()

def set_completed(self, result):
"""
Set this Task to the completed state, save it, and log output in warning cases.
This updates the :attr: `finished_at` and sets the :attr: `state` to :attr: `COMPLETED`.
:param result: The result to save on the :class: `~pulp.app.models.Task`
:type result: dict
"""
self.finished_at = timezone.now()
self.result = result

# Only set the state to finished if it's not already in a complete state. This is
# important for when the task has been canceled, so we don't move the task from canceled
# to finished.
if self.state not in TASK_FINAL_STATES:
self.state = Task.COMPLETED
else:
msg = _('Task set_completed() occurred but Task %s is already in final state')
_logger.warning(msg % self.pk)

self.save()

def set_failed(self, exc, einfo):
"""
Set this Task to the failed state and save it.
This updates the :attr: `finished_at` attribute, sets the :attr: `state` to
:attr: `FAILED`, and sets the :attr: `result` attribute.
:param exc: The exception raised by the task.
:type exc: ???
:param einfo: celery's ExceptionInfo instance, containing serialized traceback.
:type einfo: ???
"""
self.state = Task.FAILED
self.finished_at = timezone.now()
if isinstance(exc, PulpException):
self.result = exc.to_dict()
else:
self.result = {'traceback': einfo.traceback}
self.save()


class TaskTag(Model):
Expand Down
8 changes: 8 additions & 0 deletions app/pulp/app/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,14 @@
},
}
},
'broker': {
'url': 'qpid://localhost/',
'celery_require_ssl': False,
'ssl_ca_certificate': '/etc/pki/pulp/qpid/ca.crt',
'ssl_client_key': '/etc/pki/pulp/qpid/client.crt',
'ssl_client_certificate': '/etc/pki/pulp/qpid/client.crt',
'login_method': None
},
}


Expand Down
14 changes: 0 additions & 14 deletions app/pulp/app/tasks/task_system.py

This file was deleted.

1 change: 1 addition & 0 deletions common/pulp/common/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
from .constants import TASK_FINAL_STATES, TASK_INCOMPLETE_STATES, TASK_STATES
15 changes: 15 additions & 0 deletions common/pulp/common/constants.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
from types import SimpleNamespace


TASK_STATES = SimpleNamespace(
WAITING='waiting',
SKIPPED='skipped',
RUNNING='running',
COMPLETED='completed',
FAILED='failed',
CANCELED='canceled'
)

TASK_FINAL_STATES = (TASK_STATES.SKIPPED, TASK_STATES.COMPLETED, TASK_STATES.FAILED,
TASK_STATES.CANCELED)
TASK_INCOMPLETE_STATES = (TASK_STATES.WAITING, TASK_STATES.RUNNING)
21 changes: 21 additions & 0 deletions common/pulp/common/error_codes.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
from collections import namedtuple
from gettext import gettext as _


Error = namedtuple('Error', ['code', 'message', 'required_fields'])
"""
The error named tuple has 3 components:
code: The 7 character uniquely identifying code for this error, 3 A-Z identifying the module
followed by 4 numeric characters for the msg id. All general pulp server errors start
with PLP
message: The message that will be printed for this error
required_files: A list of required fields for printing the message
"""

# The PLP0000 error is to wrap non-pulp exceptions
PLP0000 = Error("PLP0000", "%(message)s", ['message'])
PLP0001 = Error("PLP0001", _("A general pulp exception occurred"), [])
PLP0008 = Error("PLP0008", _("Error raising error %(code)s. "
"The field [%(field)s] was not included in the error_data."),
['code', 'field'])
PLP0009 = Error("PLP0009", _("Missing resource(s): %(resources)s"), ['resources'])
File renamed without changes.
2 changes: 2 additions & 0 deletions exceptions/pulp/exceptions/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
from .base import PulpCodedException, PulpException
from .http import MissingResource
Loading

0 comments on commit aba8ba2

Please sign in to comment.