/
logplexhandler.go
95 lines (76 loc) · 2.79 KB
/
logplexhandler.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
package main
import (
nr "github.com/newrelic/go-agent"
"net/http"
"strconv"
)
func newLogplexHandler(app nr.Application) http.HandlerFunc {
return func(rr http.ResponseWriter, rq *http.Request) {
rawMsgs, err := parseFrames(rq.Body)
appName, _, _ := rq.BasicAuth()
logDrainToken := rq.Header.Get("Logplex-Drain-Token")
if appName == "" {
// Unfortunately, the Logplex message does **NOT** contain the Heroku
// app's name whatsoever, so we must set it as the username when adding a
// HTTPs log drain
logger.Errorf("Basic auth's username, used for Heroku app name, was not set for drain token `%s` thus we cannot record metric", logDrainToken)
rr.WriteHeader(http.StatusInternalServerError)
return
}
if err != nil {
logger.Errorf("Error parsing frames: %s", err.Error())
rr.WriteHeader(http.StatusBadRequest)
return
}
msgCount, _ := strconv.Atoi(rq.Header.Get("Logplex-Msg-Count"))
if msgCount != len(rawMsgs) {
// TODO record error to Bugsnag
logger.Error("Frame count does not match Logplex-Msg-Count header")
rr.WriteHeader(http.StatusInternalServerError)
return
}
for _, rawMsg := range rawMsgs {
logger.Debugf("Parsing raw msg: `%s`", rawMsg)
msg, err := parseLogplex(rawMsg)
logger.Debugf("Logplex: %+v", msg)
if err != nil {
// TODO: Record error to Bugsnag
logger.Warnf("Malformed Logplex format")
rr.WriteHeader(http.StatusBadRequest)
return
}
var payload map[string]interface{}
payload, err = parseKvp(msg.Msg)
if err != nil {
logger.Debugf("Error parsing payload: %s", err.Error())
continue
}
// NewRelic only receives either seconds or milliseconds.
// See
// https://docs.newrelic.com/docs/insights/insights-data-sources/custom-data/insert-custom-events-insights-api#timestamps
payload["timestamp"] = msg.Timestamp.UnixNano() / 1000000
// We cannot use `appName` since it's reserved to the current app's name
payload["sourceAppName"] = appName
if msg.ProcID == "heroku-postgres" {
app.RecordCustomEvent("PostgresMetric", payload)
} else if msg.ProcID == "heroku-redis" {
app.RecordCustomEvent("RedisMetric", payload)
} else if payload["event_name"] != nil {
app.RecordCustomEvent(payload["event_name"].(string), payload)
} else if msg.Appname == "heroku" && msg.ProcID == "router" && isRouterError(payload) {
app.RecordCustomEvent("HerokuError", payload)
} else if msg.Appname == "heroku" && isDynoMetric(payload) {
app.RecordCustomEvent("DynoMetric", payload)
}
}
rr.WriteHeader(http.StatusOK)
}
}
func isDynoMetric(payload map[string]interface{}) bool {
return payload["load_avg_1m"] != nil ||
payload["memory_total_MB"] != nil
}
func isRouterError(payload map[string]interface{}) bool {
return payload["at"] != nil &&
payload["code"] != nil
}