forked from cockroachdb/cockroach
/
http_post.go
159 lines (139 loc) · 4.68 KB
/
http_post.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
// Copyright 2015 The Cockroach Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
// implied. See the License for the specific language governing
// permissions and limitations under the License. See the AUTHORS file
// for names of contributors.
//
// Author: Spencer Kimball (spencer@cockroachlabs.com)
// Vivek Menezes (vivek@cockroachlabs.com)
package driver
import (
"bytes"
"errors"
"fmt"
"io"
"io/ioutil"
"log"
"net/http"
"time"
"github.com/cockroachdb/c-snappy"
"github.com/gogo/protobuf/proto"
"github.com/cockroachdb/cockroach/base"
"github.com/cockroachdb/cockroach/util"
"github.com/cockroachdb/cockroach/util/retry"
)
const (
// StatusTooManyRequests indicates client should retry due to
// server having too many requests.
StatusTooManyRequests = 429
)
// postContext is the context passed into the Post function
type postContext struct {
Server string // The host:port address of the Cockroach gateway node
Endpoint string
Context *base.Context // The base context: needed for client setup.
RetryOpts retry.Options
}
// httpPost posts the req using the HTTP client. The call's method is
// appended to the endpoint and set as the URL path. The call's arguments
// are protobuf-serialized and written as the POST body. The content
// type is set to application/x-protobuf.
//
// On success, the response body is unmarshalled into call.Reply.
//
// HTTP response codes which are retryable are retried with backoff in a loop
// using the default retry options. Other errors sending HTTP request are
// retried indefinitely using the same client command ID to avoid reporting
// failure when in fact the command may go through and execute successfully. We
// retry here to eventually get through with the same client command ID and be
// given the cached response.
func httpPost(c postContext, request, response proto.Message, method fmt.Stringer) error {
// Marshal the args into a request body.
body, err := proto.Marshal(request)
if err != nil {
return err
}
sharedClient, err := c.Context.GetHTTPClient()
if err != nil {
return err
}
// TODO(pmattis): Figure out the right thing to do here. The HTTP client we
// get back from base.Context has a 3 second timeout. If a client operation
// takes longer than that it will be retried. If we're not in an explicit
// transaction the auto-transaction created on the server for the second
// operation will be different than the still running first operation giving
// them the potential to stomp on each other.
client := *sharedClient
client.Timeout = 60 * time.Second
url := c.Context.HTTPRequestScheme() + "://" + c.Server + c.Endpoint + method.String()
var (
req *http.Request
resp *http.Response
b []byte
)
for r := retry.Start(c.RetryOpts); r.Next(); {
req, err = http.NewRequest("POST", url, bytes.NewReader(body))
if err != nil {
return err
}
req.Header.Add(util.ContentTypeHeader, util.ProtoContentType)
req.Header.Add(util.AcceptHeader, util.ProtoContentType)
req.Header.Add(util.AcceptEncodingHeader, util.SnappyEncoding)
resp, err = client.Do(req)
if err != nil {
log.Println(err)
continue
}
defer resp.Body.Close()
switch resp.StatusCode {
case http.StatusOK:
// We're cool.
case http.StatusServiceUnavailable, http.StatusGatewayTimeout, StatusTooManyRequests:
// Retry on service unavailable and request timeout.
// TODO(spencer): consider respecting the Retry-After header for
// backoff / retry duration.
continue
default:
// Can't recover from all other errors.
return errors.New(resp.Status)
}
if resp.Header.Get(util.ContentEncodingHeader) == util.SnappyEncoding {
resp.Body = &snappyReader{body: resp.Body}
}
b, err = ioutil.ReadAll(resp.Body)
if err != nil {
log.Println(err)
continue
}
if err = proto.Unmarshal(b, response); err != nil {
log.Println(err)
continue
}
break
}
return err
}
// snappyReader wraps a response body so it can lazily
// call snappy.NewReader on the first call to Read
type snappyReader struct {
body io.ReadCloser // underlying Response.Body
sr io.Reader // lazily-initialized snappy reader
}
func (s *snappyReader) Read(p []byte) (n int, err error) {
if s.sr == nil {
s.sr = snappy.NewReader(s.body)
}
return s.sr.Read(p)
}
func (s *snappyReader) Close() error {
return s.body.Close()
}