Skip to content

Commit

Permalink
SIO-1930 Implement PrioritizingScheduler
Browse files Browse the repository at this point in the history
Change-Id: I78b2d6ade32b450f486c59e9be231bb86e797219
  • Loading branch information
pjkozlowski committed Apr 24, 2017
1 parent 068db1b commit b3cc895
Show file tree
Hide file tree
Showing 10 changed files with 824 additions and 119 deletions.
1 change: 1 addition & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
'Twisted>=15.2.1',
'enum34', # backport from py3
'supervisor>=3.3.1',
'sortedcontainers',
],

setup_requires = [
Expand Down
26 changes: 19 additions & 7 deletions sio/sioworkersd/scheduler/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,23 @@ class Scheduler(object):
def __init__(self, manager):
self.manager = manager

def __unicode__(self):
"""Admin-friendly text representation of the queue.
Used for debugging and displaying in admin panel."""
raise NotImplementedError()

def updateContest(self, contest_uid, priority, weight):
"""Update contest prioriy and weight in scheduler memory."""
pass

def addWorker(self, worker_id):
"""Will be called when a new worker appears."""
pass

def delWorker(self, worker_id):
"""Will be called when a worker disappears."""
pass

def addTask(self, env):
"""Add a new task to queue."""
raise NotImplementedError()
Expand All @@ -12,16 +29,11 @@ def delTask(self, task_id):
"""Will be called when a task is completed or cancelled."""
raise NotImplementedError()

def __unicode__(self):
"""Admin-friendly text representation of the queue.
Used for debugging and displaying in admin panel."""
raise NotImplementedError()

def schedule(self):
"""Return a list of tasks to be executed now, as a list of pairs
(task_id, worker_id)."""
raise NotImplementedError()


def get_default_scheduler_class_name():
return 'sio.sioworkersd.scheduler.fifo.FIFOScheduler'
def getDefaultSchedulerClassName():
return 'sio.sioworkersd.scheduler.prioritizing.PrioritizingScheduler'
12 changes: 6 additions & 6 deletions sio/sioworkersd/scheduler/fifo.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ def __init__(self, wid, wdata):
self.can_run_cpu_exec = wdata.can_run_cpu_exec
self.is_running_cpu_exec = wdata.is_running_cpu_exec

def can_run(self, task):
def canRun(self, task):
if self.is_running_cpu_exec or (self.tasks_count >= self.concurrency):
return False
else:
Expand All @@ -28,7 +28,7 @@ def can_run(self, task):
return True

def assign(self, task):
assert self.can_run(task)
assert self.canRun(task)
if task.type == 'cpu':
self.is_running_cpu_exec = True
self.tasks_count += 1
Expand Down Expand Up @@ -71,7 +71,7 @@ def delTask(self, tid):
def __unicode__(self):
return unicode(self.queue)

def _schedule_queue_with(self, queue, workers):
def _scheduleQueueWith(self, queue, workers):
"""Schedule tasks from a queue using given workers.
"""
result = []
Expand All @@ -85,7 +85,7 @@ def _schedule_queue_with(self, queue, workers):
else:
workers_queue = workers['vcpu']
# Some workers may have changed, skip as many as needed.
while workers_queue and not workers_queue[-1].can_run(queue[-1]):
while workers_queue and not workers_queue[-1].canRun(queue[-1]):
workers_queue.pop()
if not workers_queue:
break
Expand Down Expand Up @@ -120,8 +120,8 @@ def schedule(self):
workers['cpu+vcpu'] = deque(sorted(workers['cpu+vcpu'],
key=lambda w: w.concurrency, reverse=True))

result = self._schedule_queue_with(self.queues['cpu+vcpu'], workers)
result = self._scheduleQueueWith(self.queues['cpu+vcpu'], workers)
while workers['vcpu'] and workers['vcpu'][0].can_run_cpu_exec:
workers['vcpu'].popleft()
result += self._schedule_queue_with(self.queues['vcpu'], workers)
result += self._scheduleQueueWith(self.queues['vcpu'], workers)
return result

0 comments on commit b3cc895

Please sign in to comment.