/
maprhandler.go
74 lines (65 loc) · 2.06 KB
/
maprhandler.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
package handlers
import (
"github.com/mimecast/dtail/internal/logger"
"github.com/mimecast/dtail/internal/mapr"
"github.com/mimecast/dtail/internal/mapr/client"
"strings"
)
// MaprHandler is the handler used on the client side for running mapreduce aggregations.
type MaprHandler struct {
baseHandler
aggregate *client.Aggregate
query *mapr.Query
count uint64
}
// NewMaprHandler returns a new mapreduce client handler.
func NewMaprHandler(server string, query *mapr.Query, globalGroup *mapr.GlobalGroupSet, pingTimeout int) *MaprHandler {
return &MaprHandler{
baseHandler: baseHandler{
server: server,
shellStarted: false,
commands: make(chan string),
pong: make(chan struct{}, 1),
stop: make(chan struct{}),
pingTimeout: pingTimeout,
},
query: query,
aggregate: client.NewAggregate(server, query, globalGroup),
}
}
// Read data from the dtail server via Writer interface.
func (h *MaprHandler) Write(p []byte) (n int, err error) {
for _, b := range p {
h.baseHandler.receiveBuf = append(h.baseHandler.receiveBuf, b)
if b == '\n' {
if len(h.baseHandler.receiveBuf) == 0 {
continue
}
message := string(h.baseHandler.receiveBuf)
if h.baseHandler.receiveBuf[0] == 'A' {
h.handleAggregateMessage(strings.TrimSpace(message))
h.baseHandler.receiveBuf = h.baseHandler.receiveBuf[:0]
continue
}
h.baseHandler.handleMessageType(message)
}
}
return len(p), nil
}
// Handle a message received from server including mapr aggregation
// related data.
func (h *MaprHandler) handleAggregateMessage(message string) {
h.count++
parts := strings.Split(message, "|")
// Index 0 contains 'AGGREGATE', 1 contains server host.
// Aggregation data begins from index 2.
logger.Debug("Received aggregate data", h.server, h.count)
h.aggregate.Aggregate(parts[2:])
logger.Debug("Aggregated aggregate data", h.server, h.count)
}
// Stop stops the mapreduce client handler.
func (h *MaprHandler) Stop() {
logger.Debug("Stopping mapreduce handler", h.server)
h.aggregate.Stop()
h.baseHandler.Stop()
}