/
main.go
131 lines (115 loc) · 2.75 KB
/
main.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
package main
import (
"bufio"
"context"
"fmt"
"io"
"os"
"os/exec"
"os/signal"
"strings"
"time"
"github.com/klev-dev/klev-api-go"
"github.com/klev-dev/klev-api-go/logs"
"github.com/klev-dev/klev-api-go/messages"
"github.com/klev-dev/kleverr"
"golang.org/x/exp/slices"
"golang.org/x/sync/errgroup"
)
func main() {
ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt)
defer cancel()
messages := make(chan klev.PublishMessage, 32)
g, ctx := errgroup.WithContext(ctx)
g.Go(func() error {
return tailJournal(ctx, messages)
})
g.Go(func() error {
return publishBatched(ctx, messages)
})
if err := g.Wait(); err != nil {
panic(err.Error())
}
}
func tailJournal(ctx context.Context, msgs chan<- klev.PublishMessage) error {
defer close(msgs)
cmd := exec.CommandContext(ctx, "/usr/bin/journalctl", "--system", "-f", "-o", "json")
stdout, err := cmd.StdoutPipe()
if err != nil {
return kleverr.Ret(err)
}
if err := cmd.Start(); err != nil {
return kleverr.Ret(err)
}
reader := bufio.NewReader(stdout)
for {
line, err := reader.ReadString('\n')
if err != nil {
if err == io.EOF {
return nil
}
return kleverr.Ret(err)
}
msgs <- klev.NewPublishMessageValue(strings.TrimSpace(line))
}
}
func publishBatched(ctx context.Context, msgs <-chan klev.PublishMessage) error {
cfg := klev.NewConfig(os.Getenv("KLEV_TOKEN_DEMO"))
logs := logs.New(cfg)
messages := messages.New(cfg)
log, err := logs.Create(ctx, klev.LogCreateParams{
Metadata: fmt.Sprintf(`{"source": "journal", "unit": "system", "start": %d}`, time.Now().Unix()),
})
if err != nil {
return kleverr.Ret(err)
}
var pending []klev.PublishMessage
var publishAny = func() (bool, error) {
publish := len(pending) > 0
if publish {
if _, err := messages.Publish(ctx, log.LogID, pending); err != nil {
return false, kleverr.Ret(err)
}
pending = nil
}
return publish, nil
}
var publishBatch = func() error {
if len(pending) > 24 {
if _, err := messages.Publish(ctx, log.LogID, pending[0:24]); err != nil {
return kleverr.Ret(err)
}
pending = slices.Delete(pending, 0, 24)
}
return nil
}
for {
select {
case msg, ok := <-msgs:
if ok {
// received a new message, append and try publish
pending = append(pending, msg)
if err := publishBatch(); err != nil {
return err
}
} else {
// messages channel is closing, publish anything pending
_, err := publishAny()
return err
}
default:
// no new messages, try publishing anything pending
if pub, err := publishAny(); err != nil {
return err
} else if !pub {
// no pending messages, wait for any change
msg, ok := <-msgs
if ok {
pending = append(pending, msg)
} else {
return nil
}
}
}
}
}