/
input_file.go
79 lines (62 loc) · 1.38 KB
/
input_file.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
package stream
import (
"bufio"
"context"
"os"
"github.com/justtrackio/gosoline/pkg/cfg"
"github.com/justtrackio/gosoline/pkg/encoding/json"
"github.com/justtrackio/gosoline/pkg/log"
)
type FileSettings struct {
Filename string `cfg:"filename"`
Blocking bool `cfg:"blocking"`
}
type fileInput struct {
logger log.Logger
settings FileSettings
channel chan *Message
stopped bool
}
func NewFileInput(_ cfg.Config, logger log.Logger, settings FileSettings) Input {
return NewFileInputWithInterfaces(logger, settings)
}
func NewFileInputWithInterfaces(logger log.Logger, settings FileSettings) Input {
return &fileInput{
logger: logger,
settings: settings,
channel: make(chan *Message),
}
}
func (i *fileInput) Data() <-chan *Message {
return i.channel
}
func (i *fileInput) Run(ctx context.Context) error {
defer func() {
if !i.settings.Blocking {
close(i.channel)
}
}()
file, err := os.Open(i.settings.Filename)
if err != nil {
i.logger.Error("can not open file: %w", err)
return err
}
scanner := bufio.NewScanner(file)
for scanner.Scan() {
if i.stopped {
break
}
rawMessage := scanner.Text()
msg := Message{}
err = json.Unmarshal([]byte(rawMessage), &msg)
if err != nil {
i.logger.Error("could not unmarshal message: %w", err)
continue
}
i.channel <- &msg
}
return nil
}
func (i *fileInput) Stop() {
i.stopped = true
}