/
request.go
99 lines (80 loc) · 1.96 KB
/
request.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
package prototest
import (
"bufio"
"bytes"
"encoding/hex"
"fmt"
"io"
"testing"
"github.com/segmentio/kafka-go/protocol"
)
func TestRequest(t *testing.T, version int16, msg protocol.Message) {
reset := load(msg)
t.Run(fmt.Sprintf("v%d", version), func(t *testing.T) {
b := &bytes.Buffer{}
if err := protocol.WriteRequest(b, version, 1234, "me", msg); err != nil {
t.Fatal(err)
}
reset()
t.Logf("\n%s\n", hex.Dump(b.Bytes()))
apiVersion, correlationID, clientID, req, err := protocol.ReadRequest(b)
if err != nil {
t.Fatal(err)
}
if apiVersion != version {
t.Errorf("api version mismatch: %d != %d", apiVersion, version)
}
if correlationID != 1234 {
t.Errorf("correlation id mismatch: %d != %d", correlationID, 1234)
}
if clientID != "me" {
t.Errorf("client id mismatch: %q != %q", clientID, "me")
}
if !deepEqual(msg, req) {
t.Errorf("request message mismatch:")
t.Logf("expected: %+v", msg)
t.Logf("found: %+v", req)
}
})
}
func BenchmarkRequest(b *testing.B, version int16, msg protocol.Message) {
reset := load(msg)
b.Run(fmt.Sprintf("v%d", version), func(b *testing.B) {
buffer := &bytes.Buffer{}
buffer.Grow(1024)
b.Run("read", func(b *testing.B) {
w := io.Writer(buffer)
if err := protocol.WriteRequest(w, version, 1234, "client", msg); err != nil {
b.Fatal(err)
}
reset()
p := buffer.Bytes()
x := bytes.NewReader(p)
r := bufio.NewReader(x)
for i := 0; i < b.N; i++ {
_, _, _, req, err := protocol.ReadRequest(r)
if err != nil {
b.Fatal(err)
}
closeMessage(req)
x.Reset(p)
r.Reset(x)
}
b.SetBytes(int64(len(p)))
buffer.Reset()
})
b.Run("write", func(b *testing.B) {
w := io.Writer(buffer)
n := int64(0)
for i := 0; i < b.N; i++ {
if err := protocol.WriteRequest(w, version, 1234, "client", msg); err != nil {
b.Fatal(err)
}
reset()
n = int64(buffer.Len())
buffer.Reset()
}
b.SetBytes(n)
})
})
}