-
Notifications
You must be signed in to change notification settings - Fork 53
/
remoteread.go
88 lines (68 loc) · 2.47 KB
/
remoteread.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
package agent
import (
"bytes"
"context"
"fmt"
// todo: needed instead of google.golang.org/protobuf/proto since prometheus Messages are built with it
"github.com/golang/protobuf/proto"
"io"
"net/http"
"github.com/golang/snappy"
"github.com/prometheus/common/version"
"github.com/prometheus/prometheus/prompb"
)
type RemoteReader interface {
Read(ctx context.Context, endpoint string, request *prompb.ReadRequest) (*prompb.ReadResponse, error)
}
func NewRemoteReader(prometheusClient *http.Client) RemoteReader {
return &remoteReader{
prometheusClient: prometheusClient,
}
}
type remoteReader struct {
prometheusClient *http.Client
}
func (client *remoteReader) Read(ctx context.Context, endpoint string, readRequest *prompb.ReadRequest) (*prompb.ReadResponse, error) {
uncompressedData, err := proto.Marshal(readRequest)
if err != nil {
return nil, fmt.Errorf("unable to marshal remote read readRequest: %w", err)
}
compressedData := snappy.Encode(nil, uncompressedData)
request, err := http.NewRequest(http.MethodPost, endpoint, bytes.NewReader(compressedData))
if err != nil {
return nil, fmt.Errorf("unable to crete remote read http readRequest: %w", err)
}
request.Header.Add("Content-Encoding", "snappy")
request.Header.Add("Accept-Encoding", "snappy")
request.Header.Set("Content-Type", "application/x-protobuf")
request.Header.Set("User-Agent", "Prometheus/xx")
request.Header.Set("X-Prometheus-Remote-Read-Version", fmt.Sprintf("Prometheus/%s", version.Version))
request = request.WithContext(ctx)
response, err := client.prometheusClient.Do(request)
if err != nil {
return nil, fmt.Errorf("could not get response from rmeote read: %w", err)
}
defer func() {
_, _ = io.Copy(io.Discard, response.Body)
_ = response.Body.Close()
}()
var reader bytes.Buffer
_, _ = io.Copy(&reader, response.Body)
compressedData, err = io.ReadAll(bytes.NewReader(reader.Bytes()))
if err != nil {
return nil, fmt.Errorf("error reading http response: %w", err)
}
if response.StatusCode/100 != 2 {
return nil, fmt.Errorf("endpoint '%s' responded with status code '%d'", endpoint, response.StatusCode)
}
uncompressedData, err = snappy.Decode(nil, compressedData)
if err != nil {
return nil, fmt.Errorf("unabled to uncompress reponse: %w", err)
}
var readResponse prompb.ReadResponse
err = proto.Unmarshal(uncompressedData, &readResponse)
if err != nil {
return nil, fmt.Errorf("could not unmarshal remote read reponse: %w", err)
}
return &readResponse, nil
}