-
Notifications
You must be signed in to change notification settings - Fork 0
/
codec.go
164 lines (138 loc) · 4.86 KB
/
codec.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
package protobuf
import (
"context"
"errors"
"fmt"
"net/http"
"strings"
"sync"
"unsafe"
"github.com/golang/protobuf/proto"
"github.com/valyala/fasthttp"
fasthttp_transport "github.com/wencan/kit-plugins/transport/fasthttp"
)
var (
// IngoreContentType Content-Type header will be ignored when decoding the body
IngoreContentType bool
// ProtobufContentType Content-Type header value that indicates that the content is protocol buffer
ProtobufContentType = "application/x-protobuf"
// DefaultBufferSize Buffer default size
DefaultBufferSize = 64
bufferPool = sync.Pool{}
)
func acquireProtoBuffer() *proto.Buffer {
buffer := bufferPool.Get()
if buffer == nil {
return proto.NewBuffer(make([]byte, DefaultBufferSize))
}
return buffer.(*proto.Buffer)
}
func releaseProtoBuffer(buffer *proto.Buffer) {
buffer.Reset()
bufferPool.Put(buffer)
}
// EncodeProtobufRequest is an fasthttp_transport.EncodeRequestFunc that serializes
// the request as a protobuf message object to the Request body. Many protobuf-over-HTTP
// services can use it as a sensible default. If the request implements Headerer,
// the provided headers will be applied to the request.
func EncodeProtobufRequest(_ context.Context, r *fasthttp.Request, request interface{}) error {
r.Header.SetContentType(ProtobufContentType)
if headerer, ok := request.(fasthttp_transport.Headerer); ok {
for k := range headerer.Headers() {
r.Header.Set(k, headerer.Headers().Get(k))
}
}
msg, ok := request.(proto.Message)
if !ok {
return errors.New("request does not implement proto.Message")
}
buffer := acquireProtoBuffer()
err := buffer.Marshal(msg)
if err != nil {
releaseProtoBuffer(buffer)
return err
}
r.Header.SetContentLength(len(buffer.Bytes()))
r.SetBody(buffer.Bytes())
releaseProtoBuffer(buffer)
return nil
}
// DecodeProtobufRequest is an fasthttp_transport.DecodeRequestFunc that deserializes the
// response as a protobuf message object from the Request body. Many protobuf-over-HTTP
// services can use it as a sensible default.
func DecodeProtobufRequest(_ context.Context, r *fasthttp.Request, request interface{}) error {
if !IngoreContentType {
contentType := strings.Split(b2s(r.Header.ContentType()), ";")[0]
if contentType != ProtobufContentType {
return fmt.Errorf("Content-Type not's %s", ProtobufContentType)
}
}
msg, ok := request.(proto.Message)
if !ok {
return errors.New("request does not implement proto.Message")
}
return proto.Unmarshal(r.Body(), msg)
}
// EncodeProtobufResponse is a fasthttp_transport.EncodeResponseFunc that serializes
// the response as a protobuf message object to the Response. Many protobuf-over-HTTP
// services can use it as a sensible default. If the response implements Headerer,
// the provided headers will be applied to the response. If the response implements
// StatusCoder, the provided StatusCode will be used instead of 200.
func EncodeProtobufResponse(_ context.Context, resp *fasthttp.Response, response interface{}) error {
resp.Header.SetContentType(ProtobufContentType)
if headerer, ok := response.(fasthttp_transport.Headerer); ok {
for k, values := range headerer.Headers() {
for _, v := range values {
resp.Header.Add(k, v)
}
}
}
code := http.StatusOK
if sc, ok := response.(fasthttp_transport.StatusCoder); ok {
code = sc.StatusCode()
}
resp.SetStatusCode(code)
if code == http.StatusNoContent {
return nil
}
msg, ok := response.(proto.Message)
if !ok {
return errors.New("request does not implement proto.Message")
}
buffer := acquireProtoBuffer()
err := buffer.Marshal(msg)
if err != nil {
releaseProtoBuffer(buffer)
return err
}
resp.Header.SetContentLength(len(buffer.Bytes()))
resp.SetBody(buffer.Bytes())
releaseProtoBuffer(buffer)
return nil
}
// DecodeProtobufResponse is an fasthttp_transport.DecodeResponseFunc that deserializes
// the response as a response object from the Response body. Many protobuf-over-HTTP
// services can use it as a sensible default.
func DecodeProtobufResponse(_ context.Context, resp *fasthttp.Response, response interface{}) error {
if !IngoreContentType {
contentType := strings.Split(b2s(resp.Header.ContentType()), ";")[0]
if contentType != ProtobufContentType {
return fmt.Errorf("Content-Type not's %s", ProtobufContentType)
}
}
msg, ok := response.(proto.Message)
if !ok {
return errors.New("request does not implement proto.Message")
}
// The fasthttp client always reads the whole body into memory before returning to the program.
// https://github.com/valyala/fasthttp/issues/246
return proto.Unmarshal(resp.Body(), msg)
}
// b2s converts byte slice to a string without memory allocation.
// See https://groups.google.com/forum/#!msg/Golang-Nuts/ENgbUzYvCuU/90yGx7GUAgAJ .
//
// Note it may break if string and/or slice header will change
// in the future go versions.
func b2s(b []byte) string {
return *(*string)(unsafe.Pointer(&b))
}