-
Notifications
You must be signed in to change notification settings - Fork 3
/
http-subscriber.go
143 lines (121 loc) · 3.33 KB
/
http-subscriber.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
package subscribers
import (
"bytes"
"encoding/json"
"io"
"net/http"
"strings"
"sync"
"github.com/julienschmidt/httprouter"
"github.com/larisgo/laravel-echo-server/express"
"github.com/larisgo/laravel-echo-server/options"
"github.com/larisgo/laravel-echo-server/types"
"github.com/zishang520/engine.io/utils"
)
type HttpSubscriber struct {
// The server.
express *express.Express
// Configurable server options.
options *options.Config
_close bool
mu sync.RWMutex
}
// Create new instance of http subscriber.
func NewHttpSubscriber(express *express.Express, _options *options.Config) Subscriber {
sub := &HttpSubscriber{}
sub.express = express
sub.options = _options
sub._close = false
return sub
}
// Subscribe to events to broadcast.
func (sub *HttpSubscriber) Subscribe(callback Broadcast) {
// Broadcast a message to a channel
sub.express.Route().POST("/apps/:appId/events", sub.express.AuthorizeRequests(func(w http.ResponseWriter, r *http.Request, router httprouter.Params) {
if sub.unSubscribed() {
w.WriteHeader(http.StatusNotFound)
w.Write(nil)
} else {
sub.handleData(w, r, router, callback)
}
}))
utils.Log().Success("Listening for http events...")
}
// Unsubscribe from events to broadcast.
func (sub *HttpSubscriber) UnSubscribe() {
sub.mu.Lock()
defer sub.mu.Unlock()
sub._close = true
}
func (sub *HttpSubscriber) unSubscribed() bool {
sub.mu.RLock()
defer sub.mu.RUnlock()
return sub._close
}
// Handle incoming event data.
func (sub *HttpSubscriber) handleData(w http.ResponseWriter, r *http.Request, router httprouter.Params, broadcast Broadcast) {
data := bytes.NewBuffer(nil)
if bd, ok := r.Body.(io.ReadCloser); ok && bd != nil {
data.ReadFrom(bd)
bd.Close()
} else {
sub.badResponse(w, r, `Event must include channel, event name and data`)
return
}
var body HttpSubscriberData
if err := json.NewDecoder(data).Decode(&body); err != nil {
sub.badResponse(w, r, err.Error())
return
}
if (len(body.Channels) > 0 || body.Channel != "") && body.Name != "" && body.Data != "" {
var data any
if err := json.Unmarshal([]byte(body.Data), &data); err != nil {
sub.badResponse(w, r, err.Error())
return
}
message := &types.Data{
Event: body.Name,
Data: data,
Socket: body.SocketId,
}
channels := []string{}
if len(body.Channels) > 0 {
channels = body.Channels
} else {
channels = []string{body.Channel}
}
if sub.options.DevMode {
utils.Log().Info("Channel: " + sub.join(channels, ", "))
utils.Log().Info("Event: " + message.Event)
}
for _, channel := range channels {
// sync
broadcast(channel, message)
}
} else {
sub.badResponse(w, r, `Event must include channel, event name and data`)
return
}
w.Header().Set("Content-Type", "application/json; charset=utf-8")
io.WriteString(w, `{"message":"ok"}`)
}
// Handle bad Request.
func (sub *HttpSubscriber) badResponse(w http.ResponseWriter, r *http.Request, message string) {
w.Header().Set("Content-Type", "application/json; charset=utf-8")
w.WriteHeader(http.StatusBadRequest)
data, _ := json.Marshal(map[string]any{
"error": message,
})
w.Write(data)
}
// join
func (sub *HttpSubscriber) join(v []string, splite string) string {
sb := new(strings.Builder)
for _, v := range v {
if sb.Len() > 0 {
sb.WriteString(splite)
}
sb.WriteString(v)
}
return sb.String()
}