/
push.go
196 lines (163 loc) · 4.78 KB
/
push.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
// Package push contains interfaces to be implemented by push notification plugins.
package push
import (
"encoding/json"
"errors"
"time"
t "github.com/tinode/chat/server/store/types"
)
// Push actions
const (
// New message.
ActMsg = "msg"
// New subscription.
ActSub = "sub"
)
// MaxPayloadLength is the maximum length of push payload in multibyte characters.
const MaxPayloadLength = 128
// Recipient is a user targeted by the push.
type Recipient struct {
// Count of user's connections that were live when the packet was dispatched from the server
Delivered int `json:"delivered"`
// List of user's devices that the packet was delivered to (if known). Len(Devices) >= Delivered
Devices []string `json:"devices,omitempty"`
// Unread count to include in the push
Unread int `json:"unread"`
}
// Receipt is the push payload with a list of recipients.
type Receipt struct {
// List of individual recipients, including those who did not receive the message.
To map[t.Uid]Recipient `json:"to"`
// Push topic for group notifications.
Channel string `json:"channel"`
// Actual content to be delivered to the client.
Payload Payload `json:"payload"`
}
// ChannelReq is a request to subscribe/unsubscribe device ID(s) to channel(s) (FCM topic).
// - If DeviceID is provided, it's subscribed/unsubscribed to all user's channels.
// - If Channel is provided, then all user's devices are subscribed/unsubscribed from the channel.
type ChannelReq struct {
// Uid is the ID of the user making request.
Uid t.Uid
// DeviceID is the device-provided token in case a single device is being subscribed to all channels.
DeviceID string
// Channel to subscribe to or unsubscribe from.
Channel string
// Unsub is set to true to unsubscribe devices, otherwise subscribe them.
Unsub bool
}
// Payload is content of the push.
type Payload struct {
// Action type of the push: new message (msg), new subscription (sub), etc.
What string `json:"what"`
// If this is a silent push: perform action but do not show a notification to the user.
Silent bool `json:"silent"`
// Topic which was affected by the action.
Topic string `json:"topic"`
// Timestamp of the action.
Timestamp time.Time `json:"ts"`
// {data} notification.
// Message sender 'usrXXX'
From string `json:"from"`
// Sequential ID of the message.
SeqId int `json:"seq"`
// MIME-Type of the message content, text/x-drafty or text/plain
ContentType string `json:"mime"`
// Actual Data.Content of the message, if requested
Content interface{} `json:"content,omitempty"`
// New subscription notification
// Access mode when notifying of new subscriptions.
ModeWant t.AccessMode `json:"want,omitempty"`
ModeGiven t.AccessMode `json:"given,omitempty"`
}
// Handler is an interface which must be implemented by handlers.
type Handler interface {
// Init initializes the handler.
Init(jsonconf string) error
// IsReady сhecks if the handler is initialized.
IsReady() bool
// Push returns a channel that the server will use to send messages to.
// The message will be dropped if the channel blocks.
Push() chan<- *Receipt
// Subscribe/unsubscribe device from FCM topic (channel).
Channel() chan<- *ChannelReq
// Stop terminates the handler's worker and stops sending pushes.
Stop()
}
type configType struct {
Name string `json:"name"`
Config json.RawMessage `json:"config"`
}
var handlers map[string]Handler
// Register a push handler
func Register(name string, hnd Handler) {
if handlers == nil {
handlers = make(map[string]Handler)
}
if hnd == nil {
panic("Register: push handler is nil")
}
if _, dup := handlers[name]; dup {
panic("Register: called twice for handler " + name)
}
handlers[name] = hnd
}
// Init initializes registered handlers.
func Init(jsconfig string) error {
var config []configType
if err := json.Unmarshal([]byte(jsconfig), &config); err != nil {
return errors.New("failed to parse config: " + err.Error())
}
for _, cc := range config {
if hnd := handlers[cc.Name]; hnd != nil {
if err := hnd.Init(string(cc.Config)); err != nil {
return err
}
}
}
return nil
}
// Push a single message to devices.
func Push(msg *Receipt) {
if handlers == nil {
return
}
for _, hnd := range handlers {
if !hnd.IsReady() {
continue
}
// Push without delay or skip
select {
case hnd.Push() <- msg:
default:
}
}
}
// ChannelSub handles a channel (FCM topic) subscription/unsubscription request.
func ChannelSub(msg *ChannelReq) {
if handlers == nil {
return
}
for _, hnd := range handlers {
if !hnd.IsReady() {
continue
}
// Send without delay or skip.
select {
case hnd.Channel() <- msg:
default:
}
}
}
// Stop all pushes
func Stop() {
if handlers == nil {
return
}
for _, hnd := range handlers {
if hnd.IsReady() {
// Will potentially block
hnd.Stop()
}
}
}