This repository has been archived by the owner on Aug 17, 2023. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 5
/
bufr_stream.go
109 lines (91 loc) · 2 KB
/
bufr_stream.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
package protocol
import (
"bufio"
"io"
"time"
)
var _ Stream = (*BufReaderStream)(nil)
type Stream interface {
Read(b []byte) (n int, err error)
Write(b []byte) (n int, err error)
Close() error
SetDeadline(t time.Time) error
SetReadDeadline(t time.Time) error
SetWriteDeadline(t time.Time) error
}
type closeWriter interface {
CloseWrite() error
}
type closeReader interface {
CloseRead() error
}
type reseter interface {
Reset() error
}
type BufReaderStream struct {
s Stream
Reader *bufio.Reader
}
func (bs *BufReaderStream) Read(p []byte) (int, error) {
return bs.Reader.Read(p)
}
func (bs *BufReaderStream) Write(p []byte) (n int, err error) {
return bs.s.Write(p)
}
func (bs *BufReaderStream) Close() error {
return bs.s.Close()
}
func (bs *BufReaderStream) Reset() error {
if s, ok := bs.s.(reseter); ok {
return s.Reset()
}
return bs.s.Close()
}
func (bs *BufReaderStream) CloseWrite() error {
if s, ok := bs.s.(closeWriter); ok {
return s.CloseWrite()
}
return bs.s.Close()
}
func (bs *BufReaderStream) CloseRead() error {
if s, ok := bs.s.(closeReader); ok {
return s.CloseRead()
}
return bs.s.Close()
}
func (bs *BufReaderStream) SetDeadline(t time.Time) error {
return bs.s.SetDeadline(t)
}
func (bs *BufReaderStream) SetReadDeadline(t time.Time) error {
return bs.s.SetReadDeadline(t)
}
func (bs *BufReaderStream) SetWriteDeadline(t time.Time) error {
return bs.s.SetWriteDeadline(t)
}
func NewBufReaderStream(s Stream) *BufReaderStream {
return &BufReaderStream{
s: s,
Reader: bufio.NewReader(s),
}
}
func tunneling(dst, src Stream) error {
errCh := make(chan error, 2)
go proxy(dst, src, errCh)
go proxy(src, dst, errCh)
// Wait
for i := 0; i < 2; i++ {
err := <-errCh
if err != nil {
// return from this function closes target (and conn).
return err
}
}
return nil
}
func proxy(dst io.Writer, src io.Reader, errCh chan error) {
_, err := io.Copy(dst, src)
if tcpConn, ok := dst.(closeWriter); ok {
tcpConn.CloseWrite()
}
errCh <- err
}