/
http2_client.go
171 lines (153 loc) · 3.81 KB
/
http2_client.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
package jsoffnet
import (
"context"
"crypto/tls"
"encoding/json"
"fmt"
"github.com/pkg/errors"
log "github.com/sirupsen/logrus"
"github.com/superisaac/jsoff"
"golang.org/x/net/http2"
"io"
"net"
"net/http"
"net/url"
"reflect"
"strings"
"sync"
)
type Http2Client struct {
StreamingClient
httpClient *http.Client
clientOnce sync.Once
// use h2c
UseHttp2C bool
}
type h2Transport struct {
client *Http2Client
resp *http.Response
decoder *json.Decoder
writer io.Writer
flusher http.Flusher
}
func NewHttp2Client(serverUrl *url.URL) *Http2Client {
newUrl, err := url.Parse(serverUrl.String())
useh2c := false
if err != nil {
log.Panicf("copy url error %s", err)
}
if newUrl.Scheme == "h2" {
newUrl.Scheme = "https"
} else if newUrl.Scheme == "h2c" {
newUrl.Scheme = "http"
useh2c = true
}
if newUrl.Scheme != "https" && newUrl.Scheme != "http" {
log.Panicf("server url %s is not http2", serverUrl)
}
c := &Http2Client{UseHttp2C: useh2c}
transport := &h2Transport{client: c}
c.InitStreaming(newUrl, transport)
return c
}
func (self *Http2Client) HTTPClient() *http.Client {
self.clientOnce.Do(func() {
if self.UseHttp2C {
// refer to https://www.mailgun.com/blog/http-2-cleartext-h2c-client-example-go/
trans := &http2.Transport{
AllowHTTP: true,
// Pretend we are dialing a TLS endpoint.
// Note, we ignore the passed tls.Config
DialTLS: func(network, addr string, cfg *tls.Config) (net.Conn, error) {
return net.Dial(network, addr)
},
}
self.httpClient = &http.Client{
Transport: trans,
}
} else {
trans := &http2.Transport{
AllowHTTP: true,
//WriteByteTimeout: time.Second * 15,
TLSClientConfig: self.ClientTLSConfig(),
}
self.httpClient = &http.Client{
Transport: trans,
}
}
})
return self.httpClient
}
func (self *Http2Client) String() string {
return fmt.Sprintf("http2 client %s", self.serverUrl)
}
// http2 transport methods
func (self *h2Transport) Close() {
if self.resp != nil {
self.resp.Body.Close()
self.resp = nil
self.writer = nil
self.flusher = nil
//self.decoder = nil
}
}
func (self h2Transport) Connected() bool {
return self.resp != nil
}
func (self *h2Transport) Connect(rootCtx context.Context, serverUrl *url.URL, header http.Header) error {
pipeReader, pipeWriter := io.Pipe()
req := &http.Request{
Method: "PRI",
URL: serverUrl,
Header: header,
Body: pipeReader,
}
resp, err := self.client.HTTPClient().Do(req)
if err != nil {
return self.handleHttp2Error(err)
}
self.writer = pipeWriter
self.resp = resp
self.decoder = json.NewDecoder(resp.Body)
return nil
}
func (self *h2Transport) handleHttp2Error(err error) error {
logger := self.client.Log()
var urlErr *url.Error
if errors.Is(err, io.EOF) {
logger.Debugf("h2 conn failed")
return TransportClosed
} else if errors.As(err, &urlErr) {
logger.Debugf("h2 conn url.Error")
return TransportClosed
} else {
logger.Warnf("transport error %s %s", reflect.TypeOf(err), err)
}
return errors.Wrap(err, "h2transport.handleHttp2Error")
}
func (self *h2Transport) WriteMessage(msg jsoff.Message) error {
marshaled, err := jsoff.MessageBytes(msg)
if err != nil {
return err
}
marshaled = append(marshaled, []byte("\n")...)
if _, err := self.writer.Write(marshaled); err != nil {
return self.handleHttp2Error(err)
}
return nil
}
func (self *h2Transport) ReadMessage() (jsoff.Message, bool, error) {
msg, err := jsoff.DecodeMessage(self.decoder)
if err != nil {
if errors.Is(err, io.EOF) {
return nil, false, TransportClosed
} else if strings.Contains(err.Error(), "read/write on closed pipe") {
return nil, false, TransportClosed
}
self.client.Log().Warnf(
"bad jsonrpc message %s %s, at pos %d",
reflect.TypeOf(err), err, self.decoder.InputOffset())
return nil, false, err
}
return msg, true, nil
}