This repository has been archived by the owner on Aug 17, 2021. It is now read-only.
/
http.go
134 lines (112 loc) · 3.24 KB
/
http.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
package mhttp
import (
"fmt"
"io/ioutil"
"net/http"
"time"
"github.com/vocdoni/multirpc/transports"
"go.vocdoni.io/dvote/log"
)
type HttpHandler struct {
Proxy *Proxy // proxy where the ws will be associated
internalReceiver chan transports.Message
}
type HttpContext struct {
Writer http.ResponseWriter
Request *http.Request
sent chan struct{}
}
func (h *HttpHandler) Init(c *transports.Connection) error {
h.internalReceiver = make(chan transports.Message, 1)
return nil
}
func (h *HttpHandler) SetProxy(p *Proxy) {
h.Proxy = p
}
func getHTTPhandler(path string, receiver chan transports.Message) func(w http.ResponseWriter, r *http.Request) {
return func(w http.ResponseWriter, r *http.Request) {
defer r.Body.Close()
respBody, err := ioutil.ReadAll(r.Body)
if err != nil {
log.Warnf("HTTP connection closed: (%s)", err)
return
}
hc := &HttpContext{Request: r, Writer: w, sent: make(chan struct{})}
msg := transports.Message{
Data: respBody,
TimeStamp: int32(time.Now().Unix()),
Context: hc,
Namespace: path,
}
receiver <- msg
// The contract is that every handled request must send a
// response, even when they fail or time out.
<-hc.sent
}
}
// AddProxyHandler adds the current websocket handler into the Proxy
func (h *HttpHandler) AddProxyHandler(path string) {
h.Proxy.AddHandler(path, getHTTPhandler(path, h.internalReceiver))
}
func (h *HttpContext) ConnectionType() string {
return "HTTP"
}
func (h *HttpContext) Send(msg transports.Message) error {
defer func() {
if r := recover(); r != nil {
log.Warnf("recovered http send panic: %v", r)
}
}()
defer close(h.sent)
defer h.Request.Body.Close()
if h.Request.Context().Err() != nil {
// The connection was closed, so don't try to write to it.
return fmt.Errorf("connection is closed")
}
h.Writer.Header().Set("Content-Length", fmt.Sprintf("%d", len(msg.Data)+1))
h.Writer.Header().Set("Content-Type", "application/json")
if _, err := h.Writer.Write(msg.Data); err != nil {
return err
}
// Ensure we end the response with a newline, to be nice.
_, err := h.Writer.Write([]byte("\n"))
return err
}
func (h *HttpHandler) ConnectionType() string {
return "HTTP"
}
func (h *HttpHandler) Listen(receiver chan<- transports.Message) {
for {
msg := <-h.internalReceiver
receiver <- msg
}
}
func (h *HttpHandler) SendUnicast(address string, msg transports.Message) error {
// WebSocket is not p2p so sendUnicast makes the same of Send()
return h.Send(msg)
}
func (h *HttpHandler) Send(msg transports.Message) error {
// TODO(mvdan): this extra abstraction layer is probably useless
return msg.Context.(*HttpContext).Send(msg)
}
func (h *HttpHandler) SetBootnodes(bootnodes []string) {
// No bootnodes on websockets handler
}
func (h *HttpHandler) AddPeer(peer string) error {
// No peers on websockets handler
return nil
}
// AddNamespace adds a new namespace to the transport
func (h *HttpHandler) AddNamespace(namespace string) error {
if len(namespace) == 0 || namespace[0] != '/' {
return fmt.Errorf("namespace on http must start with /")
}
h.AddProxyHandler(namespace)
return nil
}
func (h *HttpHandler) Address() string {
return h.String()
}
func (h *HttpHandler) String() string {
return h.Proxy.Addr.String()
}