-
Notifications
You must be signed in to change notification settings - Fork 1.6k
/
audit_logger.go
273 lines (232 loc) · 7.73 KB
/
audit_logger.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
package audit
import (
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
"io"
"net"
"net/http"
"net/url"
"os"
"time"
commonconfig "github.com/smartcontractkit/chainlink-common/pkg/config"
"github.com/smartcontractkit/chainlink-common/pkg/services"
"github.com/smartcontractkit/chainlink/v2/core/config"
"github.com/smartcontractkit/chainlink/v2/core/logger"
"github.com/smartcontractkit/chainlink/v2/core/store/models"
)
const bufferCapacity = 2048
const webRequestTimeout = 10
type Data = map[string]any
type AuditLogger interface {
services.Service
Audit(eventID EventID, data Data)
}
type HTTPAuditLoggerInterface interface {
Do(req *http.Request) (*http.Response, error)
}
type AuditLoggerService struct {
logger logger.Logger // The standard logger configured in the node
enabled bool // Whether the audit logger is enabled or not
forwardToUrl commonconfig.URL // Location we are going to send logs to
headers []models.ServiceHeader // Headers to be sent along with logs for identification/authentication
jsonWrapperKey string // Wrap audit data as a map under this key if present
environmentName string // Decorate the environment this is coming from
hostname string // The self-reported hostname of the machine
localIP string // A non-loopback IP address as reported by the machine
loggingClient HTTPAuditLoggerInterface // Abstract type for sending logs onward
loggingChannel chan wrappedAuditLog
chStop services.StopChan
chDone chan struct{}
}
type wrappedAuditLog struct {
eventID EventID
data Data
}
var NoopLogger AuditLogger = &AuditLoggerService{}
// NewAuditLogger returns a buffer push system that ingests audit log events and
// asynchronously pushes them up to an HTTP log service.
// Parses and validates the AUDIT_LOGS_* environment values and returns an enabled
// AuditLogger instance. If the environment variables are not set, the logger
// is disabled and short circuits execution via enabled flag.
func NewAuditLogger(logger logger.Logger, config config.AuditLogger) (AuditLogger, error) {
// If the unverified config is nil, then we assume this came from the
// configuration system and return a nil logger.
if config == nil || !config.Enabled() {
return &AuditLoggerService{}, nil
}
hostname, err := os.Hostname()
if err != nil {
return nil, fmt.Errorf("initialization error - unable to get hostname: %w", err)
}
forwardToUrl, err := config.ForwardToUrl()
if err != nil {
return &AuditLoggerService{}, nil
}
headers, err := config.Headers()
if err != nil {
return &AuditLoggerService{}, nil
}
loggingChannel := make(chan wrappedAuditLog, bufferCapacity)
// Create new AuditLoggerService
auditLogger := AuditLoggerService{
logger: logger.Helper(1),
enabled: true,
forwardToUrl: forwardToUrl,
headers: headers,
jsonWrapperKey: config.JsonWrapperKey(),
environmentName: config.Environment(),
hostname: hostname,
localIP: getLocalIP(),
loggingClient: &http.Client{Timeout: time.Second * webRequestTimeout},
loggingChannel: loggingChannel,
chStop: make(chan struct{}),
chDone: make(chan struct{}),
}
return &auditLogger, nil
}
func (l *AuditLoggerService) SetLoggingClient(newClient HTTPAuditLoggerInterface) {
l.loggingClient = newClient
}
// Entrypoint for new audit logs. This buffers all logs that come in they will
// sent out by the goroutine that was started when the AuditLoggerService was
// created. If this service was not enabled, this immeidately returns.
//
// This function never blocks.
func (l *AuditLoggerService) Audit(eventID EventID, data Data) {
if !l.enabled {
return
}
wrappedLog := wrappedAuditLog{
eventID: eventID,
data: data,
}
select {
case l.loggingChannel <- wrappedLog:
default:
l.logger.Errorf("buffer is full. Dropping log with eventID: %s", eventID)
}
}
// Start the audit logger and begin processing logs on the channel
func (l *AuditLoggerService) Start(context.Context) error {
if !l.enabled {
return errors.New("The audit logger is not enabled")
}
go l.runLoop()
return nil
}
// Stops the logger and will close the channel.
func (l *AuditLoggerService) Close() error {
if !l.enabled {
return errors.New("The audit logger is not enabled")
}
l.logger.Warnf("Disabled the audit logger service")
close(l.chStop)
<-l.chDone
return nil
}
func (l *AuditLoggerService) Name() string {
return l.logger.Name()
}
func (l *AuditLoggerService) HealthReport() map[string]error {
var err error
if !l.enabled {
err = errors.New("the audit logger is not enabled")
} else if len(l.loggingChannel) == bufferCapacity {
err = errors.New("buffer is full")
}
return map[string]error{l.Name(): err}
}
func (l *AuditLoggerService) Ready() error {
if !l.enabled {
return errors.New("the audit logger is not enabled")
}
return nil
}
// Entrypoint for our log handling goroutine. This waits on the channel and sends out
// logs as they come in.
//
// This function calls postLogToLogService which blocks.
func (l *AuditLoggerService) runLoop() {
defer close(l.chDone)
for {
select {
case <-l.chStop:
l.logger.Warn("The audit logger is shutting down")
return
case event := <-l.loggingChannel:
l.postLogToLogService(event.eventID, event.data)
}
}
}
// Takes an EventID and associated data and sends it to the configured logging
// endpoint. This function blocks on the send by timesout after a period of
// several seconds. This helps us prevent getting stuck on a single log
// due to transient network errors.
//
// This function blocks when called.
func (l *AuditLoggerService) postLogToLogService(eventID EventID, data Data) {
// Audit log JSON data
logItem := map[string]interface{}{
"eventID": eventID,
"hostname": l.hostname,
"localIP": l.localIP,
"env": l.environmentName,
"data": data,
}
// Optionally wrap audit log data into JSON object to help dynamically structure for an HTTP log service call
if l.jsonWrapperKey != "" {
logItem = map[string]interface{}{l.jsonWrapperKey: logItem}
}
serializedLog, err := json.Marshal(logItem)
if err != nil {
l.logger.Errorw("unable to serialize wrapped audit log item to JSON", "err", err, "logItem", logItem)
return
}
ctx, cancel := l.chStop.NewCtx()
defer cancel()
// Send to remote service
req, err := http.NewRequestWithContext(ctx, "POST", (*url.URL)(&l.forwardToUrl).String(), bytes.NewReader(serializedLog))
if err != nil {
l.logger.Error("failed to create request to remote logging service!")
}
for _, header := range l.headers {
req.Header.Add(header.Header, header.Value)
}
resp, err := l.loggingClient.Do(req)
if err != nil {
l.logger.Errorw("failed to send audit log to HTTP log service", "err", err, "logItem", logItem)
return
}
if resp.StatusCode != 200 {
if resp.Body == nil {
l.logger.Errorw("no body to read. Possibly an error occurred sending", "logItem", logItem)
return
}
bodyBytes, err := io.ReadAll(resp.Body)
if err != nil {
l.logger.Errorw("error reading errored HTTP log service webhook response body", "err", err, "logItem", logItem)
return
}
l.logger.Errorw("error sending log to HTTP log service", "statusCode", resp.StatusCode, "bodyString", string(bodyBytes))
return
}
}
// getLocalIP returns the first non-loopback local IP of the host
func getLocalIP() string {
addrs, err := net.InterfaceAddrs()
if err != nil {
return ""
}
for _, address := range addrs {
// filter and return address types for first non loopback address
if ipnet, ok := address.(*net.IPNet); ok && !ipnet.IP.IsLoopback() {
if ipnet.IP.To4() != nil {
return ipnet.IP.String()
}
}
}
return ""
}