-
Notifications
You must be signed in to change notification settings - Fork 0
/
sender.go
121 lines (106 loc) · 2.92 KB
/
sender.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
package main
import (
"encoding/base64"
"encoding/json"
"errors"
"io/ioutil"
"log"
"net/http"
"net/url"
"strconv"
"github.com/cloudfoundry-community/cfenv"
"github.com/cloudfoundry/sonde-go/events"
)
var mixPanelChanel = make(chan *[]byte, 50)
//TODO Gross, get the mixpanel sending stuff into an object.
var mixPanelToken string
//SetMixPanelToken to use when talking to mixpanel
func SetMixPanelToken(token string) {
mixPanelToken = token
}
//GetMixPanelChan returns the channel to send events to MixPanel, used as a
//test hook
func GetMixPanelChan() chan *[]byte {
return mixPanelChanel
}
//Sender interface is what you must implmenet to send something to mixpanel
type Sender interface {
Send(bytes []byte) error
}
//MixPanelSender sends to MixPanel
type MixPanelSender struct {
URL string
}
//Send to MixPanel
func (m MixPanelSender) Send(bytes []byte) error {
encodedString := base64.StdEncoding.EncodeToString(bytes)
log.Printf("Sending data to %s", m.URL)
r, err := http.PostForm(m.URL, url.Values{"data": {encodedString}})
defer r.Body.Close()
body, err := ioutil.ReadAll(r.Body)
log.Printf("Mixpanel returned %s", body)
if nil != err && r.StatusCode != http.StatusOK {
return errors.New("Server returned status:" + string(r.StatusCode))
}
if '1' != body[0] {
return errors.New("Track was not successful")
}
return nil
}
//SendEventsToMixPanel does batch posts of firehose events to mix channel
func SendEventsToMixPanel(mixPanel *cfenv.Service, msgChan chan *events.Envelope) {
mixPanelToken = mixPanel.Credentials["token"].(string)
log.Println("Using Mixpanel Token " + mixPanelToken)
for i := 0; i < 3; i++ {
go MixPanelWorker(strconv.Itoa(i),
MixPanelSender{URL: mixPanel.Credentials["uri"].(string)})
}
for msg := range msgChan {
mixPanelChanel <- EventToJSON(msg)
}
}
//EventToJSON turns a firehose event into a json representation
func EventToJSON(event *events.Envelope) *[]byte {
props := map[string]interface{}{
"time": event.GetTimestamp() / 1000000000,
"origin": event.GetOrigin(),
"deployment": event.GetDeployment(),
"job": event.GetJob(),
"index": event.GetIndex(),
"ip": event.GetIp(),
"token": mixPanelToken,
}
data := map[string]interface{}{
"event": event.GetEventType().String(),
"properties": props,
}
j, err := json.Marshal(data)
if nil != err {
log.Print("Failed to marshal event")
log.Print(data)
}
return &j
}
//Collect gathers 50 events from the channel and returns
//them as a batch
func Collect(channel chan *[]byte) []byte {
events := "["
count := 0
for {
event := <-channel
events += string(*event)
count++
if 50 == count {
events += "]"
return []byte(events)
}
events += ","
}
}
//MixPanelWorker collects events to send to mixpanel in batches of 50
func MixPanelWorker(id string, sender Sender) {
log.Println("Created a sender with id " + id)
for {
sender.Send(Collect(mixPanelChanel))
}
}