This repository has been archived by the owner on Jan 28, 2020. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 6
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
George Schneeloch
committed
Oct 28, 2015
1 parent
49fcbde
commit 6a315e6
Showing
12 changed files
with
571 additions
and
318 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,186 @@ | ||
""" | ||
Functions to manipulate tasks via REST API. | ||
""" | ||
|
||
from __future__ import unicode_literals | ||
|
||
from celery.result import AsyncResult | ||
from celery.states import FAILURE, SUCCESS, REVOKED | ||
from django.contrib.auth.models import User | ||
from rest_framework.exceptions import ValidationError | ||
|
||
from exporter.tasks import export_resources | ||
from learningresources.api import get_repo | ||
from learningresources.models import LearningResource | ||
|
||
TASK_KEY = 'tasks' | ||
EXPORT_TASK_TYPE = 'resource_export' | ||
EXPORTS_KEY = 'learning_resource_exports' | ||
|
||
|
||
def create_initial_task_dict(task, task_type, task_info): | ||
""" | ||
Create initial task data about a newly created Celery task. | ||
Args: | ||
task (Task): A Celery task. | ||
task_type (unicode): Type of task. | ||
task_info (dict): Extra information about a task. | ||
Returns: | ||
dict: Initial data about task. | ||
""" | ||
|
||
result = None | ||
if task.successful(): | ||
result = task.get() | ||
|
||
return { | ||
"id": task.id, | ||
"initial_state": task.state, | ||
"task_type": task_type, | ||
"task_info": task_info, | ||
"result": result | ||
} | ||
|
||
|
||
def create_task_result_dict(initial_data): | ||
""" | ||
Convert initial data we put in session to dict for REST API. | ||
This will use the id to look up current data about task to return | ||
to user. | ||
Args: | ||
task (dict): Initial data about task stored in session. | ||
Returns: | ||
dict: Updated data about task. | ||
""" | ||
initial_state = initial_data['initial_state'] | ||
task_id = initial_data['id'] | ||
task_type = initial_data['task_type'] | ||
task_info = initial_data['task_info'] | ||
|
||
state = "processing" | ||
result = None | ||
# initial_state is a workaround for EagerResult used in testing. | ||
# In production initial_state should usually be pending. | ||
if initial_state == SUCCESS: | ||
state = "success" | ||
result = initial_data['result'] | ||
elif initial_state in (FAILURE, REVOKED): | ||
state = "failure" | ||
else: | ||
async_result = AsyncResult(task_id) | ||
|
||
if async_result.successful(): | ||
state = "success" | ||
elif async_result.failed(): | ||
state = "failure" | ||
|
||
if async_result.successful(): | ||
result = async_result.get() | ||
|
||
return { | ||
"id": task_id, | ||
"status": state, | ||
"result": result, | ||
"task_type": task_type, | ||
"task_info": task_info | ||
} | ||
|
||
|
||
def get_tasks(session): | ||
""" | ||
Get initial task data for session. | ||
Args: | ||
session (SessionStore): The request session. | ||
Returns: | ||
dict: | ||
The initial task data stored in session for all user's tasks. The | ||
keys are task ids and the values are initial task data. | ||
""" | ||
try: | ||
return session[TASK_KEY] | ||
except KeyError: | ||
return {} | ||
|
||
|
||
def get_task(session, task_id): | ||
""" | ||
Get initial task data for a single task. | ||
Args: | ||
session (SessionStore): The request session. | ||
task_id (unicode): The task id. | ||
Returns: | ||
dict: The initial task data stored in session. | ||
""" | ||
try: | ||
return session[TASK_KEY][task_id] | ||
except KeyError: | ||
return None | ||
|
||
|
||
def create_task(session, user_id, task_type, task_info): | ||
""" | ||
Start a new Celery task. This will stop other tasks of the same type. | ||
Args: | ||
session (SessionStore): The request session. | ||
user_id (int): The id for user creating task. | ||
task_type (unicode): The type of task being started. | ||
task_info (dict): Extra information about the task. | ||
Returns: | ||
dict: The initial task data (will also be stored in session). | ||
""" | ||
|
||
if task_type == EXPORT_TASK_TYPE: | ||
try: | ||
repo_slug = task_info['repo_slug'] | ||
except KeyError: | ||
raise ValidationError("Missing repo_slug") | ||
|
||
# Verify repository ownership. | ||
get_repo(repo_slug, user_id) | ||
|
||
try: | ||
exports = set(session[EXPORTS_KEY][repo_slug]) | ||
except KeyError: | ||
exports = set() | ||
|
||
try: | ||
ids = task_info['ids'] | ||
except KeyError: | ||
raise ValidationError("Missing ids") | ||
|
||
for resource_id in ids: | ||
if resource_id not in exports: | ||
raise ValidationError("id {id} is not in export list".format( | ||
id=resource_id | ||
)) | ||
|
||
# Cancel any old tasks of same type and repo, remove from list. | ||
other_tasks = {} | ||
for task_id, data in session.get(TASK_KEY, {}).items(): | ||
if data['task_type'] == task_type and \ | ||
data['task_info']['repo_slug'] == repo_slug: | ||
AsyncResult(task_id).revoke() | ||
else: | ||
other_tasks[task_id] = data | ||
session[TASK_KEY] = other_tasks | ||
|
||
learning_resources = LearningResource.objects.filter(id__in=ids).all() | ||
user = User.objects.get(id=user_id) | ||
result = export_resources.delay(learning_resources, user.username) | ||
|
||
# Put new task in session. | ||
initial_data = create_initial_task_dict( | ||
result, task_type, task_info) | ||
session[TASK_KEY][result.id] = initial_data | ||
session.modified = True | ||
|
||
return initial_data | ||
else: | ||
raise ValidationError("Unknown task_type {task_type}".format( | ||
task_type=task_type | ||
)) |
Oops, something went wrong.