Skip to content

Commit

Permalink
Merge d70fa46 into 5c6d65b
Browse files Browse the repository at this point in the history
  • Loading branch information
meejah committed Dec 21, 2023
2 parents 5c6d65b + d70fa46 commit aba83ca
Show file tree
Hide file tree
Showing 3 changed files with 16 additions and 13 deletions.
1 change: 1 addition & 0 deletions newsfragments/4078.bugfix
@@ -0,0 +1 @@
fix race condition
15 changes: 8 additions & 7 deletions src/allmydata/immutable/downloader/fetcher.py
Expand Up @@ -63,13 +63,14 @@ def __init__(self, node, segnum, k, logparent):
self._running = True

def stop(self):
log.msg("SegmentFetcher(%r).stop" % self._node._si_prefix,
level=log.NOISY, parent=self._lp, umid="LWyqpg")
self._cancel_all_requests()
self._running = False
# help GC ??? XXX
del self._shares, self._shares_from_server, self._active_share_map
del self._share_observers
if self._running:
log.msg("SegmentFetcher(%r).stop" % self._node._si_prefix,
level=log.NOISY, parent=self._lp, umid="LWyqpg")
self._cancel_all_requests()
self._running = False
# help GC ???
del self._shares, self._shares_from_server, self._active_share_map
del self._share_observers


# called by our parent _Node
Expand Down
13 changes: 7 additions & 6 deletions src/allmydata/immutable/downloader/node.py
Expand Up @@ -132,8 +132,8 @@ def __repr__(self):
def stop(self):
# called by the Terminator at shutdown, mostly for tests
if self._active_segment:
self._active_segment.stop()
self._active_segment = None
seg, self._active_segment = self._active_segment, None
seg.stop()
self._sharefinder.stop()

# things called by outside callers, via CiphertextFileNode. get_segment()
Expand Down Expand Up @@ -410,11 +410,11 @@ def want_more_shares(self):

def fetch_failed(self, sf, f):
assert sf is self._active_segment
self._active_segment = None
# deliver error upwards
for (d,c,seg_ev) in self._extract_requests(sf.segnum):
seg_ev.error(now())
eventually(self._deliver, d, c, f)
self._active_segment = None
self._start_new_segment()

def process_blocks(self, segnum, blocks):
Expand All @@ -434,6 +434,7 @@ def _deliver(result):
eventually(self._deliver, d, c, result)
else:
(offset, segment, decodetime) = result
self._active_segment = None
for (d,c,seg_ev) in self._extract_requests(segnum):
# when we have two requests for the same segment, the
# second one will not be "activated" before the data is
Expand All @@ -446,7 +447,6 @@ def _deliver(result):
seg_ev.deliver(when, offset, len(segment), decodetime)
eventually(self._deliver, d, c, result)
self._download_status.add_misc_event("process_block", start, now())
self._active_segment = None
self._start_new_segment()
d.addBoth(_deliver)
d.addErrback(log.err, "unhandled error during process_blocks",
Expand Down Expand Up @@ -533,11 +533,12 @@ def _cancel_request(self, cancel):
self._segment_requests = [t for t in self._segment_requests
if t[2] != cancel]
segnums = [segnum for (segnum,d,c,seg_ev,lp) in self._segment_requests]

# self._active_segment might be None in rare circumstances, so make
# sure we tolerate it
if self._active_segment and self._active_segment.segnum not in segnums:
self._active_segment.stop()
self._active_segment = None
seg, self._active_segment = self._active_segment, None
seg.stop()
self._start_new_segment()

# called by ShareFinder to choose hashtree sizes in CommonShares, and by
Expand Down

0 comments on commit aba83ca

Please sign in to comment.