Skip to content
This repository has been archived by the owner on Dec 7, 2022. It is now read-only.
/ pulp Public archive

Commit

Permalink
Consolidate tasking moduels in the pluginAPI.
Browse files Browse the repository at this point in the history
closes #3280
  • Loading branch information
jortel committed Jan 24, 2018
1 parent 6d8e79b commit fbe5f15
Show file tree
Hide file tree
Showing 5 changed files with 120 additions and 143 deletions.
2 changes: 1 addition & 1 deletion docs/plugins/plugin-api/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ Plugin API reaches stability with v1.0. For the latest version of the Plugin API
serializers
storage
viewsets
tasks
tasking
download
changeset

Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
pulp.plugin.tasks
==================

All models documented here should be imported directly from the ``pulpcore.plugin.tasks`` namespace.
All models documented here should be imported directly from the ``pulpcore.plugin.tasking`` namespace.

.. automodule:: pulpcore.plugin.tasks
.. automodule:: pulpcore.plugin.tasking
:imported-members:
8 changes: 7 additions & 1 deletion plugin/pulpcore/plugin/tasking.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,14 @@
from pulpcore.exceptions import exception_to_dict
from pulpcore.tasking.util import get_current_task_id

# Support plugins creating Celery tasks.
from pulpcore.tasking.tasks import UserFacingTask # noqa

class Task(object):
# Support plugins working with the working directory.
from pulpcore.tasking.services.storage import WorkingDirectory # noqa


class Task:
"""
The task which is currently executing.
Expand Down
5 changes: 0 additions & 5 deletions plugin/pulpcore/plugin/tasks.py

This file was deleted.

244 changes: 110 additions & 134 deletions pulpcore/pulpcore/tasking/services/storage.py
Original file line number Diff line number Diff line change
@@ -1,152 +1,128 @@
import errno
import os
import shutil
import stat
from contextlib import contextmanager, suppress

from celery import task
from django.conf import settings as pulp_settings


def _working_dir_root(worker_name):
"""Return the path to the working directory of a worker.
:param worker_name: Name of worker for which path is requested
:type worker_name: basestring
:returns: working_directory setting from server config
:rtype: str
"""
return os.path.join(pulp_settings.SERVER['working_directory'], worker_name)


def create_worker_working_directory(worker_name):
"""Create a working directory for a worker.
Create a working directory inside the worker working_directory as specified in the setting file.
from gettext import gettext as _

:param worker_name: Name of worker that uses the working directory created
:type worker_name: basestring
:returns: Path to the working directory for a worker
:rtype: str
"""
working_dir_root = _working_dir_root(worker_name)
os.mkdir(working_dir_root)
return working_dir_root


def delete_worker_working_directory(worker_name):
"""Delete worker's working directory.
from celery import task
from django.conf import settings

Delete a working directory inside the working_directory as specified in the setting file.

:param worker_name: Name of worker that uses the working directory being deleted
:type worker_name: basestring
class WorkingDirectory:
"""
_rmtree(_working_dir_root(worker_name))
A context manager used to manage a Celery task's working directory.

def _working_directory_path():
"""Get path of task working directory.
Get the path for a task working directory inside a workers working directory.
:returns: full path on disk to the working directory for current task
:rtype: basestring
Examples:
>>>
>>> with WorkingDirectory.create() as working_dir:
>>> ...
>>>
"""
current_task = task.current
with suppress(AttributeError):
task_id = current_task.request.id
worker_name = current_task.request.hostname
if current_task and current_task.request and task_id and worker_name:
return os.path.join(_working_dir_root(worker_name), task_id)


def delete_working_directory():
"""Delete working directory for a particular task."""
_rmtree(_working_directory_path())
# Directory permissions.
MODE = 0o700

@classmethod
def create(cls):
"""
Create the directory on the Filesystem.
def get_working_directory():
"""Create a working directory with task id as name.
The directory (tree) is deleted and recreated when already exist.
This directory resides inside a particular worker's working directory.
The 'working_directory' setting in ``server`` section of settings defines the local path
to the root of working directories.
The directoryis created only once per task. The path to the existing working directory
is returned for all subsequent calls for that task.
:returns: full path on disk to the working directory for current task
:rtype: str
"""
working_dir_root = _working_directory_path()

if working_dir_root:
Returns:
pulpcore.tasking.services.storage.WorkingDirectory: The created directory.
"""
path = cls._build_path()
try:
os.mkdir(working_dir_root)
except OSError as error:
if error.errno is errno.EEXIST:
return working_dir_root
else:
raise
else:
return working_dir_root
else:
# If path is None, this method is called outside of an asynchronous task
raise RuntimeError("Working Directory requested outside of asynchronous task.")


def _rmtree(path):
"""Delete if exists an entire directory tree in path.
Uses _rmtree_fix_permissions but suppresses 'No such file or directory' Exception.
"""
if path is None:
return
try:
_rmtree_fix_permissions(path)
except OSError as error:
if error.errno is errno.ENOENT:
pass
os.makedirs(path, mode=cls.MODE)
except FileExistsError:
_dir = cls(path)
_dir.delete()
os.makedirs(path, mode=cls.MODE)
return _dir
else:
raise
return cls(path)

@staticmethod
def _build_path():
"""
Build the directory path using format: <root>/<worker-name>/<task_id>
def _rmtree_fix_permissions(directory_path):
"""Delete an entire directory tree in path.
Returns:
str: The absolute directory path.
Recursively remove a directory. If permissions on the directory or it's contents
block removal, attempt to fix the permissions to allow removal and attempt the removal
again.
:param directory_path: The directory to remove
:type directory_path: str
"""
try:
shutil.rmtree(directory_path)
except OSError as error:
# if perm denied (13) add rwx permissions to all directories and retry
# so that we are not blocked by users creating directories with no permissions
if error.errno is errno.EACCES:
for root, dirs, files in os.walk(directory_path):
for dir_path in dirs:
os.chmod(os.path.join(root, dir_path),
stat.S_IXUSR | stat.S_IWUSR | stat.S_IREAD)
shutil.rmtree(directory_path)
else:
raise


@contextmanager
def working_dir_context():
"""
Prepares a working directory that is removed after the context has ended.
"""
try:
working_dir = get_working_directory()
os.chdir(working_dir)
yield working_dir
finally:
delete_working_directory()
Raises:
RuntimeError: When used outside a Celery task.
"""
root = settings.SERVER['working_directory']
try:
return os.path.join(
root,
task.current.request.hostname,
task.current.request.id)
except AttributeError:
raise RuntimeError(_('WorkingDirectory may only be used within a Task.'))

def __init__(self, path):
"""
Args:
The absolute path to the directory.
Raises:
RuntimeError: When used outside a Celery task.
"""
self._path = path
assert os.path.isdir(path), _('{p} must be real directory'.format(p=path))

@property
def path(self):
"""
The absolute path to the directory.
Returns:
str: The absolute directory path.
"""
return self._path

def delete(self):
"""
Delete the directory (tree).
On permission denied - an attempt is made to recursively fix the
permissions on the tree and the delete is retried.
"""
try:
shutil.rmtree(self.path)
except PermissionError:
self._set_permissions()
self.delete()

def _set_permissions(self):
"""
Set appropriate permissions on the directory tree.
"""
for path in os.walk(self.path):
os.chmod(path[0], mode=self.MODE)

def __enter__(self):
"""
Create the directory and set the CWD to the path.
Returns:
WorkingDirectory: self
Raises:
OSError: On failure.
"""
self._prev_dir = os.getcwd()
os.chdir(self.path)
return self

def __exit__(self, *unused):
"""
Delete the directory (tree) and restore the original CWD.
"""
os.chdir(self._prev_dir)
self.delete()

def __str__(self):
return self.path

0 comments on commit fbe5f15

Please sign in to comment.