forked from elastic/beats
/
crawler.go
83 lines (65 loc) · 2.3 KB
/
crawler.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
package crawler
import (
"fmt"
"os"
"github.com/elastic/beats/filebeat/config"
"github.com/elastic/beats/filebeat/input"
"github.com/elastic/beats/libbeat/logp"
)
/*
The hierarchy for the crawler objects is explained as following
Crawler: Filebeat has one crawler. The crawler is the single point of control
and stores the state. The state is written through the registrar
Prospector: For every FileConfig the crawler starts a prospector
Harvester: For every file found inside the FileConfig, the Prospector starts a Harvester
The harvester send their events to the spooler
The spooler sends the event to the publisher
The publisher writes the state down with the registrar
*/
type Crawler struct {
// Registrar object to persist the state
Registrar *Registrar
running bool
}
func (crawler *Crawler) Start(files []config.ProspectorConfig, eventChan chan *input.FileEvent) {
pendingProspectorCnt := 0
crawler.running = true
// Prospect the globs/paths given on the command line and launch harvesters
for _, fileconfig := range files {
logp.Debug("prospector", "File Configs: %v", fileconfig.Paths)
prospector := &Prospector{
ProspectorConfig: fileconfig,
registrar: crawler.Registrar,
}
err := prospector.Init()
if err != nil {
logp.Critical("Error in initing prospector: %s", err)
fmt.Printf("Error in initing prospector: %s", err)
os.Exit(1)
}
go prospector.Run(eventChan)
pendingProspectorCnt++
}
// Now determine which states we need to persist by pulling the events from the prospectors
// When we hit a nil source a prospector had finished so we decrease the expected events
logp.Debug("prospector", "Waiting for %d prospectors to initialise", pendingProspectorCnt)
for event := range crawler.Registrar.Persist {
if event.Source == "" {
pendingProspectorCnt--
if pendingProspectorCnt == 0 {
logp.Debug("prospector", "No pending prospectors. Finishing setup")
break
}
continue
}
crawler.Registrar.state[event.Source] = event
logp.Debug("prospector", "Registrar will re-save state for %s", event.Source)
if !crawler.running {
break
}
}
logp.Info("All prospectors initialised with %d states to persist", len(crawler.Registrar.getStateCopy()))
}
func (crawler *Crawler) Stop() {
// TODO: Properly stop prospectors and harvesters
}