This repository has been archived by the owner on Feb 27, 2020. It is now read-only.
/
fetchasstream.go
107 lines (87 loc) · 2.57 KB
/
fetchasstream.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
package fetcher
import (
"context"
"errors"
"io"
"sync"
)
// ErrStreamReset returned from io.Reader when the fetch process is reset
var ErrStreamReset = errors.New("stream was reset by fetcher")
// A StreamHandler is a function that handles a stream, the stream maybe a
// aborted in which case io.Reader will return ErrStreamReset and the Context
// will be canceled.
type StreamHandler func(context.Context, io.Reader) error
// FetchAsStream gets a reference as a stream.
//
// Notice that target StreamHandler may be invoked multiple times, if the
// connection breaks while fetching it might be retried. In which case the
// Context passed to the target StreamHandler will be canceled.
func FetchAsStream(context Context, reference Reference, target StreamHandler) error {
s := &streamReseter{handler: target}
s.Reset() // initialize
// fetch and close after fetching
ferr := reference.Fetch(context, s) // Notice that Fetch() may invoke s.Reset()
cerr := s.CloseWithError(ferr)
if ferr != nil {
return ferr
}
return cerr
}
type streamReseter struct {
m sync.Mutex
handler StreamHandler
reader *io.PipeReader
writer *io.PipeWriter
ctx context.Context
cancel func()
wg sync.WaitGroup
err error
}
func (s *streamReseter) Write(p []byte) (n int, err error) {
s.m.Lock()
defer s.m.Unlock()
return s.writer.Write(p)
}
func (s *streamReseter) Reset() error {
s.m.Lock()
defer s.m.Unlock()
// Discard current state if any
if s.cancel != nil {
s.cancel()
s.writer.CloseWithError(ErrStreamReset)
}
// Wait for handler call to be done, so we don't call it concurrently
// Even if calling concurrently wouldn't be a problem here, doing it is a
// tiny optimization that only affects cases where we a broken connection
// and Fetch() needs to reset the download process. And doing it would make
// a lot harder to write a property StreamHandler as it would likely need to
// deal with this concurrency.
s.wg.Wait()
ctx, cancel := context.WithCancel(context.Background())
reader, writer := io.Pipe()
s.reader = reader
s.writer = writer
s.ctx = ctx
s.cancel = cancel
s.err = nil
s.wg.Add(1)
go func() {
defer s.wg.Done()
defer cancel()
err := s.handler(ctx, reader)
reader.CloseWithError(err)
if ctx.Err() == nil {
s.err = err
}
}()
return nil
}
func (s *streamReseter) CloseWithError(err error) error {
s.m.Lock()
defer s.m.Unlock()
// Ensure that the writer pipe is closed
s.writer.CloseWithError(err) // ignore error, we don't care if it's closed twice
// Wait for the handler call to be finished
s.wg.Wait()
return s.err
}