forked from st3v/go-plugins
/
mercury.go
98 lines (88 loc) · 1.9 KB
/
mercury.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
package mercury
import (
"bytes"
"fmt"
"io"
"sync"
"github.com/golang/protobuf/proto"
"github.com/micro/go-micro/codec"
)
type mercuryCodec struct {
sync.Mutex
rwc io.ReadWriteCloser
mt codec.MessageType
buf *bytes.Buffer
}
func (c *mercuryCodec) Close() error {
c.buf.Reset()
return c.rwc.Close()
}
func (c *mercuryCodec) String() string {
return "mercury"
}
func (c *mercuryCodec) Write(m *codec.Message, b interface{}) error {
switch m.Type {
case codec.Request:
data, err := proto.Marshal(b.(proto.Message))
if err != nil {
return err
}
c.rwc.Write(data)
m.Header["Content-Encoding"] = "request"
m.Header["Service"] = m.Target
m.Header["Endpoint"] = m.Method
case codec.Response:
m.Header["Content-Encoding"] = "response"
data, err := proto.Marshal(b.(proto.Message))
if err != nil {
return err
}
c.rwc.Write(data)
case codec.Publication:
data, err := proto.Marshal(b.(proto.Message))
if err != nil {
return err
}
c.rwc.Write(data)
default:
return fmt.Errorf("Unrecognised message type: %v", m.Type)
}
return nil
}
func (c *mercuryCodec) ReadHeader(m *codec.Message, mt codec.MessageType) error {
c.buf.Reset()
c.mt = mt
switch mt {
case codec.Request:
m.Method = m.Header["Endpoint"]
io.Copy(c.buf, c.rwc)
case codec.Response:
io.Copy(c.buf, c.rwc)
case codec.Publication:
io.Copy(c.buf, c.rwc)
default:
return fmt.Errorf("Unrecognised message type: %v", mt)
}
return nil
}
func (c *mercuryCodec) ReadBody(b interface{}) error {
var data []byte
switch c.mt {
case codec.Request, codec.Response:
data = c.buf.Bytes()
case codec.Publication:
data = c.buf.Bytes()
default:
return fmt.Errorf("Unrecognised message type: %v", c.mt)
}
if b != nil {
return proto.Unmarshal(data, b.(proto.Message))
}
return nil
}
func NewCodec(rwc io.ReadWriteCloser) codec.Codec {
return &mercuryCodec{
buf: bytes.NewBuffer(nil),
rwc: rwc,
}
}