/
apns_sender.go
106 lines (97 loc) · 2.47 KB
/
apns_sender.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
package apns
import (
"errors"
"github.com/jpillora/backoff"
"github.com/sideshow/apns2"
"github.com/smancke/guble/server/connector"
"net"
"time"
)
const (
// deviceIDKey is the key name set on the route params to identify the application
deviceIDKey = "device_token"
userIDKey = "user_id"
)
var (
errPusherInvalidParams = errors.New("Invalid parameters of APNS Pusher")
ErrRetryFailed = errors.New("Retry failed")
)
type sender struct {
client Pusher
appTopic string
}
func NewSender(config Config) (connector.Sender, error) {
pusher, err := newPusher(config)
if err != nil {
logger.WithField("error", err.Error()).Error("APNS Pusher creation error")
return nil, err
}
return NewSenderUsingPusher(pusher, *config.AppTopic)
}
func NewSenderUsingPusher(pusher Pusher, appTopic string) (connector.Sender, error) {
if pusher == nil || appTopic == "" {
return nil, errPusherInvalidParams
}
return &sender{
client: pusher,
appTopic: appTopic,
}, nil
}
func (s sender) Send(request connector.Request) (interface{}, error) {
deviceToken := request.Subscriber().Route().Get(deviceIDKey)
logger.WithField("deviceToken", deviceToken).Info("Trying to push a message to APNS")
push := func() (interface{}, error) {
return s.client.Push(&apns2.Notification{
Priority: apns2.PriorityHigh,
Topic: s.appTopic,
DeviceToken: deviceToken,
Payload: request.Message().Body,
})
}
withRetry := &retryable{
Backoff: backoff.Backoff{
Min: 1 * time.Second,
Max: 10 * time.Second,
Factor: 2,
Jitter: true,
},
maxTries: 3,
}
result, err := withRetry.execute(push)
if err != nil && err == ErrRetryFailed {
if closable, ok := s.client.(closable); ok {
logger.Warn("Close TLS and retry again")
mTotalSendRetryCloseTLS.Add(1)
closable.CloseTLS()
return push()
} else {
mTotalSendRetryUnrecoverable.Add(1)
logger.Error("Cannot Close TLS. Unrecoverable state")
}
}
return result, err
}
type retryable struct {
backoff.Backoff
maxTries int
}
func (r *retryable) execute(op func() (interface{}, error)) (interface{}, error) {
tryCounter := 0
for {
tryCounter++
result, opError := op()
// retry on network errors
if _, ok := opError.(net.Error); ok {
mTotalSendNetworkErrors.Add(1)
if tryCounter >= r.maxTries {
return "", ErrRetryFailed
}
d := r.Duration()
logger.WithField("error", opError.Error()).Warn("Retry in ", d)
time.Sleep(d)
continue
} else {
return result, opError
}
}
}