-
Notifications
You must be signed in to change notification settings - Fork 0
/
reader.go
140 lines (121 loc) · 3.17 KB
/
reader.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
// Copyright 2012 SocialCode. All rights reserved.
// Use of this source code is governed by the MIT
// license that can be found in the LICENSE file.
package gelf
import (
"bytes"
"compress/gzip"
"compress/zlib"
"encoding/json"
"fmt"
"io"
"net"
"strings"
"sync"
)
type Reader struct {
mu sync.Mutex
conn net.Conn
}
func NewReader(addr string) (*Reader, error) {
var err error
udpAddr, err := net.ResolveUDPAddr("udp", addr)
if err != nil {
return nil, fmt.Errorf("ResolveUDPAddr('%s'): %s", addr, err)
}
conn, err := net.ListenUDP("udp", udpAddr)
if err != nil {
return nil, fmt.Errorf("ListenUDP: %s", err)
}
r := new(Reader)
r.conn = conn
return r, nil
}
func (r *Reader) Addr() string {
return r.conn.LocalAddr().String()
}
// FIXME: this will discard data if p isn't big enough to hold the
// full message.
func (r *Reader) Read(p []byte) (int, error) {
msg, err := r.ReadMessage()
if err != nil {
return -1, err
}
var data string
if msg.Full == "" {
data = msg.Short
} else {
data = msg.Full
}
return strings.NewReader(data).Read(p)
}
func (r *Reader) ReadMessage() (*Message, error) {
cBuf := make([]byte, ChunkSize)
var (
err error
n, length int
cid, ocid []byte
seq, total uint8
cHead []byte
cReader io.Reader
chunks [][]byte
)
for got := 0; got < 128 && (total == 0 || got < int(total)); got++ {
if n, err = r.conn.Read(cBuf); err != nil {
return nil, fmt.Errorf("Read: %s", err)
}
cHead, cBuf = cBuf[:2], cBuf[:n]
if bytes.Equal(cHead, magicChunked) {
//fmt.Printf("chunked %v\n", cBuf[:14])
cid, seq, total = cBuf[2:2+8], cBuf[2+8], cBuf[2+8+1]
if ocid != nil && !bytes.Equal(cid, ocid) {
return nil, fmt.Errorf("out-of-band message %v (awaited %v)", cid, ocid)
} else if ocid == nil {
ocid = cid
chunks = make([][]byte, total)
}
n = len(cBuf) - chunkedHeaderLen
//fmt.Printf("setting chunks[%d]: %d\n", seq, n)
chunks[seq] = append(make([]byte, 0, n), cBuf[chunkedHeaderLen:]...)
length += n
} else { //not chunked
if total > 0 {
return nil, fmt.Errorf("out-of-band message (not chunked)")
}
break
}
}
//fmt.Printf("\nchunks: %v\n", chunks)
if length > 0 {
if cap(cBuf) < length {
cBuf = append(cBuf, make([]byte, 0, length-cap(cBuf))...)
}
cBuf = cBuf[:0]
for i := range chunks {
//fmt.Printf("appending %d %v\n", i, chunks[i])
cBuf = append(cBuf, chunks[i]...)
}
cHead = cBuf[:2]
}
// the data we get from the wire is compressed
if bytes.Equal(cHead, magicGzip) {
cReader, err = gzip.NewReader(bytes.NewReader(cBuf))
} else if cHead[0] == magicZlib[0] &&
(int(cHead[0])*256+int(cHead[1]))%31 == 0 {
// zlib is slightly more complicated, but correct
cReader, err = zlib.NewReader(bytes.NewReader(cBuf))
} else {
// compliance with https://github.com/Graylog2/graylog2-server
// treating all messages as uncompressed if they are not gzip, zlib or
// chunked
cReader = bytes.NewReader(cBuf)
}
if err != nil {
return nil, fmt.Errorf("NewReader: %s", err)
}
msg := new(Message)
if err := json.NewDecoder(cReader).Decode(&msg); err != nil {
return nil, fmt.Errorf("json.Unmarshal: %s", err)
}
return msg, nil
}