-
Notifications
You must be signed in to change notification settings - Fork 14
/
linefile.go
102 lines (91 loc) · 2.86 KB
/
linefile.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
/*
* skogul, linefile receiver
*
* Copyright (c) 2019 Telenor Norge AS
* Author(s):
* - Kristian Lyngstøl <kly@kly.no>
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License as published by the Free Software Foundation; either
* version 2.1 of the License, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this library; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
* 02110-1301 USA
*/
package receiver
import (
"bufio"
"os"
"time"
"github.com/telenornms/skogul"
)
var lfLog = skogul.Logger("receiver", "linefile")
// LineFile will keep reading File over and over again, assuming one
// collection per line. Best suited for pointing at a FIFO, which will
// allow you to 'cat' stuff to Skogul.
type LineFile struct {
File string `doc:"Path to the fifo or file from which to read from repeatedly."`
Handler skogul.HandlerRef `doc:"Handler used to parse and transform and send data."`
Delay skogul.Duration `doc:"Delay before re-opening the file, if any."`
}
// Common routine for both fifo and stdin
func (lf *LineFile) read() error {
f, err := os.Open(lf.File)
if err != nil {
lfLog.WithError(err).WithField("file", lf.File).Error("Unable to open file")
return err
}
scanner := bufio.NewScanner(f)
for scanner.Scan() {
bytes := scanner.Bytes()
if err := lf.Handler.H.Handle(bytes); err != nil {
lfLog.WithError(err).Error("Failed to send metric")
}
}
if err := scanner.Err(); err != nil {
lfLog.WithError(err).Error("Error reading file")
return skogul.Error{Reason: "Error reading file"}
}
return nil
}
// Start never returns.
func (lf *LineFile) Start() error {
for {
lf.read()
if lf.Delay.Duration != 0 {
time.Sleep(lf.Delay.Duration)
}
}
}
// File reads from a FILE, a single JSON object per line, and
// exits at EOF.
type File struct {
File string `doc:"Path to the file to read from once."`
Handler skogul.HandlerRef `doc:"Handler used to parse, transform and send data."`
lf LineFile
}
// Start reads a file once, then returns.
func (s *File) Start() error {
s.lf.File = s.File
s.lf.Handler = s.Handler
return s.lf.read()
}
// Stdin reads from /dev/stdin
type Stdin struct {
Handler skogul.HandlerRef `doc:"Handler used to parse, transform and send data."`
lf LineFile
}
// Start reads from stdin until EOF, then returns
func (s *Stdin) Start() error {
s.lf.File = "/dev/stdin"
s.lf.Handler = s.Handler
return s.lf.read()
}