-
-
Notifications
You must be signed in to change notification settings - Fork 69
/
connection.go
183 lines (159 loc) · 5.24 KB
/
connection.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
// Copyright 2020 Matthew Holt
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package layer4
import (
"bytes"
"context"
"io"
"net"
"sync"
"github.com/caddyserver/caddy/v2"
)
// WrapConnection wraps an underlying connection into a layer4 connection that
// supports recording and rewinding, as well as adding context with a replacer
// and variable table. This function is intended for use at the start of a
// connection handler chain where the underlying connection is not yet a layer4
// Connection value.
func WrapConnection(underlying net.Conn, buf *bytes.Buffer) *Connection {
repl := caddy.NewReplacer()
repl.Set("l4.conn.remote_addr", underlying.RemoteAddr())
repl.Set("l4.conn.local_addr", underlying.LocalAddr())
ctx := context.Background()
ctx = context.WithValue(ctx, VarsCtxKey, make(map[string]interface{}))
ctx = context.WithValue(ctx, ReplacerCtxKey, repl)
return &Connection{
Conn: underlying,
Context: ctx,
buf: buf,
}
}
// Connection contains information about the connection as it
// passes through various handlers. It also has the capability
// of recording and rewinding when necessary.
//
// A Connection can be used as a net.Conn because it embeds a
// net.Conn; but when wrapping underlying connections, usually
// you want to be careful to replace the embedded Conn, not
// this entire Connection value.
//
// Connection structs are NOT safe for concurrent use.
type Connection struct {
// The underlying connection.
net.Conn
// The context for the connection.
Context context.Context
buf *bytes.Buffer // stores recordings
bufReader io.Reader // used to read buf so it doesn't discard bytes
recording bool
bytesRead, bytesWritten uint64
}
// Read implements io.Reader in such a way that reads first
// deplete any associated buffer from the prior recording,
// and once depleted (or if there isn't one), it continues
// reading from the underlying connection.
func (cx *Connection) Read(p []byte) (n int, err error) {
// if there is a buffer we should read from, start
// with that; we only read from the underlying conn
// after the buffer has been "depleted"
if cx.bufReader != nil {
n, err = cx.bufReader.Read(p)
if err == io.EOF {
cx.bufReader = nil
err = nil
}
return
}
// buffer has been "depleted" so read from
// underlying connection
n, err = cx.Conn.Read(p)
cx.bytesRead += uint64(n)
if !cx.recording {
return
}
// since we're recording at this point, anything that
// was read needs to be written to the buffer, even
// if there was an error
if n > 0 {
if nw, errw := cx.buf.Write(p[:n]); errw != nil {
return nw, errw
}
}
return
}
func (cx *Connection) Write(p []byte) (n int, err error) {
n, err = cx.Conn.Write(p)
cx.bytesWritten += uint64(n)
return
}
// Wrap wraps conn in a new Connection based on cx (reusing
// cx's existing buffer and context). This is useful after
// a connection is wrapped by a package that does not support
// our Connection type (for example, `tls.Server()`).
func (cx *Connection) Wrap(conn net.Conn) *Connection {
return &Connection{
Conn: conn,
Context: cx.Context,
buf: cx.buf,
bufReader: cx.bufReader,
recording: cx.recording,
bytesRead: cx.bytesRead,
bytesWritten: cx.bytesWritten,
}
}
// record starts recording the stream into cx.buf. It also creates a reader
// to read from the buffer but not to discard any byte.
func (cx *Connection) record() {
cx.recording = true
cx.bufReader = bytes.NewReader(cx.buf.Bytes()) // Don't discard bytes.
}
// rewind stops recording and creates a reader for the
// buffer so that the next reads from an associated
// recordableConn come from the buffer first, then
// continue with the underlying conn.
func (cx *Connection) rewind() {
cx.recording = false
cx.bufReader = cx.buf // Actually consume bytes.
}
// SetVar sets a value in the context's variable table with
// the given key. It overwrites any previous value with the
// same key.
func (cx Connection) SetVar(key string, value interface{}) {
varMap, ok := cx.Context.Value(VarsCtxKey).(map[string]interface{})
if !ok {
return
}
varMap[key] = value
}
// GetVar gets a value from the context's variable table with
// the given key. It returns the value if found, and true if
// it found a value with that key; false otherwise.
func (cx Connection) GetVar(key string) interface{} {
varMap, ok := cx.Context.Value(VarsCtxKey).(map[string]interface{})
if !ok {
return nil
}
return varMap[key]
}
var (
// VarsCtxKey is the key used to store the variables table
// in a Connection's context.
VarsCtxKey caddy.CtxKey = "vars"
// ReplacerCtxKey is the key used to store the replacer.
ReplacerCtxKey caddy.CtxKey = "replacer"
)
var bufPool = sync.Pool{
New: func() interface{} {
return new(bytes.Buffer)
},
}