Skip to content

Commit

Permalink
comment, stream reset error
Browse files Browse the repository at this point in the history
  • Loading branch information
willscott committed May 2, 2020
1 parent b557ab2 commit a00fec9
Showing 1 changed file with 12 additions and 3 deletions.
15 changes: 12 additions & 3 deletions util.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,12 @@ func (s *segmentedBuffer) GrowTo(max uint64, force bool) (bool, uint32) {

currentWindow := atomic.LoadUint64(&s.len) + atomic.LoadUint64(&s.cap) + s.pending
if currentWindow > max {
// Not an error, but rather an in-flight reservation hasn't hit.
// somewhat counter-intuitively not an error.
// note that len+cap is the 'window' that shouldn't exceed max or a reservation
// would fail, triggering an error.
// We pre-count 'pending' data where we've read a header and are working on
// reading it into available data here, so that we don't undercount the remaining
// window size, but that can mean this sum ends up larger than max.
return false, 0
}
delta := max - currentWindow
Expand All @@ -88,7 +93,7 @@ func (s *segmentedBuffer) TryReserve(space uint32) bool {
if atomic.LoadUint64(&s.cap) < s.pending+uint64(space) {
return false
}
s.pending = s.pending + uint64(space)
s.pending += uint64(space)
return true
}

Expand Down Expand Up @@ -121,7 +126,11 @@ func (s *segmentedBuffer) Append(input io.Reader, length int) error {
switch err {
case nil:
case io.EOF:
err = nil
if n == length {
err = nil
} else {
err = ErrStreamReset
}
fallthrough
default:
s.bm.Lock()
Expand Down

0 comments on commit a00fec9

Please sign in to comment.