forked from micro/micro
/
broker.go
122 lines (101 loc) · 2.1 KB
/
broker.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
package handler
import (
"encoding/json"
"log"
"net/http"
"sync"
"time"
"github.com/gorilla/websocket"
"github.com/micro/go-micro/broker"
"github.com/micro/go-micro/cmd"
)
const (
pingTime = (readDeadline * 9) / 10
readLimit = 16384
readDeadline = 60 * time.Second
writeDeadline = 10 * time.Second
)
type conn struct {
topic string
ws *websocket.Conn
}
var (
once sync.Once
upgrader = websocket.Upgrader{
ReadBufferSize: 1024,
WriteBufferSize: 1024,
}
)
func (c *conn) readLoop() {
defer func() {
c.ws.Close()
}()
c.ws.SetReadLimit(readLimit)
c.ws.SetReadDeadline(time.Now().Add(readDeadline))
c.ws.SetPongHandler(func(string) error {
c.ws.SetReadDeadline(time.Now().Add(readDeadline))
return nil
})
for {
_, message, err := c.ws.ReadMessage()
if err != nil {
return
}
(*cmd.DefaultOptions().Broker).Publish(c.topic, &broker.Message{Body: message})
}
}
func (c *conn) write(mType int, data []byte) error {
c.ws.SetWriteDeadline(time.Now().Add(writeDeadline))
return c.ws.WriteMessage(mType, data)
}
func (c *conn) writeLoop() {
ticker := time.NewTicker(pingTime)
subscriber, err := (*cmd.DefaultOptions().Broker).Subscribe(c.topic, func(p broker.Publication) error {
b, err := json.Marshal(p.Message())
if err != nil {
return nil
}
return c.write(websocket.TextMessage, b)
})
defer func() {
subscriber.Unsubscribe()
ticker.Stop()
c.ws.Close()
}()
if err != nil {
log.Print(err.Error())
return
}
for _ = range ticker.C {
if err := c.write(websocket.PingMessage, []byte{}); err != nil {
return
}
}
}
func Broker(w http.ResponseWriter, r *http.Request) {
if r.Method != "GET" {
http.Error(w, "Method not allowed", 405)
return
}
r.ParseForm()
topic := r.Form.Get("topic")
if len(topic) == 0 {
http.Error(w, "Topic not specified", 400)
return
}
ws, err := upgrader.Upgrade(w, r, nil)
if err != nil {
log.Print(err.Error())
return
}
once.Do(func() {
(*cmd.DefaultOptions().Broker).Init()
(*cmd.DefaultOptions().Broker).Connect()
})
c := &conn{
topic: topic,
ws: ws,
}
go c.writeLoop()
c.readLoop()
}