/
logsender.go
106 lines (89 loc) · 2.49 KB
/
logsender.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
// Copyright 2015 Canonical Ltd.
// Licensed under the AGPLv3, see LICENCE file for details.
package logsender
import (
"io"
"net/url"
"github.com/juju/errors"
"github.com/juju/juju/api/base"
"github.com/juju/juju/rpc/params"
)
// LogWriter is the interface that allows sending log
// messages to the server for storage.
type LogWriter interface {
// WriteLog writes the given log record.
WriteLog(*params.LogRecord) error
io.Closer
}
// API provides access to the LogSender API.
type API struct {
connector base.StreamConnector
}
// NewAPI creates a new client-side logsender API.
func NewAPI(connector base.StreamConnector) *API {
return &API{connector: connector}
}
// LogWriter returns a new log writer interface value
// which must be closed when finished with.
func (api *API) LogWriter() (LogWriter, error) {
attrs := make(url.Values)
// Version 1 does ping/pong handling.
attrs.Set("version", "1")
conn, err := api.connector.ConnectStream("/logsink", attrs)
if err != nil {
return nil, errors.Annotatef(err, "cannot connect to /logsink")
}
logWriter := newWriter(conn)
return logWriter, nil
}
type writer struct {
conn base.Stream
readErrs chan error
}
func newWriter(conn base.Stream) *writer {
w := &writer{
conn: conn,
readErrs: make(chan error, 1),
}
go w.readLoop()
return w
}
// readLoop is necessary for the client to process websocket control messages.
// If we get an error, enqueue it so that if a subsequent call to WriteLog
// fails do to our closure of the socket, we can enhance the resulting error.
// Close() is safe to call concurrently.
func (w *writer) readLoop() {
for {
if _, _, err := w.conn.NextReader(); err != nil {
select {
case w.readErrs <- err:
default:
}
_ = w.conn.Close()
break
}
}
}
// WriteLog streams the log record as JSON to the logsink endpoint.
// Upon error, check to see if there is an enqueued read error that
// we can use to enhance the output.
func (w *writer) WriteLog(m *params.LogRecord) error {
// Note: due to the fire-and-forget nature of the logsink API,
// it is possible that when the connection dies, any logs that
// were "in-flight" will not be recorded on the server side.
if err := w.conn.WriteJSON(m); err != nil {
var readErr error
select {
case readErr, _ = <-w.readErrs:
default:
}
if readErr != nil {
err = errors.Annotate(err, readErr.Error())
}
return errors.Annotate(err, "sending log message")
}
return nil
}
func (w writer) Close() error {
return w.conn.Close()
}