/
ListDirectory.go
executable file
·195 lines (172 loc) · 5.32 KB
/
ListDirectory.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
// Package files is generated by Handlergenerator tooling
// Make sure to insert real Description here
package files
import (
"context"
"fmt"
"io/ioutil"
"path/filepath"
"strings"
"sync"
"time"
"github.com/percybolmer/go4data/handlers"
"github.com/percybolmer/go4data/metric"
"github.com/percybolmer/go4data/payload"
"github.com/percybolmer/go4data/property"
"github.com/percybolmer/go4data/pubsub"
"github.com/percybolmer/go4data/register"
)
// ListDirectory is used to list all FILES in a given path
type ListDirectory struct {
// Cfg is values needed to properly run the Handle func
Cfg *property.Configuration `json:"configs" yaml:"configs"`
Name string `json:"handler" yaml:"handler_name"`
path string
buffertime int64
found map[string]int64
sync.Mutex `json:"-" yaml:"-"`
subscriptionless bool
errChan chan error
metrics metric.Provider
metricPrefix string
// MetricPayloadOut is how many payloads the processor has outputted
MetricPayloadOut string
// MetricPayloadIn is how many payloads the processor has inputted
MetricPayloadIn string
}
var (
// DefaultBufferTime is how long in seconds a file should be fulfillremembered
DefaultBufferTime int64 = 3600
)
func init() {
register.Register("ListDirectory", NewListDirectoryHandler)
}
// NewListDirectoryHandler generates a new ListDirectory Handler
func NewListDirectoryHandler() handlers.Handler {
act := &ListDirectory{
Cfg: &property.Configuration{
Properties: make([]*property.Property, 0),
},
Name: "ListDirectory",
found: make(map[string]int64),
subscriptionless: true,
errChan: make(chan error, 1000),
metrics: metric.NewPrometheusProvider(),
}
act.Cfg.AddProperty("path", "the path to search for", true)
act.Cfg.AddProperty("buffertime", "the time in seconds for how long a found file should be fulfillremembered and not relisted", false)
return act
}
// GetHandlerName is used to retrun a unqiue string name
func (a *ListDirectory) GetHandlerName() string {
return a.Name
}
// Handle is used to list all files in a direcory
func (a *ListDirectory) Handle(ctx context.Context, p payload.Payload, topics ...string) error {
for {
select {
case <-ctx.Done():
return nil
default:
payloads, err := a.ListDirectory()
if err != nil {
a.errChan <- err
continue
}
if len(payloads) != 0 {
a.metrics.IncrementMetric(a.MetricPayloadOut, float64(len(payloads)))
errs := pubsub.PublishTopics(topics, payloads...)
for _, err := range errs {
fmt.Println(err)
a.errChan <- err
}
}
}
}
}
// ListDirectory will do all the main work, list directory or return error
func (a *ListDirectory) ListDirectory() ([]payload.Payload, error) {
files, err := ioutil.ReadDir(a.path)
if err != nil {
return nil, err
}
a.Lock()
for k, v := range a.found {
if time.Now().Unix()-v > a.buffertime {
delete(a.found, k) // If the item is older than given time setting, delete it from buffer
}
}
a.Unlock()
var outputPayloads []payload.Payload
for _, f := range files {
if f.IsDir() == false {
file := filepath.Base(f.Name())
var filepath string
if strings.HasSuffix(a.path, "/") {
filepath = fmt.Sprintf("%s%s", a.path, file)
} else {
filepath = fmt.Sprintf("%s/%s", a.path, file)
}
if _, ok := a.found[filepath]; !ok {
outputPayloads = append(outputPayloads, payload.NewBasePayload([]byte(filepath), "ListDirectory", nil))
a.found[filepath] = time.Now().Unix()
}
}
}
return outputPayloads, nil
}
// ValidateConfiguration is used to see that all needed configurations are assigned before starting
func (a *ListDirectory) ValidateConfiguration() (bool, []string) {
// Check if Cfgs are there as needed
// Needs a Directory to monitor
pathProp := a.Cfg.GetProperty("path")
missing := make([]string, 0)
if pathProp == nil {
missing = append(missing, "path")
return false, missing
}
bufferProp := a.Cfg.GetProperty("buffertime")
if bufferProp.Value == nil {
a.buffertime = DefaultBufferTime
} else {
value, err := bufferProp.Int64()
if err != nil {
missing = append(missing, "buffertime")
return false, missing
}
a.buffertime = value
}
a.path = pathProp.String()
return true, nil
}
// GetConfiguration will return the CFG for the Handler
func (a *ListDirectory) GetConfiguration() *property.Configuration {
return a.Cfg
}
// Subscriptionless is used to send out true
func (a *ListDirectory) Subscriptionless() bool {
return a.subscriptionless
}
// GetErrorChannel will return a channel that the Handler can output eventual errors onto
func (a *ListDirectory) GetErrorChannel() chan error {
return a.errChan
}
// SetMetricProvider is used to change what metrics provider is used by the handler
func (a *ListDirectory) SetMetricProvider(p metric.Provider, prefix string) error {
a.metrics = p
a.metricPrefix = prefix
a.MetricPayloadIn = fmt.Sprintf("%s_payloads_in", prefix)
a.MetricPayloadOut = fmt.Sprintf("%s_payloads_out", prefix)
err := a.metrics.AddMetric(&metric.Metric{
Name: a.MetricPayloadOut,
Description: "keeps track of how many payloads the handler has outputted",
})
if err != nil {
return err
}
err = a.metrics.AddMetric(&metric.Metric{
Name: a.MetricPayloadIn,
Description: "keeps track of how many payloads the handler has ingested",
})
return err
}