diff --git a/context_impl.go b/context_impl.go index 59efaa5..4e4da52 100644 --- a/context_impl.go +++ b/context_impl.go @@ -3,63 +3,66 @@ package floc import ( "context" "sync" - "sync/atomic" ) type flowContext struct { - ctx atomic.Value - mu sync.Mutex + ctx context.Context + mu sync.RWMutex } func NewContext() Context { ctx := &flowContext{ - ctx: atomic.Value{}, - mu: sync.Mutex{}, + ctx: context.TODO(), + mu: sync.RWMutex{}, } - ctx.ctx.Store(context.TODO()) - return ctx } // Release releases resources. -func (flowCtx flowContext) Release() { +func (flowCtx *flowContext) Release() { } // Ctx returns the underlying context. -func (flowCtx flowContext) Ctx() context.Context { - return flowCtx.ctx.Load().(context.Context) +func (flowCtx *flowContext) Ctx() context.Context { + flowCtx.mu.RLock() + defer flowCtx.mu.RUnlock() + + return flowCtx.ctx } // UpdateCtx sets the new underlying context. -func (flowCtx flowContext) UpdateCtx(ctx context.Context) { +func (flowCtx *flowContext) UpdateCtx(ctx context.Context) { flowCtx.mu.Lock() defer flowCtx.mu.Unlock() - flowCtx.ctx.Store(ctx) + flowCtx.ctx = ctx } // Done returns a channel that's closed when the flow done. // Successive calls to Done return the same value. -func (flowCtx flowContext) Done() <-chan struct{} { - ctx := flowCtx.ctx.Load().(context.Context) - return ctx.Done() +func (flowCtx *flowContext) Done() <-chan struct{} { + flowCtx.mu.RLock() + defer flowCtx.mu.RUnlock() + + return flowCtx.ctx.Done() } // Value returns the value associated with this context for key, // or nil if no value is associated with key. -func (flowCtx flowContext) Value(key interface{}) (value interface{}) { - ctx := flowCtx.ctx.Load().(context.Context) +func (flowCtx *flowContext) Value(key interface{}) (value interface{}) { + flowCtx.mu.RLock() + ctx := flowCtx.ctx + flowCtx.mu.RUnlock() + return ctx.Value(key) } // Create a new context with value and make it the current. -func (flowCtx flowContext) AddValue(key, value interface{}) { +func (flowCtx *flowContext) AddValue(key, value interface{}) { flowCtx.mu.Lock() defer flowCtx.mu.Unlock() - oldCtx := flowCtx.ctx.Load().(context.Context) - newCtx := context.WithValue(oldCtx, key, value) - flowCtx.ctx.Store(newCtx) + flowCtx.ctx = context.WithValue(flowCtx.ctx, key, value) } diff --git a/context_impl_test.go b/context_impl_test.go new file mode 100644 index 0000000..8596520 --- /dev/null +++ b/context_impl_test.go @@ -0,0 +1,89 @@ +package floc + +import ( + "context" + "sync" + "testing" +) + +func TestNewContext(t *testing.T) { + ctx := NewContext() + ctx.Release() +} + +func TestFlowContext_Ctx(t *testing.T) { + const key = "KEY" + const value = "VALUE" + + ctx := NewContext() + defer ctx.Release() + + innerCtx := ctx.Ctx() + ctx.UpdateCtx(context.WithValue(innerCtx, key, value)) + + v := ctx.Ctx().Value(key) + if v == nil { + t.Fatalf("%s expects value to be not nil", t.Name()) + } + + if s, ok := v.(string); !ok { + t.Fatalf("%s expects value to be of type string", t.Name()) + } else if s != value { + t.Fatalf("%s expects value to be %s but has %s", t.Name(), value, s) + } +} + +func TestFlowContext_Value(t *testing.T) { + const max = 1000 + + ctx := NewContext() + defer ctx.Release() + + var wg sync.WaitGroup + var n int + for n = 0; n < max; n++ { + wg.Add(1) + + go func(n int) { + ctx.AddValue(n, n) + wg.Done() + }(n) + } + + wg.Wait() + + for n = 0; n < max; n++ { + v := ctx.Value(n) + if v == nil { + t.Fatalf("%s expects value to be not nil", t.Name()) + } + + if d, ok := v.(int); !ok { + t.Fatalf("%s expects value %d to be of type int", t.Name(), n) + } else if d != n { + t.Fatalf("%s expects value %d to be %d but has %d", t.Name(), n, n, d) + } + } +} + +func TestFlowContext_Done(t *testing.T) { + ctx := NewContext() + defer ctx.Release() + + cancelCtx, cancel := context.WithCancel(ctx.Ctx()) + ctx.UpdateCtx(cancelCtx) + + var wg sync.WaitGroup + for i := 0; i < 10; i++ { + wg.Add(1) + + go func() { + <-ctx.Done() + wg.Done() + }() + } + + cancel() + + wg.Wait() +} diff --git a/control_impl.go b/control_impl.go index 6353fcb..06352bf 100644 --- a/control_impl.go +++ b/control_impl.go @@ -14,7 +14,12 @@ type flowControl struct { } // NewControl constructs Control instance from context given. +// The function panics if the context given is nil. func NewControl(ctx Context) Control { + if ctx == nil { + panic("context is nil") + } + oldCtx := ctx.Ctx() cancelCtx, cancelFunc := context.WithCancel(oldCtx) ctx.UpdateCtx(cancelCtx) @@ -27,12 +32,12 @@ func NewControl(ctx Context) Control { } // Release releases resources. -func (flowCtrl flowControl) Release() { +func (flowCtrl *flowControl) Release() { flowCtrl.Cancel(nil) } // Complete finishes the flow with success status. -func (flowCtrl flowControl) Complete(data interface{}) { +func (flowCtrl *flowControl) Complete(data interface{}) { // Try to change the result from None to Completed and if it's successful // finish the flow. if atomic.CompareAndSwapInt32(&flowCtrl.result, None.Int32(), Completed.Int32()) { @@ -42,7 +47,7 @@ func (flowCtrl flowControl) Complete(data interface{}) { } // Cancel cancels the execution of the flow. -func (flowCtrl flowControl) Cancel(data interface{}) { +func (flowCtrl *flowControl) Cancel(data interface{}) { // Try to change the result from None to Canceled and if it's successful // finish the flow. if atomic.CompareAndSwapInt32(&flowCtrl.result, None.Int32(), Canceled.Int32()) { @@ -52,7 +57,7 @@ func (flowCtrl flowControl) Cancel(data interface{}) { } // Fail cancels the execution of the flow with error. -func (flowCtrl flowControl) Fail(data interface{}, err error) { +func (flowCtrl *flowControl) Fail(data interface{}, err error) { // Try to change the result from None to Failed and if it's successful // finish the flow. if atomic.CompareAndSwapInt32(&flowCtrl.result, None.Int32(), Failed.Int32()) { @@ -63,14 +68,14 @@ func (flowCtrl flowControl) Fail(data interface{}, err error) { } // IsFinished tests if execution of the flow is either completed or canceled. -func (flowCtrl flowControl) IsFinished() bool { +func (flowCtrl *flowControl) IsFinished() bool { r := atomic.LoadInt32(&flowCtrl.result) return Result(r).IsFinished() } // Result returns the result code and the result data of the flow. The call // to the function is effective only if the flow is finished. -func (flowCtrl flowControl) Result() (result Result, data interface{}, err error) { +func (flowCtrl *flowControl) Result() (result Result, data interface{}, err error) { // Load the current result r := atomic.LoadInt32(&flowCtrl.result) result = Result(r) diff --git a/control_impl_test.go b/control_impl_test.go new file mode 100644 index 0000000..d0e2674 --- /dev/null +++ b/control_impl_test.go @@ -0,0 +1,166 @@ +package floc + +import ( + "fmt" + "testing" +) + +var templateData = []struct { + data interface{} + test func(sample interface{}) bool +}{ + {nil, func(sample interface{}) bool { return sample == nil }}, + {string("ABC"), func(sample interface{}) bool { s, ok := sample.(string); return ok && s == "ABC" }}, + {int(123), func(sample interface{}) bool { d, ok := sample.(int); return ok && d == 123 }}, + {float64(3.1415), func(sample interface{}) bool { f, ok := sample.(float64); return ok && f >= 3.1415 && f <= 3.14151 }}, +} + +func TestNewControl(t *testing.T) { + ctx := NewContext() + defer ctx.Release() + + ctrl := NewControl(ctx) + defer ctrl.Release() + + if ctrl.IsFinished() == true { + t.Fatalf("%s expects Control to be not finished", t.Name()) + } + + result, _, _ := ctrl.Result() + if result.IsNone() == false { + t.Fatalf("%s expects result to be None but has %s", t.Name(), result.String()) + } +} + +func TestNewControl_Panic(t *testing.T) { + defer func() { + if r := recover(); r == nil { + t.Fatalf("%s must panic when ctx is nil", t.Name()) + } + }() + + ctrl := NewControl(nil) + defer ctrl.Release() +} + +func TestFlowControl_Release(t *testing.T) { + ctx := NewContext() + defer ctx.Release() + + ctrl := NewControl(ctx) + ctrl.Release() + + if ctrl.IsFinished() == false { + result, _, _ := ctrl.Result() + t.Fatalf("%s expects Control to be finished on Release but has state %s", t.Name(), result.String()) + } + + result, _, _ := ctrl.Result() + if result.IsCanceled() == false { + t.Fatalf("%s expects Control to be Canceled bu has %s", t.Name(), result.String()) + } +} + +func TestFlowControl_Release2(t *testing.T) { + ctx := NewContext() + defer ctx.Release() + + ctrl := NewControl(ctx) + ctrl.Complete(nil) + ctrl.Release() + + if ctrl.IsFinished() == false { + result, _, _ := ctrl.Result() + t.Fatalf("%s expects Control to be finished on Release but has state %s", t.Name(), result.String()) + } + + result, _, _ := ctrl.Result() + if result.IsCompleted() == false { + t.Fatalf("%s expects Control to be Completed but has %s", t.Name(), result.String()) + } +} + +func TestFlowControl_Cancel(t *testing.T) { + for i, td := range templateData { + ctx := NewContext() + ctrl := NewControl(ctx) + + ctrl.Cancel(td.data) + result, data, err := ctrl.Result() + if err != nil { + t.Fatalf("%s expects error to be nil bu has %s", t.Name(), err.Error()) + } else if result.IsCanceled() == false { + t.Fatalf("%s expects result to be Canceled but has %s", t.Name(), result.String()) + } else if td.test(data) == false { + t.Fatalf("%s failed to test data sample %d:%v", t.Name(), i, data) + } + + ctrl.Release() + ctx.Release() + } +} + +func TestFlowControl_Complete(t *testing.T) { + for i, td := range templateData { + ctx := NewContext() + ctrl := NewControl(ctx) + + ctrl.Complete(td.data) + result, data, err := ctrl.Result() + if err != nil { + t.Fatalf("%s expects error to be nil bu has %s", t.Name(), err.Error()) + } else if result.IsCompleted() == false { + t.Fatalf("%s expects result to be Completed but has %s", t.Name(), result.String()) + } else if td.test(data) == false { + t.Fatalf("%s failed to test data sample %d:%v", t.Name(), i, data) + } + + ctrl.Release() + ctx.Release() + } +} + +func TestFlowControl_Fail(t *testing.T) { + for i, td := range templateData { + ctx := NewContext() + ctrl := NewControl(ctx) + + ctrl.Fail(td.data, fmt.Errorf("fail %d", i)) + result, data, err := ctrl.Result() + if err == nil { + t.Fatalf("%s expects error to be not nil", t.Name()) + } else if result.IsFailed() == false { + t.Fatalf("%s expects result to be Failed but has %s", t.Name(), result.String()) + } else if td.test(data) == false { + t.Fatalf("%s failed to test data sample %d:%v", t.Name(), i, data) + } + + ctrl.Release() + ctx.Release() + } +} + +func TestFlowControl_IsFinished(t *testing.T) { + var finishers = []func(ctrl Control){ + func(ctrl Control) { ctrl.Complete(nil) }, + func(ctrl Control) { ctrl.Cancel(nil) }, + func(ctrl Control) { ctrl.Fail(nil, nil) }, + } + + for _, finish := range finishers { + ctx := NewContext() + ctrl := NewControl(ctx) + + if ctrl.IsFinished() == true { + t.Fatalf("%s expects Control to be not finished", t.Name()) + } + + finish(ctrl) + if ctrl.IsFinished() == false { + t.Fatalf("%s expects Control to be finished", t.Name()) + } + + ctrl.Release() + ctx.Release() + } +} diff --git a/result_set_test.go b/result_set_test.go new file mode 100644 index 0000000..c8b01aa --- /dev/null +++ b/result_set_test.go @@ -0,0 +1,39 @@ +package floc + +import "testing" + +func TestResultSet(t *testing.T) { + NewResultSet() + NewResultSet(None) + NewResultSet(None, Completed, Canceled, Failed) +} + +func TestResultSet_Contains(t *testing.T) { + set := NewResultSet(None, Completed) + + if set.Contains(None) == false { + t.Fatalf("%s expects None to be in set", t.Name()) + } + + if set.Contains(Completed) == false { + t.Fatalf("%s expects Completed to be in set", t.Name()) + } + + if set.Contains(Canceled) == true { + t.Fatalf("%s expects Canceled to be not in set", t.Name()) + } + + if set.Contains(Failed) == true { + t.Fatalf("%s expects Failed to be not in set", t.Name()) + } +} + +func TestResultSet_Panic(t *testing.T) { + defer func() { + if r := recover(); r == nil { + t.Fatalf("%s must panic", t.Name()) + } + }() + + NewResultSet(None, Completed, Canceled, Failed, Result(100)) +} diff --git a/result_test.go b/result_test.go new file mode 100644 index 0000000..4f7b673 --- /dev/null +++ b/result_test.go @@ -0,0 +1,115 @@ +package floc + +import ( + "fmt" + "testing" +) + +func TestResult_IsNone(t *testing.T) { + r := None + + if r.IsNone() == false { + t.Fatalf("%s expects None but has %s", t.Name(), r.String()) + } + + if r.IsValid() == false { + t.Fatalf("%s expects None to be valid", t.Name()) + } +} + +func TestResult_IsCanceled(t *testing.T) { + r := Canceled + + if r.IsCanceled() == false { + t.Fatalf("%s expects Canceled but has %s", t.Name(), r.String()) + } + + if r.IsValid() == false { + t.Fatalf("%s expects Canceled to be valid", t.Name()) + } +} + +func TestResult_IsCompleted(t *testing.T) { + r := Completed + + if r.IsCompleted() == false { + t.Fatalf("%s expects Completed but has %s", t.Name(), r.String()) + } + + if r.IsValid() == false { + t.Fatalf("%s expects Completed to be valid", t.Name()) + } +} + +func TestResult_IsFailed(t *testing.T) { + r := Failed + + if r.IsFailed() == false { + t.Fatalf("%s expects Failed but has %s", t.Name(), r.String()) + } + + if r.IsValid() == false { + t.Fatalf("%s expects Failed to be valid", t.Name()) + } +} + +func TestResult_IsValid(t *testing.T) { + const n = 1000 + + r := Result(n) + + if r.IsValid() == true { + t.Fatalf("%s expects %s to be invalid", t.Name(), r.String()) + } + + s := fmt.Sprintf("Result(%d)", n) + if r.String() != s { + t.Fatalf("%s expects %s but has %s", t.Name(), s, r.String()) + } +} + +func TestResult_IsFinished(t *testing.T) { + if None.IsFinished() == true { + t.Fatalf("%s expects None to be not finished", t.Name()) + } + + if Completed.IsFinished() == false { + t.Fatalf("%s expects Completed to be finished", t.Name()) + } + + if Canceled.IsFinished() == false { + t.Fatalf("%s expects Canceled to be finished", t.Name()) + } + + if Failed.IsFinished() == false { + t.Fatalf("%s expects Failed to be finished", t.Name()) + } +} + +func TestResult_Int32(t *testing.T) { + var n int32 + for n = 0; n < 1000; n++ { + r := Result(n) + if r.Int32() != n { + t.Fatalf("%s expects Result to be %d but has %d", t.Name(), n, r.Int32()) + } + } +} + +func TestResult_String(t *testing.T) { + if None.String() != "None" { + t.Fatalf("%s expects None bu has %s", t.Name(), None.String()) + } + + if Completed.String() != "Completed" { + t.Fatalf("%s expects Completed bu has %s", t.Name(), Completed.String()) + } + + if Canceled.String() != "Canceled" { + t.Fatalf("%s expects Canceled bu has %s", t.Name(), Canceled.String()) + } + + if Failed.String() != "Failed" { + t.Fatalf("%s expects Failed bu has %s", t.Name(), Failed.String()) + } +} diff --git a/run.go b/run.go index be1790e..203de19 100644 --- a/run.go +++ b/run.go @@ -2,6 +2,8 @@ package floc import "gopkg.in/workanator/go-floc.v2/errors" +const locRun = "Run" + func Run(job Job) (result Result, data interface{}, err error) { // Return invalid job error if the job is nil if job == nil { @@ -25,7 +27,7 @@ func Run(job Job) (result Result, data interface{}, err error) { // Return Failed if unhandled error left after the execution. if unhandledErr != nil { - return Failed, nil, unhandledErr + return Failed, nil, errors.NewErrLocation(unhandledErr, locRun) } return None, nil, nil diff --git a/run_test.go b/run_test.go new file mode 100644 index 0000000..a73ab03 --- /dev/null +++ b/run_test.go @@ -0,0 +1,155 @@ +package floc + +import ( + "fmt" + "gopkg.in/workanator/go-floc.v2/errors" + "testing" +) + +func TestRun_NilJob(t *testing.T) { + result, data, err := Run(nil) + + if result.IsNone() == false { + t.Fatalf("%s expects result to be None but has %s", t.Name(), result.String()) + } + + if data != nil { + t.Fatalf("%s expects data to be nil but has %v", t.Name(), data) + } + + if err == nil { + t.Fatalf("%s expects error to be not nil", t.Name()) + } +} + +func TestRun_None(t *testing.T) { + flow := func(Context, Control) error { + return nil + } + + result, data, err := Run(flow) + + if result.IsNone() == false { + t.Fatalf("%s expects result to be None but has %s", t.Name(), result.String()) + } + + if data != nil { + t.Fatalf("%s expects data to be nil but has %v", t.Name(), data) + } + + if err != nil { + t.Fatalf("%s expects error to be nil but has %s", t.Name(), err.Error()) + } +} + +func TestRun_Completed(t *testing.T) { + const tpl int = 1 + + flow := func(ctx Context, ctrl Control) error { + ctrl.Complete(tpl) + return nil + } + + result, data, err := Run(flow) + + if result.IsCompleted() == false { + t.Fatalf("%s expects result to be Completed but has %s", t.Name(), result.String()) + } + + if data == nil { + t.Fatalf("%s expects data to be not nil", t.Name()) + } else if d, ok := data.(int); !ok { + t.Fatalf("%s expects data to be of type int but has %v", t.Name(), data) + } else if d != tpl { + t.Fatalf("%s expects data to be %d but has %d", t.Name(), tpl, d) + } + + if err != nil { + t.Fatalf("%s expects error to be nil but has %s", t.Name(), err.Error()) + } +} + +func TestRun_Canceled(t *testing.T) { + const tpl float64 = 3.1415 + + flow := func(ctx Context, ctrl Control) error { + ctrl.Cancel(tpl) + return nil + } + + result, data, err := Run(flow) + + if result.IsCanceled() == false { + t.Fatalf("%s expects result to be Canceled but has %s", t.Name(), result.String()) + } + + if data == nil { + t.Fatalf("%s expects data to be not nil", t.Name()) + } else if f, ok := data.(float64); !ok { + t.Fatalf("%s expects data to be of type float64 but has %v", t.Name(), data) + } else if f < tpl-0.0001 || f > tpl+0.0001 { + t.Fatalf("%s expects data to be %f but has %f", t.Name(), tpl, f) + } + + if err != nil { + t.Fatalf("%s expects error to be nil but has %s", t.Name(), err.Error()) + } +} + +func TestRun_Failed(t *testing.T) { + const tplData string = "REASON" + var tplError error = fmt.Errorf("failed because of %s", tplData) + + flow := func(ctx Context, ctrl Control) error { + ctrl.Fail(tplData, tplError) + return nil + } + + result, data, err := Run(flow) + + if result.IsFailed() == false { + t.Fatalf("%s expects result to be Failed but has %s", t.Name(), result.String()) + } + + if data == nil { + t.Fatalf("%s expects data to be not nil", t.Name()) + } else if s, ok := data.(string); !ok { + t.Fatalf("%s expects data to be of type string but has %v", t.Name(), data) + } else if s != tplData { + t.Fatalf("%s expects data to be %s but has %s", t.Name(), tplData, s) + } + + if err == nil { + t.Fatalf("%s expects error to be not nil", t.Name()) + } else if err.Error() != tplError.Error() { + t.Fatalf("%s expects error message to be %s but has %s", t.Name(), tplError.Error(), err.Error()) + } +} + +func TestRun_UnhandledError(t *testing.T) { + var tplError error = fmt.Errorf("something happened") + + flow := func(ctx Context, ctrl Control) error { + return tplError + } + + result, data, err := Run(flow) + + if result.IsFailed() == false { + t.Fatalf("%s expects result to be Failed but has %s", t.Name(), result.String()) + } + + if data != nil { + t.Fatalf("%s expects data to be nil but has %v", t.Name(), data) + } + + if err == nil { + t.Fatalf("%s expects error to be not nil", t.Name()) + } else if e, ok := err.(errors.ErrLocation); !ok { + t.Fatalf("%s expects error to be of type ErrLocation but has %v", t.Name(), err) + } else if e.Err().Error() != tplError.Error() { + t.Fatalf("%s expects error message to be %s but has %s", t.Name(), tplError.Error(), e.Err().Error()) + } else if e.Where() != locRun { + t.Fatalf("%s expects error location to be %s but has %s", t.Name(), locRun, e.Where()) + } +}