-
Notifications
You must be signed in to change notification settings - Fork 48
/
stream_reader.go
108 lines (93 loc) · 2.35 KB
/
stream_reader.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
package io
import (
"bytes"
"context"
"encoding/base64"
"io"
"strings"
"time"
)
// StreamReader represents a stream reader.
type StreamReader struct {
next func(pos, size int) (*ReadReply, error)
close func() error
r io.Reader
pos int
eof bool
}
var _ io.ReadCloser = (*StreamReader)(nil)
// NewStreamReader returns a reader for io.Streams that implements io.Reader
// from the standard library.
func NewStreamReader(ctx context.Context, ioClient interface {
Read(context.Context, *ReadArgs) (*ReadReply, error)
Close(context.Context, *CloseArgs) error
}, handle StreamHandle,
) *StreamReader {
args := NewReadArgs(handle)
return &StreamReader{
next: func(pos, size int) (*ReadReply, error) {
args.SetOffset(pos).SetSize(size)
return ioClient.Read(ctx, args)
},
close: func() error {
// TODO(mafredri): We should ideally allow the user to define a timeout here.
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
return ioClient.Close(ctx, NewCloseArgs(handle))
},
}
}
func (r *StreamReader) read(p []byte) (n int, err error) {
n, err = r.r.Read(p)
r.pos += n
if !r.eof && err == io.EOF {
r.r = nil
err = nil
}
return n, err
}
// Read a chunk of the stream.
func (r *StreamReader) Read(p []byte) (n int, err error) {
if r.r != nil {
// Continue reading from buffer.
return r.read(p)
}
if r.eof {
return 0, io.EOF
}
if len(p) == 0 {
return 0, nil
}
// Chrome might have an off-by-one when deciding the maximum
// size (at least for base64 encoded data), usually it will
// overflow. We subtract one to make sure it fits into p.
size := len(p) - 1
if size < 1 {
// Safety-check to avoid crashing Chrome (e.g. via SetSize(-1)).
size = 1
}
reply, err := r.next(r.pos, size)
if err != nil {
return 0, err
}
r.eof = reply.EOF
switch {
case reply.Base64Encoded != nil && *reply.Base64Encoded:
b := []byte(reply.Data)
size := base64.StdEncoding.DecodedLen(len(b))
// Safety-check for fast-path to avoid panics.
if len(p) >= size {
n, err = base64.StdEncoding.Decode(p, b)
r.pos += n
return n, err
}
r.r = base64.NewDecoder(base64.StdEncoding, bytes.NewReader(b))
default:
r.r = strings.NewReader(reply.Data)
}
return r.read(p)
}
// Close the stream, discard any temporary backing storage.
func (r *StreamReader) Close() error {
return r.close()
}