forked from coredns/coredns
/
io.go
146 lines (128 loc) · 2.97 KB
/
io.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
package dnstapio
import (
"net"
"sync/atomic"
"time"
clog "github.com/coredns/coredns/plugin/pkg/log"
tap "github.com/dnstap/golang-dnstap"
fs "github.com/farsightsec/golang-framestream"
)
var log = clog.NewWithPlugin("dnstap")
const (
tcpWriteBufSize = 1024 * 1024
tcpTimeout = 4 * time.Second
flushTimeout = 1 * time.Second
queueSize = 10000
)
type dnstapIO struct {
endpoint string
socket bool
conn net.Conn
enc *dnstapEncoder
queue chan tap.Dnstap
dropped uint32
quit chan struct{}
}
// New returns a new and initialized DnstapIO.
func New(endpoint string, socket bool) DnstapIO {
return &dnstapIO{
endpoint: endpoint,
socket: socket,
enc: newDnstapEncoder(&fs.EncoderOptions{
ContentType: []byte("protobuf:dnstap.Dnstap"),
Bidirectional: true,
}),
queue: make(chan tap.Dnstap, queueSize),
quit: make(chan struct{}),
}
}
// DnstapIO interface
type DnstapIO interface {
Connect()
Dnstap(payload tap.Dnstap)
Close()
}
func (dio *dnstapIO) newConnect() error {
var err error
if dio.socket {
if dio.conn, err = net.Dial("unix", dio.endpoint); err != nil {
return err
}
} else {
if dio.conn, err = net.DialTimeout("tcp", dio.endpoint, tcpTimeout); err != nil {
return err
}
if tcpConn, ok := dio.conn.(*net.TCPConn); ok {
tcpConn.SetWriteBuffer(tcpWriteBufSize)
tcpConn.SetNoDelay(false)
}
}
return dio.enc.resetWriter(dio.conn)
}
// Connect connects to the dnstop endpoint.
func (dio *dnstapIO) Connect() {
if err := dio.newConnect(); err != nil {
log.Error("No connection to dnstap endpoint")
}
go dio.serve()
}
// Dnstap enqueues the payload for log.
func (dio *dnstapIO) Dnstap(payload tap.Dnstap) {
select {
case dio.queue <- payload:
default:
atomic.AddUint32(&dio.dropped, 1)
}
}
func (dio *dnstapIO) closeConnection() {
dio.enc.close()
if dio.conn != nil {
dio.conn.Close()
dio.conn = nil
}
}
// Close waits until the I/O routine is finished to return.
func (dio *dnstapIO) Close() {
close(dio.quit)
}
func (dio *dnstapIO) flushBuffer() {
if dio.conn == nil {
if err := dio.newConnect(); err != nil {
return
}
log.Info("Reconnected to dnstap")
}
if err := dio.enc.flushBuffer(); err != nil {
log.Warningf("Connection lost: %s", err)
dio.closeConnection()
if err := dio.newConnect(); err != nil {
log.Errorf("Cannot connect to dnstap: %s", err)
} else {
log.Info("Reconnected to dnstap")
}
}
}
func (dio *dnstapIO) write(payload *tap.Dnstap) {
if err := dio.enc.writeMsg(payload); err != nil {
atomic.AddUint32(&dio.dropped, 1)
}
}
func (dio *dnstapIO) serve() {
timeout := time.After(flushTimeout)
for {
select {
case <-dio.quit:
dio.flushBuffer()
dio.closeConnection()
return
case payload := <-dio.queue:
dio.write(&payload)
case <-timeout:
if dropped := atomic.SwapUint32(&dio.dropped, 0); dropped > 0 {
log.Warningf("Dropped dnstap messages: %d", dropped)
}
dio.flushBuffer()
timeout = time.After(flushTimeout)
}
}
}