Skip to content

Commit

Permalink
inject a timestamp field in feedbacks
Browse files Browse the repository at this point in the history
  • Loading branch information
felipejfc committed Feb 14, 2017
1 parent d411a07 commit 072ac5b
Show file tree
Hide file tree
Showing 5 changed files with 71 additions and 14 deletions.
16 changes: 11 additions & 5 deletions extensions/apns_message_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,8 @@ type Notification struct {
// ResponseWithMetadata is a enriched Response with a Metadata field
type ResponseWithMetadata struct {
push.Response
Metadata map[string]interface{} `json:"metadata,omitempty"`
Timestamp int64 `json:"timestamp"`
Metadata map[string]interface{} `json:"metadata,omitempty"`
}

// APNSMessageHandler implements the messagehandler interface
Expand Down Expand Up @@ -190,12 +191,15 @@ func (a *APNSMessageHandler) sendMessage(message []byte) error {
}
statsReporterHandleNotificationSent(a.StatsReporters)
a.PushQueue.Push(n.DeviceToken, h, payload)
a.inflightMessagesMetadataLock.Lock()
if n.Metadata != nil {
a.inflightMessagesMetadataLock.Lock()

a.InflightMessagesMetadata[n.DeviceToken] = n.Metadata
a.requestsHeap.AddRequest(n.DeviceToken)
n.Metadata["timestamp"] = time.Now().Unix()
a.InflightMessagesMetadata[n.DeviceToken] = n.Metadata
a.requestsHeap.AddRequest(n.DeviceToken)

a.inflightMessagesMetadataLock.Unlock()
a.inflightMessagesMetadataLock.Unlock()
}
a.sentMessages++
return nil
}
Expand Down Expand Up @@ -261,6 +265,8 @@ func (a *APNSMessageHandler) handleAPNSResponse(res push.Response) error {
a.inflightMessagesMetadataLock.Lock()
if val, ok := a.InflightMessagesMetadata[res.DeviceToken]; ok {
responseWithMetadata.Metadata = val.(map[string]interface{})
responseWithMetadata.Timestamp = responseWithMetadata.Metadata["timestamp"].(int64)
delete(responseWithMetadata.Metadata, "timestamp")
delete(a.InflightMessagesMetadata, res.DeviceToken)
}
a.inflightMessagesMetadataLock.Unlock()
Expand Down
28 changes: 25 additions & 3 deletions extensions/apns_message_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -511,9 +511,29 @@ var _ = Describe("APNS Message Handler", func() {
Expect(err).NotTo(HaveOccurred())
})

It("should include a timestamp in feedback root", func() {
timestampNow := time.Now().Unix()
metadata := map[string]interface{}{
"some": "metadata",
"timestamp": timestampNow,
}
handler.InflightMessagesMetadata["testToken1"] = metadata
res := push.Response{
DeviceToken: "testToken1",
ID: "idTest1",
}
go handler.handleAPNSResponse(res)

fromKafka := &ResponseWithMetadata{}
msg := <-mockKafkaProducerClient.ProduceChannel()
json.Unmarshal(msg.Value, fromKafka)
Expect(fromKafka.Timestamp).To(Equal(timestampNow))
})

It("should send feedback if success and metadata is present", func() {
metadata := map[string]interface{}{
"some": "metadata",
"some": "metadata",
"timestamp": time.Now().Unix(),
}
handler.InflightMessagesMetadata["testToken1"] = metadata
res := push.Response{
Expand Down Expand Up @@ -547,7 +567,8 @@ var _ = Describe("APNS Message Handler", func() {

It("should send feedback if error and metadata is present and token should be deleted", func() {
metadata := map[string]interface{}{
"some": "metadata",
"some": "metadata",
"timestamp": time.Now().Unix(),
}
handler.InflightMessagesMetadata["testToken1"] = metadata
res := push.Response{
Expand All @@ -571,7 +592,8 @@ var _ = Describe("APNS Message Handler", func() {

It("should send feedback if error and metadata is present and token should not be deleted", func() {
metadata := map[string]interface{}{
"some": "metadata",
"some": "metadata",
"timestamp": time.Now().Unix(),
}
handler.InflightMessagesMetadata["testToken1"] = metadata
res := push.Response{
Expand Down
6 changes: 5 additions & 1 deletion extensions/gcm_message_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,8 @@ type KafkaGCMMessage struct {
// CCSMessageWithMetadata is a enriched CCSMessage with a metadata field
type CCSMessageWithMetadata struct {
gcm.CCSMessage
Metadata map[string]interface{} `json:"metadata,omitempty"`
Timestamp int64 `json:"timestamp"`
Metadata map[string]interface{} `json:"metadata,omitempty"`
}

// GCMMessageHandler implements the messagehandler interface
Expand Down Expand Up @@ -206,6 +207,8 @@ func (g *GCMMessageHandler) handleGCMResponse(cm gcm.CCSMessage) error {
g.inflightMessagesMetadataLock.Lock()
if val, ok := g.InflightMessagesMetadata[cm.MessageID]; ok {
ccsMessageWithMetadata.Metadata = val.(map[string]interface{})
ccsMessageWithMetadata.Timestamp = ccsMessageWithMetadata.Metadata["timestamp"].(int64)
delete(ccsMessageWithMetadata.Metadata, "timestamp")
delete(g.InflightMessagesMetadata, cm.MessageID)
}
g.inflightMessagesMetadataLock.Unlock()
Expand Down Expand Up @@ -297,6 +300,7 @@ func (g *GCMMessageHandler) sendMessage(message []byte) error {
if km.Metadata != nil && len(km.Metadata) > 0 {
g.inflightMessagesMetadataLock.Lock()

km.Metadata["timestamp"] = time.Now().Unix()
g.InflightMessagesMetadata[messageID] = km.Metadata
g.requestsHeap.AddRequest(messageID)

Expand Down
33 changes: 29 additions & 4 deletions extensions/gcm_message_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,8 @@ var _ = Describe("GCM Message Handler", func() {
It("should send xmpp message with metadata", func() {
ttl := uint(0)
metadata := map[string]interface{}{
"some": "metadata",
"some": "metadata",
"timestamp": time.Now().Unix(),
}
msg := &KafkaGCMMessage{
gcm.XMPPMessage{
Expand Down Expand Up @@ -446,9 +447,31 @@ var _ = Describe("GCM Message Handler", func() {

})

It("should include a timestamp in feedback root", func() {
timestampNow := time.Now().Unix()
metadata := map[string]interface{}{
"some": "metadata",
"timestamp": timestampNow,
}
handler.InflightMessagesMetadata["idTest1"] = metadata
res := gcm.CCSMessage{
From: "testToken1",
MessageID: "idTest1",
MessageType: "ack",
Category: "testCategory",
}
go handler.handleGCMResponse(res)

fromKafka := &CCSMessageWithMetadata{}
msg := <-mockKafkaProducerClient.ProduceChannel()
json.Unmarshal(msg.Value, fromKafka)
Expect(fromKafka.Timestamp).To(Equal(timestampNow))
})

It("should send feedback if success and metadata is present", func() {
metadata := map[string]interface{}{
"some": "metadata",
"some": "metadata",
"timestamp": time.Now().Unix(),
}
handler.InflightMessagesMetadata["idTest1"] = metadata
res := gcm.CCSMessage{
Expand Down Expand Up @@ -490,7 +513,8 @@ var _ = Describe("GCM Message Handler", func() {

It("should send feedback if error and metadata is present and token should be deleted", func() {
metadata := map[string]interface{}{
"some": "metadata",
"some": "metadata",
"timestamp": time.Now().Unix(),
}
handler.InflightMessagesMetadata["idTest1"] = metadata
res := gcm.CCSMessage{
Expand All @@ -516,7 +540,8 @@ var _ = Describe("GCM Message Handler", func() {

It("should send feedback if error and metadata is present and token should not be deleted", func() {
metadata := map[string]interface{}{
"some": "metadata",
"some": "metadata",
"timestamp": time.Now().Unix(),
}
handler.InflightMessagesMetadata["idTest1"] = metadata
res := gcm.CCSMessage{
Expand Down
2 changes: 1 addition & 1 deletion util/version.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,4 +23,4 @@
package util

//Version is the current version of pusher
var Version = "1.0.0"
var Version = "1.1.0"

0 comments on commit 072ac5b

Please sign in to comment.