Skip to content

Commit

Permalink
Added some comments for debugging purposes
Browse files Browse the repository at this point in the history
  • Loading branch information
lrgll committed Nov 11, 2019
1 parent 963ab7a commit b99bb33
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 8 deletions.
46 changes: 41 additions & 5 deletions weblablib/backends/redis_manager.py
Expand Up @@ -15,15 +15,43 @@
from weblablib.utils import create_token, _current_timestamp
from weblablib.users import AnonymousUser, CurrentUser, ExpiredUser


class RedisManager(object):
"""
To manage users, sessions and tasks.
USER-RELATED STRUCTURES:
- <prefix>:weblab:sessions:<session_id> : These keys contain the session ids, storing their creation time. They are
set to expire when created, so they might need to be refreshed. These keys are used to check if a session has been
deleted, or to request that a session be deleted.
- <prefix>:weblab:active:<session_id> : These are the actual hashsets with the field values for the users. They are
set to expire too, so they might need to be refreshed as well.
TASK-RELATED STRUCTURES:
- <prefix>:weblab:tasks:<task_id> : Hashset that stores the actual task info.
- ...
"""

def __init__(self, redis_url, key_base, task_expires, weblab):
self.client = redis.StrictRedis.from_url(redis_url, decode_responses=True)
self.weblab = weblab
self.key_base = key_base
self.key_base = key_base # Redis base prefix to use. It is *not* user or session specific.

# ~lrg: What is the lifecycle for the tasks? How is the expires() scheme meant to work?
# ~lrg: The expire time seems to depend on the config, and seems to be 3600 by default.
self.task_expires = task_expires

def add_user(self, session_id, user, expiration):
"""
Adds a new user.
This will:
- Store all user fields into a <prefix>:weblab:active:<sessionid> hashset.
- Schedule this hashset to expire in a while.
- Store the sessionid with the current time in the key <prefix>:weblab:sessions:<sessionid>
- Schedule this last key to expire in a while.
"""
key = '{}:weblab:active:{}'.format(self.key_base, session_id)

pipeline = self.client.pipeline()
Expand Down Expand Up @@ -64,10 +92,10 @@ def update_data(self, session_id, data):
pipeline.hset(key_inactive, 'data', json.dumps(data))
max_date_active, max_date_inactive, _, _ = pipeline.execute()

if max_date_active is None: # Object had been removed
if max_date_active is None: # Object had been removed
self.client.delete(key_active)

if max_date_inactive is None: # Object had been removed
if max_date_inactive is None: # Object had been removed
self.client.delete(key_inactive)

def get_user(self, session_id):
Expand Down Expand Up @@ -261,16 +289,22 @@ def new_task(self, session_id, name, args, kwargs):
pipeline.expire('{}:weblab:task_ids:{}'.format(self.key_base, task_id), self.task_expires)
results = pipeline.execute()

# ~lrg: I am not sure that this actually works as expected, without previously setting the nx flag
# on the set() call.
if results[0]:
# Ensure it's unique
break

# Otherwise try with another
task_id = create_token()

# Register the new task atomically.
pipeline = self.client.pipeline()
# Add the taskid into a set where we will store all ids.
pipeline.sadd('{}:weblab:{}:tasks'.format(self.key_base, session_id), task_id)
# Expire the whole set with task ids in a while. (~lrg: Is that really what we want to do?).
pipeline.expire('{}:weblab:{}:tasks'.format(self.key_base, session_id), self.task_expires)
# Register the actual values for the task within a hashset with a task-specific key.
pipeline.hset('{}:weblab:tasks:{}'.format(self.key_base, task_id), 'name', name)
pipeline.hset('{}:weblab:tasks:{}'.format(self.key_base, task_id), 'session_id', session_id)
pipeline.hset('{}:weblab:tasks:{}'.format(self.key_base, task_id), 'args', json.dumps(args))
Expand All @@ -292,7 +326,7 @@ def lock_global_unique_task(self, task_name):
key = '{}:weblab:global-unique-tasks:{}'.format(self.key_base, task_name)
pipeline = self.client.pipeline()
pipeline.hset(key, 'running', 1)
pipeline.expire(key, 7200) # 2-hour task lock is way too long in the context of remote labs
pipeline.expire(key, 7200) # 2-hour task lock is way too long in the context of remote labs
established, _ = pipeline.execute()
return established == 1

Expand Down Expand Up @@ -384,6 +418,8 @@ def update_task_data(self, task_id, new_data):
pipeline.hget(key, 'name')
pipeline.hset(key, 'data', json.dumps(new_data))
name, _ = pipeline.execute()
# ~lrg: This will delete the whole hashset if the 'name' field is not present, but not sure how
# that would happen.
if name is None:
# Deleted in the meanwhile
self.client.delete(key)
Expand Down Expand Up @@ -452,7 +488,7 @@ def get_unfinished_tasks(self, session_id):

pending_task_ids = []
for task_id, finished in zip(task_ids, pipeline.execute()):
if finished == 'false': # If finished or failed: true; if expired: None
if finished == 'false': # If finished or failed: true; if expired: None
pending_task_ids.append(task_id)

return pending_task_ids
Expand Down
10 changes: 7 additions & 3 deletions weblablib/tasks.py
Expand Up @@ -20,6 +20,7 @@

from weblablib.exc import AlreadyRunningError, TimeoutError


class _TaskWrapper(object):
def __init__(self, weblab, func, unique):
self._func = func
Expand All @@ -43,7 +44,7 @@ def unique(self):

def __call__(self, *args, **kwargs):
"""Runs the function in the same way, directly, without catching errors"""
session_id = None # only used if unique='user'
session_id = None # only used if unique='user'
if self._unique:
if self._unique == 'global':
locked = self._backend.lock_global_unique_task(self._name)
Expand All @@ -64,8 +65,10 @@ def __call__(self, *args, **kwargs):
self._backend.unlock_user_unique_task(self._name, session_id)

def delay(self, *args, **kwargs):
"""Starts the function in a thread or in another process.
It returns a WebLabTask object"""
"""
Starts the function in a thread or in another process.
It returns a WebLabTask object
"""
session_id = _current_session_id()
task_id = self._backend.new_task(session_id, self._name, args, kwargs)
return WebLabTask(self._weblab, task_id)
Expand Down Expand Up @@ -100,6 +103,7 @@ def my_func(a, b):
task_object.join(timeout=timeout, error_on_timeout=False)
return task_object


class WebLabTask(object):
"""
WebLab-Task. You can create it by defining a task as in::
Expand Down

0 comments on commit b99bb33

Please sign in to comment.