diff --git a/newsfragments/4078.bugfix b/newsfragments/4078.bugfix new file mode 100644 index 0000000000..12ca66bcfb --- /dev/null +++ b/newsfragments/4078.bugfix @@ -0,0 +1 @@ +Fix a race condition with SegmentFetcher \ No newline at end of file diff --git a/src/allmydata/immutable/downloader/fetcher.py b/src/allmydata/immutable/downloader/fetcher.py index 4e8b7f9266..130bb93dc2 100644 --- a/src/allmydata/immutable/downloader/fetcher.py +++ b/src/allmydata/immutable/downloader/fetcher.py @@ -63,13 +63,14 @@ def __init__(self, node, segnum, k, logparent): self._running = True def stop(self): - log.msg("SegmentFetcher(%r).stop" % self._node._si_prefix, - level=log.NOISY, parent=self._lp, umid="LWyqpg") - self._cancel_all_requests() - self._running = False - # help GC ??? XXX - del self._shares, self._shares_from_server, self._active_share_map - del self._share_observers + if self._running: + log.msg("SegmentFetcher(%r).stop" % self._node._si_prefix, + level=log.NOISY, parent=self._lp, umid="LWyqpg") + self._cancel_all_requests() + self._running = False + # help GC ??? + del self._shares, self._shares_from_server, self._active_share_map + del self._share_observers # called by our parent _Node diff --git a/src/allmydata/immutable/downloader/node.py b/src/allmydata/immutable/downloader/node.py index a1ef4b4853..102d29a0a2 100644 --- a/src/allmydata/immutable/downloader/node.py +++ b/src/allmydata/immutable/downloader/node.py @@ -132,8 +132,8 @@ def __repr__(self): def stop(self): # called by the Terminator at shutdown, mostly for tests if self._active_segment: - self._active_segment.stop() - self._active_segment = None + seg, self._active_segment = self._active_segment, None + seg.stop() self._sharefinder.stop() # things called by outside callers, via CiphertextFileNode. get_segment() @@ -410,11 +410,11 @@ def want_more_shares(self): def fetch_failed(self, sf, f): assert sf is self._active_segment + self._active_segment = None # deliver error upwards for (d,c,seg_ev) in self._extract_requests(sf.segnum): seg_ev.error(now()) eventually(self._deliver, d, c, f) - self._active_segment = None self._start_new_segment() def process_blocks(self, segnum, blocks): @@ -434,6 +434,7 @@ def _deliver(result): eventually(self._deliver, d, c, result) else: (offset, segment, decodetime) = result + self._active_segment = None for (d,c,seg_ev) in self._extract_requests(segnum): # when we have two requests for the same segment, the # second one will not be "activated" before the data is @@ -446,7 +447,6 @@ def _deliver(result): seg_ev.deliver(when, offset, len(segment), decodetime) eventually(self._deliver, d, c, result) self._download_status.add_misc_event("process_block", start, now()) - self._active_segment = None self._start_new_segment() d.addBoth(_deliver) d.addErrback(log.err, "unhandled error during process_blocks", @@ -533,11 +533,12 @@ def _cancel_request(self, cancel): self._segment_requests = [t for t in self._segment_requests if t[2] != cancel] segnums = [segnum for (segnum,d,c,seg_ev,lp) in self._segment_requests] + # self._active_segment might be None in rare circumstances, so make # sure we tolerate it if self._active_segment and self._active_segment.segnum not in segnums: - self._active_segment.stop() - self._active_segment = None + seg, self._active_segment = self._active_segment, None + seg.stop() self._start_new_segment() # called by ShareFinder to choose hashtree sizes in CommonShares, and by diff --git a/src/allmydata/test/cli/test_run.py b/src/allmydata/test/cli/test_run.py index ae0f92131c..c7ebcd8c0d 100644 --- a/src/allmydata/test/cli/test_run.py +++ b/src/allmydata/test/cli/test_run.py @@ -264,7 +264,7 @@ def test_non_numeric_pid(self): self.assertThat(runs, Equals([])) self.assertThat(result_code, Equals(1)) - good_file_content_re = re.compile(r"\s[0-9]*\s[0-9]*\s", re.M) + good_file_content_re = re.compile(r"\s*[0-9]*\s[0-9]*\s*", re.M) @given(text()) def test_pidfile_contents(self, content):