Skip to content

Commit

Permalink
Add QueueManager to client so queries/transfers can be multiplexed. Y…
Browse files Browse the repository at this point in the history
…eah!
  • Loading branch information
robgolding committed Mar 9, 2011
1 parent b8f4451 commit 2908f50
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 8 deletions.
14 changes: 11 additions & 3 deletions backtrac/client/client.py
Expand Up @@ -5,27 +5,35 @@
from twisted.spread import pb
from twisted.python import failure
from twisted.internet import defer, reactor
from twisted.internet.task import LoopingCall
from twisted.python.filepath import FilePath

from django.conf import settings

from backtrac.client import utils
from backtrac.client.broker import BackupBroker
from backtrac.client.job import BackupJob
from backtrac.client.queue import BackupQueue, TransferQueue
from backtrac.client.queue import BackupQueue, TransferQueue, QueueManager
from backtrac.client.platform import FileSystemMonitor
from backtrac.utils import makedirs
from backtrac.utils.transfer import PageCollector

from backtrac.apps.catalog.utils import normpath

BACKUP_QUEUE_SLOTS = 5
TRANSFER_QUEUE_SLOTS = 5

class ClientError(Exception): pass

class BackupClient(pb.Referenceable):
def __init__(self, broker):
self.broker = broker
self.backup_queue = BackupQueue(self)
self.transfer_queue = TransferQueue(self)
self.backup_queue = QueueManager(
[ BackupQueue(self) for i in range(BACKUP_QUEUE_SLOTS) ]
)
self.transfer_queue = QueueManager(
[ TransferQueue(self) for i in range(TRANSFER_QUEUE_SLOTS) ]
)
self.monitor = FileSystemMonitor(self.backup_queue)

@defer.inlineCallbacks
Expand Down
28 changes: 23 additions & 5 deletions backtrac/client/queue.py
Expand Up @@ -10,12 +10,14 @@ class ConsumerQueue(object):
def __init__(self, stop_on_error=False):
self.stop_on_error = stop_on_error
self.queue = DeferredQueue()
self.size = 0

def _consume_next(self, *args):
self.size -= 1
self.queue.get().addCallbacks(self._consumer, self._error)

def _consumer(self, obj):
r = self.consume(obj)
def _consumer(self, item):
r = self.consume(item)
if isinstance(r, Deferred):
r.addCallbacks(self._consume_next, self._consume_next)
else:
Expand All @@ -26,10 +28,11 @@ def _error(self, fail):
if not self.stop_on_error:
self._consume_next()

def add(self, filepath):
self.queue.put(filepath)
def add(self, item):
self.size += 1
self.queue.put(item)

def consume(self, obj):
def consume(self, item):
raise NotImplementedError

def fail(self, fail):
Expand Down Expand Up @@ -103,3 +106,18 @@ def _transfer(self, collector, filepath, d=None):
print '%s, %d bytes' % (filepath.path, filepath.getsize())
except (OSError, IOError):
pass

class QueueManager(object):
def __init__(self, queues):
self.queues = queues

def add(self, item):
queue = min(self.queues, key=lambda q:q.size)
queue.add(item)

def start(self):
for queue in self.queues:
queue.start()

def get_size(self):
return sum([ queue.size for queue in self.queues ])

0 comments on commit 2908f50

Please sign in to comment.