forked from cloudfoundry-community/firehose-to-syslog
/
sync.go
144 lines (120 loc) · 3.85 KB
/
sync.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
package consumer
import (
"bytes"
"errors"
"fmt"
"io/ioutil"
"mime/multipart"
"net/http"
"net/url"
"strings"
"github.com/cloudfoundry/noaa"
noaa_errors "github.com/cloudfoundry/noaa/errors"
"github.com/cloudfoundry/sonde-go/events"
"github.com/gogo/protobuf/proto"
)
// RecentLogs connects to traffic controller via its 'recentlogs' http(s)
// endpoint and returns a slice of recent messages. It does not guarantee any
// order of the messages; they are in the order returned by traffic controller.
//
// The noaa.SortRecent function is provided to sort the data returned by
// this method.
func (c *Consumer) RecentLogs(appGuid string, authToken string) ([]*events.LogMessage, error) {
messages := make([]*events.LogMessage, 0, 200)
callback := func(envelope *events.Envelope) error {
messages = append(messages, envelope.GetLogMessage())
return nil
}
err := c.readTC(appGuid, authToken, "recentlogs", callback)
if err != nil {
return nil, err
}
return messages, nil
}
// ContainerMetrics connects to traffic controller via its 'containermetrics'
// http(s) endpoint and returns the most recent messages for an app. The
// returned metrics will be sorted by InstanceIndex.
func (c *Consumer) ContainerMetrics(appGuid string, authToken string) ([]*events.ContainerMetric, error) {
messages := make([]*events.ContainerMetric, 0, 200)
callback := func(envelope *events.Envelope) error {
if envelope.GetEventType() == events.Envelope_LogMessage {
return errors.New(fmt.Sprintf("Upstream error: %s", envelope.GetLogMessage().GetMessage()))
}
messages = append(messages, envelope.GetContainerMetric())
return nil
}
err := c.readTC(appGuid, authToken, "containermetrics", callback)
if err != nil {
return nil, err
}
noaa.SortContainerMetrics(messages)
return messages, err
}
func (c *Consumer) readTC(appGuid string, authToken string, endpoint string, callback func(*events.Envelope) error) error {
trafficControllerUrl, err := url.ParseRequestURI(c.trafficControllerUrl)
if err != nil {
return err
}
scheme := "https"
if trafficControllerUrl.Scheme == "ws" {
scheme = "http"
}
recentPath := fmt.Sprintf("%s://%s/apps/%s/%s", scheme, trafficControllerUrl.Host, appGuid, endpoint)
req, _ := http.NewRequest("GET", recentPath, nil)
req.Header.Set("Authorization", authToken)
resp, err := c.client.Do(req)
if err != nil {
message := `Error dialing traffic controller server: %s.
Please ask your Cloud Foundry Operator to check the platform configuration (traffic controller endpoint is %s).`
return errors.New(fmt.Sprintf(message, err, c.trafficControllerUrl))
}
defer resp.Body.Close()
err = checkForErrors(resp)
if err != nil {
return err
}
reader, err := getMultipartReader(resp)
if err != nil {
return err
}
var buffer bytes.Buffer
for part, loopErr := reader.NextPart(); loopErr == nil; part, loopErr = reader.NextPart() {
buffer.Reset()
_, err = buffer.ReadFrom(part)
if err != nil {
break
}
envelope := new(events.Envelope)
proto.Unmarshal(buffer.Bytes(), envelope)
err = callback(envelope)
if err != nil {
return err
}
}
return nil
}
func checkForErrors(resp *http.Response) error {
if resp.StatusCode == http.StatusUnauthorized {
data, _ := ioutil.ReadAll(resp.Body)
return noaa_errors.NewUnauthorizedError(string(data))
}
if resp.StatusCode == http.StatusBadRequest {
return ErrBadRequest
}
if resp.StatusCode != http.StatusOK {
return ErrNotOK
}
return nil
}
func getMultipartReader(resp *http.Response) (*multipart.Reader, error) {
contentType := resp.Header.Get("Content-Type")
if len(strings.TrimSpace(contentType)) == 0 {
return nil, ErrBadResponse
}
matches := boundaryRegexp.FindStringSubmatch(contentType)
if len(matches) != 2 || len(strings.TrimSpace(matches[1])) == 0 {
return nil, ErrBadResponse
}
reader := multipart.NewReader(resp.Body, matches[1])
return reader, nil
}