forked from VKCOM/kittenhouse
-
Notifications
You must be signed in to change notification settings - Fork 0
/
chunks.go
127 lines (99 loc) · 2.21 KB
/
chunks.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
package clickhouse
import (
"errors"
"io"
"log"
"sync"
"sync/atomic"
"time"
)
type (
readResult struct {
result []byte
err error
}
pendingRequest struct {
id int32
// request params
cancelFunc func() // must be called to release context resources
bodyReader io.ReadCloser // must be closed after request is done
// channel where results are streamed to
// when channel is closed it means that all results are fully read
resCh chan *readResult
// internal state
buf []byte
}
)
var (
bigRequestID int32
pendingRequests = struct {
sync.Mutex
m map[int32]*pendingRequest
}{
m: make(map[int32]*pendingRequest),
}
errNoSuchQuery = errors.New("No such query pending (perhaps timed out)")
)
func addPendingRequest(cancelFunc func(), bodyReader io.ReadCloser, buf []byte) int32 {
reqID := atomic.AddInt32(&bigRequestID, 1)
pendingRequests.Lock()
defer pendingRequests.Unlock()
req := &pendingRequest{
id: reqID,
cancelFunc: cancelFunc,
bodyReader: bodyReader,
buf: buf,
resCh: make(chan *readResult, 1),
}
pendingRequests.m[reqID] = req
go req.loop()
return reqID
}
func getNextChunk(id int32) (buf []byte, eof bool, err error) {
pendingRequests.Lock()
req, ok := pendingRequests.m[id]
pendingRequests.Unlock()
if !ok {
return nil, false, errNoSuchQuery
}
res, ok := <-req.resCh
if !ok {
return nil, true, nil
}
return res.result, false, res.err
}
func (r *pendingRequest) loop() {
const readTimeout = time.Second * 10
defer func() {
pendingRequests.Lock()
delete(pendingRequests.m, r.id)
pendingRequests.Unlock()
r.cancelFunc()
r.bodyReader.Close()
}()
r.resCh <- &readResult{result: r.buf}
r.buf = nil
for {
buf := make([]byte, maxResultSize)
n, err := io.ReadFull(r.bodyReader, buf)
res := &readResult{result: buf[0:n], err: err}
if err == io.EOF {
break
} else if err == io.ErrUnexpectedEOF {
res.err = nil
}
select {
case <-time.After(readTimeout):
log.Printf("Request %d timed out while waiting for readers", r.id)
close(r.resCh)
return
case r.resCh <- res:
}
if err != nil {
break
}
}
close(r.resCh)
// let readers fetch all the results
time.Sleep(readTimeout)
}