-
Notifications
You must be signed in to change notification settings - Fork 26
/
main.go
85 lines (77 loc) · 2.13 KB
/
main.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
package main
import (
"fmt"
"io/ioutil"
"l2met/receiver"
"l2met/store"
"l2met/utils"
"log"
"net/http"
"os"
"runtime"
"time"
)
func init() {
runtime.GOMAXPROCS(runtime.NumCPU())
}
func main() {
// The number of partitions that our backends support.
numPartitions := utils.EnvUint64("NUM_OUTLET_PARTITIONS", 1)
// The bucket.Store struct will initialize a redis pool for us.
maxRedisConn := utils.EnvInt("OUTLET_C", 2) + 100
// We use the store to Put buckets into redis.
server, pass, err := utils.ParseRedisUrl()
if err != nil {
log.Fatal(err)
}
rs := store.NewRedisStore(server, pass, numPartitions, maxRedisConn)
reqBuf := utils.EnvInt("REQUEST_BUFFER", 1000)
recv := receiver.NewReceiver(reqBuf, reqBuf)
recv.FlushInterval = time.Millisecond * 200
recv.NumOutlets = utils.EnvInt("OUTLET_C", 100)
recv.NumAcceptors = utils.EnvInt("ACCEPT_C", 100)
recv.Store = rs
recv.Start()
http.HandleFunc("/health", func(w http.ResponseWriter, r *http.Request) {
healthCheck(w, r, rs)
})
http.HandleFunc("/logs", func(w http.ResponseWriter, r *http.Request) {
recvLogs(w, r, recv)
})
port := utils.EnvString("PORT", "8000")
err = http.ListenAndServe(":"+port, nil)
if err != nil {
fmt.Printf("error=%s msg=%q\n", err, "Unable to start http server.")
os.Exit(1)
}
fmt.Printf("at=l2met-initialized port=%s\n", port)
}
func healthCheck(w http.ResponseWriter, r *http.Request, s store.Store) {
ok := s.Health()
if !ok {
msg := "Redis is unavailable."
fmt.Printf("error=%q\n", msg)
http.Error(w, msg, 500)
}
}
// Pull data from the http request, stick it in a channel and close the request.
// We don't do any validation on the data. Always respond with 200.
func recvLogs(w http.ResponseWriter, r *http.Request, recv *receiver.Receiver) {
defer utils.MeasureT("http-receiver", time.Now())
if r.Method != "POST" {
http.Error(w, "Invalid Request", 400)
return
}
token, err := utils.ParseToken(r)
if err != nil {
http.Error(w, "Invalid Request", 400)
return
}
b, err := ioutil.ReadAll(r.Body)
r.Body.Close()
if err != nil {
http.Error(w, "Invalid Request", 400)
return
}
recv.Receive(token, b, r.URL.Query())
}