Skip to content

Commit

Permalink
Merge pull request #48 from pubnub/CE-3250-req-msg-count
Browse files Browse the repository at this point in the history
MessageQueueOverflowCount and PNRequestMessageCountExceededCategory
  • Loading branch information
crimsonred committed Jun 21, 2018
2 parents d09a6fb + f7e9056 commit 1977d45
Show file tree
Hide file tree
Showing 17 changed files with 124 additions and 43 deletions.
22 changes: 17 additions & 5 deletions .pubnub.yml
Original file line number Diff line number Diff line change
@@ -1,5 +1,17 @@
---
changelog:
-
changes:
-
text: "MessageQueueOverflowCount and PNRequestMessageCountExceededCategory"
type: improvement
-
text: "subscribeMessageWorker optimization"
type: improvement
-
text: "integration tests optimizations"
type: improvement
version: v4.1.1
-
changes:
-
Expand Down Expand Up @@ -220,17 +232,17 @@ features:
others:
- PN-OTHER-PROCESSING
- TELEMETRY
notify:
- REQUEST-MESSAGE-COUNT-EXCEEDED
name: go
schema: 1
scm: github.com/pubnub/go
supported-platforms:
-
editors:
- "1.7.1"
- "1.7.3"
- "1.7.4"
- 1.8
- 1.9
- "1.7.6"
- "1.8.7"
- "1.9.7"
- "1.10.3"
platforms:
Expand All @@ -239,4 +251,4 @@ supported-platforms:
- "Mac OS X 10.8 or later, amd64"
- "Windows 7 or later, amd64, 386"
version: "PubNub Go SDK"
version: v4.1.0
version: v4.1.1
6 changes: 2 additions & 4 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,8 @@ language: go

go:
- 1.7.1
- 1.7.3
- 1.7.4
- 1.8
- 1.9
- 1.7.6
- 1.8.7
- 1.9.7
- 1.10.3
- tip
Expand Down
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@

# PubNub 4.1.0 client for Go
# PubNub 4.1.1 client for Go
* Go (1.7.1+)

# Please direct all Support Questions and Concerns to Support@PubNub.com

[![GoDoc](https://godoc.org/github.com/pubnub/go?status.svg)](https://godoc.org/github.com/pubnub/go)
[![Build Status](https://travis-ci.org/pubnub/go.svg?branch=CE-3033-Go-v4)](https://travis-ci.org/pubnub/go)
[![codecov.io](https://codecov.io/github/pubnub/go/coverage.svg?branch=CE-3033-Go-v4)](https://codecov.io/github/pubnub/go?branch=CE-3033-Go-v4)
[![Go Report Card](https://goreportcard.com/badge/github.com/pubnub/go)](https://goreportcard.com/report/github.com/pubnub/go)
Expand Down
2 changes: 1 addition & 1 deletion VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
4.1.0
4.1.1
2 changes: 2 additions & 0 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ type Config struct {
SuppressLeaveEvents bool // When true the SDK doesn't send out the leave requests.
DisablePNOtherProcessing bool // PNOther processing looks for pn_other in the JSON on the recevied message
UseHTTP2 bool // HTTP2 Flag
MessageQueueOverflowCount int // When the limit is exceeded by the number of messages received in a single subscribe request, a status event PNRequestMessageCountExceededCategory is fired.
}

// NewDemoConfig initiates the config with demo keys, for tests only.
Expand Down Expand Up @@ -63,6 +64,7 @@ func NewConfig() *Config {
SuppressLeaveEvents: false,
DisablePNOtherProcessing: false,
PNReconnectionPolicy: PNNonePolicy,
MessageQueueOverflowCount: 100,
}

return &c
Expand Down
2 changes: 2 additions & 0 deletions enums.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ const (
// Applicable on for PNLinearPolicy and PNExponentialPolicy.
// Reconnection attempts are set in the config: MaximumReconnectionRetries.
PNReconnectionAttemptsExhausted
// PNRequestMessageCountExceededCategory is fired when the MessageQueueOverflowCount limit is exceeded by the number of messages received in a single subscribe request
PNRequestMessageCountExceededCategory
)

const (
Expand Down
13 changes: 3 additions & 10 deletions examples/cli/cli_demo.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,9 @@ func connect() {
log.Println(http.ListenAndServe("localhost:6060", nil))
}()
config = pubnub.NewConfig()
//config.Origin = "ssp.pubnub.com"
config.UseHTTP2 = false

config.PNReconnectionPolicy = pubnub.PNExponentialPolicy
//config.AuthKey = "a"
//config.EnableLogging = false

var infoLogger *log.Logger

Expand All @@ -58,13 +55,9 @@ func connect() {
//config.Log = log.New(os.Stdout, "", log.Ldate|log.Ltime|log.Lshortfile)
config.Log = infoLogger
config.Log.SetPrefix("PubNub :-> ")
//config.SuppressLeaveEvents = true

//config.PublishKey = "pub-c-afeb2ec5-45e9-449f-9a8d-c4940a9c7836"
//config.SubscribeKey = "sub-c-e41d50d4-43ce-11e8-a433-9e6b275e7b64"
config.PublishKey = "pub-c-7e5c6521-91d0-4e60-9656-4bed419a769b"
config.SubscribeKey = "sub-c-b9ab9508-43cf-11e8-9967-869954283fb4"
config.SecretKey = "sec-c-MjRhODgwMTgtY2RmMS00ZWNmLTgzNTUtYjI3MzZhOThlNTY0"
config.PublishKey = "demo"
config.SubscribeKey = "demo"
config.SecretKey = "demo"

config.CipherKey = "enigma"
pn = pubnub.NewPubNub(config)
Expand Down
2 changes: 1 addition & 1 deletion fetch_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ type fetchOpts struct {
Start int64
End int64

// defualt: 100
// default: 100
Count int

// default: false
Expand Down
2 changes: 1 addition & 1 deletion history_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ type historyOpts struct {
Start int64
End int64

// defualt: 100
// default: 100
Count int

// default: false
Expand Down
2 changes: 1 addition & 1 deletion pubnub.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (
// Default constants
const (
// Version :the version of the SDK
Version = "4.1.0"
Version = "4.1.1"
// MaxSequence for publish messages
MaxSequence = 65535
)
Expand Down
2 changes: 1 addition & 1 deletion subscribe_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func (b *subscribeBuilder) ChannelGroups(groups []string) *subscribeBuilder {
return b
}

// Timetoken sets the timetoken to subscribe. Susbcribe will start to fetch the messages from this timetoken onwards.
// Timetoken sets the timetoken to subscribe. Subscribe will start to fetch the messages from this timetoken onwards.
func (b *subscribeBuilder) Timetoken(tt int64) *subscribeBuilder {
b.operation.Timetoken = tt

Expand Down
25 changes: 22 additions & 3 deletions subscription_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,10 @@ import (
// SubscriptionManager Events:
// - ConnectedCategory - after connection established
// - DisconnectedCategory - after subscription loop stops for any reason (no
// channels left or error happend)
// channels left or error happened)

// Unsubscribe.
// When you unsubscirbe from channel or channel group the following events
// When you unsubscribe from channel or channel group the following events
// happens:
// - LoopStopCategory - immediately when no more channels or channel groups left
// to subscribe
Expand Down Expand Up @@ -397,7 +397,19 @@ func (m *SubscriptionManager) startSubscribeLoop() {

m.listenerManager.announceStatus(pnStatus)
}
if len(envelope.Messages) > 0 {
messageCount := len(envelope.Messages)
if messageCount > 0 {
if messageCount > m.pubnub.Config.MessageQueueOverflowCount {
pnStatus := &PNStatus{
Error: false,
AffectedChannels: combinedChannels,
AffectedChannelGroups: combinedGroups,
Category: PNRequestMessageCountExceededCategory,
}
m.pubnub.Config.Log.Println("Status: ", pnStatus)

m.listenerManager.announceStatus(pnStatus)
}
for _, message := range envelope.Messages {
m.messages <- message
}
Expand Down Expand Up @@ -595,6 +607,13 @@ func subscribeMessageWorker(m *SubscriptionManager) {
m.exitSubscriptionManager = make(chan bool)
for m.exitSubscriptionManager != nil {
m.pubnub.Config.Log.Println("subscribeMessageWorker looping...")
combinedChannels := m.stateManager.prepareChannelList(true)
combinedGroups := m.stateManager.prepareGroupList(true)

if len(combinedChannels) == 0 && len(combinedGroups) == 0 {
m.pubnub.Config.Log.Println("subscribeMessageWorker all channels unsubscribed")
break
}
select {
case <-m.exitSubscriptionManager:
m.pubnub.Config.Log.Println("subscribeMessageWorker context done")
Expand Down
77 changes: 67 additions & 10 deletions tests/e2e/subscribe_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,67 @@ import (
"github.com/stretchr/testify/assert"
)

import _ "net/http/pprof"
import "net/http"
//import _ "net/http/pprof"
//import "net/http"

var timeout = 1000

func TestRequestMesssageOverflow(t *testing.T) {
assert := assert.New(t)
doneSubscribe := make(chan bool)
errChan := make(chan string)
ch := randomized("sub-message-overflow")

pn := pubnub.NewPubNub(configCopy())
pn.Config.MessageQueueOverflowCount = 2
pn.Config.Log = log.New(os.Stdout, "", log.Ldate|log.Ltime|log.Lshortfile)

timestamp1 := GetTimetoken(pn)
for i := 0; i < 3; i++ {
message := fmt.Sprintf("message %d", i)
pn.Publish().Channel(ch).Message(message).Execute()
}

listener := pubnub.NewListener()

go func() {
for {
select {
case status := <-listener.Status:
switch status.Category {
case pubnub.PNConnectedCategory:
continue
case pubnub.PNRequestMessageCountExceededCategory:
doneSubscribe <- true
break
default:
errChan <- fmt.Sprintf("error ===> %v", status)
break
}
case <-listener.Message:
errChan <- "Got message while awaiting for a status event"
break
case <-listener.Presence:
errChan <- "Got presence while awaiting for a status event"
break
}
}
}()

pn.AddListener(listener)

pn.Subscribe().Channels([]string{ch}).Timetoken(timestamp1).Execute()
tic := time.NewTicker(time.Duration(timeout) * time.Second)
select {
case <-doneSubscribe:
case err := <-errChan:
assert.Fail(err)
case <-tic.C:
tic.Stop()
assert.Fail("timeout")
}
}

/////////////////////////////
/////////////////////////////
// Structure
Expand All @@ -40,9 +96,9 @@ func TestSubscribeUnsubscribe(t *testing.T) {
doneUnsubscribe := make(chan bool)
errChan := make(chan string)
ch := randomized("sub-u-ch")
go func() {
log.Println(http.ListenAndServe("localhost:6063", nil))
}()
// go func() {
// log.Println(http.ListenAndServe("localhost:6063", nil))
// }()

interceptor := stubs.NewInterceptor()
interceptor.AddStub(&stubs.Stub{
Expand Down Expand Up @@ -112,7 +168,7 @@ func TestSubscribeUnsubscribe(t *testing.T) {
Channels([]string{ch}).
Execute()

tic := time.NewTicker(time.Duration(timeout) * time.Second)
tic := time.NewTicker(time.Duration(timeout) * time.Second * 2)
select {
case <-doneUnsubscribe:
//fmt.Println("doneUnsubscribe...")
Expand Down Expand Up @@ -1230,6 +1286,7 @@ func TestSubscribeUnsubscribeGroup(t *testing.T) {
cg := randomized("sub-sug-cg")

pn := pubnub.NewPubNub(configCopy())
pn.Config.Log = log.New(os.Stdout, "", log.Ldate|log.Ltime|log.Lshortfile)

listener := pubnub.NewListener()

Expand Down Expand Up @@ -1289,7 +1346,7 @@ func TestSubscribeUnsubscribeGroup(t *testing.T) {
ChannelGroups([]string{cg}).
Execute()

tic := time.NewTicker(time.Duration(timeout) * time.Second)
tic := time.NewTicker(time.Duration(timeout) * time.Second * 2)
select {
case <-doneUnsubscribe:
case err := <-errChan:
Expand All @@ -1310,9 +1367,9 @@ func TestSubscribeUnsubscribeGroup(t *testing.T) {
}

func TestSubscribePublishUnsubscribeAllGroup(t *testing.T) {
go func() {
log.Println(http.ListenAndServe("localhost:6061", nil))
}()
// go func() {
// log.Println(http.ListenAndServe("localhost:6061", nil))
// }()

assert := assert.New(t)
pn := pubnub.NewPubNub(configCopy())
Expand Down
2 changes: 1 addition & 1 deletion tests/helpers/url_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func UrlsEqual(expectedString, actualString string,
return true, nil
}

// mixedPositions - a postion of items which can contain unsorted items like
// PathsEqual mixedPositions - a position of items which can contain unsorted items like
// multiple unsorted channels. If no such positions expected use an empty slice.
// Like in arrays, the first position is 0.
func PathsEqual(expectedString, actualString string,
Expand Down
2 changes: 1 addition & 1 deletion time_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func (b *timeBuilder) Transport(tr http.RoundTripper) *timeBuilder {
return b
}

// Excecute runs the Time request and fetches the time from the server.
// Execute runs the Time request and fetches the time from the server.
func (b *timeBuilder) Execute() (*TimeResponse, StatusResponse, error) {
rawJSON, status, err := executeRequest(b.opts)
if err != nil {
Expand Down
1 change: 0 additions & 1 deletion utils/string_utils_1_5.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
// +build go1.5

package utils

2 changes: 0 additions & 2 deletions utils/string_utils_1_7.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
// +build go1.7

package utils


0 comments on commit 1977d45

Please sign in to comment.