Skip to content

Commit

Permalink
Fix queue sticking problem when an error occurs on the client
Browse files Browse the repository at this point in the history
  • Loading branch information
robgolding committed Mar 29, 2011
1 parent c00f965 commit 89f9ce3
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 9 deletions.
19 changes: 12 additions & 7 deletions backtrac/client/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,17 +58,22 @@ def get_index(self, path):
for f in files:
path = os.path.join(root, f)
filepath = FilePath(path)
index[path] = {
'mtime': filepath.getModificationTime(),
'size': filepath.getsize(),
}
try:
index[path] = {
'mtime': filepath.getModificationTime(),
'size': filepath.getsize(),
}
except OSError:
# file could be a broken symlink, or deleted mid-scan
continue
return index

@defer.inlineCallbacks
def check_index(self, path, index):
backup = yield self.broker.check_index(path, index)
for path in backup:
self.backup_queue.add(BackupJob(path))
#self.backup_queue.add(BackupJob(path))
self.transfer_queue.add(FilePath(path))

def remote_put_file(self, path):
self.monitor.add_exclusion(path)
Expand All @@ -83,8 +88,6 @@ def remote_put_file(self, path):
@defer.inlineCallbacks
def start(self):
broker = yield self.broker.connect(client=self)
self.backup_queue.start()
self.transfer_queue.start()
paths = yield self.broker.get_paths()
for path in paths:
path = normpath(path)
Expand All @@ -93,6 +96,8 @@ def start(self):
self.check_present_state(path)
self.check_index(path, index)
#self.walk_path(path)
self.backup_queue.start()
self.transfer_queue.start()

def get_server_status():
broker = BackupBroker(server='localhost', secret_key=settings.SECRET_KEY,
Expand Down
14 changes: 12 additions & 2 deletions backtrac/client/queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,9 @@ def consume_create(self, filepath):
return self.client.broker.create_item(filepath.path, 'd')

def consume_update(self, filepath):
if filepath.isdir():
# we can't backup a directory
return
try:
attrs = {
'mtime': filepath.getModificationTime(),
Expand Down Expand Up @@ -85,10 +88,14 @@ def _check_result(self, backup_required, path):

class TransferQueue(BackupQueue):
def consume(self, filepath):
if filepath.isdir():
# we can't transfer a directory
return
try:
mtime = filepath.getModificationTime()
size = filepath.getsize()
except (OSError, IOError):
# returning will allow the queue to move on
return
d = Deferred()
self.client.broker.put_file(filepath.path, mtime, size).addCallback(
Expand All @@ -106,8 +113,11 @@ def _transfer(self, collector, filepath, d=None):
if d is not None:
pager.wait().chainDeferred(d)
print '%s, %d bytes' % (filepath.path, filepath.getsize())
except (OSError, IOError):
pass
except (OSError, IOError) as e:
print "Error:", e
if d is not None:
# make sure we callback, otherwise the queue will stick
d.errback(e)

class QueueManager(object):
def __init__(self, queues):
Expand Down

0 comments on commit 89f9ce3

Please sign in to comment.