Skip to content

Commit

Permalink
Merge 64cc950 into 2c315db
Browse files Browse the repository at this point in the history
  • Loading branch information
jtolio committed Sep 6, 2018
2 parents 2c315db + 64cc950 commit bfe8e24
Showing 1 changed file with 33 additions and 7 deletions.
40 changes: 33 additions & 7 deletions pkg/eestream/stripe.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,10 @@
package eestream

import (
"fmt"
"io"
"sort"
"strings"
"sync"

"github.com/vivint/infectious"
Expand All @@ -17,7 +20,7 @@ type StripeReader struct {
bufs map[int]*PieceBuffer
inbufs [][]byte
inmap map[int][]byte
errmap map[int]bool
errmap map[int]error
}

// NewStripeReader creates a new StripeReader from the given readers, erasure
Expand All @@ -35,7 +38,7 @@ func NewStripeReader(rs map[int]io.ReadCloser, es ErasureScheme, mbm int) *Strip
bufs: make(map[int]*PieceBuffer, es.TotalCount()),
inbufs: make([][]byte, es.TotalCount()),
inmap: make(map[int][]byte, es.TotalCount()),
errmap: make(map[int]bool, es.TotalCount()),
errmap: make(map[int]error, es.TotalCount()),
}

for i := 0; i < es.TotalCount(); i++ {
Expand Down Expand Up @@ -86,7 +89,7 @@ func (r *StripeReader) ReadStripe(num int64, p []byte) ([]byte, error) {
r.cond.L.Lock()
defer r.cond.L.Unlock()

for {
for r.pendingReaders() {
for r.readAvailableShares(num) == 0 {
r.cond.Wait()
}
Expand All @@ -101,20 +104,22 @@ func (r *StripeReader) ReadStripe(num int64, p []byte) ([]byte, error) {
return out, nil
}
}
// could not read enough shares to attempt a decode
return nil, r.combineErrs()
}

// readAvailableShares reads the available num-th erasure shares from the piece
// buffers without blocking. The return value n is the number of erasure shares
// read.
func (r *StripeReader) readAvailableShares(num int64) (n int) {
for i := 0; i < len(r.bufs); i++ {
if r.inmap[i] != nil || r.errmap[i] {
if r.inmap[i] != nil || r.errmap[i] != nil {
continue
}
if r.bufs[i].HasShare(num) {
err := r.bufs[i].ReadShare(num, r.inbufs[i])
if err != nil {
r.errmap[i] = true
r.errmap[i] = err
} else {
r.inmap[i] = r.inbufs[i]
}
Expand All @@ -124,11 +129,16 @@ func (r *StripeReader) readAvailableShares(num int64) (n int) {
return n
}

// pendingReaders checks if there are any pending readers to get a share from.
func (r *StripeReader) pendingReaders() bool {
return len(r.inmap)+len(r.errmap) < r.scheme.TotalCount()
}

// hasEnoughShares check if there are enough erasure shares read to attempt
// a decode.
func (r *StripeReader) hasEnoughShares() bool {
return len(r.inmap) >= r.scheme.RequiredCount()+1 ||
len(r.inmap)+len(r.errmap) >= r.scheme.TotalCount()
(len(r.inmap) >= r.scheme.RequiredCount() && !r.pendingReaders())
}

// shouldWaitForMore checks the returned decode error if it makes sense to wait
Expand All @@ -140,5 +150,21 @@ func (r *StripeReader) shouldWaitForMore(err error) bool {
return false
}
// check if there are more input buffers to wait for
return len(r.inmap)+len(r.errmap) < r.scheme.TotalCount()
return r.pendingReaders()
}

// combineErrs makes a useful error message from the errors in errmap.
// combineErrs always returns an error.
func (r *StripeReader) combineErrs() error {
if len(r.errmap) == 0 {
return Error.New("programmer error: no errors to combine")
}
errstrings := make([]string, 0, len(r.errmap))
for i, err := range r.errmap {
errstrings = append(errstrings,
fmt.Sprintf("\nerror retrieving piece %02d: %v", i, err))
}
sort.Strings(errstrings)
return Error.New("failed to download stripe: %s",
strings.Join(errstrings, ""))
}

0 comments on commit bfe8e24

Please sign in to comment.