Skip to content

Commit

Permalink
Merge e309a3e into f958ea4
Browse files Browse the repository at this point in the history
  • Loading branch information
mbotarro committed Feb 6, 2019
2 parents f958ea4 + e309a3e commit 1ab223e
Show file tree
Hide file tree
Showing 12 changed files with 295 additions and 72 deletions.
27 changes: 0 additions & 27 deletions Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions config/default.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -57,3 +57,4 @@ invalidToken:
maxRetries: 3
database: push
connectionTimeout: 100
chanSize: 1000
1 change: 1 addition & 0 deletions config/test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -59,3 +59,4 @@ invalidToken:
maxRetries: 3
database: push
connectionTimeout: 100
chanSize: 1000
6 changes: 4 additions & 2 deletions extensions/apns_message_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,8 @@ var _ = FDescribe("APNS Message Handler", func() {
handler.handleAPNSResponse(res)
Expect(handler.responsesReceived).To(Equal(int64(1)))
Expect(handler.failuresReceived).To(Equal(int64(1)))
Expect(hook.Entries).To(ContainLogMessage("deleting token"))
Eventually(func() []*logrus.Entry { return hook.Entries }).
Should(ContainLogMessage("deleting token"))
//Expect(hook.Entries[len(hook.Entries)-2].Data["category"]).To(Equal("TokenError"))
})

Expand All @@ -146,7 +147,8 @@ var _ = FDescribe("APNS Message Handler", func() {
handler.handleAPNSResponse(res)
Expect(handler.responsesReceived).To(Equal(int64(1)))
Expect(handler.failuresReceived).To(Equal(int64(1)))
Expect(hook.Entries).To(ContainLogMessage("deleting token"))
Eventually(func() []*logrus.Entry { return hook.Entries }).
Should(ContainLogMessage("deleting token"))
//Expect(hook.Entries[len(hook.Entries)-2].Data["category"]).To(Equal("TokenError"))
})

Expand Down
35 changes: 35 additions & 0 deletions extensions/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,9 @@ package extensions
import (
"encoding/json"
"regexp"
"time"

log "github.com/sirupsen/logrus"
"github.com/topfreegames/pusher/errors"
"github.com/topfreegames/pusher/interfaces"
)
Expand All @@ -44,6 +46,39 @@ func handleInvalidToken(invalidTokenHandlers []interfaces.InvalidTokenHandler, t
}
}

func StopInvalidTokenHandlers(
logger *log.Logger,
invalidTokenHandlers []interfaces.InvalidTokenHandler,
timeout time.Duration,
) {
l := logger.WithField(
"method", "stopInvalidTokenHandlers",
)

for _, invalidTokenHandler := range invalidTokenHandlers {
done := make(chan struct{})
stopCheck := false
go func() {
defer close(done)
for invalidTokenHandler.HasJob() && !stopCheck {
time.Sleep(10 * time.Millisecond)
}
}()

select {
case <-done:
invalidTokenHandler.Stop()
return

case <-time.After(timeout):
invalidTokenHandler.Stop()
stopCheck = true
l.Error("timeout reached - invalidTokenHandler was closed with jobs in queue")
return
}
}
}

func getGameAndPlatformFromTopic(topic string) ParsedTopic {
res := topicRegex.FindStringSubmatch(topic)
return ParsedTopic{
Expand Down
55 changes: 49 additions & 6 deletions extensions/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ package extensions

import (
"fmt"
"time"

. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
Expand All @@ -34,6 +35,7 @@ import (
"github.com/sirupsen/logrus/hooks/test"
"github.com/topfreegames/pusher/interfaces"
"github.com/topfreegames/pusher/mocks"
testing "github.com/topfreegames/pusher/testing"
"github.com/topfreegames/pusher/util"
)

Expand Down Expand Up @@ -71,19 +73,60 @@ var _ = Describe("Common", func() {
token := uuid.NewV4().String()
handleInvalidToken(invalidTokenHandlers, token, "test", "apns")
query := "DELETE FROM test_apns WHERE token = ?0;"
Expect(db.Execs).To(HaveLen(2))
Expect(db.Execs[1][0]).To(BeEquivalentTo(query))
Expect(db.Execs[1][1]).To(BeEquivalentTo([]interface{}{token}))

Eventually(func() [][]interface{} { return db.Execs }).
Should(HaveLen(2))
Eventually(func() interface{} { return db.Execs[1][0] }).
Should(BeEquivalentTo(query))
Eventually(func() interface{} { return db.Execs[1][1] }).
Should(BeEquivalentTo([]interface{}{token}))
})

It("should fail silently", func() {
token := uuid.NewV4().String()
db.Error = fmt.Errorf("pg: error")
handleInvalidToken(invalidTokenHandlers, token, "test", "apns")
Expect(db.Execs).To(HaveLen(2))

Eventually(func() [][]interface{} { return db.Execs }).
Should(HaveLen(2))
query := "DELETE FROM test_apns WHERE token = ?0;"
Expect(db.Execs[1][0]).To(BeEquivalentTo(query))
Expect(db.Execs[1][1]).To(BeEquivalentTo([]interface{}{token}))
Eventually(func() interface{} { return db.Execs[1][0] }).
Should(BeEquivalentTo(query))
Eventually(func() interface{} { return db.Execs[1][1] }).
Should(BeEquivalentTo([]interface{}{token}))
})

Describe("should stop handler gracefully", func() {
It("if there's no more job to do", func() {
token := uuid.NewV4().String()
handleInvalidToken(invalidTokenHandlers, token, "test", "apns")

timeout := 1 * time.Second
StopInvalidTokenHandlers(logger, invalidTokenHandlers, timeout)
Consistently(func() []*logrus.Entry { return hook.Entries }, 2*timeout).
ShouldNot(testing.ContainLogMessage("timeout reached - invalidTokenHandler was closed with jobs in queue"))

Eventually(func() [][]interface{} { return db.Execs }).
Should(HaveLen(2))
query := "DELETE FROM test_apns WHERE token = ?0;"
Eventually(func() interface{} { return db.Execs[1][0] }).
Should(BeEquivalentTo(query))
Eventually(func() interface{} { return db.Execs[1][1] }).
Should(BeEquivalentTo([]interface{}{token}))
})

It("timeout reached", func() {
size := 1000
for i := 0; i < size; i++ {
token := uuid.NewV4().String()
handleInvalidToken(invalidTokenHandlers, token, "test", "apns")
}

timeout := 1 * time.Nanosecond
StopInvalidTokenHandlers(logger, invalidTokenHandlers, timeout)
Eventually(func() []*logrus.Entry { return hook.Entries }).
Should(testing.ContainLogMessage("timeout reached - invalidTokenHandler was closed with jobs in queue"))
})
})
})

Expand Down
44 changes: 23 additions & 21 deletions extensions/gcm_message_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import (
uuid "github.com/satori/go.uuid"
"github.com/sirupsen/logrus"
"github.com/sirupsen/logrus/hooks/test"
"github.com/topfreegames/go-gcm"
gcm "github.com/topfreegames/go-gcm"
"github.com/topfreegames/pusher/interfaces"
"github.com/topfreegames/pusher/mocks"
. "github.com/topfreegames/pusher/testing"
Expand Down Expand Up @@ -133,7 +133,8 @@ var _ = Describe("GCM Message Handler", func() {
handler.handleGCMResponse(res)
Expect(handler.responsesReceived).To(Equal(int64(1)))
Expect(handler.failuresReceived).To(Equal(int64(1)))
Expect(hook.Entries).To(ContainLogMessage("deleting token"))
Eventually(func() []*logrus.Entry { return hook.Entries }).
Should(ContainLogMessage("deleting token"))
})

It("if response has error BAD_REGISTRATION", func() {
Expand All @@ -143,7 +144,8 @@ var _ = Describe("GCM Message Handler", func() {
handler.handleGCMResponse(res)
Expect(handler.responsesReceived).To(Equal(int64(1)))
Expect(handler.failuresReceived).To(Equal(int64(1)))
Expect(hook.Entries).To(ContainLogMessage("deleting token"))
Eventually(func() []*logrus.Entry { return hook.Entries }).
Should(ContainLogMessage("deleting token"))
})

It("if response has error INVALID_JSON", func() {
Expand Down Expand Up @@ -214,9 +216,9 @@ var _ = Describe("GCM Message Handler", func() {
gcm.XMPPMessage{
TimeToLive: &ttl,
DeliveryReceiptRequested: false,
DryRun: true,
To: uuid.NewV4().String(),
Data: map[string]interface{}{},
DryRun: true,
To: uuid.NewV4().String(),
Data: map[string]interface{}{},
},
metadata,
makeTimestamp() + int64(1000000),
Expand Down Expand Up @@ -245,9 +247,9 @@ var _ = Describe("GCM Message Handler", func() {
gcm.XMPPMessage{
TimeToLive: &ttl,
DeliveryReceiptRequested: false,
DryRun: true,
To: uuid.NewV4().String(),
Data: map[string]interface{}{},
DryRun: true,
To: uuid.NewV4().String(),
Data: map[string]interface{}{},
},
metadata,
makeTimestamp() - int64(100),
Expand Down Expand Up @@ -285,9 +287,9 @@ var _ = Describe("GCM Message Handler", func() {
msg := &gcm.XMPPMessage{
TimeToLive: &ttl,
DeliveryReceiptRequested: false,
DryRun: true,
To: uuid.NewV4().String(),
Data: map[string]interface{}{},
DryRun: true,
To: uuid.NewV4().String(),
Data: map[string]interface{}{},
}
msgBytes, err := json.Marshal(msg)
Expect(err).NotTo(HaveOccurred())
Expand Down Expand Up @@ -315,9 +317,9 @@ var _ = Describe("GCM Message Handler", func() {
gcm.XMPPMessage{
TimeToLive: &ttl,
DeliveryReceiptRequested: false,
DryRun: true,
To: uuid.NewV4().String(),
Data: map[string]interface{}{},
DryRun: true,
To: uuid.NewV4().String(),
Data: map[string]interface{}{},
},
metadata,
makeTimestamp() + int64(1000000),
Expand All @@ -341,9 +343,9 @@ var _ = Describe("GCM Message Handler", func() {
msg := &gcm.XMPPMessage{
TimeToLive: &ttl,
DeliveryReceiptRequested: false,
DryRun: true,
To: uuid.NewV4().String(),
Data: map[string]interface{}{},
DryRun: true,
To: uuid.NewV4().String(),
Data: map[string]interface{}{},
}
msgBytes, err := json.Marshal(msg)
Expect(err).NotTo(HaveOccurred())
Expand Down Expand Up @@ -460,9 +462,9 @@ var _ = Describe("GCM Message Handler", func() {
msg := &gcm.XMPPMessage{
TimeToLive: &ttl,
DeliveryReceiptRequested: false,
DryRun: true,
To: uuid.NewV4().String(),
Data: map[string]interface{}{},
DryRun: true,
To: uuid.NewV4().String(),
Data: map[string]interface{}{},
}
msgBytes, err := json.Marshal(msg)
Expect(err).NotTo(HaveOccurred())
Expand Down
Loading

0 comments on commit 1ab223e

Please sign in to comment.