-
Notifications
You must be signed in to change notification settings - Fork 0
/
sender.go
123 lines (104 loc) · 2.39 KB
/
sender.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
package metrics
import (
"bytes"
"errors"
"fmt"
"net"
"sync"
"time"
"github.com/russellhaering/godep-timetools-bug/Godeps/_workspace/src/github.com/mailgun/timetools"
)
type sender interface {
Close() error
Write(data []byte) (int, error)
}
type UDPSender struct {
// underlying connection
c *net.UDPConn
// resolved udp address
ra *net.UDPAddr
}
func (s *UDPSender) Write(data []byte) (int, error) {
// no need for locking here, as the underlying fdNet
// already serialized writes
n, err := s.c.WriteToUDP([]byte(data), s.ra)
if err != nil {
return 0, err
}
if n == 0 {
return n, errors.New("Wrote no bytes")
}
return n, nil
}
func (s *UDPSender) Close() error {
return s.c.Close()
}
func newUDPSender(addr string) (sender, error) {
c, err := net.ListenPacket("udp", ":0")
if err != nil {
return nil, err
}
ra, err := net.ResolveUDPAddr("udp", addr)
if err != nil {
return nil, err
}
return &UDPSender{ra: ra, c: c.(*net.UDPConn)}, nil
}
type bufSender struct {
m *sync.Mutex
s sender
buf *bytes.Buffer
lastFlush time.Time
clock timetools.TimeProvider
maxBytes int
flushPeriod time.Duration
}
func (s *bufSender) flush() {
for {
s.clock.Sleep(s.flushPeriod)
s.Write(nil)
}
}
func (s *bufSender) Close() error {
return s.s.Close()
}
func (s *bufSender) timeToFlush() bool {
return s.clock.UtcNow().After(s.lastFlush.Add(s.flushPeriod))
}
func (s *bufSender) Write(data []byte) (int, error) {
s.m.Lock()
defer s.m.Unlock()
if len(data)+1 > s.maxBytes {
return 0, fmt.Errorf("datagram is too large")
}
// if we are approaching the limit, flush
if s.buf.Len() != 0 && (s.buf.Len()+len(data)+1 > s.maxBytes || s.timeToFlush()) {
s.lastFlush = s.clock.UtcNow()
s.buf.WriteTo(s.s)
// always truncate regardless off errors
s.buf.Truncate(0)
}
if len(data) != 0 {
s.buf.Write(data)
s.buf.WriteRune('\n')
}
// Never return errors, otherwise WriterTo will be confused.
// we must discard errors anyways
return len(data), nil
}
func newBufSender(s sender, maxBytes int, flushPeriod time.Duration) (sender, error) {
b := &bytes.Buffer{}
b.Grow(maxBytes)
clock := &timetools.RealTime{}
sdr := &bufSender{
m: &sync.Mutex{},
s: s,
clock: clock,
buf: b,
maxBytes: maxBytes,
flushPeriod: flushPeriod,
lastFlush: clock.UtcNow(),
}
go sdr.flush()
return sdr, nil
}