diff --git a/backtrac/client/client.py b/backtrac/client/client.py index dd84ddd..31057f5 100644 --- a/backtrac/client/client.py +++ b/backtrac/client/client.py @@ -5,6 +5,7 @@ from twisted.spread import pb from twisted.python import failure from twisted.internet import defer, reactor +from twisted.internet.task import LoopingCall from twisted.python.filepath import FilePath from django.conf import settings @@ -12,20 +13,27 @@ from backtrac.client import utils from backtrac.client.broker import BackupBroker from backtrac.client.job import BackupJob -from backtrac.client.queue import BackupQueue, TransferQueue +from backtrac.client.queue import BackupQueue, TransferQueue, QueueManager from backtrac.client.platform import FileSystemMonitor from backtrac.utils import makedirs from backtrac.utils.transfer import PageCollector from backtrac.apps.catalog.utils import normpath +BACKUP_QUEUE_SLOTS = 5 +TRANSFER_QUEUE_SLOTS = 5 + class ClientError(Exception): pass class BackupClient(pb.Referenceable): def __init__(self, broker): self.broker = broker - self.backup_queue = BackupQueue(self) - self.transfer_queue = TransferQueue(self) + self.backup_queue = QueueManager( + [ BackupQueue(self) for i in range(BACKUP_QUEUE_SLOTS) ] + ) + self.transfer_queue = QueueManager( + [ TransferQueue(self) for i in range(TRANSFER_QUEUE_SLOTS) ] + ) self.monitor = FileSystemMonitor(self.backup_queue) @defer.inlineCallbacks diff --git a/backtrac/client/queue.py b/backtrac/client/queue.py index c559aa8..836a102 100644 --- a/backtrac/client/queue.py +++ b/backtrac/client/queue.py @@ -10,12 +10,14 @@ class ConsumerQueue(object): def __init__(self, stop_on_error=False): self.stop_on_error = stop_on_error self.queue = DeferredQueue() + self.size = 0 def _consume_next(self, *args): + self.size -= 1 self.queue.get().addCallbacks(self._consumer, self._error) - def _consumer(self, obj): - r = self.consume(obj) + def _consumer(self, item): + r = self.consume(item) if isinstance(r, Deferred): r.addCallbacks(self._consume_next, self._consume_next) else: @@ -26,10 +28,11 @@ def _error(self, fail): if not self.stop_on_error: self._consume_next() - def add(self, filepath): - self.queue.put(filepath) + def add(self, item): + self.size += 1 + self.queue.put(item) - def consume(self, obj): + def consume(self, item): raise NotImplementedError def fail(self, fail): @@ -103,3 +106,18 @@ def _transfer(self, collector, filepath, d=None): print '%s, %d bytes' % (filepath.path, filepath.getsize()) except (OSError, IOError): pass + +class QueueManager(object): + def __init__(self, queues): + self.queues = queues + + def add(self, item): + queue = min(self.queues, key=lambda q:q.size) + queue.add(item) + + def start(self): + for queue in self.queues: + queue.start() + + def get_size(self): + return sum([ queue.size for queue in self.queues ])