diff --git a/.pubnub.yml b/.pubnub.yml index abfbed31..5126a4d4 100644 --- a/.pubnub.yml +++ b/.pubnub.yml @@ -1,5 +1,21 @@ --- changelog: + - + changes: + - + text: "QueryParams in all API calls" + type: feature + - + text: "d in grant" + type: feature + - + text: "maxIdleConnsPerHost setting in config" + type: feature + - + text: "Max concurrent workers for Publish and Grant requests" + type: improvement + date: Oct 16, 18 + version: v4.1.4 - changes: - @@ -269,10 +285,11 @@ supported-platforms: - "1.8.7" - "1.9.7" - "1.10.3" + - "1.11.1" platforms: - "FreeBSD 8-STABLE or later, amd64, 386" - "Linux 2.6 or later, amd64, 386." - "Mac OS X 10.8 or later, amd64" - "Windows 7 or later, amd64, 386" version: "PubNub Go SDK" -version: v4.1.3 +version: v4.1.4 diff --git a/.travis.yml b/.travis.yml index fb262e08..f7cb476a 100644 --- a/.travis.yml +++ b/.travis.yml @@ -5,6 +5,7 @@ go: - 1.8.7 - 1.9.7 - 1.10.3 + - 1.11.1 - tip matrix: diff --git a/README.md b/README.md index 5c53ae78..e65e0ad0 100644 --- a/README.md +++ b/README.md @@ -1,5 +1,5 @@ -# PubNub 4.1.3 client for Go +# PubNub 4.1.4 client for Go * Go (1.7.6+) # Please direct all Support Questions and Concerns to Support@PubNub.com diff --git a/VERSION b/VERSION index de197cc3..a95f2884 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -4.1.3 +4.1.4 diff --git a/add_channel_channel_group_request.go b/add_channel_channel_group_request.go index df1c6fd6..b1f7fa8b 100644 --- a/add_channel_channel_group_request.go +++ b/add_channel_channel_group_request.go @@ -66,6 +66,13 @@ func (b *addChannelToChannelGroupBuilder) Transport( return b } +// QueryParam accepts a map, the keys and values of the map are passed as the query string parameters of the URL called by the API. +func (b *addChannelToChannelGroupBuilder) QueryParam(queryParam map[string]string) *addChannelToChannelGroupBuilder { + b.opts.QueryParam = queryParam + + return b +} + // Execute runs AddChannelToChannelGroup request func (b *addChannelToChannelGroupBuilder) Execute() ( *AddChannelToChannelGroupResponse, StatusResponse, error) { @@ -78,15 +85,12 @@ func (b *addChannelToChannelGroupBuilder) Execute() ( } type addChannelOpts struct { - pubnub *PubNub - - Channels []string - + pubnub *PubNub + Channels []string ChannelGroup string - - Transport http.RoundTripper - - ctx Context + QueryParam map[string]string + Transport http.RoundTripper + ctx Context } func (o *addChannelOpts) config() Config { @@ -133,10 +137,15 @@ func (o *addChannelOpts) buildQuery() (*url.Values, error) { } q.Set("add", strings.Join(channels, ",")) + SetQueryParam(q, o.QueryParam) return q, nil } +func (o *addChannelOpts) jobQueue() chan *JobQItem { + return o.pubnub.jobQueue +} + func (o *addChannelOpts) buildBody() ([]byte, error) { return []byte{}, nil } diff --git a/add_channel_channel_group_request_test.go b/add_channel_channel_group_request_test.go index 6d256ca0..f7393b81 100644 --- a/add_channel_channel_group_request_test.go +++ b/add_channel_channel_group_request_test.go @@ -32,6 +32,47 @@ func TestAddChannelRequestBasic(t *testing.T) { expected := &url.Values{} expected.Set("add", "ch1,ch2,ch3") + + h.AssertQueriesEqual(t, expected, query, []string{"pnsdk", "uuid"}, []string{}) + + body, err := opts.buildBody() + assert.Nil(err) + + assert.Equal([]byte{}, body) +} + +func TestAddChannelRequestBasicQueryParam(t *testing.T) { + assert := assert.New(t) + + opts := &addChannelOpts{ + Channels: []string{"ch1", "ch2", "ch3"}, + ChannelGroup: "cg", + pubnub: pubnub, + } + queryParam := map[string]string{ + "q1": "v1", + "q2": "v2", + } + opts.QueryParam = queryParam + + path, err := opts.buildPath() + assert.Nil(err) + u := &url.URL{ + Path: path, + } + + h.AssertPathsEqual(t, + fmt.Sprintf("/v1/channel-registration/sub-key/sub_key/channel-group/cg"), + u.EscapedPath(), []int{}) + + query, err := opts.buildQuery() + assert.Nil(err) + + expected := &url.Values{} + expected.Set("q1", "v1") + expected.Set("q2", "v2") + expected.Set("add", "ch1,ch2,ch3") + h.AssertQueriesEqual(t, expected, query, []string{"pnsdk", "uuid"}, []string{}) body, err := opts.buildBody() diff --git a/add_channels_to_push_request.go b/add_channels_to_push_request.go index 70f7c07d..d5d1f80e 100644 --- a/add_channels_to_push_request.go +++ b/add_channels_to_push_request.go @@ -61,6 +61,13 @@ func (b *addPushNotificationsOnChannelsBuilder) DeviceIDForPush( return b } +// QueryParam accepts a map, the keys and values of the map are passed as the query string parameters of the URL called by the API. +func (b *addPushNotificationsOnChannelsBuilder) QueryParam(queryParam map[string]string) *addPushNotificationsOnChannelsBuilder { + b.opts.QueryParam = queryParam + + return b +} + // Execute runs add Push Notifications on channels request func (b *addPushNotificationsOnChannelsBuilder) Execute() ( *AddPushNotificationsOnChannelsResponse, StatusResponse, error) { @@ -73,17 +80,13 @@ func (b *addPushNotificationsOnChannelsBuilder) Execute() ( } type addChannelsToPushOpts struct { - pubnub *PubNub - - Channels []string - - PushType PNPushType - + pubnub *PubNub + Channels []string + PushType PNPushType DeviceIDForPush string - - Transport http.RoundTripper - - ctx Context + QueryParam map[string]string + Transport http.RoundTripper + ctx Context } func (o *addChannelsToPushOpts) config() Config { @@ -138,10 +141,15 @@ func (o *addChannelsToPushOpts) buildQuery() (*url.Values, error) { q.Set("add", strings.Join(channels, ",")) q.Set("type", o.PushType.String()) + SetQueryParam(q, o.QueryParam) return q, nil } +func (o *addChannelsToPushOpts) jobQueue() chan *JobQItem { + return o.pubnub.jobQueue +} + func (o *addChannelsToPushOpts) buildBody() ([]byte, error) { return []byte{}, nil } diff --git a/add_channels_to_push_request_test.go b/add_channels_to_push_request_test.go index 976d607a..565bed04 100644 --- a/add_channels_to_push_request_test.go +++ b/add_channels_to_push_request_test.go @@ -81,6 +81,29 @@ func TestAddChannelsToPushOptsBuildQuery(t *testing.T) { assert.Nil(err) } +func TestAddChannelsToPushOptsBuildQueryParams(t *testing.T) { + assert := assert.New(t) + queryParam := map[string]string{ + "q1": "v1", + "q2": "v2", + } + + opts := &addChannelsToPushOpts{ + Channels: []string{"ch1", "ch2", "ch3"}, + DeviceIDForPush: "deviceId", + PushType: PNPushTypeAPNS, + pubnub: pubnub, + QueryParam: queryParam, + } + + u, err := opts.buildQuery() + assert.Equal("ch1,ch2,ch3", u.Get("add")) + assert.Equal("apns", u.Get("type")) + assert.Equal("v1", u.Get("q1")) + assert.Equal("v2", u.Get("q2")) + assert.Nil(err) +} + func TestAddChannelsToPushOptsBuildBody(t *testing.T) { assert := assert.New(t) diff --git a/clients.go b/clients.go index 4d2b9d0f..60539c7d 100644 --- a/clients.go +++ b/clients.go @@ -8,13 +8,13 @@ import ( ) // NewHTTP1Client creates a new HTTP 1 client with a new transport initialized with connect and read timeout -func NewHTTP1Client(connectTimeout int, responseReadTimeout int) *http.Client { +func NewHTTP1Client(connectTimeout, responseReadTimeout, maxIdleConnsPerHost int) *http.Client { transport := &http.Transport{ - // MaxIdleConns: 30, Dial: (&net.Dialer{ // Covers establishing a new TCP connection Timeout: time.Duration(connectTimeout) * time.Second, }).Dial, + MaxIdleConnsPerHost: maxIdleConnsPerHost, } client := &http.Client{ diff --git a/config.go b/config.go index 8ed45262..04ddacab 100644 --- a/config.go +++ b/config.go @@ -36,6 +36,8 @@ type Config struct { DisablePNOtherProcessing bool // PNOther processing looks for pn_other in the JSON on the recevied message UseHTTP2 bool // HTTP2 Flag MessageQueueOverflowCount int // When the limit is exceeded by the number of messages received in a single subscribe request, a status event PNRequestMessageCountExceededCategory is fired. + MaxIdleConnsPerHost int // Used to set the value of HTTP Transport's MaxIdleConnsPerHost. + MaxWorkers int // Number of max workers for Publish and Grant requests } // NewDemoConfig initiates the config with demo keys, for tests only. @@ -65,6 +67,8 @@ func NewConfig() *Config { DisablePNOtherProcessing: false, PNReconnectionPolicy: PNNonePolicy, MessageQueueOverflowCount: 100, + MaxIdleConnsPerHost: 30, + MaxWorkers: 20, } return &c diff --git a/delete_channel_group_request.go b/delete_channel_group_request.go index ab422b08..d9d44232 100644 --- a/delete_channel_group_request.go +++ b/delete_channel_group_request.go @@ -45,6 +45,13 @@ func (b *deleteChannelGroupBuilder) ChannelGroup( return b } +// QueryParam accepts a map, the keys and values of the map are passed as the query string parameters of the URL called by the API. +func (b *deleteChannelGroupBuilder) QueryParam(queryParam map[string]string) *deleteChannelGroupBuilder { + b.opts.QueryParam = queryParam + + return b +} + // Execute runs the DeleteChannelGroup request. func (b *deleteChannelGroupBuilder) Execute() ( *DeleteChannelGroupResponse, StatusResponse, error) { @@ -58,13 +65,11 @@ func (b *deleteChannelGroupBuilder) Execute() ( } type deleteChannelGroupOpts struct { - pubnub *PubNub - + pubnub *PubNub ChannelGroup string - - Transport http.RoundTripper - - ctx Context + Transport http.RoundTripper + QueryParam map[string]string + ctx Context } func (o *deleteChannelGroupOpts) config() Config { @@ -102,10 +107,14 @@ func (o *deleteChannelGroupOpts) buildPath() (string, error) { func (o *deleteChannelGroupOpts) buildQuery() (*url.Values, error) { q := defaultQuery(o.pubnub.Config.UUID, o.pubnub.telemetryManager) - + SetQueryParam(q, o.QueryParam) return q, nil } +func (o *deleteChannelGroupOpts) jobQueue() chan *JobQItem { + return o.pubnub.jobQueue +} + func (o *deleteChannelGroupOpts) buildBody() ([]byte, error) { return []byte{}, nil } diff --git a/delete_channel_group_request_test.go b/delete_channel_group_request_test.go index cc376567..c7bb39a1 100644 --- a/delete_channel_group_request_test.go +++ b/delete_channel_group_request_test.go @@ -38,6 +38,43 @@ func TestDeleteChannelGroupRequestBasic(t *testing.T) { assert.Equal([]byte{}, body) } +func TestDeleteChannelGroupRequestBasicQueryParam(t *testing.T) { + assert := assert.New(t) + queryParam := map[string]string{ + "q1": "v1", + "q2": "v2", + } + + opts := &deleteChannelGroupOpts{ + ChannelGroup: "cg", + pubnub: pubnub, + QueryParam: queryParam, + } + + path, err := opts.buildPath() + assert.Nil(err) + u := &url.URL{ + Path: path, + } + h.AssertPathsEqual(t, + fmt.Sprintf("/v1/channel-registration/sub-key/sub_key/channel-group/cg/remove"), + u.EscapedPath(), []int{}) + + query, err := opts.buildQuery() + assert.Nil(err) + + expected := &url.Values{} + expected.Set("q1", "v1") + expected.Set("q2", "v2") + + h.AssertQueriesEqual(t, expected, query, []string{"pnsdk", "uuid"}, []string{}) + + body, err := opts.buildBody() + assert.Nil(err) + + assert.Equal([]byte{}, body) +} + func TestNewDeleteChannelGroupBuilder(t *testing.T) { assert := assert.New(t) o := newDeleteChannelGroupBuilder(pubnub) diff --git a/endpoints.go b/endpoints.go index d0dfb33f..3c01ccfa 100644 --- a/endpoints.go +++ b/endpoints.go @@ -12,20 +12,27 @@ import ( ) type endpointOpts interface { + jobQueue() chan *JobQItem config() Config client() *http.Client context() Context validate() error - buildPath() (string, error) buildQuery() (*url.Values, error) buildBody() ([]byte, error) - httpMethod() string operationType() OperationType telemetryManager() *TelemetryManager } +func SetQueryParam(q *url.Values, queryParam map[string]string) { + if queryParam != nil { + for key, value := range queryParam { + q.Set(key, value) + } + } +} + func defaultQuery(uuid string, telemetryManager *TelemetryManager) *url.Values { v := &url.Values{} @@ -76,6 +83,7 @@ func buildURL(o endpointOpts) (*url.URL, error) { } signedInput += utils.PreparePamParams(query) + //o.config().Log.Println("signedInput:", signedInput) signature = utils.GetHmacSha256(o.config().SecretKey, signedInput) } diff --git a/endpoints_test.go b/endpoints_test.go index f97b6b90..2edf1026 100644 --- a/endpoints_test.go +++ b/endpoints_test.go @@ -30,6 +30,10 @@ func (o *fakeEndpointOpts) buildBody() ([]byte, error) { return []byte("myBody"), nil } +func (o *fakeEndpointOpts) jobQueue() chan *JobQItem { + return o.pubnub.jobQueue +} + func (o *fakeEndpointOpts) config() Config { return *o.pubnub.Config } diff --git a/examples/cli/cli_demo.go b/examples/cli/cli_demo.go index be28d0ad..c7d2b45e 100644 --- a/examples/cli/cli_demo.go +++ b/examples/cli/cli_demo.go @@ -58,10 +58,12 @@ func connect() { config.PublishKey = "demo" config.SubscribeKey = "demo" config.SecretKey = "demo" + config.AuthKey = "akey" config.CipherKey = "enigma" pn = pubnub.NewPubNub(config) + // for subscribe event listener := pubnub.NewListener() @@ -954,6 +956,10 @@ func publishRequest(args []string) { } channels := strings.Split(args[4], ",") + queryParam := map[string]string{ + "q1": "v1", + "q2": "v2", + } for _, ch := range channels { fmt.Println(fmt.Sprintf("%s Publishing to channel: %s", outputPrefix, ch)) @@ -962,7 +968,7 @@ func publishRequest(args []string) { Message(res). UsePost(usePost). ShouldStore(store). - DoNotReplicate(repl). + DoNotReplicate(repl).QueryParam(queryParam). Execute() if err != nil { @@ -1043,6 +1049,10 @@ func subscribeRequest(args []string) { } withPresence, _ := strconv.ParseBool(args[0]) + queryParam := map[string]string{ + "q1": "v1", + "q2": "v2", + } channels := strings.Split(args[1], ",") if (len(args)) > 3 { @@ -1084,6 +1094,7 @@ func subscribeRequest(args []string) { pn.Subscribe(). Channels(channels). WithPresence(withPresence). + QueryParam(queryParam). Execute() } diff --git a/fetch_request.go b/fetch_request.go index 4c1fe2f9..bd2032be 100644 --- a/fetch_request.go +++ b/fetch_request.go @@ -77,6 +77,13 @@ func (b *fetchBuilder) Reverse(r bool) *fetchBuilder { return b } +// QueryParam accepts a map, the keys and values of the map are passed as the query string parameters of the URL called by the API. +func (b *fetchBuilder) QueryParam(queryParam map[string]string) *fetchBuilder { + b.opts.QueryParam = queryParam + + return b +} + // Transport sets the Transport for the Fetch request. func (b *fetchBuilder) Transport(tr http.RoundTripper) *fetchBuilder { b.opts.Transport = tr @@ -109,6 +116,7 @@ type fetchOpts struct { // default: false IncludeTimetoken bool + QueryParam map[string]string // nil hacks setStart bool @@ -169,10 +177,15 @@ func (o *fetchOpts) buildQuery() (*url.Values, error) { } q.Set("reverse", strconv.FormatBool(o.Reverse)) + SetQueryParam(q, o.QueryParam) return q, nil } +func (o *fetchOpts) jobQueue() chan *JobQItem { + return o.pubnub.jobQueue +} + func (o *fetchOpts) buildBody() ([]byte, error) { return []byte{}, nil } diff --git a/fetch_request_test.go b/fetch_request_test.go index 32764e45..7dad4d84 100644 --- a/fetch_request_test.go +++ b/fetch_request_test.go @@ -32,6 +32,34 @@ func AssertSuccessFetchGet(t *testing.T, expectedString string, channels []strin assert.Empty(body) } +func TestFetchQueryParam(t *testing.T) { + channels := []string{"test1", "test2"} + AssertSuccessFetchGetQueryParam(t, "%22test%22?max=25&reverse=false", channels) +} + +func AssertSuccessFetchGetQueryParam(t *testing.T, expectedString string, channels []string) { + assert := assert.New(t) + queryParam := map[string]string{ + "q1": "v1", + "q2": "v2", + } + + opts := &fetchOpts{ + Channels: channels, + Reverse: false, + IncludeTimetoken: true, + pubnub: pubnub, + QueryParam: queryParam, + } + + u, err := opts.buildQuery() + assert.Nil(err) + + assert.Equal("v1", u.Get("q1")) + assert.Equal("v2", u.Get("q2")) + +} + func AssertSuccessFetchQuery(t *testing.T, expectedString string, channels []string) { opts := &fetchOpts{ Channels: channels, diff --git a/fire_request.go b/fire_request.go index 1a6421e2..635a9a83 100644 --- a/fire_request.go +++ b/fire_request.go @@ -24,6 +24,7 @@ type fireOpts struct { DoNotReplicate bool Transport http.RoundTripper ctx Context + QueryParam map[string]string // nil hacks setTTL bool setShouldStore bool @@ -105,6 +106,13 @@ func (b *fireBuilder) Transport(tr http.RoundTripper) *fireBuilder { return b } +// QueryParam accepts a map, the keys and values of the map are passed as the query string parameters of the URL called by the API. +func (b *fireBuilder) QueryParam(queryParam map[string]string) *fireBuilder { + b.opts.QueryParam = queryParam + + return b +} + // Execute runs the Fire request. func (b *fireBuilder) Execute() (*PublishResponse, StatusResponse, error) { b.opts.ShouldStore = false @@ -202,10 +210,15 @@ func (o *fireOpts) buildQuery() (*url.Values, error) { } q.Set("seqn", strconv.Itoa(o.pubnub.getPublishSequence())) + SetQueryParam(q, o.QueryParam) return q, nil } +func (o *fireOpts) jobQueue() chan *JobQItem { + return o.pubnub.jobQueue +} + func (o *fireOpts) buildBody() ([]byte, error) { if o.UsePost { var msg []byte diff --git a/fire_request_test.go b/fire_request_test.go index b389d588..c25be913 100644 --- a/fire_request_test.go +++ b/fire_request_test.go @@ -230,6 +230,29 @@ func TestFireDoNotSerializePost(t *testing.T) { assert.NotEmpty(body) } +func TestFireDoNotSerializeQueryParam(t *testing.T) { + assert := assert.New(t) + + message := "{\"one\":\"hey\"}" + queryParam := map[string]string{ + "q1": "v1", + "q2": "v2", + } + + opts := &fireOpts{ + Channel: "ch", + Message: message, + pubnub: pubnub, + QueryParam: queryParam, + } + + b, err := opts.buildQuery() + assert.Nil(err) + assert.Equal("v1", b.Get("q1")) + assert.Equal("v2", b.Get("q2")) + +} + func TestValidatePublishKey(t *testing.T) { assert := assert.New(t) pn := NewPubNub(NewDemoConfig()) diff --git a/get_state_request.go b/get_state_request.go index 57018491..415e1f3b 100644 --- a/get_state_request.go +++ b/get_state_request.go @@ -58,6 +58,13 @@ func (b *getStateBuilder) ChannelGroups(cg []string) *getStateBuilder { return b } +// QueryParam accepts a map, the keys and values of the map are passed as the query string parameters of the URL called by the API. +func (b *getStateBuilder) QueryParam(queryParam map[string]string) *getStateBuilder { + b.opts.QueryParam = queryParam + + return b +} + // UUID sets the UUID for the Get State request. func (b *getStateBuilder) UUID(uuid string) *getStateBuilder { b.opts.UUID = uuid @@ -86,13 +93,11 @@ func (b *getStateBuilder) Execute() ( } type getStateOpts struct { - pubnub *PubNub - - Channels []string - + pubnub *PubNub + Channels []string ChannelGroups []string - - UUID string + UUID string + QueryParam map[string]string Transport http.RoundTripper @@ -146,10 +151,15 @@ func (o *getStateOpts) buildQuery() (*url.Values, error) { } q.Set("channel-group", strings.Join(groups, ",")) + SetQueryParam(q, o.QueryParam) return q, nil } +func (o *getStateOpts) jobQueue() chan *JobQItem { + return o.pubnub.jobQueue +} + func (o *getStateOpts) buildBody() ([]byte, error) { return []byte{}, nil } diff --git a/get_state_request_test.go b/get_state_request_test.go index 5e9c9060..837181fc 100644 --- a/get_state_request_test.go +++ b/get_state_request_test.go @@ -127,6 +127,48 @@ func TestNewGetStateBuilder(t *testing.T) { assert.Equal([]byte{}, body) } +func TestNewGetStateBuilderQueryParam(t *testing.T) { + assert := assert.New(t) + + pubnub.Config.UUID = "my-custom-uuid" + + o := newGetStateBuilder(pubnub) + o.Channels([]string{"ch"}) + o.ChannelGroups([]string{"cg"}) + queryParam := map[string]string{ + "q1": "v1", + "q2": "v2", + } + + path, err := o.opts.buildPath() + o.opts.QueryParam = queryParam + + assert.Nil(err) + u := &url.URL{ + Path: path, + } + h.AssertPathsEqual(t, + "/v2/presence/sub-key/sub_key/channel/ch/uuid/my-custom-uuid", + u.EscapedPath(), []int{}) + + query, err := o.opts.buildQuery() + assert.Equal("v1", query.Get("q1")) + assert.Equal("v2", query.Get("q2")) + + assert.Nil(err) + + expected := &url.Values{} + expected.Set("channel-group", "cg") + expected.Set("q1", "v1") + expected.Set("q2", "v2") + + h.AssertQueriesEqual(t, expected, query, []string{"pnsdk", "uuid"}, []string{}) + + body, err := o.opts.buildBody() + assert.Nil(err) + assert.Equal([]byte{}, body) +} + func TestNewGetStateBuilderContext(t *testing.T) { assert := assert.New(t) diff --git a/grant_request.go b/grant_request.go index d04bdd7d..2fa09a6c 100644 --- a/grant_request.go +++ b/grant_request.go @@ -14,7 +14,20 @@ import ( "github.com/pubnub/go/pnerr" ) +// PNGrantType grant types +type PNGrantType int + const grantPath = "/v1/auth/grant/sub-key/%s" +const ( + // PNReadEnabled Read Enabled + PNReadEnabled PNGrantType = 1 + iota + // PNWriteEnabled Write Enabled + PNWriteEnabled + // PNManageEnabled Manage Enabled + PNManageEnabled + // PNDeleteEnabled Delete Enabled + PNDeleteEnabled +) var emptyGrantResponse *GrantResponse @@ -61,6 +74,12 @@ func (b *grantBuilder) Manage(manage bool) *grantBuilder { return b } +func (b *grantBuilder) Delete(del bool) *grantBuilder { + b.opts.Delete = del + + return b +} + // TTL in minutes for which granted permissions are valid. // // Min: 1 @@ -96,6 +115,13 @@ func (b *grantBuilder) ChannelGroups(groups []string) *grantBuilder { return b } +// QueryParam accepts a map, the keys and values of the map are passed as the query string parameters of the URL called by the API. +func (b *grantBuilder) QueryParam(queryParam map[string]string) *grantBuilder { + b.opts.QueryParam = queryParam + + return b +} + // Execute runs the Grant request. func (b *grantBuilder) Execute() (*GrantResponse, StatusResponse, error) { rawJSON, status, err := executeRequest(b.opts) @@ -113,13 +139,14 @@ type grantOpts struct { AuthKeys []string Channels []string ChannelGroups []string + QueryParam map[string]string // Stringified permissions // Setting 'true' or 'false' will apply permissions to level Read bool Write bool Manage bool - + Delete bool // Max: 525600 // Min: 1 // Default: 1440 @@ -183,6 +210,12 @@ func (o *grantOpts) buildQuery() (*url.Values, error) { q.Set("m", "0") } + if o.Delete { + q.Set("d", "1") + } else { + q.Set("d", "0") + } + if len(o.AuthKeys) > 0 { q.Set("auth", strings.Join(o.AuthKeys, ",")) } @@ -203,10 +236,15 @@ func (o *grantOpts) buildQuery() (*url.Values, error) { timestamp := time.Now().Unix() q.Set("timestamp", strconv.Itoa(int(timestamp))) + SetQueryParam(q, o.QueryParam) return q, nil } +func (o *grantOpts) jobQueue() chan *JobQItem { + return o.pubnub.jobQueue +} + func (o *grantOpts) buildBody() ([]byte, error) { return []byte{}, nil } @@ -248,6 +286,7 @@ type GrantResponse struct { ReadEnabled bool WriteEnabled bool ManageEnabled bool + DeleteEnabled bool } // PNPAMEntityData is the struct containing the access details of the channels. @@ -257,6 +296,7 @@ type PNPAMEntityData struct { ReadEnabled bool WriteEnabled bool ManageEnabled bool + DeleteEnabled bool TTL int } @@ -265,6 +305,7 @@ type PNAccessManagerKeyData struct { ReadEnabled bool WriteEnabled bool ManageEnabled bool + DeleteEnabled bool TTL int } @@ -299,37 +340,7 @@ func newGrantResponse(jsonBytes []byte, status StatusResponse) ( } for key, value := range channelMap { - valueMap := value.(map[string]interface{}) - keyData := &PNAccessManagerKeyData{} - - if val, ok := valueMap["r"]; ok { - parsedValue, _ := val.(float64) - if parsedValue == float64(1) { - keyData.ReadEnabled = true - } else { - keyData.ReadEnabled = false - } - } - - if val, ok := valueMap["w"]; ok { - parsedValue, _ := val.(float64) - if parsedValue == float64(1) { - keyData.WriteEnabled = true - } else { - keyData.WriteEnabled = false - } - } - - if val, ok := valueMap["m"]; ok { - parsedValue, _ := val.(float64) - if parsedValue == float64(1) { - keyData.ManageEnabled = true - } else { - keyData.ManageEnabled = false - } - } - - auths[key] = keyData + auths[key] = createPNAccessManagerKeyData(value, entityData, false) } entityData.AuthKeys = auths @@ -340,59 +351,23 @@ func newGrantResponse(jsonBytes []byte, status StatusResponse) ( if val, ok := parsedPayload["channel-groups"]; ok { groupName, _ := val.(string) constructedAuthKey := make(map[string]*PNAccessManagerKeyData) - entityData := PNPAMEntityData{ + entityData := &PNPAMEntityData{ Name: groupName, } if _, ok := val.(string); ok { for authKeyName, value := range auths { - auth, _ := value.(map[string]interface{}) - - managerKeyData := &PNAccessManagerKeyData{} - - if val, ok := auth["r"]; ok { - parsedValue, _ := val.(float64) - if parsedValue == float64(1) { - managerKeyData.ReadEnabled = true - } else { - managerKeyData.ReadEnabled = false - } - } - - if val, ok := auth["w"]; ok { - parsedValue, _ := val.(float64) - if parsedValue == float64(1) { - managerKeyData.WriteEnabled = true - } else { - managerKeyData.WriteEnabled = false - } - } - - if val, ok := auth["m"]; ok { - parsedValue, _ := val.(float64) - if parsedValue == float64(1) { - managerKeyData.ManageEnabled = true - } else { - managerKeyData.ManageEnabled = false - } - } - - if val, ok := auth["ttl"]; ok { - parsedVal, _ := val.(int) - entityData.TTL = parsedVal - } - - constructedAuthKey[authKeyName] = managerKeyData + constructedAuthKey[authKeyName] = createPNAccessManagerKeyData(value, entityData, false) } entityData.AuthKeys = constructedAuthKey - constructedGroups[groupName] = &entityData + constructedGroups[groupName] = entityData } if groupMap, ok := val.(map[string]interface{}); ok { groupName, _ := val.(string) constructedAuthKey := make(map[string]*PNAccessManagerKeyData) - entityData := PNPAMEntityData{ + entityData := &PNPAMEntityData{ Name: groupName, } @@ -401,68 +376,13 @@ func newGrantResponse(jsonBytes []byte, status StatusResponse) ( if keys, ok := valueMap["auths"]; ok { parsedKeys, _ := keys.(map[string]interface{}) - keyData := &PNAccessManagerKeyData{} for keyName, value := range parsedKeys { - valueMap, _ := value.(map[string]interface{}) - - if val, ok := valueMap["r"]; ok { - parsedValue, _ := val.(float64) - if parsedValue == float64(1) { - keyData.ReadEnabled = true - } else { - keyData.ReadEnabled = false - } - } - - if val, ok := valueMap["w"]; ok { - parsedValue, _ := val.(float64) - if parsedValue == float64(1) { - keyData.WriteEnabled = true - } else { - keyData.WriteEnabled = false - } - } - - if val, ok := valueMap["m"]; ok { - parsedValue, _ := val.(float64) - if parsedValue == float64(1) { - keyData.ManageEnabled = true - } else { - keyData.ManageEnabled = false - } - } - - constructedAuthKey[keyName] = keyData + constructedAuthKey[keyName] = createPNAccessManagerKeyData(value, entityData, false) } } - if val, ok := valueMap["r"]; ok { - parsedValue, _ := val.(float64) - if parsedValue == float64(1) { - entityData.ReadEnabled = true - } else { - entityData.ReadEnabled = false - } - } - - if val, ok := valueMap["w"]; ok { - parsedValue, _ := val.(float64) - if parsedValue == float64(1) { - entityData.WriteEnabled = true - } else { - entityData.WriteEnabled = false - } - } - - if val, ok := valueMap["m"]; ok { - parsedValue, _ := val.(float64) - if parsedValue == float64(1) { - entityData.ManageEnabled = true - } else { - entityData.ManageEnabled = false - } - } + createPNAccessManagerKeyData(valueMap, entityData, true) if val, ok := parsedPayload["ttl"]; ok { parsedVal, _ := val.(float64) @@ -470,7 +390,7 @@ func newGrantResponse(jsonBytes []byte, status StatusResponse) ( } entityData.AuthKeys = constructedAuthKey - constructedGroups[groupName] = &entityData + constructedGroups[groupName] = entityData } } } @@ -519,6 +439,15 @@ func newGrantResponse(jsonBytes []byte, status StatusResponse) ( } } + if r, ok := parsedPayload["d"]; ok { + parsedValue, _ := r.(float64) + if parsedValue == float64(1) { + resp.DeleteEnabled = true + } else { + resp.DeleteEnabled = false + } + } + if r, ok := parsedPayload["ttl"]; ok { parsedValue, _ := r.(float64) resp.TTL = int(parsedValue) @@ -541,73 +470,76 @@ func fetchChannel(channelName string, parsedValue := val.(map[string]interface{}) for key, value := range parsedValue { - valueMap := value.(map[string]interface{}) - keyData := &PNAccessManagerKeyData{} - - if val, ok := valueMap["r"]; ok { - parsedValue, _ := val.(float64) - if parsedValue == float64(1) { - keyData.ReadEnabled = true - } else { - keyData.ReadEnabled = false - } - } + auths[key] = createPNAccessManagerKeyData(value, entityData, false) + } + } - if val, ok := valueMap["w"]; ok { - parsedValue, _ := val.(float64) - if parsedValue == float64(1) { - keyData.WriteEnabled = true - } else { - keyData.WriteEnabled = false - } - } + createPNAccessManagerKeyData(value, entityData, true) - if val, ok := valueMap["m"]; ok { - parsedValue, _ := val.(float64) - if parsedValue == float64(1) { - keyData.ManageEnabled = true - } else { - keyData.ManageEnabled = false - } - } + if val, ok := parsedPayload["ttl"]; ok { + parsedVal, _ := val.(float64) + entityData.TTL = int(parsedVal) + } + + entityData.AuthKeys = auths - auths[key] = keyData + return entityData +} + +func readKeyData(val interface{}, keyData *PNAccessManagerKeyData, entityData *PNPAMEntityData, writeToEntityData bool, grantType PNGrantType) { + parsedValue, _ := val.(float64) + readValue := false + if parsedValue == float64(1) { + readValue = true + } + if writeToEntityData { + switch grantType { + case PNReadEnabled: + entityData.ReadEnabled = readValue + case PNWriteEnabled: + entityData.WriteEnabled = readValue + case PNManageEnabled: + entityData.ManageEnabled = readValue + case PNDeleteEnabled: + entityData.DeleteEnabled = readValue + } + } else { + switch grantType { + case PNReadEnabled: + keyData.ReadEnabled = readValue + case PNWriteEnabled: + keyData.WriteEnabled = readValue + case PNManageEnabled: + keyData.ManageEnabled = readValue + case PNDeleteEnabled: + keyData.DeleteEnabled = readValue } } +} + +func createPNAccessManagerKeyData(value interface{}, entityData *PNPAMEntityData, writeToEntityData bool) *PNAccessManagerKeyData { + valueMap := value.(map[string]interface{}) + keyData := &PNAccessManagerKeyData{} if val, ok := valueMap["r"]; ok { - parsedValue, _ := val.(float64) - if parsedValue == float64(1) { - entityData.ReadEnabled = true - } else { - entityData.ReadEnabled = false - } + readKeyData(val, keyData, entityData, writeToEntityData, PNReadEnabled) } if val, ok := valueMap["w"]; ok { - parsedValue, _ := val.(float64) - if parsedValue == float64(1) { - entityData.WriteEnabled = true - } else { - entityData.WriteEnabled = false - } + readKeyData(val, keyData, entityData, writeToEntityData, PNWriteEnabled) } if val, ok := valueMap["m"]; ok { - parsedValue, _ := val.(float64) - if parsedValue == float64(1) { - entityData.ManageEnabled = true - } else { - entityData.ManageEnabled = false - } + readKeyData(val, keyData, entityData, writeToEntityData, PNManageEnabled) } - if val, ok := parsedPayload["ttl"]; ok { - parsedVal, _ := val.(float64) - entityData.TTL = int(parsedVal) + if val, ok := valueMap["d"]; ok { + readKeyData(val, keyData, entityData, writeToEntityData, PNDeleteEnabled) } - entityData.AuthKeys = auths - - return entityData + if val, ok := valueMap["ttl"]; ok { + parsedVal, _ := val.(int) + entityData.TTL = parsedVal + } + return keyData } diff --git a/grant_request_test.go b/grant_request_test.go index 616268fe..153004e6 100644 --- a/grant_request_test.go +++ b/grant_request_test.go @@ -44,6 +44,7 @@ func TestGrantRequestBasic(t *testing.T) { expected.Set("r", "1") expected.Set("w", "1") expected.Set("m", "1") + expected.Set("d", "0") expected.Set("ttl", "5000") h.AssertQueriesEqual(t, expected, query, []string{"pnsdk", "uuid", "timestamp"}, []string{}) @@ -54,6 +55,53 @@ func TestGrantRequestBasic(t *testing.T) { assert.Equal([]byte{}, body) } +func TestGrantRequestBasicQueryParam(t *testing.T) { + assert := assert.New(t) + + opts := &grantOpts{ + AuthKeys: []string{"my-auth-key"}, + Channels: []string{"ch"}, + ChannelGroups: []string{"cg"}, + Read: true, + Write: true, + Manage: true, + TTL: 5000, + setTTL: true, + pubnub: pubnub, + } + + queryParam := map[string]string{ + "q1": "v1", + "q2": "v2", + } + + opts.QueryParam = queryParam + + query, err := opts.buildQuery() + assert.Nil(err) + + expected := &url.Values{} + expected.Set("auth", "my-auth-key") + expected.Set("channel", "ch") + expected.Set("channel-group", "cg") + expected.Set("r", "1") + expected.Set("w", "1") + expected.Set("m", "1") + expected.Set("d", "0") + expected.Set("ttl", "5000") + expected.Set("q1", "v1") + expected.Set("q2", "v2") + + h.AssertQueriesEqual(t, expected, query, + []string{"pnsdk", "uuid", "timestamp"}, []string{}) + + body, err := opts.buildBody() + + assert.Nil(err) + assert.Equal([]byte{}, body) + +} + func TestNewGrantBuilder(t *testing.T) { assert := assert.New(t) o := newGrantBuilder(pubnub) @@ -63,6 +111,50 @@ func TestNewGrantBuilder(t *testing.T) { o.Read(true) o.Write(true) o.Manage(true) + o.Delete(true) + o.TTL(5000) + + path, err := o.opts.buildPath() + assert.Nil(err) + u := &url.URL{ + Path: path, + } + + h.AssertPathsEqual(t, + fmt.Sprintf("/v1/auth/grant/sub-key/%s", o.opts.pubnub.Config.SubscribeKey), + u.EscapedPath(), []int{}) + + query, err := o.opts.buildQuery() + assert.Nil(err) + + expected := &url.Values{} + expected.Set("auth", "my-auth-key") + expected.Set("channel", "ch") + expected.Set("channel-group", "cg") + expected.Set("r", "1") + expected.Set("w", "1") + expected.Set("m", "1") + expected.Set("d", "1") + expected.Set("ttl", "5000") + h.AssertQueriesEqual(t, expected, query, + []string{"pnsdk", "uuid", "timestamp"}, []string{}) + + body, err := o.opts.buildBody() + + assert.Nil(err) + assert.Equal([]byte{}, body) +} + +func TestNewGrantBuilderDelFalse(t *testing.T) { + assert := assert.New(t) + o := newGrantBuilder(pubnub) + o.AuthKeys([]string{"my-auth-key"}) + o.Channels([]string{"ch"}) + o.ChannelGroups([]string{"cg"}) + o.Read(true) + o.Write(true) + o.Manage(true) + o.Delete(false) o.TTL(5000) path, err := o.opts.buildPath() @@ -85,6 +177,7 @@ func TestNewGrantBuilder(t *testing.T) { expected.Set("r", "1") expected.Set("w", "1") expected.Set("m", "1") + expected.Set("d", "0") expected.Set("ttl", "5000") h.AssertQueriesEqual(t, expected, query, []string{"pnsdk", "uuid", "timestamp"}, []string{}) @@ -105,6 +198,11 @@ func TestNewGrantBuilderContext(t *testing.T) { o.Write(true) o.Manage(true) o.TTL(5000) + queryParam := map[string]string{ + "q1": "v1", + "q2": "v2", + } + o.QueryParam(queryParam) path, err := o.opts.buildPath() assert.Nil(err) @@ -126,7 +224,10 @@ func TestNewGrantBuilderContext(t *testing.T) { expected.Set("r", "1") expected.Set("w", "1") expected.Set("m", "1") + expected.Set("d", "0") expected.Set("ttl", "5000") + expected.Set("q1", "v1") + expected.Set("q2", "v2") h.AssertQueriesEqual(t, expected, query, []string{"pnsdk", "uuid", "timestamp"}, []string{}) diff --git a/heartbeat_request.go b/heartbeat_request.go index 38a72f56..33e567f0 100644 --- a/heartbeat_request.go +++ b/heartbeat_request.go @@ -143,6 +143,10 @@ func (o *heartbeatOpts) buildQuery() (*url.Values, error) { return q, nil } +func (o *heartbeatOpts) jobQueue() chan *JobQItem { + return o.pubnub.jobQueue +} + func (o *heartbeatOpts) buildBody() ([]byte, error) { return []byte{}, nil } diff --git a/here_now_request.go b/here_now_request.go index 24531f45..0df69613 100644 --- a/here_now_request.go +++ b/here_now_request.go @@ -73,6 +73,13 @@ func (b *hereNowBuilder) IncludeUUIDs(uuid bool) *hereNowBuilder { return b } +// QueryParam accepts a map, the keys and values of the map are passed as the query string parameters of the URL called by the API. +func (b *hereNowBuilder) QueryParam(queryParam map[string]string) *hereNowBuilder { + b.opts.QueryParam = queryParam + + return b +} + // Execute runs the HereNow request. func (b *hereNowBuilder) Execute() (*HereNowResponse, StatusResponse, error) { rawJSON, status, err := executeRequest(b.opts) @@ -86,14 +93,13 @@ func (b *hereNowBuilder) Execute() (*HereNowResponse, StatusResponse, error) { type hereNowOpts struct { pubnub *PubNub - Channels []string - ChannelGroups []string - - IncludeUUIDs bool - IncludeState bool - + Channels []string + ChannelGroups []string + IncludeUUIDs bool + IncludeState bool SetIncludeState bool SetIncludeUUIDs bool + QueryParam map[string]string Transport http.RoundTripper @@ -156,9 +162,15 @@ func (o *hereNowOpts) buildQuery() (*url.Values, error) { q.Set("disable-uuids", "0") } + SetQueryParam(q, o.QueryParam) + return q, nil } +func (o *hereNowOpts) jobQueue() chan *JobQItem { + return o.pubnub.jobQueue +} + func (o *hereNowOpts) buildBody() ([]byte, error) { return []byte{}, nil } diff --git a/here_now_request_test.go b/here_now_request_test.go index 2ba812db..1cbaa9f2 100644 --- a/here_now_request_test.go +++ b/here_now_request_test.go @@ -113,6 +113,42 @@ func TestHereNowMultipleWithOpts(t *testing.T) { assert.Equal([]byte{}, body) } +func TestHereNowMultipleWithOptsQueryParam(t *testing.T) { + assert := assert.New(t) + + opts := &hereNowOpts{ + Channels: []string{"ch1", "ch2", "ch3"}, + ChannelGroups: []string{"cg1", "cg2", "cg3"}, + IncludeUUIDs: false, + IncludeState: true, + SetIncludeState: true, + SetIncludeUUIDs: true, + pubnub: pubnub, + } + queryParam := map[string]string{ + "q1": "v1", + "q2": "v2", + } + + opts.QueryParam = queryParam + + query, err := opts.buildQuery() + assert.Nil(err) + + expected := &url.Values{} + expected.Set("channel-group", "cg1,cg2,cg3") + expected.Set("state", "1") + expected.Set("disable-uuids", "1") + expected.Set("q1", "v1") + expected.Set("q2", "v2") + + h.AssertQueriesEqual(t, expected, query, []string{"pnsdk", "uuid"}, []string{}) + + body, err := opts.buildBody() + assert.Nil(err) + assert.Equal([]byte{}, body) +} + func TestHereNowGlobal(t *testing.T) { assert := assert.New(t) diff --git a/history_delete_request.go b/history_delete_request.go index 651a1d21..85b48376 100644 --- a/history_delete_request.go +++ b/history_delete_request.go @@ -59,6 +59,13 @@ func (b *historyDeleteBuilder) End(end int64) *historyDeleteBuilder { return b } +// QueryParam accepts a map, the keys and values of the map are passed as the query string parameters of the URL called by the API. +func (b *historyDeleteBuilder) QueryParam(queryParam map[string]string) *historyDeleteBuilder { + b.opts.QueryParam = queryParam + + return b +} + // Transport sets the Transport for the DeleteMessages request. func (b *historyDeleteBuilder) Transport(tr http.RoundTripper) *historyDeleteBuilder { b.opts.Transport = tr @@ -78,10 +85,10 @@ func (b *historyDeleteBuilder) Execute() (*HistoryDeleteResponse, StatusResponse type historyDeleteOpts struct { pubnub *PubNub - Channel string - - Start int64 - End int64 + Channel string + Start int64 + End int64 + QueryParam map[string]string SetStart bool SetEnd bool @@ -119,6 +126,10 @@ func (o *historyDeleteOpts) validate() error { return nil } +func (o *historyDeleteOpts) jobQueue() chan *JobQItem { + return o.pubnub.jobQueue +} + func (o *historyDeleteOpts) buildPath() (string, error) { return fmt.Sprintf(historyDeletePath, o.pubnub.Config.SubscribeKey, @@ -136,6 +147,8 @@ func (o *historyDeleteOpts) buildQuery() (*url.Values, error) { q.Set("end", strconv.FormatInt(o.End, 10)) } + SetQueryParam(q, o.QueryParam) + return q, nil } diff --git a/history_delete_request_test.go b/history_delete_request_test.go index fb1b9559..a287920f 100644 --- a/history_delete_request_test.go +++ b/history_delete_request_test.go @@ -44,6 +44,42 @@ func TestHistoryDeleteRequestAllParams(t *testing.T) { assert.Equal([]byte{}, body) } +func TestHistoryDeleteRequestQueryParams(t *testing.T) { + assert := assert.New(t) + + opts := &historyDeleteOpts{ + Channel: "ch", + SetStart: true, + SetEnd: true, + Start: int64(123), + End: int64(456), + pubnub: pubnub, + } + + queryParam := map[string]string{ + "q1": "v1", + "q2": "v2", + } + + opts.QueryParam = queryParam + + query, err := opts.buildQuery() + assert.Nil(err) + + expected := &url.Values{} + expected.Set("start", "123") + expected.Set("end", "456") + expected.Set("q1", "v1") + expected.Set("q2", "v2") + + h.AssertQueriesEqual(t, expected, query, []string{"pnsdk", "uuid"}, []string{}) + + body, err := opts.buildBody() + + assert.Nil(err) + assert.Equal([]byte{}, body) +} + func TestNewHistoryDeleteBuilder(t *testing.T) { assert := assert.New(t) diff --git a/history_request.go b/history_request.go index 0d032641..6baad636 100644 --- a/history_request.go +++ b/history_request.go @@ -83,6 +83,13 @@ func (b *historyBuilder) IncludeTimetoken(i bool) *historyBuilder { return b } +// QueryParam accepts a map, the keys and values of the map are passed as the query string parameters of the URL called by the API. +func (b *historyBuilder) QueryParam(queryParam map[string]string) *historyBuilder { + b.opts.QueryParam = queryParam + + return b +} + // Transport sets the Transport for the History request. func (b *historyBuilder) Transport(tr http.RoundTripper) *historyBuilder { b.opts.Transport = tr @@ -104,8 +111,9 @@ type historyOpts struct { Channel string - Start int64 - End int64 + Start int64 + End int64 + QueryParam map[string]string // default: 100 Count int @@ -175,9 +183,15 @@ func (o *historyOpts) buildQuery() (*url.Values, error) { q.Set("reverse", strconv.FormatBool(o.Reverse)) q.Set("include_token", strconv.FormatBool(o.IncludeTimetoken)) + SetQueryParam(q, o.QueryParam) + return q, nil } +func (o *historyOpts) jobQueue() chan *JobQItem { + return o.pubnub.jobQueue +} + func (o *historyOpts) buildBody() ([]byte, error) { return []byte{}, nil } diff --git a/leave_request.go b/leave_request.go index 5fcb9eec..583b3c99 100644 --- a/leave_request.go +++ b/leave_request.go @@ -47,6 +47,13 @@ func (b *leaveBuilder) ChannelGroups(groups []string) *leaveBuilder { return b } +// QueryParam accepts a map, the keys and values of the map are passed as the query string parameters of the URL called by the API. +func (b *leaveBuilder) QueryParam(queryParam map[string]string) *leaveBuilder { + b.opts.QueryParam = queryParam + + return b +} + // Execute runs the Leave request. func (b *leaveBuilder) Execute() (StatusResponse, error) { _, status, err := executeRequest(b.opts) @@ -60,6 +67,7 @@ func (b *leaveBuilder) Execute() (StatusResponse, error) { type leaveOpts struct { Channels []string ChannelGroups []string + QueryParam map[string]string pubnub *PubNub ctx Context @@ -85,6 +93,10 @@ func (o *leaveOpts) buildPath() (string, error) { channels), nil } +func (o *leaveOpts) jobQueue() chan *JobQItem { + return o.pubnub.jobQueue +} + func (o *leaveOpts) buildQuery() (*url.Values, error) { q := defaultQuery(o.pubnub.Config.UUID, o.pubnub.telemetryManager) @@ -92,7 +104,7 @@ func (o *leaveOpts) buildQuery() (*url.Values, error) { channelGroup := utils.JoinChannels(o.ChannelGroups) q.Set("channel-group", string(channelGroup)) } - + SetQueryParam(q, o.QueryParam) return q, nil } diff --git a/leave_request_test.go b/leave_request_test.go index 3df6fdd4..5c6ed8b7 100644 --- a/leave_request_test.go +++ b/leave_request_test.go @@ -63,6 +63,32 @@ func TestLeaveRequestSingleChannelGroup(t *testing.T) { h.AssertQueriesEqual(t, expected, query, []string{"pnsdk", "uuid"}, []string{}) } +func TestLeaveRequestSingleChannelGroupQueryParam(t *testing.T) { + assert := assert.New(t) + + opts := &leaveOpts{ + ChannelGroups: []string{"cg"}, + pubnub: pubnub, + } + queryParam := map[string]string{ + "q1": "v1", + "q2": "v2", + } + + opts.QueryParam = queryParam + + query, err := opts.buildQuery() + assert.Nil(err) + + expected := &url.Values{} + expected.Set("q1", "v1") + expected.Set("q2", "v2") + + expected.Set("channel-group", "cg") + + h.AssertQueriesEqual(t, expected, query, []string{"pnsdk", "uuid"}, []string{}) +} + func TestLeaveRequestMultipleChannelGroups(t *testing.T) { assert := assert.New(t) diff --git a/list_all_channel_group_request.go b/list_all_channel_group_request.go index 38901e72..e3030efb 100644 --- a/list_all_channel_group_request.go +++ b/list_all_channel_group_request.go @@ -49,6 +49,13 @@ func (b *allChannelGroupBuilder) ChannelGroup( return b } +// QueryParam accepts a map, the keys and values of the map are passed as the query string parameters of the URL called by the API. +func (b *allChannelGroupBuilder) QueryParam(queryParam map[string]string) *allChannelGroupBuilder { + b.opts.QueryParam = queryParam + + return b +} + // Execute runs the ListChannelsInChannelGroup request. func (b *allChannelGroupBuilder) Execute() ( *AllChannelGroupResponse, StatusResponse, error) { @@ -64,8 +71,8 @@ type allChannelGroupOpts struct { pubnub *PubNub ChannelGroup string - - Transport http.RoundTripper + QueryParam map[string]string + Transport http.RoundTripper ctx Context } @@ -102,10 +109,14 @@ func (o *allChannelGroupOpts) buildPath() (string, error) { func (o *allChannelGroupOpts) buildQuery() (*url.Values, error) { q := defaultQuery(o.pubnub.Config.UUID, o.pubnub.telemetryManager) - + SetQueryParam(q, o.QueryParam) return q, nil } +func (o *allChannelGroupOpts) jobQueue() chan *JobQItem { + return o.pubnub.jobQueue +} + func (o *allChannelGroupOpts) buildBody() ([]byte, error) { return []byte{}, nil } diff --git a/list_all_channel_group_request_test.go b/list_all_channel_group_request_test.go index f2c2591b..ddd6c872 100644 --- a/list_all_channel_group_request_test.go +++ b/list_all_channel_group_request_test.go @@ -38,6 +38,34 @@ func TestListAllChannelGroupRequestBasic(t *testing.T) { assert.Equal([]byte{}, body) } +func TestListAllChannelGroupRequestBasicQueryParam(t *testing.T) { + assert := assert.New(t) + + opts := &allChannelGroupOpts{ + ChannelGroup: "cg", + pubnub: pubnub, + } + queryParam := map[string]string{ + "q1": "v1", + "q2": "v2", + } + + opts.QueryParam = queryParam + + query, err := opts.buildQuery() + assert.Nil(err) + + expected := &url.Values{} + expected.Set("q1", "v1") + expected.Set("q2", "v2") + + h.AssertQueriesEqual(t, expected, query, []string{"pnsdk", "uuid"}, []string{}) + + body, err := opts.buildBody() + assert.Nil(err) + assert.Equal([]byte{}, body) +} + func TestNewAllChannelGroupBuilder(t *testing.T) { assert := assert.New(t) o := newAllChannelGroupBuilder(pubnub) diff --git a/list_push_provisions_request.go b/list_push_provisions_request.go index 923dabc2..ff67d336 100644 --- a/list_push_provisions_request.go +++ b/list_push_provisions_request.go @@ -56,6 +56,13 @@ func (b *listPushProvisionsRequestBuilder) DeviceIDForPush( return b } +// QueryParam accepts a map, the keys and values of the map are passed as the query string parameters of the URL called by the API. +func (b *listPushProvisionsRequestBuilder) QueryParam(queryParam map[string]string) *listPushProvisionsRequestBuilder { + b.opts.QueryParam = queryParam + + return b +} + // Execute runs the List Push Provisions request. func (b *listPushProvisionsRequestBuilder) Execute() ( *ListPushProvisionsRequestResponse, StatusResponse, error) { @@ -98,8 +105,8 @@ type listPushProvisionsRequestOpts struct { PushType PNPushType DeviceIDForPush string - - Transport http.RoundTripper + QueryParam map[string]string + Transport http.RoundTripper ctx Context } @@ -146,10 +153,14 @@ func (o *listPushProvisionsRequestOpts) buildPath() (string, error) { func (o *listPushProvisionsRequestOpts) buildQuery() (*url.Values, error) { q := defaultQuery(o.pubnub.Config.UUID, o.pubnub.telemetryManager) q.Set("type", o.PushType.String()) - + SetQueryParam(q, o.QueryParam) return q, nil } +func (o *listPushProvisionsRequestOpts) jobQueue() chan *JobQItem { + return o.pubnub.jobQueue +} + func (o *listPushProvisionsRequestOpts) buildBody() ([]byte, error) { return []byte{}, nil } diff --git a/list_push_provisions_request_test.go b/list_push_provisions_request_test.go index bfd7b149..63565231 100644 --- a/list_push_provisions_request_test.go +++ b/list_push_provisions_request_test.go @@ -91,6 +91,29 @@ func TestListPushProvisionsRequestBuildQuery(t *testing.T) { assert.Nil(err) } +func TestListPushProvisionsRequestBuildQueryParam(t *testing.T) { + assert := assert.New(t) + + opts := &listPushProvisionsRequestOpts{ + DeviceIDForPush: "deviceId", + PushType: PNPushTypeAPNS, + pubnub: pubnub, + } + + queryParam := map[string]string{ + "q1": "v1", + "q2": "v2", + } + + opts.QueryParam = queryParam + + u, err := opts.buildQuery() + assert.Equal("apns", u.Get("type")) + assert.Equal("v1", u.Get("q1")) + assert.Equal("v2", u.Get("q2")) + assert.Nil(err) +} + func TestListPushProvisionsRequestBuildBody(t *testing.T) { assert := assert.New(t) diff --git a/publish_request.go b/publish_request.go index 0d55af18..875a3f05 100644 --- a/publish_request.go +++ b/publish_request.go @@ -32,6 +32,7 @@ type publishOpts struct { ShouldStore bool Serialize bool DoNotReplicate bool + QueryParam map[string]string Transport http.RoundTripper @@ -171,6 +172,13 @@ func (b *publishBuilder) Transport(tr http.RoundTripper) *publishBuilder { return b } +// QueryParam accepts a map, the keys and values of the map are passed as the query string parameters of the URL called by the API. +func (b *publishBuilder) QueryParam(queryParam map[string]string) *publishBuilder { + b.opts.QueryParam = queryParam + + return b +} + // Execute runs the Publish request. func (b *publishBuilder) Execute() (*PublishResponse, StatusResponse, error) { rawJSON, status, err := executeRequest(b.opts) @@ -337,13 +345,20 @@ func (o *publishOpts) buildQuery() (*url.Values, error) { o.pubnub.Config.Log.Println("seqn:", seqn) q.Set("seqn", seqn) + SetQueryParam(q, o.QueryParam) + if o.DoNotReplicate == true { q.Set("norep", "true") } + o.pubnub.Config.Log.Println(q) return q, nil } +func (o *publishOpts) jobQueue() chan *JobQItem { + return o.pubnub.jobQueue +} + func (o *publishOpts) buildBody() ([]byte, error) { if o.UsePost { if cipherKey := o.pubnub.Config.CipherKey; cipherKey != "" { diff --git a/publish_request_test.go b/publish_request_test.go index cac055fb..b1316593 100644 --- a/publish_request_test.go +++ b/publish_request_test.go @@ -102,6 +102,43 @@ func AssertSuccessPublishGet2(t *testing.T, expectedString string, message inter } +func AssertSuccessPublishGet3(t *testing.T, expectedString string, message interface{}) { + assert := assert.New(t) + + pn := NewPubNub(NewDemoConfig()) + pn.Config.AuthKey = "a" + + o := newPublishBuilder(pn) + o.Channel("ch") + o.Message(message) + o.TTL(10) + o.ShouldStore(false) + o.DoNotReplicate(true) + queryParam := map[string]string{ + "q1": "v1", + "q2": "v2", + } + + o.QueryParam(queryParam) + + query, err := o.opts.buildQuery() + log.Println(query) + + assert.Nil(err) + expected := &url.Values{} + expected.Set("seqn", "1") + expected.Set("uuid", pn.Config.UUID) + expected.Set("ttl", "10") + expected.Set("pnsdk", Version) + expected.Set("norep", "true") + expected.Set("q1", "v1") + expected.Set("q2", "v2") + + h.AssertQueriesEqual(t, expected, query, + []string{"seqn", "pnsdk", "uuid"}, []string{}) + +} + func AssertSuccessPublishGetAuth(t *testing.T, expectedString string, message interface{}) { assert := assert.New(t) @@ -219,6 +256,7 @@ func TestPublishMixedGet(t *testing.T) { msgMap) AssertSuccessPublishGet2(t, "12", 12) + AssertSuccessPublishGet3(t, "12", 12) AssertSuccessPublishGet2(t, "%22hey%22", "hey") AssertSuccessPublishGet2(t, "true", true) AssertSuccessPublishGet2(t, "%5B%22hey1%22%2C%22hey2%22%2C%22hey3%22%5D", diff --git a/pubnub.go b/pubnub.go index b7ae3548..afd7cbb4 100644 --- a/pubnub.go +++ b/pubnub.go @@ -5,13 +5,14 @@ import ( "io/ioutil" "log" "net/http" + "runtime" "sync" ) // Default constants const ( // Version :the version of the SDK - Version = "4.1.3" + Version = "4.1.4" // MaxSequence for publish messages MaxSequence = 65535 ) @@ -49,6 +50,8 @@ type PubNub struct { telemetryManager *TelemetryManager client *http.Client subscribeClient *http.Client + requestWorkers *RequestWorkers + jobQueue chan *JobQItem ctx Context cancel func() } @@ -156,7 +159,8 @@ func (pn *PubNub) GetClient() *http.Client { pn.Config.NonSubscribeRequestTimeout) } else { pn.client = NewHTTP1Client(pn.Config.ConnectTimeout, - pn.Config.NonSubscribeRequestTimeout) + pn.Config.NonSubscribeRequestTimeout, + pn.Config.MaxIdleConnsPerHost) } } @@ -180,7 +184,7 @@ func (pn *PubNub) GetSubscribeClient() *http.Client { pn.Config.SubscribeRequestTimeout) } else { pn.subscribeClient = NewHTTP1Client(pn.Config.ConnectTimeout, - pn.Config.SubscribeRequestTimeout) + pn.Config.SubscribeRequestTimeout, pn.Config.MaxIdleConnsPerHost) } } @@ -318,7 +322,10 @@ func (pn *PubNub) Destroy() { pn.Config.Log.Println("calling RemoveAllListeners") pn.subscriptionManager.RemoveAllListeners() pn.Config.Log.Println("after RemoveAllListeners") - if pn.telemetryManager.ExitTelemetryManager != nil { + pn.telemetryManager.RLock() + telManagerRunning := pn.telemetryManager.IsRunning + pn.telemetryManager.RUnlock() + if (pn.telemetryManager.ExitTelemetryManager != nil) && (telManagerRunning) { pn.Config.Log.Println("calling exitTelemetryManager") pn.telemetryManager.ExitTelemetryManager <- true pn.Config.Log.Println("after exitTelemetryManager") @@ -347,7 +354,7 @@ func NewPubNub(pnconf *Config) *PubNub { if pnconf.Log == nil { pnconf.Log = log.New(ioutil.Discard, "", log.Ldate|log.Ltime|log.Lshortfile) } - pnconf.Log.Println(fmt.Sprintf("PubNub Go v4 SDK: %s\npnconf: %v", Version, pnconf)) + pnconf.Log.Println(fmt.Sprintf("PubNub Go v4 SDK: %s\npnconf: %v\n%s\n%s\n%s", Version, pnconf, runtime.Version(), runtime.GOARCH, runtime.GOOS)) pn := &PubNub{ Config: pnconf, @@ -357,12 +364,26 @@ func NewPubNub(pnconf *Config) *PubNub { } pn.subscriptionManager = newSubscriptionManager(pn, ctx) - pn.telemetryManager = newTelemetryManager( - pnconf.MaximumLatencyDataAge, ctx) + pn.telemetryManager = newTelemetryManager(pnconf.MaximumLatencyDataAge, ctx) + pn.jobQueue = make(chan *JobQItem) + pn.requestWorkers = pn.newNonSubQueueProcessor(pnconf.MaxWorkers) return pn } +func (pn *PubNub) newNonSubQueueProcessor(maxWorkers int) *RequestWorkers { + workers := make(chan chan *JobQItem, maxWorkers) + + pn.Config.Log.Printf("Init RequestWorkers: workers %d", maxWorkers) + + p := &RequestWorkers{ + Workers: workers, + MaxWorkers: maxWorkers, + } + p.Start(pn) + return p +} + func NewPubNubDemo() *PubNub { return NewPubNub(NewDemoConfig()) } diff --git a/remove_all_push_channels_request.go b/remove_all_push_channels_request.go index b60c37a9..05bc6a4d 100644 --- a/remove_all_push_channels_request.go +++ b/remove_all_push_channels_request.go @@ -52,6 +52,13 @@ func (b *removeAllPushChannelsForDeviceBuilder) DeviceIDForPush( return b } +// QueryParam accepts a map, the keys and values of the map are passed as the query string parameters of the URL called by the API. +func (b *removeAllPushChannelsForDeviceBuilder) QueryParam(queryParam map[string]string) *removeAllPushChannelsForDeviceBuilder { + b.opts.QueryParam = queryParam + + return b +} + // Execute runs the RemoveAllPushNotifications request. func (b *removeAllPushChannelsForDeviceBuilder) Execute() ( *RemoveAllPushChannelsForDeviceResponse, StatusResponse, error) { @@ -66,8 +73,8 @@ func (b *removeAllPushChannelsForDeviceBuilder) Execute() ( type removeAllPushChannelsForDeviceOpts struct { pubnub *PubNub - PushType PNPushType - + PushType PNPushType + QueryParam map[string]string DeviceIDForPush string Transport http.RoundTripper @@ -115,10 +122,14 @@ func (o *removeAllPushChannelsForDeviceOpts) buildPath() (string, error) { func (o *removeAllPushChannelsForDeviceOpts) buildQuery() (*url.Values, error) { q := defaultQuery(o.pubnub.Config.UUID, o.pubnub.telemetryManager) q.Set("type", o.PushType.String()) - + SetQueryParam(q, o.QueryParam) return q, nil } +func (o *removeAllPushChannelsForDeviceOpts) jobQueue() chan *JobQItem { + return o.pubnub.jobQueue +} + func (o *removeAllPushChannelsForDeviceOpts) buildBody() ([]byte, error) { return []byte{}, nil } diff --git a/remove_all_push_channels_request_test.go b/remove_all_push_channels_request_test.go index 8e17e4fe..c107cbfa 100644 --- a/remove_all_push_channels_request_test.go +++ b/remove_all_push_channels_request_test.go @@ -77,6 +77,28 @@ func TestRemoveAllPushNotificationsBuildPath(t *testing.T) { } +func TestRemoveAllPushNotificationsBuildQueryParam(t *testing.T) { + assert := assert.New(t) + queryParam := map[string]string{ + "q1": "v1", + "q2": "v2", + } + + opts := &removeAllPushChannelsForDeviceOpts{ + DeviceIDForPush: "deviceId", + PushType: PNPushTypeAPNS, + pubnub: pubnub, + QueryParam: queryParam, + } + + u, err := opts.buildQuery() + assert.Equal("apns", u.Get("type")) + assert.Equal("v1", u.Get("q1")) + assert.Equal("v2", u.Get("q2")) + + assert.Nil(err) +} + func TestRemoveAllPushNotificationsBuildQuery(t *testing.T) { assert := assert.New(t) @@ -88,6 +110,7 @@ func TestRemoveAllPushNotificationsBuildQuery(t *testing.T) { u, err := opts.buildQuery() assert.Equal("apns", u.Get("type")) + assert.Nil(err) } diff --git a/remove_channel_channel_group_request.go b/remove_channel_channel_group_request.go index 5ec047cd..a9177b50 100644 --- a/remove_channel_channel_group_request.go +++ b/remove_channel_channel_group_request.go @@ -54,6 +54,13 @@ func (b *removeChannelFromChannelGroupBuilder) ChannelGroup( return b } +// QueryParam accepts a map, the keys and values of the map are passed as the query string parameters of the URL called by the API. +func (b *removeChannelFromChannelGroupBuilder) QueryParam(queryParam map[string]string) *removeChannelFromChannelGroupBuilder { + b.opts.QueryParam = queryParam + + return b +} + // Execute runs RemoveChannelFromChannelGroup request func (b *removeChannelFromChannelGroupBuilder) Execute() ( *RemoveChannelFromChannelGroupResponse, StatusResponse, error) { @@ -68,8 +75,8 @@ func (b *removeChannelFromChannelGroupBuilder) Execute() ( type removeChannelOpts struct { pubnub *PubNub - Channels []string - + Channels []string + QueryParam map[string]string ChannelGroup string Transport http.RoundTripper @@ -121,10 +128,14 @@ func (o *removeChannelOpts) buildQuery() (*url.Values, error) { } q.Set("remove", strings.Join(channels, ",")) - + SetQueryParam(q, o.QueryParam) return q, nil } +func (o *removeChannelOpts) jobQueue() chan *JobQItem { + return o.pubnub.jobQueue +} + func (o *removeChannelOpts) buildBody() ([]byte, error) { return []byte{}, nil } diff --git a/remove_channel_channel_group_request_test.go b/remove_channel_channel_group_request_test.go index 9fc07398..d0a65a3e 100644 --- a/remove_channel_channel_group_request_test.go +++ b/remove_channel_channel_group_request_test.go @@ -50,6 +50,36 @@ func TestRemoveChannelRequestBasic(t *testing.T) { assert.Equal([]byte{}, body) } +func TestRemoveChannelRequestBasicQueryParam(t *testing.T) { + assert := assert.New(t) + queryParam := map[string]string{ + "q1": "v1", + "q2": "v2", + } + + opts := &removeChannelOpts{ + Channels: []string{"ch1", "ch2", "ch3"}, + ChannelGroup: "cg", + pubnub: pubnub, + QueryParam: queryParam, + } + + query, err := opts.buildQuery() + assert.Nil(err) + + expected := &url.Values{} + expected.Set("remove", "ch1,ch2,ch3") + expected.Set("q1", "v1") + expected.Set("q2", "v2") + + h.AssertQueriesEqual(t, expected, query, []string{"pnsdk", "uuid"}, []string{}) + + body, err := opts.buildBody() + + assert.Nil(err) + assert.Equal([]byte{}, body) +} + func TestNewRemoveChannelFromChannelGroupBuilder(t *testing.T) { assert := assert.New(t) o := newRemoveChannelFromChannelGroupBuilder(pubnub) diff --git a/remove_channels_from_push_request.go b/remove_channels_from_push_request.go index b424b497..9f84e5c0 100644 --- a/remove_channels_from_push_request.go +++ b/remove_channels_from_push_request.go @@ -60,6 +60,13 @@ func (b *removeChannelsFromPushBuilder) DeviceIDForPush( return b } +// QueryParam accepts a map, the keys and values of the map are passed as the query string parameters of the URL called by the API. +func (b *removeChannelsFromPushBuilder) QueryParam(queryParam map[string]string) *removeChannelsFromPushBuilder { + b.opts.QueryParam = queryParam + + return b +} + // Execute runs the RemovePushNotificationsFromChannels request. func (b *removeChannelsFromPushBuilder) Execute() ( *RemoveChannelsFromPushResponse, StatusResponse, error) { @@ -74,10 +81,9 @@ func (b *removeChannelsFromPushBuilder) Execute() ( type removeChannelsFromPushOpts struct { pubnub *PubNub - Channels []string - - PushType PNPushType - + Channels []string + QueryParam map[string]string + PushType PNPushType DeviceIDForPush string Transport http.RoundTripper @@ -136,10 +142,14 @@ func (o *removeChannelsFromPushOpts) buildQuery() (*url.Values, error) { } q.Set("remove", strings.Join(channels, ",")) - + SetQueryParam(q, o.QueryParam) return q, nil } +func (o *removeChannelsFromPushOpts) jobQueue() chan *JobQItem { + return o.pubnub.jobQueue +} + func (o *removeChannelsFromPushOpts) buildBody() ([]byte, error) { return []byte{}, nil } diff --git a/remove_channels_from_push_request_test.go b/remove_channels_from_push_request_test.go index 6d65461f..2f61279a 100644 --- a/remove_channels_from_push_request_test.go +++ b/remove_channels_from_push_request_test.go @@ -64,6 +64,30 @@ func TestRemoveChannelsFromPushRequestBuildPath(t *testing.T) { } +func TestRemoveChannelsFromPushRequestBuildQueryParam(t *testing.T) { + assert := assert.New(t) + queryParam := map[string]string{ + "q1": "v1", + "q2": "v2", + } + + opts := &removeChannelsFromPushOpts{ + Channels: []string{"ch1", "ch2", "ch3"}, + DeviceIDForPush: "deviceId", + PushType: PNPushTypeAPNS, + pubnub: pubnub, + QueryParam: queryParam, + } + + u, err := opts.buildQuery() + assert.Equal("ch1,ch2,ch3", u.Get("remove")) + assert.Equal("apns", u.Get("type")) + assert.Equal("v1", u.Get("q1")) + assert.Equal("v2", u.Get("q2")) + + assert.Nil(err) +} + func TestRemoveChannelsFromPushRequestBuildQuery(t *testing.T) { assert := assert.New(t) @@ -77,6 +101,7 @@ func TestRemoveChannelsFromPushRequestBuildQuery(t *testing.T) { u, err := opts.buildQuery() assert.Equal("ch1,ch2,ch3", u.Get("remove")) assert.Equal("apns", u.Get("type")) + assert.Nil(err) } diff --git a/request.go b/request.go index 30f6ff0e..21fe7f3d 100644 --- a/request.go +++ b/request.go @@ -2,6 +2,7 @@ package pubnub import ( "bytes" + "fmt" "github.com/pubnub/go/pnerr" "io" "io/ioutil" @@ -37,6 +38,15 @@ type ResponseInfo struct { OriginalResponse *http.Response } +func addToJobQ(req *http.Request, client *http.Client, opts endpointOpts, j chan *JobQResponse) { + jqi := &JobQItem{ + Req: req, + Client: client, + JobResponse: j, + } + opts.jobQueue() <- jqi +} + func executeRequest(opts endpointOpts) ([]byte, StatusResponse, error) { err := opts.validate() @@ -56,8 +66,7 @@ func executeRequest(opts endpointOpts) ([]byte, StatusResponse, error) { err } - opts.config().Log.Println(url) - opts.config().Log.Println(opts.httpMethod()) + opts.config().Log.Println(fmt.Sprintf("url:%s\nmethod:%s", url, opts.httpMethod())) var req *http.Request @@ -95,7 +104,26 @@ func executeRequest(opts endpointOpts) ([]byte, StatusResponse, error) { client := opts.client() startTimestamp := time.Now() - res, err := client.Do(req) + + var res *http.Response + runRequestWorker := false + + switch opts.operationType() { + case PNPublishOperation, PNAccessManagerGrant: + runRequestWorker = true + } + + if runRequestWorker && opts.config().MaxWorkers > 0 { + j := make(chan *JobQResponse) + go addToJobQ(req, client, opts, j) + jr := <-j + close(j) + res = jr.Resp + err = jr.Error + } else { + res, err = client.Do(req) + } + // Host lookup failed if err != nil { opts.config().Log.Println("err.Error()", err.Error()) diff --git a/request_workers.go b/request_workers.go new file mode 100644 index 00000000..36b9ced2 --- /dev/null +++ b/request_workers.go @@ -0,0 +1,81 @@ +package pubnub + +import "net/http" + +type nonSubMsgType int + +const ( + messageTypePublish nonSubMsgType = 1 << iota + messageTypePAM +) + +type JobQResponse struct { + Resp *http.Response + Error error +} + +type JobQItem struct { + Req *http.Request + Client *http.Client + JobResponse chan *JobQResponse +} + +type RequestWorkers struct { + Workers chan chan *JobQItem + MaxWorkers int + Sem chan bool +} + +type Worker struct { + Workers chan chan *JobQItem + JobChannel chan *JobQItem + id int +} + +func NewRequestWorkers(workers chan chan *JobQItem, id int) Worker { + return Worker{ + Workers: workers, + JobChannel: make(chan *JobQItem), + id: id, + } +} + +func (pw Worker) Process(pubnub *PubNub) { + go func() { + for { + pw.Workers <- pw.JobChannel + job := <-pw.JobChannel + res, err := job.Client.Do(job.Req) + jqr := &JobQResponse{ + Error: err, + Resp: res, + } + job.JobResponse <- jqr + } + }() +} + +func (p *RequestWorkers) Start(pubnub *PubNub) { + pubnub.Config.Log.Println("Start: Running with workers ", p.MaxWorkers) + for i := 0; i < p.MaxWorkers; i++ { + pubnub.Config.Log.Println("Start: StartNonSubWorker ", i) + worker := NewRequestWorkers(p.Workers, i) + worker.Process(pubnub) + } + go p.ReadQueue(pubnub) +} + +func (p *RequestWorkers) ReadQueue(pubnub *PubNub) { + for job := range pubnub.jobQueue { + pubnub.Config.Log.Println("ReadQueue: Got job for channel ", job.Req) + go func(job *JobQItem) { + jobChannel := <-p.Workers + jobChannel <- job + }(job) + } + pubnub.Config.Log.Println("ReadQueue: Exit") +} + +func (p *RequestWorkers) Close() { + close(p.Workers) +} diff --git a/set_state_request.go b/set_state_request.go index dc6a2d9f..1b577a8a 100644 --- a/set_state_request.go +++ b/set_state_request.go @@ -59,6 +59,13 @@ func (b *setStateBuilder) ChannelGroups(groups []string) *setStateBuilder { return b } +// QueryParam accepts a map, the keys and values of the map are passed as the query string parameters of the URL called by the API. +func (b *setStateBuilder) QueryParam(queryParam map[string]string) *setStateBuilder { + b.opts.QueryParam = queryParam + + return b +} + // Execute runs the the Set State request and returns the SetStateResponse func (b *setStateBuilder) Execute() (*SetStateResponse, StatusResponse, error) { stateOperation := StateOperation{} @@ -80,10 +87,10 @@ type setStateOpts struct { State map[string]interface{} Channels []string ChannelGroups []string - - pubnub *PubNub - stringState string - ctx Context + QueryParam map[string]string + pubnub *PubNub + stringState string + ctx Context } func (o *setStateOpts) config() Config { @@ -144,10 +151,14 @@ func (o *setStateOpts) buildQuery() (*url.Values, error) { if len(o.ChannelGroups) > 0 { q.Set("channel-group", string(groups)) } - + SetQueryParam(q, o.QueryParam) return q, nil } +func (o *setStateOpts) jobQueue() chan *JobQItem { + return o.pubnub.jobQueue +} + func (o *setStateOpts) buildBody() ([]byte, error) { return []byte{}, nil } diff --git a/set_state_request_test.go b/set_state_request_test.go index 46fcb72f..0acf00ec 100644 --- a/set_state_request_test.go +++ b/set_state_request_test.go @@ -142,6 +142,31 @@ func TestSetStateMultipleChannelGroups(t *testing.T) { h.AssertQueriesEqual(t, expected, query, []string{"pnsdk", "uuid"}, []string{}) } +func TestSetStateMultipleChannelGroupsQueryParam(t *testing.T) { + assert := assert.New(t) + + opts := &setStateOpts{ + ChannelGroups: []string{"cg1", "cg2", "cg3"}, + pubnub: pubnub, + } + queryParam := map[string]string{ + "q1": "v1", + "q2": "v2", + } + + opts.QueryParam = queryParam + + query, err := opts.buildQuery() + assert.Nil(err) + + expected := &url.Values{} + expected.Set("channel-group", "cg1,cg2,cg3") + expected.Set("q1", "v1") + expected.Set("q2", "v2") + + h.AssertQueriesEqual(t, expected, query, []string{"pnsdk", "uuid"}, []string{}) +} + func TestSetStateValidateSubscribeKey(t *testing.T) { assert := assert.New(t) pn := NewPubNub(NewDemoConfig()) diff --git a/subscribe_request.go b/subscribe_request.go index f48afa0d..749411e7 100644 --- a/subscribe_request.go +++ b/subscribe_request.go @@ -15,9 +15,9 @@ const subscribePath = "/v2/subscribe/%s/%s/0" type subscribeOpts struct { pubnub *PubNub - Channels []string - ChannelGroups []string - + Channels []string + ChannelGroups []string + QueryParam map[string]string Heartbeat int Region string Timetoken int64 @@ -86,6 +86,13 @@ func (b *subscribeBuilder) State(state map[string]interface{}) *subscribeBuilder return b } +// QueryParam accepts a map, the keys and values of the map are passed as the query string parameters of the URL called by the API. +func (b *subscribeBuilder) QueryParam(queryParam map[string]string) *subscribeBuilder { + b.operation.QueryParam = queryParam + + return b +} + // Execute runs the Subscribe operation. func (b *subscribeBuilder) Execute() { b.opts.pubnub.subscriptionManager.adaptSubscribe(b.operation) @@ -164,10 +171,15 @@ func (o *subscribeOpts) buildQuery() (*url.Values, error) { if o.stringState != "" { q.Set("state", o.stringState) } + SetQueryParam(q, o.QueryParam) return q, nil } +func (o *subscribeOpts) jobQueue() chan *JobQItem { + return o.pubnub.jobQueue +} + func (o *subscribeOpts) buildBody() ([]byte, error) { return []byte{}, nil } diff --git a/subscribe_request_test.go b/subscribe_request_test.go index 9a20cabc..7ff23bf6 100644 --- a/subscribe_request_test.go +++ b/subscribe_request_test.go @@ -101,6 +101,39 @@ func TestSubscribeMixedParams(t *testing.T) { []string{"pnsdk", "uuid"}, []string{}) } +func TestSubscribeMixedQueryParams(t *testing.T) { + assert := assert.New(t) + + opts := &subscribeOpts{ + Channels: []string{"ch"}, + ChannelGroups: []string{"cg"}, + Region: "us-east-1", + Timetoken: 123, + FilterExpression: "abc", + pubnub: pubnub, + } + queryParam := map[string]string{ + "q1": "v1", + "q2": "v2", + } + + opts.QueryParam = queryParam + + query, err := opts.buildQuery() + assert.Nil(err) + + expected := &url.Values{} + expected.Set("channel-group", "cg") + expected.Set("tr", "us-east-1") + expected.Set("filter-expr", "abc") + expected.Set("tt", "123") + expected.Set("q1", "v1") + expected.Set("q2", "v2") + + h.AssertQueriesEqual(t, expected, query, + []string{"pnsdk", "uuid"}, []string{}) +} + func TestSubscribeValidateSubscribeKey(t *testing.T) { assert := assert.New(t) pn := NewPubNub(NewDemoConfig()) diff --git a/subscription_manager.go b/subscription_manager.go index 291c6113..f14f04fb 100644 --- a/subscription_manager.go +++ b/subscription_manager.go @@ -71,6 +71,8 @@ type SubscriptionManager struct { heartbeatStopCalled bool exitSubscriptionManagerMutex sync.Mutex exitSubscriptionManager chan bool + queryParam map[string]string + channelsOpen bool } // SubscribeOperation @@ -81,11 +83,13 @@ type SubscribeOperation struct { Timetoken int64 FilterExpression string State map[string]interface{} + QueryParam map[string]string } type UnsubscribeOperation struct { Channels []string ChannelGroups []string + QueryParam map[string]string } type StateOperation struct { @@ -109,7 +113,7 @@ func newSubscriptionManager(pubnub *PubNub, ctx Context) *SubscriptionManager { manager.ctx, manager.subscribeCancel = contextWithCancel(backgroundContext) manager.messages = make(chan subscribeMessage, 1000) manager.reconnectionManager = newReconnectionManager(pubnub) - + manager.channelsOpen = true manager.Unlock() if manager.pubnub.Config.PNReconnectionPolicy != PNNonePolicy { @@ -166,14 +170,19 @@ func newSubscriptionManager(pubnub *PubNub, ctx Context) *SubscriptionManager { func (m *SubscriptionManager) Destroy() { m.subscribeCancel() - if m.exitSubscriptionManager != nil { - close(m.exitSubscriptionManager) - } - if m.listenerManager.exitListener != nil { - close(m.listenerManager.exitListener) - } - if m.reconnectionManager.exitReconnectionManager != nil { - close(m.reconnectionManager.exitReconnectionManager) + if m.channelsOpen { + m.RLock() + m.channelsOpen = false + m.RUnlock() + if m.exitSubscriptionManager != nil { + close(m.exitSubscriptionManager) + } + if m.listenerManager.exitListener != nil { + close(m.listenerManager.exitListener) + } + if m.reconnectionManager.exitReconnectionManager != nil { + close(m.reconnectionManager.exitReconnectionManager) + } } } @@ -190,6 +199,7 @@ func (m *SubscriptionManager) adaptSubscribe( m.Lock() m.subscriptionStateAnnounced = false + m.queryParam = subscribeOperation.QueryParam if subscribeOperation.Timetoken != 0 { m.timetoken = subscribeOperation.Timetoken @@ -220,7 +230,7 @@ func (m *SubscriptionManager) adaptUnsubscribe( announceAck := false if !m.pubnub.Config.SuppressLeaveEvents { _, err := m.pubnub.Leave().Channels(unsubscribeOperation.Channels). - ChannelGroups(unsubscribeOperation.ChannelGroups).Execute() + ChannelGroups(unsubscribeOperation.ChannelGroups).QueryParam(unsubscribeOperation.QueryParam).Execute() if err != nil { pnStatus := &PNStatus{ @@ -305,6 +315,7 @@ func (m *SubscriptionManager) startSubscribeLoop() { Heartbeat: m.pubnub.Config.PresenceTimeout, FilterExpression: m.pubnub.Config.FilterExpression, ctx: ctx, + QueryParam: m.queryParam, } if s := m.stateManager.createStatePayload(); len(s) > 0 { diff --git a/telemetry_manager.go b/telemetry_manager.go index 07d9c607..80f4265e 100644 --- a/telemetry_manager.go +++ b/telemetry_manager.go @@ -34,6 +34,7 @@ type TelemetryManager struct { maxLatencyDataAge int ExitTelemetryManager chan bool + IsRunning bool } func newTelemetryManager(maxLatencyDataAge int, ctx Context) *TelemetryManager { @@ -118,16 +119,24 @@ func (m *TelemetryManager) startCleanUpTimer() { go func() { for { - m.RLock() + m.Lock() + m.IsRunning = true + m.Unlock() timerCh := m.cleanUpTimer.C - m.RUnlock() + select { case <-timerCh: m.CleanUpTelemetryData() case <-m.ctx.Done(): + m.Lock() + m.IsRunning = false + m.Unlock() m.cleanUpTimer.Stop() return case <-m.ExitTelemetryManager: + m.Lock() + m.IsRunning = false + m.Unlock() m.cleanUpTimer.Stop() return } diff --git a/tests/e2e/add_channel_channel_group_test.go b/tests/e2e/add_channel_channel_group_test.go index ffb59b77..64a713ad 100644 --- a/tests/e2e/add_channel_channel_group_test.go +++ b/tests/e2e/add_channel_channel_group_test.go @@ -78,9 +78,15 @@ func TestAddChannelToChannelGroupSuperCall(t *testing.T) { pn := pubnub.NewPubNub(config) + queryParam := map[string]string{ + "q1": "v1", + "q2": "v2", + } + _, _, err := pn.AddChannelToChannelGroup(). Channels([]string{channelCharacters}). ChannelGroup(validCharacters). + QueryParam(queryParam). Execute() //fmt.Println(err.Error()) assert.Nil(err) @@ -89,12 +95,13 @@ func TestAddChannelToChannelGroupSuperCall(t *testing.T) { func TestAddChannelToChannelGroupSuccessAdded(t *testing.T) { assert := assert.New(t) pn := pubnub.NewPubNub(configCopy()) + //pn.Config.Log = log.New(os.Stdout, "", log.Ldate|log.Ltime|log.Lshortfile) interceptor := stubs.NewInterceptor() interceptor.AddStub(&stubs.Stub{ Method: "GET", Path: "/v1/channel-registration/sub-key/sub-c-e41d50d4-43ce-11e8-a433-9e6b275e7b64/channel-group/my-unique-group", - Query: "add=my-channel", + Query: "add=my-channel&q1=v1&q2=v2", ResponseBody: "{\"status\": 200, \"message\": \"OK\", \"service\": \"channel-registry\", \"error\": \"false\"}", IgnoreQueryKeys: []string{"uuid", "pnsdk", "l_cg"}, ResponseStatusCode: 200, @@ -102,7 +109,7 @@ func TestAddChannelToChannelGroupSuccessAdded(t *testing.T) { interceptor.AddStub(&stubs.Stub{ Method: "GET", Path: "/v1/channel-registration/sub-key/sub-c-e41d50d4-43ce-11e8-a433-9e6b275e7b64/channel-group/my-unique-group", - Query: "", + Query: "q1=v1&q2=v2", ResponseBody: "{\"status\": \"200\", \"payload\": {\"channels\": [\"my-channel\"], \"group\": \"my-unique-group\"}, \"service\": \"channel-registry\", \"error\": \"false\"}", IgnoreQueryKeys: []string{"uuid", "pnsdk", "l_cg"}, ResponseStatusCode: 200, @@ -111,18 +118,22 @@ func TestAddChannelToChannelGroupSuccessAdded(t *testing.T) { myChannel := "my-channel" myGroup := "my-unique-group" - //pn.Config.Log = log.New(os.Stdout, "", log.Ldate|log.Ltime|log.Lshortfile) + queryParam := map[string]string{ + "q1": "v1", + "q2": "v2", + } + pn.SetClient(interceptor.GetClient()) _, _, err := pn.AddChannelToChannelGroup(). Channels([]string{myChannel}). - ChannelGroup(myGroup). + ChannelGroup(myGroup).QueryParam(queryParam). Execute() - //fmt.Println(err.Error()) + assert.Nil(err) res, _, err := pn.ListChannelsInChannelGroup(). - ChannelGroup(myGroup). + ChannelGroup(myGroup).QueryParam(queryParam). Execute() assert.Nil(err) diff --git a/tests/e2e/add_channels_to_push_request_test.go b/tests/e2e/add_channels_to_push_request_test.go index b9f71e4d..d7f97ee2 100644 --- a/tests/e2e/add_channels_to_push_request_test.go +++ b/tests/e2e/add_channels_to_push_request_test.go @@ -29,12 +29,34 @@ func TestAddChannelToPushNotStubbedContext(t *testing.T) { assert := assert.New(t) pn := pubnub.NewPubNub(configCopy()) + + //pn.Config.Log = log.New(os.Stdout, "", log.Ldate|log.Ltime|log.Lshortfile) + + _, _, err := pn.AddPushNotificationsOnChannelsWithContext(backgroundContext). + Channels([]string{"ch1"}). + DeviceIDForPush("cg1"). + PushType(pubnub.PNPushTypeGCM). + Execute() + //fmt.Println(err.Error()) + assert.Nil(err) +} + +func TestAddChannelToPushNotStubbedContextWithQueryParam(t *testing.T) { + assert := assert.New(t) + + pn := pubnub.NewPubNub(configCopy()) + + queryParam := map[string]string{ + "q1": "v1", + "q2": "v2", + } //pn.Config.Log = log.New(os.Stdout, "", log.Ldate|log.Ltime|log.Lshortfile) _, _, err := pn.AddPushNotificationsOnChannelsWithContext(backgroundContext). Channels([]string{"ch1"}). DeviceIDForPush("cg1"). PushType(pubnub.PNPushTypeGCM). + QueryParam(queryParam). Execute() //fmt.Println(err.Error()) assert.Nil(err) diff --git a/tests/e2e/delete_channel_group_test.go b/tests/e2e/delete_channel_group_test.go index d7ac9419..99acd0c0 100644 --- a/tests/e2e/delete_channel_group_test.go +++ b/tests/e2e/delete_channel_group_test.go @@ -58,8 +58,14 @@ func TestRemoveChannelGroupSuperCall(t *testing.T) { pn := pubnub.NewPubNub(config) + queryParam := map[string]string{ + "q1": "v1", + "q2": "v2", + } + _, _, err := pn.DeleteChannelGroup(). ChannelGroup(validCharacters). + QueryParam(queryParam). Execute() assert.Nil(err) @@ -82,7 +88,7 @@ func TestRemoveChannelGroupSuccessRemoved(t *testing.T) { interceptor.AddStub(&stubs.Stub{ Method: "GET", Path: "/v1/channel-registration/sub-key/sub-c-e41d50d4-43ce-11e8-a433-9e6b275e7b64/channel-group/my-unique-group-remove", - Query: "remove=my-channel-remove", + Query: "remove=my-channel-remove&q1=v1&q2=v2", ResponseBody: `{"status": 200, "message": "OK", "service": "channel-registry", "error": false}`, IgnoreQueryKeys: []string{"uuid", "pnsdk", "l_cg"}, ResponseStatusCode: 200, @@ -106,9 +112,15 @@ func TestRemoveChannelGroupSuccessRemoved(t *testing.T) { assert.Nil(err) + queryParam := map[string]string{ + "q1": "v1", + "q2": "v2", + } + _, _, err = pn.RemoveChannelFromChannelGroup(). Channels([]string{myChannel}). ChannelGroup(myGroup). + QueryParam(queryParam). Execute() assert.Nil(err) diff --git a/tests/e2e/fetch_test.go b/tests/e2e/fetch_test.go index cdc09f37..24f1c84d 100644 --- a/tests/e2e/fetch_test.go +++ b/tests/e2e/fetch_test.go @@ -58,6 +58,11 @@ func TestFetch(t *testing.T) { Execute() assert.Nil(err) + queryParam := map[string]string{ + "q1": "v1", + "q2": "v2", + } + MatchFetchMessages(ret, 0, ch1, ch2, assert) ret1, _, err1 := pn.FetchWithContext(backgroundContext). @@ -66,6 +71,7 @@ func TestFetch(t *testing.T) { Reverse(reverse). Start(timestamp1). End(timestamp2). + QueryParam(queryParam). Execute() assert.Nil(err1) diff --git a/tests/e2e/get_state_test.go b/tests/e2e/get_state_test.go index cf07916d..f3be4d49 100644 --- a/tests/e2e/get_state_test.go +++ b/tests/e2e/get_state_test.go @@ -108,3 +108,60 @@ func TestGetStateSucess(t *testing.T) { assert.Fail(fmt.Sprintf("!map[string]interface{} ")) } } + +func TestGetStateSucessQueryParam(t *testing.T) { + assert := assert.New(t) + + interceptor := stubs.NewInterceptor() + interceptor.AddStub(&stubs.Stub{ + Method: "GET", + Path: "/v2/presence/sub-key/sub-c-e41d50d4-43ce-11e8-a433-9e6b275e7b64/channel/ch/uuid/" + config.UUID + "/data", + Query: "state=%7B%22age%22%3A%2220%22%2C%22name%22%3A%22John%20Doe%22%7D&q1=v1&q2=v2", + ResponseBody: `{"status": 200, "message": "OK", "payload": {"age": "20", "name": "John Doe"}, "service": "Presence"}`, + IgnoreQueryKeys: []string{"uuid", "pnsdk", "l_pres"}, + ResponseStatusCode: 200, + }) + interceptor.AddStub(&stubs.Stub{ + Method: "GET", + Path: "/v2/presence/sub-key/sub-c-e41d50d4-43ce-11e8-a433-9e6b275e7b64/channel/ch/uuid/" + config.UUID, + Query: "q1=v1&q2=v2", + ResponseBody: `{"status": 200, "message": "OK", "payload": {"age": "20", "name": "John Doe"}, "uuid": "bb45300a-25fb-4b14-8de1-388393274a54", "channel": "ch", "service": "Presence"}`, + IgnoreQueryKeys: []string{"uuid", "pnsdk", "channel-group", "l_pres"}, + ResponseStatusCode: 200, + }) + + pn := pubnub.NewPubNub(config) + pn.SetClient(interceptor.GetClient()) + + queryParam := map[string]string{ + "q1": "v1", + "q2": "v2", + } + + state := make(map[string]interface{}) + state["age"] = "20" + state["name"] = "John Doe" + + _, _, err := pn.SetState(). + State(state). + Channels([]string{"ch"}). + QueryParam(queryParam). + Execute() + + assert.Nil(err) + + res, _, err := pn.GetState(). + Channels([]string{"ch"}). + QueryParam(queryParam). + Execute() + + assert.Nil(err) + //fmt.Println(res.State) + if s, ok := res.State["ch"].(map[string]interface{}); ok { + assert.Equal("20", s["age"]) + assert.Equal("John Doe", s["name"]) + + } else { + assert.Fail(fmt.Sprintf("!map[string]interface{} ")) + } +} diff --git a/tests/e2e/grant_test.go b/tests/e2e/grant_test.go index 9447d5cf..159f78a0 100644 --- a/tests/e2e/grant_test.go +++ b/tests/e2e/grant_test.go @@ -110,7 +110,80 @@ func TestGrantSucccessNotStubbed(t *testing.T) { Execute() assert.Nil(err) + log.Println(res) assert.NotNil(res) + + assert.True(res.Channels["ch2"].AuthKeys["pam-key"].WriteEnabled) + assert.True(res.Channels["ch2"].AuthKeys["pam-key"].ReadEnabled) + assert.True(res.Channels["ch2"].AuthKeys["pam-key"].ManageEnabled) + assert.True(!res.Channels["ch2"].AuthKeys["pam-key"].DeleteEnabled) + +} + +func TestGrantSucccessAppLevelFalse(t *testing.T) { + assert := assert.New(t) + + pn := pubnub.NewPubNub(pamConfigCopy()) + + pn.Config.UUID = "asd,|//&aqwe" + + res, _, err := pn.Grant(). + Read(false).Write(false).Manage(false).Delete(false). + Execute() + + assert.Nil(err) + log.Println(res) + assert.NotNil(res) + + assert.True(!res.WriteEnabled) + assert.True(!res.ReadEnabled) + assert.True(!res.ManageEnabled) + assert.True(!res.DeleteEnabled) + +} + +func TestGrantSucccessAppLevelMixed(t *testing.T) { + assert := assert.New(t) + + pn := pubnub.NewPubNub(pamConfigCopy()) + + pn.Config.UUID = "asd,|//&aqwe" + + res, _, err := pn.Grant(). + Read(false).Write(true).Manage(false).Delete(true). + Execute() + + assert.Nil(err) + log.Println(res) + assert.NotNil(res) + + assert.True(res.WriteEnabled) + assert.True(!res.ReadEnabled) + assert.True(!res.ManageEnabled) + assert.True(res.DeleteEnabled) + +} + +func TestGrantSucccessAppLevelMixed2(t *testing.T) { + assert := assert.New(t) + + pn := pubnub.NewPubNub(pamConfigCopy()) + + pn.Config.UUID = "asd,|//&aqwe" + + res, _, err := pn.Grant(). + Read(true).Write(false).Manage(true).Delete(false). + Execute() + + assert.Nil(err) + log.Println(res) + assert.NotNil(res) + + assert.True(!res.WriteEnabled) + assert.True(res.ReadEnabled) + assert.True(res.ManageEnabled) + assert.True(!res.DeleteEnabled) + } func TestGrantSucccessNotStubbedContext(t *testing.T) { @@ -136,7 +209,7 @@ func TestGrantMultipleMixed(t *testing.T) { interceptor.AddStub(&stubs.Stub{ Method: "GET", Path: "/v1/auth/grant/sub-key/sub-c-b9ab9508-43cf-11e8-9967-869954283fb4", - Query: "auth=my-auth-key-1%2Cmy-auth-key-2&channel=ch1%2Cch2%2Cch3&channel-group=cg1%2Ccg2%2Ccg3&r=1&m=1&w=1", + Query: "auth=my-auth-key-1%2Cmy-auth-key-2&channel=ch1%2Cch2%2Cch3&channel-group=cg1%2Ccg2%2Ccg3&r=1&m=1&w=1&d=0", ResponseBody: `{"message":"Success","payload":{"level":"channel-group+auth","subscribe_key":"sub-c-b9ab9508-43cf-11e8-9967-869954283fb4","ttl":1440,"channels":{"ch1":{"auths":{"my-auth-key-1":{"r":1,"w":1,"m":1,"d":0},"my-auth-key-2":{"r":1,"w":1,"m":1,"d":0}}},"ch2":{"auths":{"my-auth-key-1":{"r":1,"w":1,"m":1,"d":0},"my-auth-key-2":{"r":1,"w":1,"m":1,"d":0}}},"ch3":{"auths":{"my-auth-key-1":{"r":1,"w":1,"m":1,"d":0},"my-auth-key-2":{"r":1,"w":1,"m":1,"d":0}}}},"channel-groups":{"cg1":{"auths":{"my-auth-key-1":{"r":1,"w":1,"m":1,"d":0},"my-auth-key-2":{"r":1,"w":1,"m":1,"d":0}}},"cg2":{"auths":{"my-auth-key-1":{"r":1,"w":1,"m":1,"d":0},"my-auth-key-2":{"r":1,"w":1,"m":1,"d":0}}},"cg3":{"auths":{"my-auth-key-1":{"r":1,"w":1,"m":1,"d":0},"my-auth-key-2":{"r":1,"w":1,"m":1,"d":0}}}}},"service":"Access Manager","status":200}`, IgnoreQueryKeys: []string{"uuid", "pnsdk", "timestamp", "signature"}, ResponseStatusCode: 200, @@ -144,6 +217,7 @@ func TestGrantMultipleMixed(t *testing.T) { pn := pubnub.NewPubNub(pamConfigCopy()) pn.SetClient(interceptor.GetClient()) + pn.Config.Log = log.New(os.Stdout, "", log.Ldate|log.Ltime|log.Lshortfile) res, _, err := pn.Grant(). Read(true).Write(true).Manage(true). @@ -163,7 +237,7 @@ func TestGrantSingleChannel(t *testing.T) { interceptor.AddStub(&stubs.Stub{ Method: "GET", Path: "/v1/auth/grant/sub-key/sub-c-b9ab9508-43cf-11e8-9967-869954283fb4", - Query: "channel=ch1&m=0&r=1&w=1", + Query: "channel=ch1&m=0&r=1&w=1&d=0", ResponseBody: `{"message":"Success","payload":{"level":"channel","subscribe_key":"sub-c-b9ab9508-43cf-11e8-9967-869954283fb4","ttl":1440,"channels":{"ch1":{"r":1,"w":1,"m":0,"d":0}}},"service":"Access Manager","status":200}`, IgnoreQueryKeys: []string{"uuid", "pnsdk", "signature", "timestamp"}, ResponseStatusCode: 200, @@ -171,6 +245,7 @@ func TestGrantSingleChannel(t *testing.T) { pn := pubnub.NewPubNub(pamConfigCopy()) pn.SetClient(interceptor.GetClient()) + pn.Config.Log = log.New(os.Stdout, "", log.Ldate|log.Ltime|log.Lshortfile) res, _, err := pn.Grant(). Read(true).Write(true). @@ -191,7 +266,7 @@ func TestGrantSingleChannelWithAuth(t *testing.T) { interceptor.AddStub(&stubs.Stub{ Method: "GET", Path: "/v1/auth/grant/sub-key/sub-c-b9ab9508-43cf-11e8-9967-869954283fb4", - Query: "auth=my-pam-key&channel=ch1&m=0&r=1&w=1", + Query: "auth=my-pam-key&channel=ch1&m=0&r=1&w=1&d=0", ResponseBody: `{"message":"Success","payload":{"level":"user","subscribe_key":"sub-c-b9ab9508-43cf-11e8-9967-869954283fb4","ttl":1440,"channel":"ch1","auths":{"my-pam-key":{"r":1,"w":1,"m":0,"d":0}}},"service":"Access Manager","status":200}`, IgnoreQueryKeys: []string{"uuid", "pnsdk", "signature", "timestamp"}, ResponseStatusCode: 200, @@ -220,7 +295,7 @@ func TestGrantMultipleChannels(t *testing.T) { interceptor.AddStub(&stubs.Stub{ Method: "GET", Path: "/v1/auth/grant/sub-key/sub-c-b9ab9508-43cf-11e8-9967-869954283fb4", - Query: "channel=ch1%2Cch2&m=0&r=1&w=1", + Query: "channel=ch1%2Cch2&m=0&r=1&w=1&d=0", ResponseBody: `{"message":"Success","payload":{"level":"channel","subscribe_key":"sub-c-b9ab9508-43cf-11e8-9967-869954283fb4","ttl":1440,"channels":{"ch1":{"r":1,"w":1,"m":0,"d":0},"ch2":{"r":1,"w":1,"m":0,"d":0}}},"service":"Access Manager","status":200}`, IgnoreQueryKeys: []string{"uuid", "pnsdk", "signature", "timestamp"}, ResponseStatusCode: 200, @@ -253,7 +328,7 @@ func TestGrantMultipleChannelsWithAuth(t *testing.T) { interceptor.AddStub(&stubs.Stub{ Method: "GET", Path: "/v1/auth/grant/sub-key/sub-c-b9ab9508-43cf-11e8-9967-869954283fb4", - Query: "auth=my-pam-key&channel=ch1%2Cch2&m=0&r=1&w=1", + Query: "auth=my-pam-key&channel=ch1%2Cch2&m=0&r=1&w=1&d=0", ResponseBody: `{"message":"Success","payload":{"level":"user","subscribe_key":"sub-c-b9ab9508-43cf-11e8-9967-869954283fb4","ttl":1440,"channels":{"ch1":{"auths":{"my-pam-key":{"r":1,"w":1,"m":0,"d":0}}},"ch2":{"auths":{"my-pam-key":{"r":1,"w":1,"m":0,"d":0}}}}},"service":"Access Manager","status":200}`, IgnoreQueryKeys: []string{"uuid", "pnsdk", "signature", "timestamp"}, ResponseStatusCode: 200, @@ -287,7 +362,7 @@ func TestGrantSingleGroup(t *testing.T) { interceptor.AddStub(&stubs.Stub{ Method: "GET", Path: "/v1/auth/grant/sub-key/sub-c-b9ab9508-43cf-11e8-9967-869954283fb4", - Query: "channel-group=cg1&m=0&r=1&w=1", + Query: "channel-group=cg1&m=0&r=1&w=1&d=0", ResponseBody: `{"message":"Success","payload":{"level":"channel-group","subscribe_key":"sub-c-b9ab9508-43cf-11e8-9967-869954283fb4","ttl":1440,"channel-groups":{"cg1":{"r":1,"w":1,"m":0,"d":0}}},"service":"Access Manager","status":200}`, IgnoreQueryKeys: []string{"uuid", "pnsdk", "signature", "timestamp"}, ResponseStatusCode: 200, @@ -316,7 +391,7 @@ func TestGrantSingleGroupWithAuth(t *testing.T) { interceptor.AddStub(&stubs.Stub{ Method: "GET", Path: "/v1/auth/grant/sub-key/sub-c-b9ab9508-43cf-11e8-9967-869954283fb4", - Query: "auth=my-pam-key&channel-group=cg1&m=0&r=1&w=1", + Query: "auth=my-pam-key&channel-group=cg1&m=0&r=1&w=1&d=0", ResponseBody: `{"message":"Success","payload":{"level":"channel-group+auth","subscribe_key":"sub-c-b9ab9508-43cf-11e8-9967-869954283fb4","ttl":1440,"channel-groups":"cg1","auths":{"my-pam-key":{"r":1,"w":1,"m":0,"d":0}}},"service":"Access Manager","status":200}`, IgnoreQueryKeys: []string{"uuid", "pnsdk", "signature", "timestamp"}, ResponseStatusCode: 200, @@ -347,7 +422,7 @@ func TestGrantMultipleGroups(t *testing.T) { interceptor.AddStub(&stubs.Stub{ Method: "GET", Path: "/v1/auth/grant/sub-key/sub-c-b9ab9508-43cf-11e8-9967-869954283fb4", - Query: "channel-group=cg1%2Ccg2&m=0&r=1&w=1", + Query: "channel-group=cg1%2Ccg2&m=0&r=1&w=1&d=0", ResponseBody: `{"message":"Success","payload":{"level":"channel-group","subscribe_key":"sub-c-b9ab9508-43cf-11e8-9967-869954283fb4","ttl":1440,"channel-groups":{"cg1":{"r":1,"w":1,"m":0,"d":0},"cg2":{"r":1,"w":1,"m":0,"d":0}}},"service":"Access Manager","status":200}`, IgnoreQueryKeys: []string{"uuid", "pnsdk", "signature", "timestamp"}, ResponseStatusCode: 200, @@ -380,7 +455,7 @@ func TestGrantMultipleGroupsWithAuth(t *testing.T) { interceptor.AddStub(&stubs.Stub{ Method: "GET", Path: "/v1/auth/grant/sub-key/sub-c-b9ab9508-43cf-11e8-9967-869954283fb4", - Query: "auth=my-pam-key&channel-group=cg1%2Ccg2&m=0&r=1&w=1", + Query: "auth=my-pam-key&channel-group=cg1%2Ccg2&m=0&r=1&w=1&d=0", ResponseBody: `{"message":"Success","payload":{"level":"channel-group+auth","subscribe_key":"sub-c-b9ab9508-43cf-11e8-9967-869954283fb4","ttl":1440,"channel-groups":{"cg1":{"auths":{"my-pam-key":{"r":1,"w":1,"m":0,"d":0}}},"cg2":{"auths":{"my-pam-key":{"r":1,"w":1,"m":0,"d":0}}}}},"service":"Access Manager","status":200}`, IgnoreQueryKeys: []string{"uuid", "pnsdk", "signature", "timestamp"}, ResponseStatusCode: 200, diff --git a/tests/e2e/heartbeat_test.go b/tests/e2e/heartbeat_test.go index 30f330e9..be111282 100644 --- a/tests/e2e/heartbeat_test.go +++ b/tests/e2e/heartbeat_test.go @@ -125,7 +125,7 @@ func HeartbeatTimeoutEvent(t *testing.T) { return } - cl := pubnub.NewHTTP1Client(15, 15) + cl := pubnub.NewHTTP1Client(15, 15, 20) cl.Transport = fakeTransport{ Status: "200 OK", StatusCode: 200, diff --git a/tests/e2e/subscribe_test.go b/tests/e2e/subscribe_test.go index 3406b5a3..af2182eb 100644 --- a/tests/e2e/subscribe_test.go +++ b/tests/e2e/subscribe_test.go @@ -3,7 +3,10 @@ package e2e import ( //"encoding/json" "fmt" + "io/ioutil" + "log" "math/rand" + "os" "sync" "testing" "time" @@ -13,11 +16,58 @@ import ( "github.com/stretchr/testify/assert" ) -//import _ "net/http/pprof" -//import "net/http" +import _ "net/http/pprof" +import "net/http" var timeout = 3 +func SubscribesLogsForQueryParams(t *testing.T) { + go func() { + log.Println(http.ListenAndServe("localhost:6061", nil)) + }() + + assert := assert.New(t) + rescueStdout := os.Stdout + r, w, _ := os.Pipe() + os.Stdout = w + + pn := pubnub.NewPubNub(configCopy()) + pn.Config.SecretKey = "sec-key" + pn.Config.AuthKey = "myAuthKey" + queryParam := map[string]string{ + "q1": "v1", + "q2": "v2", + } + + pn.Config.Log = log.New(os.Stdout, "", log.Ldate|log.Ltime|log.Lshortfile) + pn.Subscribe(). + Channels([]string{"ch1", "ch2"}). + QueryParam(queryParam). + Execute() + + tic := time.NewTicker(time.Duration(timeout) * time.Second) + select { + case <-tic.C: + tic.Stop() + + } + w.Close() + out, _ := ioutil.ReadAll(r) + os.Stdout = rescueStdout + + //fmt.Printf("Captured: %s", out) + + s := fmt.Sprintf("%s", out) + expected2 := fmt.Sprintf("q1=v1") + expected3 := fmt.Sprintf("q2=v2") + + assert.Contains(s, expected2) + assert.Contains(s, expected3) + + //https://ps.pndsn.com/v1/auth/grant/sub-key/sub-c-e41d50d4-43ce-11e8-a433-9e6b275e7b64?m=1&auth=authkey1,authkey2&channel=ch1,ch2×tamp=1535719219&pnsdk=PubNub-Go/4.1.3&uuid=pn-a83164fe-7ecf-42ab-ba14-d2d8e6eabd7a&r=1&w=1&signature=0SkyfvohAq8_0phVi0YhCL4c2ZRSPBVwCwQ9fANvPmM= + +} + func TestRequestMesssageOverflow(t *testing.T) { assert := assert.New(t) doneSubscribe := make(chan bool) @@ -1432,8 +1482,15 @@ func TestSubscribe403Error(t *testing.T) { doneSubscribe := make(chan bool) doneAccessDenied := make(chan bool) errChan := make(chan string) + ch := randomized("sub-403-ch") pn := pubnub.NewPubNub(pamConfigCopy()) + pn.Config.Log = log.New(os.Stdout, "", log.Ldate|log.Ltime|log.Lshortfile) + pamConfig := pamConfigCopy() + pamConfig.SecretKey = "" + pn2 := pubnub.NewPubNub(pamConfig) + + pn2.Config.Log = log.New(os.Stdout, "", log.Ldate|log.Ltime|log.Lshortfile) listener := pubnub.NewListener() go func() { @@ -1458,20 +1515,20 @@ func TestSubscribe403Error(t *testing.T) { } }() - pn.AddListener(listener) + pn2.AddListener(listener) pn.Grant(). Read(false). Write(false). Manage(false). - AuthKeys([]string{"pam-key"}). - Channels([]string{"ch"}). + TTL(10). Execute() - pn.Config.SecretKey = "" - - pn.Subscribe(). - Channels([]string{"ch"}). + fmt.Println("sleeping") + time.Sleep(5 * time.Second) + fmt.Println("after sleeping") + pn2.Subscribe(). + Channels([]string{ch}). Execute() tic := time.NewTicker(time.Duration(timeout) * time.Second) @@ -1487,8 +1544,8 @@ func TestSubscribe403Error(t *testing.T) { } - assert.Zero(len(pn.GetSubscribedChannels())) - assert.Zero(len(pn.GetSubscribedGroups())) + assert.Zero(len(pn2.GetSubscribedChannels())) + assert.Zero(len(pn2.GetSubscribedGroups())) } func TestSubscribeParseUserMeta(t *testing.T) { diff --git a/time_request.go b/time_request.go index b30ef343..2bfb6b81 100644 --- a/time_request.go +++ b/time_request.go @@ -45,6 +45,13 @@ func (b *timeBuilder) Transport(tr http.RoundTripper) *timeBuilder { return b } +// QueryParam accepts a map, the keys and values of the map are passed as the query string parameters of the URL called by the API. +func (b *timeBuilder) QueryParam(queryParam map[string]string) *timeBuilder { + b.opts.QueryParam = queryParam + + return b +} + // Execute runs the Time request and fetches the time from the server. func (b *timeBuilder) Execute() (*TimeResponse, StatusResponse, error) { rawJSON, status, err := executeRequest(b.opts) @@ -56,9 +63,9 @@ func (b *timeBuilder) Execute() (*TimeResponse, StatusResponse, error) { } type timeOpts struct { - pubnub *PubNub - - Transport http.RoundTripper + pubnub *PubNub + QueryParam map[string]string + Transport http.RoundTripper ctx Context } @@ -85,10 +92,14 @@ func (o *timeOpts) buildPath() (string, error) { func (o *timeOpts) buildQuery() (*url.Values, error) { q := defaultQuery(o.pubnub.Config.UUID, o.pubnub.telemetryManager) - + SetQueryParam(q, o.QueryParam) return q, nil } +func (o *timeOpts) jobQueue() chan *JobQItem { + return o.pubnub.jobQueue +} + func (o *timeOpts) buildBody() ([]byte, error) { return []byte{}, nil } diff --git a/time_request_test.go b/time_request_test.go index 76b5b782..99b6cbcf 100644 --- a/time_request_test.go +++ b/time_request_test.go @@ -1,8 +1,10 @@ package pubnub import ( + "net/url" "testing" + h "github.com/pubnub/go/tests/helpers" "github.com/stretchr/testify/assert" ) @@ -33,6 +35,44 @@ func TestNewTimeResponseUnmarshalling(t *testing.T) { assert.Equal(a, []byte{}) } +func TestNewTimeResponseQueryParam(t *testing.T) { + assert := assert.New(t) + + queryParam := map[string]string{ + "q1": "v1", + "q2": "v2", + } + config := NewConfig() + pn := NewPubNub(config) + + opts := &timeOpts{} + opts.pubnub = pn + opts.QueryParam = queryParam + + expected := &url.Values{} + expected.Set("q1", "v1") + expected.Set("q2", "v2") + + path, err := opts.buildPath() + u := &url.URL{ + Path: path, + } + assert.Nil(err) + + query, err := opts.buildQuery() + assert.Nil(err) + + h.AssertPathsEqual(t, + "/time/0", + u.EscapedPath(), []int{}) + + h.AssertQueriesEqual(t, expected, query, []string{"pnsdk", "uuid"}, []string{}) + + a, err := opts.buildBody() + assert.Nil(err) + assert.Equal(a, []byte{}) +} + func TestNewTimeBuilder(t *testing.T) { assert := assert.New(t) diff --git a/unsubscribe_builder.go b/unsubscribe_builder.go index edce70cd..0f568a22 100644 --- a/unsubscribe_builder.go +++ b/unsubscribe_builder.go @@ -28,6 +28,13 @@ func (b *unsubscribeBuilder) ChannelGroups(groups []string) *unsubscribeBuilder return b } +// QueryParam accepts a map, the keys and values of the map are passed as the query string parameters of the URL called by the API. +func (b *unsubscribeBuilder) QueryParam(queryParam map[string]string) *unsubscribeBuilder { + b.operation.QueryParam = queryParam + + return b +} + // Execute runs the Unsubscribe request and unsubscribes from the specified channels. func (b *unsubscribeBuilder) Execute() { b.pubnub.subscriptionManager.adaptUnsubscribe(b.operation) diff --git a/where_now_request.go b/where_now_request.go index b78fdd50..d670bd74 100644 --- a/where_now_request.go +++ b/where_now_request.go @@ -48,6 +48,13 @@ func (b *whereNowBuilder) UUID(uuid string) *whereNowBuilder { return b } +// QueryParam accepts a map, the keys and values of the map are passed as the query string parameters of the URL called by the API. +func (b *whereNowBuilder) QueryParam(queryParam map[string]string) *whereNowBuilder { + b.opts.QueryParam = queryParam + + return b +} + // Execute runs the WhereNow request. func (b *whereNowBuilder) Execute() (*WhereNowResponse, StatusResponse, error) { if len(b.opts.UUID) <= 0 { @@ -65,9 +72,9 @@ func (b *whereNowBuilder) Execute() (*WhereNowResponse, StatusResponse, error) { type whereNowOpts struct { pubnub *PubNub - UUID string - - Transport http.RoundTripper + UUID string + QueryParam map[string]string + Transport http.RoundTripper ctx Context } @@ -100,10 +107,14 @@ func (o *whereNowOpts) buildPath() (string, error) { func (o *whereNowOpts) buildQuery() (*url.Values, error) { q := defaultQuery(o.pubnub.Config.UUID, o.pubnub.telemetryManager) - + SetQueryParam(q, o.QueryParam) return q, nil } +func (o *whereNowOpts) jobQueue() chan *JobQItem { + return o.pubnub.jobQueue +} + func (o *whereNowOpts) buildBody() ([]byte, error) { return []byte{}, nil } diff --git a/where_now_request_test.go b/where_now_request_test.go index 05335764..bba361c4 100644 --- a/where_now_request_test.go +++ b/where_now_request_test.go @@ -46,6 +46,42 @@ func TestWhereNowBasicRequest(t *testing.T) { assert.Equal([]byte{}, body) } +func TestWhereNowBasicRequestQueryParam(t *testing.T) { + assert := assert.New(t) + queryParam := map[string]string{ + "q1": "v1", + "q2": "v2", + } + opts := &whereNowOpts{ + UUID: "my-custom-uuid", + pubnub: pubnub, + } + opts.QueryParam = queryParam + path, err := opts.buildPath() + assert.Nil(err) + u := &url.URL{ + Path: path, + } + + h.AssertPathsEqual(t, + "/v2/presence/sub-key/sub_key/uuid/my-custom-uuid", + u.EscapedPath(), []int{}) + + query, err := opts.buildQuery() + assert.Nil(err) + + expected := &url.Values{} + expected.Set("q1", "v1") + expected.Set("q2", "v2") + + h.AssertQueriesEqual(t, expected, query, []string{"pnsdk", "uuid"}, []string{}) + + body, err := opts.buildBody() + + assert.Nil(err) + assert.Equal([]byte{}, body) +} + func TestNewWhereNowBuilder(t *testing.T) { assert := assert.New(t)