-
-
Notifications
You must be signed in to change notification settings - Fork 0
/
bufferedcopier.go
138 lines (130 loc) · 3.5 KB
/
bufferedcopier.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
package interruptible_websocket_proxy
import (
"fmt"
"io"
"log"
"time"
)
type writeErr struct {
error
CopyDirection
}
type readErr struct {
error
CopyDirection
}
type CopyDirection int
const (
CopyToBackend CopyDirection = iota + 1
CopyFromBacked
)
// copyBuffer is the actual implementation of Copy and CopyBuffer.
// if buf is nil, one is allocated.
// TODO: Should mutex writes to backend error
func (pep *PersistentPipe) copyBuffer(cd CopyDirection, errChan chan error) {
var src func() io.Reader
var dst func() io.Writer
var written int64
var err error
if cd == CopyToBackend {
src = func() io.Reader { return pep.ClientConn }
dst = func() io.Writer {
return pep.BackendConn
}
} else {
src = func() io.Reader {
return pep.BackendConn
}
dst = func() io.Writer {
return pep.ClientConn
}
}
size := 32 * 1024
if l, ok := src().(*io.LimitedReader); ok && int64(size) > l.N {
if l.N < 1 {
size = 1
} else {
size = int(l.N)
}
}
buf := make([]byte, size)
for {
if cd == CopyFromBacked && pep.BackendErr != nil {
if len(pep.backendBuffer) > pep.bufferByteLimit {
err = writeErr{error: fmt.Errorf("backend buffer reached max limit, exiting")}
log.Println(err)
break
}
time.Sleep(2 * time.Second)
continue
}
nr, srcReadErr := src().Read(buf)
if nr > 0 {
if cd == CopyToBackend && pep.BackendErr != nil {
if len(pep.backendBuffer)+len(buf[0:nr]) > pep.bufferByteLimit {
err = writeErr{error: fmt.Errorf("backend buffer reached max limit, exiting")}
log.Println(err)
break
}
pep.backendBuffer = append(pep.backendBuffer, buf[0:nr]...)
continue
} else if cd == CopyToBackend && pep.BackendErr == nil && len(pep.backendBuffer) > 0 {
// TODO: this one is quick hack, can implement to write chunks if writes are failing with large buffer size
buf = append(pep.backendBuffer, buf[0:nr]...)
nr = len(pep.backendBuffer)
}
nw, ew := dst().Write(buf[0:nr])
if nw < 0 || nr < nw {
nw = 0
if ew == nil {
ew = fmt.Errorf("invalid write error")
}
}
written += int64(nw)
if ew != nil {
err = writeErr{error: ew, CopyDirection: cd}
if cd == CopyToBackend {
//pep.BackendErr = err
pep.backendBuffer = append(pep.backendBuffer, buf[0:nr]...)
//errChan <- err
continue
}
//errChan <- err
break
}
if nr != nw {
invalidWriteErr := fmt.Errorf("invalid write error: %s", io.ErrShortWrite)
err = writeErr{error: invalidWriteErr, CopyDirection: cd}
log.Println(err)
if cd == CopyToBackend {
//pep.BackendErr = err
pep.backendBuffer = append(pep.backendBuffer, buf[0:nr]...)
//errChan <- err
continue
}
//errChan <- err
break
} else if len(pep.backendBuffer) > 0 {
pep.backendBuffer = pep.backendBuffer[:0]
}
}
// In case of reading from client connection is an error, stop
if cd == CopyToBackend && srcReadErr != nil {
log.Printf("WARN: read from client connection failed with err: %s", srcReadErr)
if srcReadErr != io.EOF {
err = readErr{error: srcReadErr, CopyDirection: cd}
}
break
} else if cd == CopyFromBacked && srcReadErr != nil {
log.Printf("WARN: backend connection failed with err: %s", srcReadErr)
if pep.BackendErr == nil {
pep.BackendErr = readErr{error: srcReadErr, CopyDirection: cd}
errChan <- pep.BackendErr
}
if len(pep.backendBuffer) > pep.bufferByteLimit {
err = writeErr{error: fmt.Errorf("backend buffer reached max limit, exiting")}
break
}
}
}
}