This repository has been archived by the owner on Jan 31, 2018. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 3
/
route_events.go
184 lines (162 loc) · 4.71 KB
/
route_events.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
package processor
import (
"compress/gzip"
"encoding/json"
"io"
"io/ioutil"
"os"
"path"
"strings"
"time"
"github.com/twitchscience/aws_utils/logger"
"github.com/twitchscience/blueprint/bpdb"
)
// EventRouter receives Mixpanel events, and for events that do not have a table yet, outputs files
// describing the table for that event.
type EventRouter struct {
// CurrentTables maintains the current event names with schemas in bpdb. It is updated periodically.
CurrentTables []string
// Processors aggregate data about different event types.
Processors map[string]EventProcessor
// ProcessorFactory creates a new Processor for a previously unseen event type.
ProcessorFactory func(string) EventProcessor
// FlushTimer will peridically flush data about events to the output directory.
FlushTimer <-chan time.Time
// bpSchemaBackend talks to blueprint's db to get the current tables.
bpSchemaBackend bpdb.BpSchemaBackend
// GzipReader is for reading files, and is re-used.
GzipReader *gzip.Reader
// OutputDir to place files.
OutputDir string
}
// NewRouter allocates a new EventRouter that outputs transformations to a given output directory.
func NewRouter(
outputDir string,
flushInterval time.Duration,
bpSchemaBackend bpdb.BpSchemaBackend,
) *EventRouter {
r := &EventRouter{
Processors: make(map[string]EventProcessor),
ProcessorFactory: NewNonTrackedEventProcessor,
FlushTimer: time.NewTicker(flushInterval).C,
bpSchemaBackend: bpSchemaBackend,
OutputDir: outputDir,
}
r.UpdateCurrentTables()
return r
}
// MPEvent is a Mixpanel event.
type MPEvent struct {
Event string
Properties map[string]interface{}
}
// ReadFile reads a file of Mixpanel events and routes them to event aggregators.
// If the flush interval has expired, it will flush all even aggregators after reading the file.
func (e *EventRouter) ReadFile(filename string) error {
e.UpdateCurrentTables()
file, err := os.Open(filename)
if err != nil {
return err
}
defer func() {
err = file.Close()
if err != nil {
logger.WithError(err).WithField("filename", filename).Error("Failed to close file")
}
}()
if e.GzipReader == nil {
e.GzipReader, err = gzip.NewReader(file)
if err != nil {
return err
}
} else {
err = e.GzipReader.Reset(file)
if err != nil {
return err
}
}
defer func() {
err = e.GzipReader.Close()
if err != nil {
logger.WithError(err).Error("Failed to close gzip reader body")
}
}()
d := json.NewDecoder(e.GzipReader)
d.UseNumber()
for {
var event MPEvent
err := d.Decode(&event)
if err == io.EOF {
break
} else if err != nil {
logger.WithError(err).Fatal("Decoding event error")
}
e.Route(event.Event, event.Properties)
}
// if the Ticker has a message in the channel then we flush. Otherwise continue...
select {
case <-e.FlushTimer:
e.FlushRouters()
default:
}
return nil
}
// UpdateCurrentTables talks to bpdb and updates the list of tables that have been created.
func (e *EventRouter) UpdateCurrentTables() {
configs, err := e.bpSchemaBackend.AllSchemas()
if err != nil {
logger.WithError(err).Error("Failed to fetch schemas from bpdb")
return
}
newTables := make([]string, len(configs))
for idx, config := range configs {
newTables[idx] = config.EventName
}
e.CurrentTables = newTables
}
// Route sends an event to its event aggregator, but only if the event does not have a table yet.
func (e *EventRouter) Route(eventName string, properties map[string]interface{}) {
if e.EventCreated(eventName) {
return
}
_, ok := e.Processors[eventName]
if !ok {
e.Processors[eventName] = e.ProcessorFactory(e.OutputDir)
}
e.Processors[eventName].Accept(properties)
}
// FlushRouters flushes event schema descriptions to the output directory, and also deletes ones for
// which a table has been created (can happen under race condition).
func (e *EventRouter) FlushRouters() {
for event, processor := range e.Processors {
processor.Flush(event)
delete(e.Processors, event)
}
// removed tracked events here (at least limit the time of the race duration)
e.UpdateCurrentTables()
infos, err := ioutil.ReadDir(e.OutputDir)
if err != nil {
return
}
for _, info := range infos {
if info.IsDir() {
continue
}
if strings.HasSuffix(info.Name(), ".json") && e.EventCreated(strings.TrimSuffix(info.Name(), ".json")) {
fname := path.Join(e.OutputDir, info.Name())
err = os.Remove(fname)
if err != nil {
logger.WithError(err).WithField("filename", fname).Error("Failed to remove file")
}
}
}
}
// EventCreated returns true if the event has a table in bpdb.
func (e *EventRouter) EventCreated(eventName string) bool {
for _, tables := range e.CurrentTables {
if tables == eventName {
return true
}
}
return false
}