/
forwarder.go
119 lines (101 loc) · 2.75 KB
/
forwarder.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
package server
import (
"context"
"sync"
"time"
log "github.com/go-pkgz/lgr"
"github.com/pkg/errors"
"github.com/umputun/dkll/app/core"
)
// Forwarder tails syslog messages, parses entries and pushes to Publisher (store) and file logger(s)
type Forwarder struct {
Publisher Publisher
Syslog SyslogBackgroundReader
FileWriter FileWriter
}
// Publisher to store
type Publisher interface {
Publish(records []core.LogEntry) (err error)
LastPublished() (entry core.LogEntry, err error)
}
// SyslogBackgroundReader provides aysnc runner returning the channel for incoming messages
type SyslogBackgroundReader interface {
Go(ctx context.Context) (<-chan string, error)
}
// FileWriter writes entry to all log files
type FileWriter interface {
Write(rec core.LogEntry) error
}
// Run executes forwarder in endless (blocking) loop
func (f *Forwarder) Run(ctx context.Context) error {
log.Print("[INFO] run forwarder from syslog")
messages := make(chan core.LogEntry, 10000)
writerWg := f.backgroundWriter(ctx, messages)
if pe, err := f.Publisher.LastPublished(); err == nil {
log.Printf("[DEBUG] last published [%s : %s]", pe.ID, pe)
}
syslogCh, err := f.Syslog.Go(ctx)
if err != nil {
return errors.Wrap(err, "forwarder failed to run")
}
for {
select {
case <-ctx.Done():
writerWg.Wait() // wait for backgroundWriter completion
<-syslogCh // wait for syslog close
return ctx.Err()
case line, ok := <-syslogCh:
if !ok {
continue
}
ent, err := core.NewEntry(line, time.Local)
if err != nil {
log.Printf("[WARN] failed to make entry from %q, %v", line, err)
continue
}
messages <- ent
}
}
}
func (f *Forwarder) backgroundWriter(ctx context.Context, messages <-chan core.LogEntry) *sync.WaitGroup {
log.Print("[INFO] forwarder's writer activated")
wg := sync.WaitGroup{}
wg.Add(1)
go func() {
defer wg.Done()
buffer := make([]core.LogEntry, 0, 1001)
// send buffer to publisher and file logger
writeBuff := func() {
if len(buffer) == 0 {
return
}
if err := f.Publisher.Publish(buffer); err != nil {
log.Printf("[WARN] failed to publish, error=%s", err)
}
for _, r := range buffer {
if err := f.FileWriter.Write(r); err != nil {
log.Printf("[WARN] failed to write to logs, %v", err)
}
}
log.Printf("[DEBUG] wrote %d entries", len(buffer))
buffer = buffer[0:0]
}
ticks := time.NewTicker(time.Millisecond * 500)
for {
select {
case <-ctx.Done():
writeBuff()
log.Print("[DEBUG] background writer terminated")
return
case msg := <-messages:
buffer = append(buffer, msg)
if len(buffer) >= 1000 { // forced flush every 1000
writeBuff()
}
case <-ticks.C: // flush every 1/2 second
writeBuff()
}
}
}()
return &wg
}