Skip to content

Commit 58e8fea

Browse files
thepaulStorj Robot
authored andcommitted
satellite/repair: decouple piece decoding from piece uploading
We have a situation in the repairer where slow uploads cause the error "internal: quiescence" to be returned from _all_ ongoing uploads, failing a repair entirely. The quiescence machinery is meant to catch inactive/stalled _downloads_, but our downloading has already finished by the time this happens. It appears that what is happening is this: the piece decoder (the part that recreates the original segment content) is connected directly to the piece encoder (the part that encodes new pieces). When the repairer is doing an upload, it is calling into both of those parts by way of io.Copy. The quiescence machinery is built into the decoder. When uploads are especially slow, the piece decoder can't read into the temporary-file piece contents very far because it can't get too far ahead (limited buffer size, backpressure, yada yada). This causes the decoder to complain that it hasn't read anything for 5 seconds, the threshold for the quiescence error. It looks like a bit of a complicated job to make an option that disables quiescence checking in uplink. Instead, we will decouple the decoder and the encoder in the repairer. Writing the whole segment contents to a local tempfile through the decoder should make sure that the decoder doesn't ever observe a 5 second stall (barring some crazy disk problem). Then the encoder and the uploader can continue from the point of the reconstructed segment, and it can take as long as it needs to take. Once it is possible to suppress or avoid the quiescence error in eestream.decodedReader, we can remove this tempfile step. Change-Id: I71c68b3460fc4129320364a0514f893e1c47876e
1 parent 29302bc commit 58e8fea

File tree

1 file changed

+33
-0
lines changed

1 file changed

+33
-0
lines changed

satellite/repair/repairer/segments.go

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import (
1212
"strconv"
1313
"time"
1414

15+
"github.com/calebcase/tmpfile"
1516
"github.com/zeebo/errs"
1617
"go.uber.org/zap"
1718
"golang.org/x/exp/maps"
@@ -568,6 +569,38 @@ func (repairer *SegmentRepairer) Repair(ctx context.Context, queueSegment queue.
568569
}
569570
defer func() { err = errs.Combine(err, segmentReader.Close()) }()
570571

572+
// Reconstruct the segment from the pieces. This should ideally happen in
573+
// tandem with the new piece uploads (have Repair() read directly from
574+
// segmentReader), but this causes a situation where slow uploads cause
575+
// reads from the piece files to block due to backpressure, but then the
576+
// slow reads are marked as inactive (a "internal: quiescence" error is
577+
// returned). This causes all uploads to fail. Instead, for now, we will
578+
// write the reconstructed segment to a tempfile and then upload the pieces
579+
// from that.
580+
//
581+
// Once it is possible to suppress or avoid the quiescence error in
582+
// eestream.decodedReader, we can remove this tempfile step.
583+
if !repairer.ec.inmemoryDownload {
584+
tempfile, err := tmpfile.New("", "repaired-segment-*")
585+
if err != nil {
586+
return true, repairReconstructError.New("could not open tempfile: %v", err)
587+
}
588+
_, err = io.Copy(tempfile, segmentReader)
589+
if err != nil {
590+
return true, repairReconstructError.New("could not reconstruct segment: %v", err)
591+
}
592+
_, err = tempfile.Seek(0, io.SeekStart)
593+
if err != nil {
594+
return true, repairReconstructError.New("could not seek to beginning of tempfile: %v", err)
595+
}
596+
err = segmentReader.Close()
597+
// assign to tempfile before returning, because we've already defer-closed segmentReader
598+
segmentReader = tempfile
599+
if err != nil {
600+
return true, repairReconstructError.New("could not close segmentReader: %v", err)
601+
}
602+
}
603+
571604
// only report audit result when segment can be successfully downloaded
572605
cachedNodesReputation := make(map[storj.NodeID]overlay.ReputationStatus, len(cachedNodesInfo))
573606
for id, info := range cachedNodesInfo {

0 commit comments

Comments
 (0)