forked from uber/kraken
-
Notifications
You must be signed in to change notification settings - Fork 0
/
logger.go
249 lines (216 loc) · 7.29 KB
/
logger.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
// Copyright (c) 2016-2019 Uber Technologies, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package torrentlog
import (
"errors"
"fmt"
"os"
"time"
"github.com/uber/kraken/core"
"github.com/uber/kraken/utils/log"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
)
var (
errEmptyReceivedPieces = errors.New("empty received piece counts")
errNegativeReceivedPieces = errors.New("negative value in received piece counts")
)
// Logger wraps structured log entries for important torrent events. These events
// are intended to be consumed at the cluster level via ELK, and are distinct from
// the verbose stdout logs of the agent. In particular, Logger bridges host-agnostic
// metrics to individual hostnames.
//
// For example, if there is a spike in download times, an engineer can cross-reference
// the spike with the torrent logs in ELK and zero-in on a single host. From there,
// the engineer can inspect the stdout logs of the host for more detailed information
// as to why the download took so long.
type Logger struct {
zap *zap.Logger
}
// New creates a new Logger.
func New(config log.Config, pctx core.PeerContext) (*Logger, error) {
hostname, err := os.Hostname()
if err != nil {
return nil, fmt.Errorf("hostname: %s", err)
}
logger, err := log.New(config, map[string]interface{}{
"hostname": hostname,
"zone": pctx.Zone,
"cluster": pctx.Cluster,
"peer_id": pctx.PeerID.String(),
})
if err != nil {
return nil, fmt.Errorf("config: %s", err)
}
return &Logger{logger}, nil
}
// NewNopLogger returns a Logger containing a no-op zap logger for testing purposes.
func NewNopLogger() *Logger {
return &Logger{zap.NewNop()}
}
// OutgoingConnectionAccept logs an accepted outgoing connection.
func (l *Logger) OutgoingConnectionAccept(
d core.Digest,
infoHash core.InfoHash,
remotePeerID core.PeerID) {
l.zap.Debug(
"Outgoing connection accept",
zap.String("name", d.Hex()),
zap.String("info_hash", infoHash.String()),
zap.String("remote_peer_id", remotePeerID.String()))
}
// OutgoingConnectionReject logs a rejected outgoing connection.
func (l *Logger) OutgoingConnectionReject(d core.Digest,
infoHash core.InfoHash,
remotePeerID core.PeerID,
err error) {
l.zap.Debug(
"Outgoing connection reject",
zap.String("name", d.Hex()),
zap.String("info_hash", infoHash.String()),
zap.String("remote_peer_id", remotePeerID.String()),
zap.Error(err))
}
// IncomingConnectionAccept logs an accepted incoming connection.
func (l *Logger) IncomingConnectionAccept(
d core.Digest,
infoHash core.InfoHash,
remotePeerID core.PeerID) {
l.zap.Debug(
"Incoming connection accept",
zap.String("name", d.Hex()),
zap.String("info_hash", infoHash.String()),
zap.String("remote_peer_id", remotePeerID.String()))
}
// IncomingConnectionReject logs a rejected incoming connection.
func (l *Logger) IncomingConnectionReject(
d core.Digest,
infoHash core.InfoHash,
remotePeerID core.PeerID,
err error) {
l.zap.Debug(
"Incoming connection reject",
zap.String("name", d.Hex()),
zap.String("info_hash", infoHash.String()),
zap.String("remote_peer_id", remotePeerID.String()),
zap.Error(err))
}
// SeedTimeout logs a seeding torrent being torn down due to timeout.
func (l *Logger) SeedTimeout(d core.Digest, infoHash core.InfoHash) {
l.zap.Debug(
"Seed timeout",
zap.String("name", d.Hex()),
zap.String("info_hash", infoHash.String()))
}
// LeechTimeout logs a leeching torrent being torn down due to timeout.
func (l *Logger) LeechTimeout(d core.Digest, infoHash core.InfoHash) {
l.zap.Debug(
"Leech timeout",
zap.String("name", d.Hex()),
zap.String("info_hash", infoHash.String()))
}
// DownloadSuccess logs a successful download.
func (l *Logger) DownloadSuccess(namespace string, d core.Digest, size int64, downloadTime time.Duration) {
l.zap.Info(
"Download success",
zap.String("namespace", namespace),
zap.String("name", d.Hex()),
zap.Int64("size", size),
zap.Duration("download_time", downloadTime))
}
// DownloadFailure logs a failed download.
func (l *Logger) DownloadFailure(namespace string, d core.Digest, size int64, err error) {
l.zap.Error(
"Download failure",
zap.String("namespace", namespace),
zap.String("name", d.Hex()),
zap.Int64("size", size),
zap.Error(err))
}
// SeederSummaries logs a summary of the pieces requested and received from peers for a torrent.
func (l *Logger) SeederSummaries(
d core.Digest,
infoHash core.InfoHash,
summaries SeederSummaries) error {
l.zap.Debug(
"Seeder summaries",
zap.String("name", d.Hex()),
zap.String("info_hash", infoHash.String()),
zap.Array("seeder_summaries", summaries))
return nil
}
// LeecherSummaries logs a summary of the pieces requested by and sent to peers for a torrent.
func (l *Logger) LeecherSummaries(
d core.Digest,
infoHash core.InfoHash,
summaries LeecherSummaries) error {
l.zap.Debug(
"Leecher summaries",
zap.String("name", d.Hex()),
zap.String("info_hash", infoHash.String()),
zap.Array("leecher_summaries", summaries))
return nil
}
// Sync flushes the log.
func (l *Logger) Sync() {
l.zap.Sync()
}
// SeederSummary contains information about piece requests to and pieces received from a peer.
type SeederSummary struct {
PeerID core.PeerID
RequestsSent int
GoodPiecesReceived int
DuplicatePiecesReceived int
}
// MarshalLogObject marshals a SeederSummary for logging.
func (s SeederSummary) MarshalLogObject(enc zapcore.ObjectEncoder) error {
enc.AddString("peer_id", s.PeerID.String())
enc.AddInt("requests_sent", s.RequestsSent)
enc.AddInt("good_pieces_received", s.GoodPiecesReceived)
enc.AddInt("duplicate_pieces_received", s.DuplicatePiecesReceived)
return nil
}
// SeederSummaries represents a slice of type SeederSummary
// that can be marshalled for logging.
type SeederSummaries []SeederSummary
// MarshalLogArray marshals a SeederSummaries slice for logging.
func (ss SeederSummaries) MarshalLogArray(enc zapcore.ArrayEncoder) error {
for _, summary := range ss {
enc.AppendObject(summary)
}
return nil
}
// LeecherSummary contains information about piece requests from and pieces sent to a peer.
type LeecherSummary struct {
PeerID core.PeerID
RequestsReceived int
PiecesSent int
}
// MarshalLogObject marshals a LeecherSummary for logging.
func (s LeecherSummary) MarshalLogObject(enc zapcore.ObjectEncoder) error {
enc.AddString("peer_id", s.PeerID.String())
enc.AddInt("requests_received", s.RequestsReceived)
enc.AddInt("pieces_sent", s.PiecesSent)
return nil
}
// LeecherSummaries represents a slice of type LeecherSummary
// that can be marshalled for logging.
type LeecherSummaries []LeecherSummary
// MarshalLogArray marshals a LeecherSummaries slice for logging.
func (ls LeecherSummaries) MarshalLogArray(enc zapcore.ArrayEncoder) error {
for _, summary := range ls {
enc.AppendObject(summary)
}
return nil
}