Skip to content

Commit

Permalink
Merge 4840eef into 6895d35
Browse files Browse the repository at this point in the history
  • Loading branch information
msohailhussain committed Jan 3, 2020
2 parents 6895d35 + 4840eef commit 3c13fed
Show file tree
Hide file tree
Showing 2 changed files with 76 additions and 2 deletions.
25 changes: 23 additions & 2 deletions pkg/notification/manager.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/****************************************************************************
* Copyright 2019, Optimizely, Inc. and contributors *
* Copyright 2019-2020, Optimizely, Inc. and contributors *
* *
* Licensed under the Apache License, Version 2.0 (the "License"); *
* you may not use this file except in compliance with the License. *
Expand All @@ -19,6 +19,7 @@ package notification

import (
"fmt"
"sync"
"sync/atomic"

"github.com/optimizely/go-sdk/pkg/logging"
Expand All @@ -37,6 +38,7 @@ type Manager interface {
type AtomicManager struct {
handlers map[uint32]func(interface{})
counter uint32
lock sync.RWMutex
}

// NewAtomicManager creates a new instance of the atomic manager
Expand All @@ -48,13 +50,19 @@ func NewAtomicManager() *AtomicManager {

// Add adds the given handler
func (am *AtomicManager) Add(newHandler func(interface{})) (int, error) {
am.lock.Lock()
defer am.lock.Unlock()

atomic.AddUint32(&am.counter, 1)
am.handlers[am.counter] = newHandler
return int(am.counter), nil
}

// Remove removes handler with the given id
func (am *AtomicManager) Remove(id int) {
am.lock.Lock()
defer am.lock.Unlock()

handlerID := uint32(id)
if _, ok := am.handlers[handlerID]; ok {
delete(am.handlers, handlerID)
Expand All @@ -66,7 +74,20 @@ func (am *AtomicManager) Remove(id int) {

// Send sends the notification to the registered handlers
func (am *AtomicManager) Send(notification interface{}) {
for _, handler := range am.handlers {
// copying handler to avoid race condition
handlers := am.copyHandlers()
for _, handler := range handlers {
handler(notification)
}
}

// Copy handlers and return it.
func (am *AtomicManager) copyHandlers() map[uint32]func(interface{}) {
am.lock.RLock()
defer am.lock.RUnlock()
m := make(map[uint32]func(interface{}), len(am.handlers))
for k, v := range am.handlers {
m[k] = v
}
return m
}
53 changes: 53 additions & 0 deletions pkg/notification/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,3 +50,56 @@ func TestAtomicManager(t *testing.T) {
// Sanity check by calling remove with a incorrect handler id
atomicManager.Remove(55)
}

func TestSendRaceCondition(t *testing.T) {
sync := make(chan interface{})
payload := map[string]interface{}{
"key": "test",
}
atomicManager := NewAtomicManager()
result1, result2 := 0, 0
listenerCalled := false

listener1 := func(interface{}) {
}

listener2 := func(interface{}) {
// Add listener2 internally to assert deadlock
result2, _ = atomicManager.Add(listener1)
// Remove all added listeners
atomicManager.Remove(result1)
atomicManager.Remove(result2)
listenerCalled = true
}
result1, _ = atomicManager.Add(listener2)

go func() {
atomicManager.Send(payload)
// notifying that notification is sent.
sync <- ""
}()

atomicManager.Add(listener1)
<-sync

assert.Equal(t, 1, result1)
assert.Equal(t, len(atomicManager.handlers), 1)
assert.Equal(t, true, listenerCalled)
}

func TestAddRaceCondition(t *testing.T) {
sync := make(chan interface{})
atomicManager := NewAtomicManager()

listener1 := func(interface{}) {

}
result1, _ := atomicManager.Add(listener1)
go func() {
atomicManager.Remove(result1)
sync <- ""
}()

<-sync
assert.Equal(t, len(atomicManager.handlers), 0)
}

0 comments on commit 3c13fed

Please sign in to comment.