Skip to content

Commit

Permalink
git.CatPipe: more resilience against weird errors.
Browse files Browse the repository at this point in the history
Notably, MemoryErrors thrown because the file we're trying to load into
memory is too big to load all at once.  Now the MemoryError gets thrown, but
the main program is potentially able to recover from it because CatPipe at
least doesn't get into an inconsistent state.

Also we can recover nicely if some lamer kills our git-cat-file subprocess.

The AutoFlushIter we were using for this purpose turns out to not have been
good enough, and it's never been used anywhere but in CatPipe, so I've
revised it further and renamed it to git.AbortableIter.
  • Loading branch information
apenwarr committed Apr 23, 2010
1 parent 46880ed commit 0ed56b2
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 31 deletions.
70 changes: 57 additions & 13 deletions lib/bup/git.py
Expand Up @@ -639,6 +639,35 @@ def _git_capture(argv):
return r


class AbortableIter:
def __init__(self, it, onabort = None):
self.it = it
self.onabort = onabort
self.done = None

def __iter__(self):
return self

def next(self):
try:
return self.it.next()
except StopIteration, e:
self.done = True
raise
except:
self.abort()
raise

def abort(self):
if not self.done:
self.done = True
if self.onabort:
self.onabort()

def __del__(self):
self.abort()


_ver_warned = 0
class CatPipe:
def __init__(self):
Expand All @@ -651,14 +680,28 @@ def __init__(self):
_ver_warned = 1
self.get = self._slow_get
else:
self.p = subprocess.Popen(['git', 'cat-file', '--batch'],
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
preexec_fn = _gitenv)
self.p = self.inprogress = None
self.get = self._fast_get
self.inprogress = None

def _abort(self):
if self.p:
self.p.stdout.close()
self.p.stdin.close()
self.p = None
self.inprogress = None

def _restart(self):
self._abort()
self.p = subprocess.Popen(['git', 'cat-file', '--batch'],
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
preexec_fn = _gitenv)

def _fast_get(self, id):
if not self.p or self.p.poll() != None:
self._restart()
assert(self.p)
assert(self.p.poll() == None)
if self.inprogress:
log('_fast_get: opening %r while %r is open'
% (id, self.inprogress))
Expand All @@ -676,16 +719,17 @@ def _fast_get(self, id):
raise GitError('expected blob, got %r' % spl)
(hex, type, size) = spl

def ondone():
it = AbortableIter(chunkyreader(self.p.stdout, int(spl[2])),
onabort = self._abort)
try:
yield type
for blob in it:
yield blob
assert(self.p.stdout.readline() == '\n')
self.inprogress = None

it = AutoFlushIter(chunkyreader(self.p.stdout, int(spl[2])),
ondone = ondone)
yield type
for blob in it:
yield blob
del it
except Exception, e:
it.abort()
raise

def _slow_get(self, id):
assert(id.find('\n') < 0)
Expand Down
18 changes: 0 additions & 18 deletions lib/bup/helpers.py
Expand Up @@ -200,24 +200,6 @@ def chunkyreader(f, count = None):
yield b


class AutoFlushIter:
def __init__(self, it, ondone = None):
self.it = it
self.ondone = ondone

def __iter__(self):
return self

def next(self):
return self.it.next()

def __del__(self):
for i in self.it:
pass
if self.ondone:
self.ondone()


def slashappend(s):
if s and not s.endswith('/'):
return s + '/'
Expand Down

0 comments on commit 0ed56b2

Please sign in to comment.