forked from juju/juju
/
logstream.go
190 lines (165 loc) · 5.51 KB
/
logstream.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
// Copyright 2016 Canonical Ltd.
// Licensed under the AGPLv3, see LICENCE file for details.
package logstream
import (
"io"
"sync"
"github.com/juju/errors"
"github.com/juju/loggo"
"github.com/juju/version"
"gopkg.in/juju/names.v2"
"github.com/juju/juju/api/base"
"github.com/juju/juju/api/common/stream"
"github.com/juju/juju/apiserver/params"
"github.com/juju/juju/logfwd"
)
// jsonReadCloser provides the functionality to send JSON-serialized
// values over a streaming connection.
type jsonReadCloser interface {
io.Closer
// ReadJSON decodes the next JSON value from the connection and
// sets the value at the provided pointer to that newly decoded one.
ReadJSON(interface{}) error
}
// LogStream streams log entries of the /logstream API endpoint over
// a websocket connection.
type LogStream struct {
mu sync.Mutex
stream jsonReadCloser
controllerUUID string
}
// Open opens a websocket to the API's /logstream endpoint and returns
// a stream of log records from that connection.
func Open(conn base.StreamConnector, cfg params.LogStreamConfig, controllerUUID string) (*LogStream, error) {
wsStream, err := stream.Open(conn, "/logstream", &cfg)
if err != nil {
return nil, errors.Trace(err)
}
ls := &LogStream{
stream: wsStream,
controllerUUID: controllerUUID,
}
return ls, nil
}
// Next returns the next batch of log records from the server. The records are
// converted from the wire format into logfwd.Record. The first returned
// record will be the one after the last successfully sent record. If no
// records have been sent yet then it will be the oldest log record.
//
// An error indicates either the streaming connection is closed, the
// connection failed, or the data read from the connection is corrupted.
// In each of these cases the stream should be re-opened. It will start
// at the record after the one marked as successfully sent. So the
// the record at which Next() failed previously will be streamed again.
//
// That is only a problem when the same record is consistently streamed
// as invalid data. This will happen only if the record was invalid
// before being stored in the DB or if the DB on-disk storage for the
// record becomes corrupted. Both scenarios are highly unlikely and
// the respective systems are managed such that neither should happen.
func (ls *LogStream) Next() ([]logfwd.Record, error) {
apiRecords, err := ls.next()
if err != nil {
return nil, errors.Trace(err)
}
records, err := recordsFromAPI(apiRecords, ls.controllerUUID)
if err != nil {
// This should only happen if the data got corrupted over the
// network. Any other cause should be addressed by fixing the
// code that resulted in the bad data that caused the error
// here. If that code is between the DB on-disk storage and
// the server-side stream then care should be taken to not
// block on a consistently invalid record or to throw away
// a record. The log stream needs to maintain a high level
// of reliable delivery.
return nil, errors.Trace(err)
}
return records, nil
}
func (ls *LogStream) next() (params.LogStreamRecords, error) {
ls.mu.Lock()
defer ls.mu.Unlock()
var result params.LogStreamRecords
if ls.stream == nil {
return result, errors.Errorf("cannot read from closed stream")
}
err := ls.stream.ReadJSON(&result)
if err != nil {
return result, errors.Trace(err)
}
return result, nil
}
// Close closes the stream.
func (ls *LogStream) Close() error {
ls.mu.Lock()
defer ls.mu.Unlock()
if ls.stream == nil {
return nil
}
if err := ls.stream.Close(); err != nil {
return errors.Trace(err)
}
ls.stream = nil
return nil
}
// See the counterpart in apiserver/logstream.go.
func recordsFromAPI(apiRecords params.LogStreamRecords, controllerUUID string) ([]logfwd.Record, error) {
result := make([]logfwd.Record, len(apiRecords.Records))
for i, apiRec := range apiRecords.Records {
rec, err := recordFromAPI(apiRec, controllerUUID)
if err != nil {
return nil, errors.Trace(err)
}
result[i] = rec
}
return result, nil
}
func recordFromAPI(apiRec params.LogStreamRecord, controllerUUID string) (logfwd.Record, error) {
rec := logfwd.Record{
ID: apiRec.ID,
Timestamp: apiRec.Timestamp,
Message: apiRec.Message,
}
origin, err := originFromAPI(apiRec, controllerUUID)
if err != nil {
return rec, errors.Trace(err)
}
rec.Origin = origin
loc, err := logfwd.ParseLocation(apiRec.Module, apiRec.Location)
if err != nil {
return rec, errors.Trace(err)
}
rec.Location = loc
level, ok := loggo.ParseLevel(apiRec.Level)
if !ok {
return rec, errors.Errorf("unrecognized log level %q", apiRec.Level)
}
rec.Level = level
if err := rec.Validate(); err != nil {
return rec, errors.Trace(err)
}
return rec, nil
}
func originFromAPI(apiRec params.LogStreamRecord, controllerUUID string) (logfwd.Origin, error) {
var origin logfwd.Origin
tag, err := names.ParseTag(apiRec.Entity)
if err != nil {
return origin, errors.Annotate(err, "invalid entity")
}
ver, err := version.Parse(apiRec.Version)
if err != nil {
return origin, errors.Annotatef(err, "invalid version %q", apiRec.Version)
}
switch tag := tag.(type) {
case names.MachineTag:
origin = logfwd.OriginForMachineAgent(tag, controllerUUID, apiRec.ModelUUID, ver)
case names.UnitTag:
origin = logfwd.OriginForUnitAgent(tag, controllerUUID, apiRec.ModelUUID, ver)
default:
origin, err = logfwd.OriginForJuju(tag, controllerUUID, apiRec.ModelUUID, ver)
if err != nil {
return origin, errors.Annotate(err, "could not extract origin")
}
}
return origin, nil
}