Skip to content
12 changes: 9 additions & 3 deletions pkg/config/polling_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ func WithInitialDatafile(datafile []byte) OptionFunc {

// SyncConfig gets current datafile and updates projectConfig
func (cm *PollingProjectConfigManager) SyncConfig(datafile []byte) {
initDatafile := datafile
var e error
var code int
var respHeaders http.Header
Expand Down Expand Up @@ -163,10 +164,10 @@ func (cm *PollingProjectConfigManager) SyncConfig(datafile []byte) {
}
closeMutex(nil)

if cm.notificationCenter != nil {
if cm.notificationCenter != nil && len(initDatafile) == 0 {
projectConfigUpdateNotification := notification.ProjectConfigUpdateNotification{
Type: notification.ProjectConfigUpdate,
Revision: cm.projectConfig.GetRevision(),
Revision: projectConfig.GetRevision(),
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FYI. race condition fix

}
if err = cm.notificationCenter.Send(notification.ProjectConfigUpdate, projectConfigUpdateNotification); err != nil {
cmLogger.Warning("Problem with sending notification")
Expand All @@ -177,6 +178,8 @@ func (cm *PollingProjectConfigManager) SyncConfig(datafile []byte) {
// Start starts the polling
func (cm *PollingProjectConfigManager) Start(ctx context.Context) {
cmLogger.Debug("Polling Config Manager Initiated")
cm.SyncConfig([]byte{})

t := time.NewTicker(cm.pollingInterval)
for {
select {
Expand Down Expand Up @@ -205,7 +208,10 @@ func NewPollingProjectConfigManager(sdkKey string, pollingMangerOptions ...Optio
}

initDatafile := pollingProjectConfigManager.initDatafile
pollingProjectConfigManager.SyncConfig(initDatafile) // initial poll
if len(initDatafile) > 0 {
pollingProjectConfigManager.SyncConfig(initDatafile) // initial poll
}

return &pollingProjectConfigManager
}

Expand Down
125 changes: 115 additions & 10 deletions pkg/config/polling_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package config
import (
"context"
"net/http"
"sync"
"testing"
"time"

Expand All @@ -44,6 +45,14 @@ func newExecGroup() *utils.ExecGroup {
return utils.NewExecGroup(context.Background())
}

// assertion method to wait for config or err for a specified period of time.
func waitForConfigOrCancelTimeout(t *testing.T, configManager ProjectConfigManager, checkError bool) {
assert.Eventually(t, func() bool {
_, err := configManager.GetConfig()
return (err != nil) == checkError
}, 500*time.Millisecond, 10*time.Millisecond)
}

func TestNewPollingProjectConfigManagerWithOptions(t *testing.T) {

mockDatafile := []byte(`{"revision":"42"}`)
Expand All @@ -57,6 +66,8 @@ func TestNewPollingProjectConfigManagerWithOptions(t *testing.T) {
eg := newExecGroup()
configManager := NewPollingProjectConfigManager(sdkKey, WithRequester(mockRequester))
eg.Go(configManager.Start)

waitForConfigOrCancelTimeout(t, configManager, false)
mockRequester.AssertExpectations(t)

actual, err := configManager.GetConfig()
Expand All @@ -78,10 +89,11 @@ func TestNewPollingProjectConfigManagerWithNull(t *testing.T) {
eg := newExecGroup()
configManager := NewPollingProjectConfigManager(sdkKey, WithRequester(mockRequester))
eg.Go(configManager.Start)

waitForConfigOrCancelTimeout(t, configManager, true)

mockRequester.AssertExpectations(t)

_, err := configManager.GetConfig()
assert.NotNil(t, err)
}

func TestNewPollingProjectConfigManagerWithSimilarDatafileRevisions(t *testing.T) {
Expand All @@ -96,6 +108,9 @@ func TestNewPollingProjectConfigManagerWithSimilarDatafileRevisions(t *testing.T
eg := newExecGroup()
configManager := NewPollingProjectConfigManager(sdkKey, WithRequester(mockRequester))
eg.Go(configManager.Start)

waitForConfigOrCancelTimeout(t, configManager, false)

mockRequester.AssertExpectations(t)

actual, err := configManager.GetConfig()
Expand Down Expand Up @@ -125,6 +140,8 @@ func TestNewPollingProjectConfigManagerWithLastModifiedDates(t *testing.T) {
configManager := NewPollingProjectConfigManager(sdkKey, WithRequester(mockRequester))
eg.Go(configManager.Start)

waitForConfigOrCancelTimeout(t, configManager, false)

// Fetch valid config
actual, err := configManager.GetConfig()
assert.Nil(t, err)
Expand Down Expand Up @@ -154,6 +171,9 @@ func TestNewPollingProjectConfigManagerWithDifferentDatafileRevisions(t *testing
eg := newExecGroup()
configManager := NewPollingProjectConfigManager(sdkKey, WithRequester(mockRequester))
eg.Go(configManager.Start)

waitForConfigOrCancelTimeout(t, configManager, false)

mockRequester.AssertExpectations(t)

actual, err := configManager.GetConfig()
Expand All @@ -170,15 +190,14 @@ func TestPollingGetOptimizelyConfig(t *testing.T) {
mockDatafile1 := []byte(`{"revision":"42","botFiltering":true}`)
mockDatafile2 := []byte(`{"revision":"43","botFiltering":false}`)
mockRequester := new(MockRequester)
mockRequester.On("Get", []utils.Header(nil)).Return(mockDatafile1, http.Header{}, http.StatusOK, nil)
mockRequester.On("Get", []utils.Header(nil)).Return([]byte(mockDatafile2), http.Header{}, http.StatusOK, nil)

// Test we fetch using requester
sdkKey := "test_sdk_key"

eg := newExecGroup()
configManager := NewPollingProjectConfigManager(sdkKey, WithRequester(mockRequester))
configManager := NewPollingProjectConfigManager(sdkKey, WithRequester(mockRequester), WithInitialDatafile(mockDatafile1))
eg.Go(configManager.Start)
mockRequester.AssertExpectations(t)

assert.Nil(t, configManager.optimizelyConfig)

Expand All @@ -189,10 +208,14 @@ func TestPollingGetOptimizelyConfig(t *testing.T) {

assert.Equal(t, "42", optimizelyConfig.Revision)

configManager.SyncConfig(mockDatafile2)
assert.Eventually(t, func() bool {
config, _ := configManager.GetConfig()
return config.GetRevision() == "43"
}, time.Second, 10*time.Millisecond)

optimizelyConfig = configManager.GetOptimizelyConfig()
assert.Equal(t, "43", optimizelyConfig.Revision)

mockRequester.AssertExpectations(t)
}

func TestNewPollingProjectConfigManagerWithErrorHandling(t *testing.T) {
Expand All @@ -210,6 +233,9 @@ func TestNewPollingProjectConfigManagerWithErrorHandling(t *testing.T) {
eg := newExecGroup()
configManager := NewPollingProjectConfigManager(sdkKey, WithRequester(mockRequester))
eg.Go(configManager.Start)

waitForConfigOrCancelTimeout(t, configManager, true)

mockRequester.AssertExpectations(t)

actual, err := configManager.GetConfig() // polling for bad file
Expand All @@ -228,7 +254,7 @@ func TestNewPollingProjectConfigManagerWithErrorHandling(t *testing.T) {
assert.Equal(t, projectConfig2, actual)
}

func TestNewPollingProjectConfigManagerOnDecision(t *testing.T) {
func TestNewPollingProjectConfigManagerOnConfigUpdate(t *testing.T) {
mockDatafile1 := []byte(`{"revision":"42","botFiltering":true}`)
mockDatafile2 := []byte(`{"revision":"43","botFiltering":false}`)

Expand All @@ -240,13 +266,18 @@ func TestNewPollingProjectConfigManagerOnDecision(t *testing.T) {

eg := newExecGroup()
configManager := NewPollingProjectConfigManager(sdkKey, WithRequester(mockRequester))
eg.Go(configManager.Start)

m := sync.RWMutex{}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd be surprised if the ++ operator was not already atomic, but there is an atomic increment as part of the sync package https://gobyexample.com/atomic-counters that you should be able to use instead. Since you're just confirming that the notification was triggered, you can also use a sync.WaitGroup and block on completion of the group. I found the WaitGroup approach to be less flaky when testing with goroutines.

var numberOfCalls = 0
callback := func(notification notification.ProjectConfigUpdateNotification) {
m.Lock()
defer m.Unlock()
numberOfCalls++
}
id, _ := configManager.OnProjectConfigUpdate(callback)

eg.Go(configManager.Start)
waitForConfigOrCancelTimeout(t, configManager, false)

mockRequester.AssertExpectations(t)

actual, err := configManager.GetConfig()
Expand All @@ -259,7 +290,10 @@ func TestNewPollingProjectConfigManagerOnDecision(t *testing.T) {
assert.NotNil(t, actual)

assert.NotEqual(t, id, 0)

m.Lock()
assert.Equal(t, numberOfCalls, 1)
m.Unlock()

err = configManager.RemoveOnProjectConfigUpdate(id)
assert.Nil(t, err)
Expand All @@ -268,6 +302,77 @@ func TestNewPollingProjectConfigManagerOnDecision(t *testing.T) {
assert.Nil(t, err)
}

func TestNewPollingProjectConfigManagerHardcodedDatafile(t *testing.T) {
mockDatafile1 := []byte(`{"revision":"42"}`)
mockDatafile2 := []byte(`{"revision":"43"}`)
sdkKey := "test_sdk_key"

mockRequester := new(MockRequester)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This mockRequester isn't passed as an option to NewPollingProjectConfigManager via WithRequester, so the assertion on 316 isn't useful, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @mjc1283 for pointing out, i have added requester there.

mockRequester.On("Get", []utils.Header(nil)).Return(mockDatafile2, http.Header{}, http.StatusOK, nil)

configManager := NewPollingProjectConfigManager(sdkKey, WithInitialDatafile(mockDatafile1), WithRequester(mockRequester))
config, err := configManager.GetConfig()

mockRequester.AssertNotCalled(t, "Get")
assert.Nil(t, err)
assert.NotNil(t, config)
assert.Equal(t, "42", config.GetRevision())
}

func TestNewPollingProjectConfigManagerPullImmediatelyOnStart(t *testing.T) {
m := sync.RWMutex{}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar comment here about using the Mutex as opposed to other synchronization options.

mockDatafile1 := []byte(`{"revision":"44"}`) // remote
mockDatafile2 := []byte(`{"revision":"43"}`) // hardcoded

mockRequester := new(MockRequester)
mockRequester.On("Get", []utils.Header(nil)).Return(mockDatafile1, http.Header{}, http.StatusOK, nil)

// Test we fetch using requester
sdkKey := "test_sdk_key"

configManager := NewPollingProjectConfigManager(sdkKey,
WithRequester(mockRequester),
WithInitialDatafile(mockDatafile2),
// want to make sure regardless of any polling interval, syncconfig should be called immediately
WithPollingInterval(10*time.Second))

config, err := configManager.GetConfig()

numberOfCalls := 0

// hardcoded datafile assertion
assert.Nil(t, err)
assert.NotNil(t, config)
assert.Equal(t, "43", config.GetRevision())
mockRequester.AssertNotCalled(t, "Get")

callback := func(notification notification.ProjectConfigUpdateNotification) {
m.Lock()
defer m.Unlock()
numberOfCalls++
}

configManager.OnProjectConfigUpdate(callback)

eg := newExecGroup()
eg.Go(configManager.Start)

assert.Eventually(t, func() bool {
m.Lock()
defer m.Unlock()
return numberOfCalls == 1
}, 1500*time.Millisecond, 10*time.Millisecond)

mockRequester.AssertExpectations(t)

remoteConfig, err := configManager.GetConfig()
assert.Nil(t, err)
assert.NotNil(t, remoteConfig)
assert.Equal(t, "44", remoteConfig.GetRevision())

eg.TerminateAndWait() // just sending signal and improving coverage
}

func TestPollingInterval(t *testing.T) {

sdkKey := "test_sdk_key"
Expand Down
8 changes: 8 additions & 0 deletions pkg/notification/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package notification

import (
"fmt"
"sync"
"sync/atomic"

"github.com/optimizely/go-sdk/pkg/logging"
Expand All @@ -37,6 +38,7 @@ type Manager interface {
type AtomicManager struct {
handlers map[uint32]func(interface{})
counter uint32
lock sync.Mutex
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be a RWMutex?

}

// NewAtomicManager creates a new instance of the atomic manager
Expand All @@ -49,13 +51,17 @@ func NewAtomicManager() *AtomicManager {
// Add adds the given handler
func (am *AtomicManager) Add(newHandler func(interface{})) (int, error) {
atomic.AddUint32(&am.counter, 1)
am.lock.Lock()
defer am.lock.Unlock()
am.handlers[am.counter] = newHandler
return int(am.counter), nil
}

// Remove removes handler with the given id
func (am *AtomicManager) Remove(id int) {
handlerID := uint32(id)
am.lock.Lock()
defer am.lock.Unlock()
if _, ok := am.handlers[handlerID]; ok {
delete(am.handlers, handlerID)
return
Expand All @@ -66,6 +72,8 @@ func (am *AtomicManager) Remove(id int) {

// Send sends the notification to the registered handlers
func (am *AtomicManager) Send(notification interface{}) {
am.lock.Lock()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the user calls Remove in their notification callback, does that cause deadlock?

Copy link
Contributor Author

@msohailhussain msohailhussain Dec 26, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can try a sample code, and can check.

defer am.lock.Unlock()
for _, handler := range am.handlers {
handler(notification)
}
Comment on lines 77 to 79
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Per our group discussion, we can first create an array of handlers, outside of the map, then trigger the handlers. That way we are not potentially modifying the Map while we're iterating.

Expand Down