Skip to content

Commit

Permalink
Merge pull request #102 from pubnub/develop
Browse files Browse the repository at this point in the history
Make publish/grant workers per instance
  • Loading branch information
crimsonred committed Feb 6, 2020
2 parents e406ec8 + 8fb1309 commit feb2559
Show file tree
Hide file tree
Showing 16 changed files with 242 additions and 147 deletions.
13 changes: 10 additions & 3 deletions .pubnub.yml
Original file line number Diff line number Diff line change
@@ -1,5 +1,12 @@
---
changelog:
-
changes:
-
text: "Make publish/grant workers per instance"
type: bug
date: Feb 5, 20
version: v4.6.4
-
changes:
-
Expand Down Expand Up @@ -505,12 +512,12 @@ supported-platforms:
- "1.9.7"
- "1.10.8"
- "1.11.13"
- "1.12.9"
- "1.13.4"
- "1.12.16"
- "1.13.7"
platforms:
- "FreeBSD 8-STABLE or later, amd64, 386"
- "Linux 2.6 or later, amd64, 386."
- "Mac OS X 10.8 or later, amd64"
- "Windows 7 or later, amd64, 386"
version: "PubNub Go SDK"
version: v4.6.3
version: v4.6.4
4 changes: 2 additions & 2 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ go:
- 1.9.7
- 1.10.8
- 1.11.13
- 1.12.9
- 1.13.5
- 1.12.16
- 1.13.7
- master
- tip

Expand Down
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
## [v4.6.4](https://github.com/pubnub/go/tree/v4.6.4)
February-5-2020

## [v4.6.3](https://github.com/pubnub/go/tree/v4.6.3)
January-28-2020

Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# PubNub 4.6.3 client for Go
# PubNub 4.6.4 client for Go
* Go (1.9+)

# Please direct all Support Questions and Concerns to Support@PubNub.com
Expand Down
2 changes: 1 addition & 1 deletion VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
4.6.3
4.6.4
4 changes: 3 additions & 1 deletion examples/cli/cli_demo.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ const outputSuffix = "\x1b[32;2m Example <<<< \x1b[0m"

func main() {
connect()
// go pubnub.NewPubNub(pubnub.NewConfig())
// pubnub.NewPubNub(pubnub.NewConfig())
}

func connect() {
Expand Down Expand Up @@ -56,7 +58,7 @@ func connect() {
config.PublishKey = "demo"
config.SubscribeKey = "demo"

config.FilterExpression = "name="
//config.FilterExpression = "name="

config.CipherKey = "enigma"

Expand Down
6 changes: 3 additions & 3 deletions pubnub.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (
// Default constants
const (
// Version :the version of the SDK
Version = "4.6.3"
Version = "4.6.4"
// MaxSequence for publish messages
MaxSequence = 65535
)
Expand Down Expand Up @@ -599,8 +599,8 @@ func (pn *PubNub) newNonSubQueueProcessor(maxWorkers int, ctx Context) *RequestW
pn.Config.Log.Printf("Init RequestWorkers: workers %d", maxWorkers)

p := &RequestWorkers{
Workers: workers,
MaxWorkers: maxWorkers,
WorkersChannel: workers,
MaxWorkers: maxWorkers,
}
p.Start(pn, ctx)
return p
Expand Down
5 changes: 5 additions & 0 deletions pubnub_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,3 +30,8 @@ func TestDemoInitializer(t *testing.T) {
assert.Equal("demo", demo.Config.SubscribeKey)
assert.Equal("demo", demo.Config.SecretKey)
}

func TestMultipleConcurrentInit(t *testing.T) {
go NewPubNub(NewConfig())
NewPubNub(NewConfig())
}
37 changes: 18 additions & 19 deletions request_workers.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,26 +21,25 @@ type JobQItem struct {
}

type RequestWorkers struct {
Workers chan chan *JobQItem
MaxWorkers int
Sem chan bool
Workers []Worker
WorkersChannel chan chan *JobQItem
MaxWorkers int
Sem chan bool
}

type Worker struct {
Workers chan chan *JobQItem
JobChannel chan *JobQItem
ctx Context
id int
WorkersChannel chan chan *JobQItem
JobChannel chan *JobQItem
ctx Context
id int
}

var workers []Worker

func newRequestWorkers(workers chan chan *JobQItem, id int, ctx Context) Worker {
return Worker{
Workers: workers,
JobChannel: make(chan *JobQItem),
ctx: ctx,
id: id,
WorkersChannel: workers,
JobChannel: make(chan *JobQItem),
ctx: ctx,
id: id,
}
}

Expand All @@ -50,7 +49,7 @@ func (pw Worker) Process(pubnub *PubNub) {
ProcessLabel:
for {
select {
case pw.Workers <- pw.JobChannel:
case pw.WorkersChannel <- pw.JobChannel:
job := <-pw.JobChannel
if job != nil {
res, err := job.Client.Do(job.Req)
Expand All @@ -75,12 +74,12 @@ func (pw Worker) Process(pubnub *PubNub) {
// Start starts the workers
func (p *RequestWorkers) Start(pubnub *PubNub, ctx Context) {
pubnub.Config.Log.Println("Start: Running with workers ", p.MaxWorkers)
workers = make([]Worker, p.MaxWorkers)
p.Workers = make([]Worker, p.MaxWorkers)
for i := 0; i < p.MaxWorkers; i++ {
pubnub.Config.Log.Println("Start: StartNonSubWorker ", i)
worker := newRequestWorkers(p.Workers, i, ctx)
worker := newRequestWorkers(p.WorkersChannel, i, ctx)
worker.Process(pubnub)
workers[i] = worker
p.Workers[i] = worker
}
go p.ReadQueue(pubnub)
}
Expand All @@ -90,7 +89,7 @@ func (p *RequestWorkers) ReadQueue(pubnub *PubNub) {
for job := range pubnub.jobQueue {
pubnub.Config.Log.Println("ReadQueue: Got job for channel ", job.Req)
go func(job *JobQItem) {
jobChannel := <-p.Workers
jobChannel := <-p.WorkersChannel
jobChannel <- job
}(job)
}
Expand All @@ -100,7 +99,7 @@ func (p *RequestWorkers) ReadQueue(pubnub *PubNub) {
// Close closes the workers
func (p *RequestWorkers) Close() {

for _, w := range workers {
for _, w := range p.Workers {
close(w.JobChannel)
w.ctx.Done()
}
Expand Down
4 changes: 2 additions & 2 deletions tests/e2e/grant_token_test.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package e2e

import (
"fmt"
//"fmt"

pubnub "github.com/pubnub/go"
"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -81,7 +81,7 @@ func TestGrantToken(t *testing.T) {
if err == nil {
chResources := pubnub.ParseGrantResources(cborObject.Resources, token, cborObject.Timestamp, cborObject.TTL)

fmt.Println(chResources)
//fmt.Println(chResources)

// assert.Equal(ch[ch1].Read, chResources.Channels[ch1].Permissions.Read)
// assert.Equal(ch[ch1].Write, chResources.Channels[ch1].Permissions.Write)
Expand Down
4 changes: 3 additions & 1 deletion tests/e2e/history_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,9 @@ func TestHistoryCallWithAllParams(t *testing.T) {

pn := pubnub.NewPubNub(configCopy())
pn.SetClient(interceptor.GetClient())
pn.Config.Log = log.New(os.Stdout, "", log.Ldate|log.Ltime|log.Lshortfile)
if enableDebuggingInTests {
pn.Config.Log = log.New(os.Stdout, "", log.Ldate|log.Ltime|log.Lshortfile)
}

res, _, err := pn.History().
Channel("ch").
Expand Down
18 changes: 9 additions & 9 deletions tests/e2e/message_actions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ func MessageActionsListenersCommon(t *testing.T, encrypted, withMeta, withMessag
case pubnub.PNConnectedCategory:
doneConnected <- true
default:
fmt.Println(" --- status: ", status)
//fmt.Println(" --- status: ", status)
}
case messageActionsEvent := <-listener.MessageActionsEvent:
if enableDebuggingInTests {
Expand Down Expand Up @@ -156,7 +156,7 @@ func MessageActionsListenersCommon(t *testing.T, encrypted, withMeta, withMessag
// read tt,
if resPub != nil {
messageTimetoken = strconv.FormatInt(resPub.Timestamp, 10)
fmt.Println("messageTimetoken", messageTimetoken)
//fmt.Println("messageTimetoken", messageTimetoken)

// add action,
ma := pubnub.MessageAction{
Expand Down Expand Up @@ -190,14 +190,14 @@ func MessageActionsListenersCommon(t *testing.T, encrypted, withMeta, withMessag
limit := 1

recActionTimetokenM1 := recActionTimetoken
fmt.Println("recActionTimetoken", recActionTimetoken)
//fmt.Println("recActionTimetoken", recActionTimetoken)

n, err := strconv.ParseInt(recActionTimetoken, 10, 64)
if err == nil {
n = n + 1
recActionTimetokenM1 = strconv.FormatInt(n, 10)
}
fmt.Println("recActionTimetokenM1", recActionTimetokenM1, limit)
//fmt.Println("recActionTimetokenM1", recActionTimetokenM1, limit)

resGetMA1, _, errGetMA1 := pnMA.GetMessageActions().Channel(chMA).Execute()
assert.Nil(errGetMA1)
Expand Down Expand Up @@ -307,7 +307,7 @@ func MessageActionsListenersCommon(t *testing.T, encrypted, withMeta, withMessag
func MatchHistoryMessageWithMAResp(assert *assert.Assertions, resp *pubnub.HistoryResponse, chMA, message string, messageTimetoken int64, meta interface{}, withMeta bool) {
if resp != nil {
messages := resp.Messages
fmt.Println("====> history messages:", messages)
//fmt.Println("====> history messages:", messages)
if messages != nil {
assert.Equal(message, messages[0].Message)
assert.Equal(messageTimetoken, messages[0].Timetoken)
Expand All @@ -316,7 +316,7 @@ func MatchHistoryMessageWithMAResp(assert *assert.Assertions, resp *pubnub.Histo
meta := messages[0].Meta.(map[string]interface{})
assert.Equal("n1", meta["m1"])
assert.Equal("n2", meta["m2"])
fmt.Println("meta:", meta)
//fmt.Println("meta:", meta)
} else {
assert.Fail("meta nil")
}
Expand All @@ -332,7 +332,7 @@ func MatchHistoryMessageWithMAResp(assert *assert.Assertions, resp *pubnub.Histo
func MatchFetchMessageWithMAResp(assert *assert.Assertions, resp *pubnub.FetchResponse, chMA, message string, messageTimetoken, recActionTimetokenM1 int64, UUID string, ma pubnub.MessageAction, meta interface{}, withMeta, withMessageActions bool) {
if resp != nil {
messages := resp.Messages
fmt.Println("messages:", messages)
//fmt.Println("messages:", messages)
m0 := messages[chMA]
if m0 != nil {
assert.Equal(message, m0[0].Message)
Expand All @@ -342,7 +342,7 @@ func MatchFetchMessageWithMAResp(assert *assert.Assertions, resp *pubnub.FetchRe
meta := m0[0].Meta.(map[string]interface{})
assert.Equal("n1", meta["m1"])
assert.Equal("n2", meta["m2"])
fmt.Println("meta:", meta)
//fmt.Println("meta:", meta)
} else {
assert.Fail("meta nil")
}
Expand All @@ -356,7 +356,7 @@ func MatchFetchMessageWithMAResp(assert *assert.Assertions, resp *pubnub.FetchRe
if r00 != nil {
assert.Equal(UUID, r00[0].UUID)
assert.Equal(strconv.FormatInt(recActionTimetokenM1, 10), r00[0].ActionTimetoken)
fmt.Println("action val:", r00[0].UUID, r00[0].ActionTimetoken)
//fmt.Println("action val:", r00[0].UUID, r00[0].ActionTimetoken)
} else {
assert.Fail("r0 nil")
}
Expand Down
Loading

0 comments on commit feb2559

Please sign in to comment.