-
Notifications
You must be signed in to change notification settings - Fork 0
/
adapter.go
137 lines (114 loc) · 2.81 KB
/
adapter.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
package logger
import (
"io"
"os"
"strings"
"sync"
"time"
"github.com/docker/docker/api/types/plugins/logdriver"
"github.com/docker/docker/pkg/plugingetter"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
)
// pluginAdapter takes a plugin and implements the Logger interface for logger
// instances
type pluginAdapter struct {
driverName string
id string
plugin logPlugin
basePath string
fifoPath string
capabilities Capability
logInfo Info
// synchronize access to the log stream and shared buffer
mu sync.Mutex
enc logdriver.LogEntryEncoder
stream io.WriteCloser
// buf is shared for each `Log()` call to reduce allocations.
// buf must be protected by mutex
buf logdriver.LogEntry
}
func (a *pluginAdapter) Log(msg *Message) error {
a.mu.Lock()
a.buf.Line = msg.Line
a.buf.TimeNano = msg.Timestamp.UnixNano()
a.buf.Partial = msg.Partial
a.buf.Source = msg.Source
err := a.enc.Encode(&a.buf)
a.buf.Reset()
a.mu.Unlock()
PutMessage(msg)
return err
}
func (a *pluginAdapter) Name() string {
return a.driverName
}
func (a *pluginAdapter) Close() error {
a.mu.Lock()
defer a.mu.Unlock()
if err := a.plugin.StopLogging(strings.TrimPrefix(a.fifoPath, a.basePath)); err != nil {
return err
}
if err := a.stream.Close(); err != nil {
logrus.WithError(err).Error("error closing plugin fifo")
}
if err := os.Remove(a.fifoPath); err != nil && !os.IsNotExist(err) {
logrus.WithError(err).Error("error cleaning up plugin fifo")
}
// may be nil, especially for unit tests
if pluginGetter != nil {
pluginGetter.Get(a.Name(), extName, plugingetter.Release)
}
return nil
}
type pluginAdapterWithRead struct {
*pluginAdapter
}
func (a *pluginAdapterWithRead) ReadLogs(config ReadConfig) *LogWatcher {
watcher := NewLogWatcher()
go func() {
defer close(watcher.Msg)
stream, err := a.plugin.ReadLogs(a.logInfo, config)
if err != nil {
watcher.Err <- errors.Wrap(err, "error getting log reader")
return
}
defer stream.Close()
dec := logdriver.NewLogEntryDecoder(stream)
for {
select {
case <-watcher.WatchClose():
return
default:
}
var buf logdriver.LogEntry
if err := dec.Decode(&buf); err != nil {
if err == io.EOF {
return
}
select {
case watcher.Err <- errors.Wrap(err, "error decoding log message"):
case <-watcher.WatchClose():
}
return
}
msg := &Message{
Timestamp: time.Unix(0, buf.TimeNano),
Line: buf.Line,
Source: buf.Source,
}
// plugin should handle this, but check just in case
if !config.Since.IsZero() && msg.Timestamp.Before(config.Since) {
continue
}
select {
case watcher.Msg <- msg:
case <-watcher.WatchClose():
// make sure the message we consumed is sent
watcher.Msg <- msg
return
}
}
}()
return watcher
}