Skip to content

Commit

Permalink
Add tests
Browse files Browse the repository at this point in the history
  • Loading branch information
workanator committed Sep 16, 2017
1 parent 856a517 commit 9efce4e
Show file tree
Hide file tree
Showing 8 changed files with 602 additions and 28 deletions.
45 changes: 24 additions & 21 deletions context_impl.go
Expand Up @@ -3,63 +3,66 @@ package floc
import (
"context"
"sync"
"sync/atomic"
)

type flowContext struct {
ctx atomic.Value
mu sync.Mutex
ctx context.Context
mu sync.RWMutex
}

func NewContext() Context {
ctx := &flowContext{
ctx: atomic.Value{},
mu: sync.Mutex{},
ctx: context.TODO(),
mu: sync.RWMutex{},
}

ctx.ctx.Store(context.TODO())

return ctx
}

// Release releases resources.
func (flowCtx flowContext) Release() {
func (flowCtx *flowContext) Release() {

}

// Ctx returns the underlying context.
func (flowCtx flowContext) Ctx() context.Context {
return flowCtx.ctx.Load().(context.Context)
func (flowCtx *flowContext) Ctx() context.Context {
flowCtx.mu.RLock()
defer flowCtx.mu.RUnlock()

return flowCtx.ctx
}

// UpdateCtx sets the new underlying context.
func (flowCtx flowContext) UpdateCtx(ctx context.Context) {
func (flowCtx *flowContext) UpdateCtx(ctx context.Context) {
flowCtx.mu.Lock()
defer flowCtx.mu.Unlock()

flowCtx.ctx.Store(ctx)
flowCtx.ctx = ctx
}

// Done returns a channel that's closed when the flow done.
// Successive calls to Done return the same value.
func (flowCtx flowContext) Done() <-chan struct{} {
ctx := flowCtx.ctx.Load().(context.Context)
return ctx.Done()
func (flowCtx *flowContext) Done() <-chan struct{} {
flowCtx.mu.RLock()
defer flowCtx.mu.RUnlock()

return flowCtx.ctx.Done()
}

// Value returns the value associated with this context for key,
// or nil if no value is associated with key.
func (flowCtx flowContext) Value(key interface{}) (value interface{}) {
ctx := flowCtx.ctx.Load().(context.Context)
func (flowCtx *flowContext) Value(key interface{}) (value interface{}) {
flowCtx.mu.RLock()
ctx := flowCtx.ctx
flowCtx.mu.RUnlock()

return ctx.Value(key)
}

// Create a new context with value and make it the current.
func (flowCtx flowContext) AddValue(key, value interface{}) {
func (flowCtx *flowContext) AddValue(key, value interface{}) {
flowCtx.mu.Lock()
defer flowCtx.mu.Unlock()

oldCtx := flowCtx.ctx.Load().(context.Context)
newCtx := context.WithValue(oldCtx, key, value)
flowCtx.ctx.Store(newCtx)
flowCtx.ctx = context.WithValue(flowCtx.ctx, key, value)
}
89 changes: 89 additions & 0 deletions context_impl_test.go
@@ -0,0 +1,89 @@
package floc

import (
"context"
"sync"
"testing"
)

func TestNewContext(t *testing.T) {
ctx := NewContext()
ctx.Release()
}

func TestFlowContext_Ctx(t *testing.T) {
const key = "KEY"
const value = "VALUE"

ctx := NewContext()
defer ctx.Release()

innerCtx := ctx.Ctx()
ctx.UpdateCtx(context.WithValue(innerCtx, key, value))

v := ctx.Ctx().Value(key)
if v == nil {
t.Fatalf("%s expects value to be not nil", t.Name())
}

if s, ok := v.(string); !ok {
t.Fatalf("%s expects value to be of type string", t.Name())
} else if s != value {
t.Fatalf("%s expects value to be %s but has %s", t.Name(), value, s)
}
}

func TestFlowContext_Value(t *testing.T) {
const max = 1000

ctx := NewContext()
defer ctx.Release()

var wg sync.WaitGroup
var n int
for n = 0; n < max; n++ {
wg.Add(1)

go func(n int) {
ctx.AddValue(n, n)
wg.Done()
}(n)
}

wg.Wait()

for n = 0; n < max; n++ {
v := ctx.Value(n)
if v == nil {
t.Fatalf("%s expects value to be not nil", t.Name())
}

if d, ok := v.(int); !ok {
t.Fatalf("%s expects value %d to be of type int", t.Name(), n)
} else if d != n {
t.Fatalf("%s expects value %d to be %d but has %d", t.Name(), n, n, d)
}
}
}

func TestFlowContext_Done(t *testing.T) {
ctx := NewContext()
defer ctx.Release()

cancelCtx, cancel := context.WithCancel(ctx.Ctx())
ctx.UpdateCtx(cancelCtx)

var wg sync.WaitGroup
for i := 0; i < 10; i++ {
wg.Add(1)

go func() {
<-ctx.Done()
wg.Done()
}()
}

cancel()

wg.Wait()
}
17 changes: 11 additions & 6 deletions control_impl.go
Expand Up @@ -14,7 +14,12 @@ type flowControl struct {
}

// NewControl constructs Control instance from context given.
// The function panics if the context given is nil.
func NewControl(ctx Context) Control {
if ctx == nil {
panic("context is nil")
}

oldCtx := ctx.Ctx()
cancelCtx, cancelFunc := context.WithCancel(oldCtx)
ctx.UpdateCtx(cancelCtx)
Expand All @@ -27,12 +32,12 @@ func NewControl(ctx Context) Control {
}

// Release releases resources.
func (flowCtrl flowControl) Release() {
func (flowCtrl *flowControl) Release() {
flowCtrl.Cancel(nil)
}

// Complete finishes the flow with success status.
func (flowCtrl flowControl) Complete(data interface{}) {
func (flowCtrl *flowControl) Complete(data interface{}) {
// Try to change the result from None to Completed and if it's successful
// finish the flow.
if atomic.CompareAndSwapInt32(&flowCtrl.result, None.Int32(), Completed.Int32()) {
Expand All @@ -42,7 +47,7 @@ func (flowCtrl flowControl) Complete(data interface{}) {
}

// Cancel cancels the execution of the flow.
func (flowCtrl flowControl) Cancel(data interface{}) {
func (flowCtrl *flowControl) Cancel(data interface{}) {
// Try to change the result from None to Canceled and if it's successful
// finish the flow.
if atomic.CompareAndSwapInt32(&flowCtrl.result, None.Int32(), Canceled.Int32()) {
Expand All @@ -52,7 +57,7 @@ func (flowCtrl flowControl) Cancel(data interface{}) {
}

// Fail cancels the execution of the flow with error.
func (flowCtrl flowControl) Fail(data interface{}, err error) {
func (flowCtrl *flowControl) Fail(data interface{}, err error) {
// Try to change the result from None to Failed and if it's successful
// finish the flow.
if atomic.CompareAndSwapInt32(&flowCtrl.result, None.Int32(), Failed.Int32()) {
Expand All @@ -63,14 +68,14 @@ func (flowCtrl flowControl) Fail(data interface{}, err error) {
}

// IsFinished tests if execution of the flow is either completed or canceled.
func (flowCtrl flowControl) IsFinished() bool {
func (flowCtrl *flowControl) IsFinished() bool {
r := atomic.LoadInt32(&flowCtrl.result)
return Result(r).IsFinished()
}

// Result returns the result code and the result data of the flow. The call
// to the function is effective only if the flow is finished.
func (flowCtrl flowControl) Result() (result Result, data interface{}, err error) {
func (flowCtrl *flowControl) Result() (result Result, data interface{}, err error) {
// Load the current result
r := atomic.LoadInt32(&flowCtrl.result)
result = Result(r)
Expand Down

0 comments on commit 9efce4e

Please sign in to comment.