forked from gravitational/teleport
-
Notifications
You must be signed in to change notification settings - Fork 0
/
log.go
110 lines (96 loc) · 2.67 KB
/
log.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
package state
import (
"io"
"sync"
"time"
log "github.com/Sirupsen/logrus"
"github.com/gravitational/teleport/lib/events"
"github.com/gravitational/teleport/lib/session"
"github.com/gravitational/trace"
)
var (
errNotSupported = trace.BadParameter("method not supported")
)
const (
// MaxQueueSize determines how many logging events to queue in-memory
// before start dropping them (probably because logging server is down)
MaxQueueSize = 10
)
// CachingAuditLog implements events.IAuditLog on the recording machine (SSH server)
// It captures the local recording and forwards it to the AuditLog network server
type CachingAuditLog struct {
server events.IAuditLog
queue chan msg
closeC chan int
closeOnce sync.Once
}
// msg structure is used to transfer logging calls from the calling thread into
// asynchronous queue
type msg struct {
eventType string
fields events.EventFields
sid session.ID
reader io.Reader
}
// MakeCachingAuditLog creaets a new & fully initialized instance of the alog
func MakeCachingAuditLog(logServer events.IAuditLog) *CachingAuditLog {
ll := &CachingAuditLog{
server: logServer,
closeC: make(chan int),
}
// start the queue:
if logServer != nil {
ll.queue = make(chan msg, MaxQueueSize+1)
go ll.run()
}
return ll
}
// run thread is picking up logging events and tries to forward them
// to the logging server
func (ll *CachingAuditLog) run() {
var err error
for ll.server != nil {
select {
case <-ll.closeC:
return
case msg := <-ll.queue:
if msg.fields != nil {
err = ll.server.EmitAuditEvent(msg.eventType, msg.fields)
} else if msg.reader != nil {
err = ll.server.PostSessionChunk(msg.sid, msg.reader)
}
if err != nil {
log.Error(err)
}
}
}
}
func (ll *CachingAuditLog) post(m msg) error {
select {
case ll.queue <- m:
default:
log.Warnf("Audit log cannot keep up. Dropping event '%v'", m.eventType)
}
return nil
}
func (ll *CachingAuditLog) Close() error {
ll.closeOnce.Do(func() {
close(ll.closeC)
})
return nil
}
func (ll *CachingAuditLog) EmitAuditEvent(eventType string, fields events.EventFields) error {
return ll.post(msg{eventType: eventType, fields: fields})
}
func (ll *CachingAuditLog) PostSessionChunk(sid session.ID, reader io.Reader) error {
return ll.post(msg{sid: sid, reader: reader})
}
func (ll *CachingAuditLog) GetSessionChunk(session.ID, int, int) ([]byte, error) {
return nil, errNotSupported
}
func (ll *CachingAuditLog) GetSessionEvents(session.ID, int) ([]events.EventFields, error) {
return nil, errNotSupported
}
func (ll *CachingAuditLog) SearchEvents(time.Time, time.Time, string) ([]events.EventFields, error) {
return nil, errNotSupported
}