-
Notifications
You must be signed in to change notification settings - Fork 46
/
iox.go
81 lines (74 loc) · 2.11 KB
/
iox.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
package webconnectivitylte
//
// Extensions to incrementally stream-reading a response body.
//
import (
"context"
"errors"
"io"
"github.com/ooni/probe-cli/v3/internal/netxlite"
)
// StreamAllContext streams from the given reader [r] until
// interrupted by [ctx] or when [r] hits the EOF.
//
// This function runs a background goroutine that should exit as soon
// as [ctx] is done or when [reader] is closed, if applicable.
//
// This function transforms an errors.Is(err, io.EOF) to a nil error
// such as the standard library's ReadAll does.
//
// This function might return a non-zero-length buffer along with
// an non-nil error in the case in which we could only read a portion
// of the body and then we were interrupted by the error.
func StreamAllContext(ctx context.Context, reader io.Reader) ([]byte, error) {
// TODO(bassosimone): consider merging into the ./internal/netxlite/iox.go file
// once this code has been used in testing for quite some time
datach, errch := make(chan []byte), make(chan error)
ctx, cancel := context.WithCancel(ctx)
defer cancel()
go func() {
for {
// Implementation note: the buffer MUST be created at each
// loop, otherwise we're data-racing with the reader.
buffer := make([]byte, 1<<13)
count, err := reader.Read(buffer)
if count > 0 {
data := buffer[:count]
select {
case datach <- data:
// fallthrough to check error
case <-ctx.Done():
return
}
}
if err != nil {
select {
case errch <- err:
return
case <-ctx.Done():
return
}
}
}
}()
resultbuf := make([]byte, 0, 1<<17)
for {
select {
case data := <-datach:
// TODO(bassosimone): is there a more efficient way?
resultbuf = append(resultbuf, data...)
case err := <-errch:
if errors.Is(err, io.EOF) {
// see https://github.com/ooni/probe/issues/1965
return resultbuf, nil
}
return resultbuf, netxlite.NewTopLevelGenericErrWrapper(err)
case <-ctx.Done():
err := ctx.Err()
if errors.Is(err, context.DeadlineExceeded) {
return resultbuf, nil
}
return resultbuf, netxlite.NewTopLevelGenericErrWrapper(err)
}
}
}