-
Notifications
You must be signed in to change notification settings - Fork 28
/
framer.go
152 lines (136 loc) · 4.3 KB
/
framer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
// Copyright 2015 The Vanadium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
// Package framer provides the ability to wrap a network connection
// (or any io.ReadWriteCloser) with framing that consists of a 3-byte
// length followed by data of that length.
package framer
import (
"io"
"v.io/v23/flow"
)
const (
sizeBytes = 3
maxPacketSize = 0xffffff
copyBound = 8192
)
type framer struct {
io.ReadWriteCloser
readFrame [sizeBytes]byte
writeFrame [sizeBytes]byte
writeBuf []byte
}
// T represents the interface to a framer, see New, for a more complete
// explanation.
type T interface {
// MsgReadWriteCloser reads/writes frames to the underlying stream.
flow.MsgReadWriteCloser
// ReadWriter provides raw, unframed, access to the underlying stream,
// that is messages are written/read from it without notion of a frame.
io.ReadWriter
// FrameHeaderSize returns the size, in bytes, of the frame header.
FrameHeaderSize() int
// PutSize encodes the message size into FrameHeaderSize bytes at
// the head of dst (ie. dst[0:FrameHeaderSize()])
PutSize(dst []byte, msgSize int) error
// GetSize retrieves the encoded message size from src[0:FrameHeaderSize()]
GetSize(src []byte) int
}
// New creates a new instance of T that implements 'framing' over a
// raw connection via the flow.MsgReadWriteCloser methods. It also
// provides direct access (without framing) via the io.ReadWriter methods,
// which in conjunction with the SizeBytes, PutSize and Size methods can
// be used to send pre-framed messages to reduce the number of system
// calls and copying that is otherwise required.
// The framer may issue multiple writes to the underlying connection
// for a single message. For smaller messages it will copy the data into
// a single buffer and issue a single write. This combinded approach ensures
// that the framer has a fixed and known memory overhead.
func New(c io.ReadWriteCloser) T {
f := &framer{
ReadWriteCloser: c,
writeBuf: make([]byte, copyBound),
}
return f
}
func (f *framer) FrameHeaderSize() int {
return sizeBytes
}
func (f *framer) PutSize(dst []byte, msgSize int) error {
if msgSize > maxPacketSize {
return ErrLargerThan3ByteUInt.Errorf(nil, "integer too large to represent in %v bytes", sizeBytes)
}
write3ByteUint(dst, msgSize)
return nil
}
func (f *framer) GetSize(src []byte) int {
return read3ByteUint(src)
}
// WriteMsg implements flow.MsgReadWriteCloser. The supplied data may be written
// to the underlying connection in one or more writes depending on the size
// of supplied data.
func (f *framer) WriteMsg(data ...[]byte) (int, error) {
// Compute the message size.
msgSize := 0
for _, b := range data {
msgSize += len(b)
}
if msgSize < (copyBound - sizeBytes) {
write3ByteUint(f.writeBuf[:sizeBytes], msgSize)
head := sizeBytes
for _, d := range data {
l := len(d)
copy(f.writeBuf[head:head+l], d)
head += l
}
n, err := f.Write(f.writeBuf[:head])
return n - 3, err
}
if msgSize > maxPacketSize {
return 0, ErrLargerThan3ByteUInt.Errorf(nil, "integer too large to represent in %v bytes", sizeBytes)
}
write3ByteUint(f.writeFrame[:], msgSize)
if n, err := f.Write(f.writeFrame[:]); err != nil {
return n, err
}
written := 0
for _, d := range data {
n, err := f.Write(d)
if err != nil {
return written + n, err
}
written += n
}
return written, nil
}
// ReadMsg implements flow.MsgReadWriteCloser.
func (f *framer) ReadMsg() ([]byte, error) {
return f.ReadMsg2(nil)
}
// ReadMsg2 implements flow.MsgReadWriteCloser and will use
// the supplied msg buffer if it is large enough.
func (f *framer) ReadMsg2(msg []byte) ([]byte, error) {
// Read the message size.
if _, err := io.ReadFull(f, f.readFrame[:]); err != nil {
return nil, err
}
msgSize := read3ByteUint(f.readFrame[:])
// Read the message.
if msgSize > len(msg) {
msg = make([]byte, msgSize)
}
used := msg[:msgSize]
if _, err := io.ReadFull(f, used); err != nil {
return nil, err
}
return used, nil
}
func write3ByteUint(dst []byte, n int) {
n = maxPacketSize - n
dst[0] = byte((n & 0xff0000) >> 16)
dst[1] = byte((n & 0x00ff00) >> 8)
dst[2] = byte(n & 0x0000ff)
}
func read3ByteUint(src []byte) int {
return maxPacketSize - (int(src[0])<<16 | int(src[1])<<8 | int(src[2]))
}