This repository has been archived by the owner on Jan 31, 2018. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 3
/
processor.go
109 lines (91 loc) · 2.98 KB
/
processor.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
// Package processor contains logic to process gzipped Mixpanel event data into SQL table schemas.
package processor
import (
"reflect"
"github.com/twitchscience/aws_utils/logger"
)
var (
// CriticalPercentage is the percentage of events that a property must be seen in in order to be considered part of the schema for an event.
CriticalPercentage = 0.0
// CriticalThreshold is the number of events of a specific event name that must occur for the event to be summarized.
CriticalThreshold = 2
)
// EventProcessor processes events of a certain type and flushes metadata about the schema.
type EventProcessor interface {
Accept(map[string]interface{})
Flush(string)
}
// Outputter outputs a given event's property summary and number of rows.
type Outputter interface {
Output(string, []PropertySummary, int) error
}
// NonTrackedEventProcessor takes in events
type NonTrackedEventProcessor struct {
// Out outputs events to a directory.
Out Outputter
// Aggregator summarizes the properties for this event for the purposes of creating a SQL table.
Aggregator *EventAggregator
// In is the channel of event properties.
In chan map[string]interface{}
// F is a channel that receives the event name when we're done aggregating and want to compute the transformation.
F chan string
}
// PropertySummary gives information about a field contained in an event.
type PropertySummary struct {
// Name of the property.
Name string
// OccurrenceProbability is an estimate of how often the field appears when the event is sent.
OccurrenceProbability float64
// T is the Go type of the property.
T reflect.Type
// Len gives an approximate length of the values for this property if it is a string.
Len int
}
// NewNonTrackedEventProcessor allocates a new NonTrackedEventProcessor.
func NewNonTrackedEventProcessor(outputDir string) EventProcessor {
p := &NonTrackedEventProcessor{
Out: NewOutputter(outputDir),
In: make(chan map[string]interface{}, 100),
F: make(chan string),
}
logger.Go(p.Listen)
return p
}
// Listen for events.
func (e *NonTrackedEventProcessor) Listen() {
for {
select {
case p := <-e.In:
if e.Aggregator == nil {
e.Aggregator = NewEventAggregator(CriticalPercentage)
}
e.Aggregator.Aggregate(p)
case eventName := <-e.F:
// drain
close(e.In)
for p := range e.In {
if e.Aggregator == nil {
e.Aggregator = NewEventAggregator(CriticalPercentage)
}
e.Aggregator.Aggregate(p)
}
nRows, cols := e.Aggregator.Summarize()
if nRows > CriticalThreshold {
err := e.Out.Output(eventName, cols, nRows)
if err != nil {
logger.WithError(err).Error("Outputter failed")
}
}
e.Aggregator = NewEventAggregator(CriticalPercentage)
return
}
}
}
// Accept an event's properties.
func (e *NonTrackedEventProcessor) Accept(propertyBag map[string]interface{}) {
e.In <- propertyBag
}
// Flush events received. Label the flush with a given name.
func (e *NonTrackedEventProcessor) Flush(eventName string) {
e.F <- eventName
}