/
journal.go
245 lines (194 loc) · 6.53 KB
/
journal.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
package journal
import (
"encoding/json"
"fmt"
"io"
"os"
"sort"
"sync"
"time"
"github.com/vaitekunas/journal/logrpc"
)
// Config contains all the necessary settings to create a new local logging facility
type Config struct {
Service string // Service name
Instance string // Instance name
Folder string // Folder to store logfiles (can be empty if logging to stdout only)
Filename string // Filename of the logfiles (without date suffix and file extension. Can be empty if logging to stdout only)
Rotation int // Logfile rotation frequency
Out int // Logger output type
Headers bool // Should the logfile contain column headers?
JSON bool // Should each entry be written as a JSON-formatted string?
Compress bool // Should old logfiles be compressed?
Columns []int64 // List of relevant columns (can be empty if default columns should be used)
}
// Killswitch is a bool channel used to stop coroutines
type killswitch chan<- bool
// New creates a new logging facility
func New(config *Config) (*Logger, error) {
// Validate options
if config.Rotation < ROT_NONE || config.Rotation > ROT_ANNUALLY {
return nil, fmt.Errorf("New: invalid roll option '%d'", config.Rotation)
}
if config.Out < OUT_FILE || config.Out > OUT_FILE_AND_STDOUT {
return nil, fmt.Errorf("New: invalid output option '%d'", config.Out)
}
if len(config.Columns) == 0 {
config.Columns = defaultCols
} else {
for _, col := range config.Columns {
if col < COL_DATE_YYMMDD || col > COL_LINE {
return nil, fmt.Errorf("New: invalid column '%d'", col)
}
}
}
// Check permissions
if config.Out == OUT_FILE || config.Out == OUT_FILE_AND_STDOUT {
if !canWrite(config.Folder) {
return nil, fmt.Errorf("New: cannot write to '%s'", config.Folder)
}
}
// Initiate log instance
Log := &Logger{
mu: &sync.Mutex{},
wg: &sync.WaitGroup{},
active: true,
config: config,
codes: defaultCodes,
ledger: make(chan logEntry, 1000),
remoteWriters: map[string]io.Writer{},
killswitches: []killswitch{},
}
// Start file rotation (async)
Log.killswitches = append(Log.killswitches, Log.rotateFile())
// Start log writer
Log.killswitches = append(Log.killswitches, Log.write())
return Log, nil
}
// Logger is the main loggger struct
type Logger struct {
mu *sync.Mutex // Protect logfile changes
wg *sync.WaitGroup // Protect ledger processing
active bool // logger Activity switch
config *Config // Main config
codes map[int]Code // Mapping of integer message codes to their string values
ledger chan logEntry // Ledger of unprocessed log entries
killswitches []killswitch // Killswitches of all coroutines spawned by the logger
// log Writers
logfile *os.File // local logfile's file descriptor
stdout *os.File // local stdout
remoteWriters map[string]io.Writer // remote log writers (grpc, kafka, etc)
// gRPC-related
gRPC *logrpc.RemoteLoggerClient // gRPC client
gRPCTimeout time.Duration // gRPC timeout duration
}
// UseCustomCodes Replaces loggers default message codes with custom ones
func (l *Logger) UseCustomCodes(codes map[int]Code) {
for code, lCode := range codes {
if code > 1 && code < 999 {
l.codes[code] = lCode
}
}
}
// Log logs a simple message and returns nil or error, depending on the code
func (l *Logger) Log(caller string, code int, msg string, format ...interface{}) error {
return l.pushToLedger(2, caller, code, msg, format...)
}
// LogFields encodes the message (not the whole log) in JSON and writes to log
func (l *Logger) LogFields(caller string, code int, msg map[string]interface{}) error {
jsoned, err := json.Marshal(msg)
if err != nil {
return l.pushToLedger(2, "system", 1, "LogFields: could not marshal log entry to JSON: %s", err.Error())
}
return l.pushToLedger(2, caller, code, string(jsoned))
}
// NewCaller is a wrapper for the Logger.Log function
func (l *Logger) NewCaller(caller string) func(int, string, ...interface{}) error {
return func(code int, msg string, format ...interface{}) error {
return l.pushToLedger(2, caller, code, msg, format...)
}
}
// NewCallerCode is a wrapper for the Logger.fullog function
func (l *Logger) NewCallerCode(caller string, code int) func(string, ...interface{}) error {
return func(msg string, format ...interface{}) error {
return l.pushToLedger(2, caller, code, msg, format...)
}
}
// RawEntry writes a raw log entry (map of strings) into the ledger.
// The raw entry must contain columns COL_DATE_YYMMDD_HHMMSS_NANO to COL_LINE
func (l *Logger) RawEntry(entry map[int64]string) error {
// Validate the raw Entry
for _, code := range defaultCols {
if _, ok := entry[code]; !ok {
return fmt.Errorf("RawEntry: missing column '%d'", code)
}
}
// Write the entry into the ledger
if l.active {
l.wg.Add(1)
go func() {
l.ledger <- entry
}()
}
return nil
}
// AddDestination adds a (remote) destination to send logs to
func (l *Logger) AddDestination(name string, writer io.Writer) error {
l.mu.Lock()
defer l.mu.Unlock()
if _, ok := l.remoteWriters[name]; ok {
return fmt.Errorf("AddDestination: destination %s already present", name)
}
l.remoteWriters[name] = writer
return nil
}
// RemoveDestination removes a (remote) destination to send logs to
func (l *Logger) RemoveDestination(name string) error {
l.mu.Lock()
defer l.mu.Unlock()
if _, ok := l.remoteWriters[name]; !ok {
return fmt.Errorf("RemoveDestination: unknown destination '%s'", name)
}
delete(l.remoteWriters, name)
return nil
}
// ListDestinations lists all (remote) destinations
func (l *Logger) ListDestinations() []string {
l.mu.Lock()
defer l.mu.Unlock()
var localDst []string
switch l.config.Out {
case OUT_STDOUT:
localDst = []string{"stdout"}
case OUT_FILE:
localDst = []string{l.logfile.Name()}
case OUT_FILE_AND_STDOUT:
localDst = []string{"stdout", l.logfile.Name()}
}
remoteDst := make([]string, len(l.remoteWriters))
i := 0
for endpoint := range l.remoteWriters {
remoteDst[i] = endpoint
i++
}
sort.Strings(remoteDst)
return append(localDst, remoteDst...)
}
// Quit stops all Logger coroutines and closes files
func (l *Logger) Quit() {
// Deactivate ledger
l.active = false
// Wait for the ledger processing to finish
l.wg.Wait()
// Lock any writing or file rotation activity
l.mu.Lock()
defer l.mu.Unlock()
// Stop all registered coroutines
for _, killswitch := range l.killswitches {
killswitch <- true
}
// Close active log
if l.logfile != nil {
l.logfile.Close()
}
}