diff --git a/conn.go b/conn.go index 61e5753..9a7d6f8 100644 --- a/conn.go +++ b/conn.go @@ -71,7 +71,6 @@ type ConnBase struct { isLocalConnCfgSealed bool localConnCfgMap map[string]ConnCfg connMap map[string]Conn - innerMap map[string]any connMutex sync.Mutex } @@ -81,7 +80,6 @@ func NewConnBase() *ConnBase { isLocalConnCfgSealed: false, localConnCfgMap: make(map[string]ConnCfg), connMap: make(map[string]Conn), - innerMap: make(map[string]any), } } @@ -125,12 +123,6 @@ func (base *ConnBase) GetConn(name string) (Conn, Err) { return conn, Ok() } -// InnerMap gets a singular map in a ConnBase/Proc for communicating among -// multiple data accesses. -func (base *ConnBase) InnerMap() map[string]any { - return base.innerMap -} - func (base *ConnBase) begin() { base.isLocalConnCfgSealed = true isGlobalConnCfgSealed = true diff --git a/conn_test.go b/conn_test.go index 1cdce3d..4af73d1 100644 --- a/conn_test.go +++ b/conn_test.go @@ -55,6 +55,7 @@ func (cfg FooConnCfg) CreateConn() (Conn, Err) { type BarConn struct { Label string + store map[string]string } func (conn *BarConn) Commit() Err { @@ -67,13 +68,17 @@ func (conn *BarConn) Rollback() { func (conn *BarConn) Close() { logs.PushBack("BarConn#Close") } +func (conn *BarConn) Store(name, value string) { + conn.store[name] = value +} type BarConnCfg struct { Label string + Store map[string]string } func (cfg BarConnCfg) CreateConn() (Conn, Err) { - return &BarConn{Label: cfg.Label}, Ok() + return &BarConn{Label: cfg.Label, store: cfg.Store}, Ok() } func TestAddGlobalConnCfg(t *testing.T) { @@ -133,7 +138,6 @@ func TestNewConnBase(t *testing.T) { assert.False(t, base.isLocalConnCfgSealed) assert.Equal(t, len(base.localConnCfgMap), 0) assert.Equal(t, len(base.connMap), 0) - assert.Equal(t, len(base.innerMap), 0) } func TestConnBase_addLocalConnCfg(t *testing.T) { @@ -145,21 +149,18 @@ func TestConnBase_addLocalConnCfg(t *testing.T) { assert.False(t, base.isLocalConnCfgSealed) assert.Equal(t, len(base.localConnCfgMap), 0) assert.Equal(t, len(base.connMap), 0) - assert.Equal(t, len(base.innerMap), 0) base.addLocalConnCfg("foo", FooConnCfg{}) assert.False(t, base.isLocalConnCfgSealed) assert.Equal(t, len(base.localConnCfgMap), 1) assert.Equal(t, len(base.connMap), 0) - assert.Equal(t, len(base.innerMap), 0) base.addLocalConnCfg("bar", &BarConnCfg{}) assert.False(t, base.isLocalConnCfgSealed) assert.Equal(t, len(base.localConnCfgMap), 2) assert.Equal(t, len(base.connMap), 0) - assert.Equal(t, len(base.innerMap), 0) } func TestConnBase_begin(t *testing.T) { @@ -172,7 +173,6 @@ func TestConnBase_begin(t *testing.T) { assert.False(t, base.isLocalConnCfgSealed) assert.Equal(t, len(base.localConnCfgMap), 0) assert.Equal(t, len(base.connMap), 0) - assert.Equal(t, len(base.innerMap), 0) base.addLocalConnCfg("foo", FooConnCfg{}) @@ -180,7 +180,6 @@ func TestConnBase_begin(t *testing.T) { assert.False(t, base.isLocalConnCfgSealed) assert.Equal(t, len(base.localConnCfgMap), 1) assert.Equal(t, len(base.connMap), 0) - assert.Equal(t, len(base.innerMap), 0) base.begin() @@ -188,7 +187,6 @@ func TestConnBase_begin(t *testing.T) { assert.True(t, base.isLocalConnCfgSealed) assert.Equal(t, len(base.localConnCfgMap), 1) assert.Equal(t, len(base.connMap), 0) - assert.Equal(t, len(base.innerMap), 0) base.addLocalConnCfg("bar", &BarConnCfg{}) @@ -196,7 +194,6 @@ func TestConnBase_begin(t *testing.T) { assert.True(t, base.isLocalConnCfgSealed) assert.Equal(t, len(base.localConnCfgMap), 1) assert.Equal(t, len(base.connMap), 0) - assert.Equal(t, len(base.innerMap), 0) base.isLocalConnCfgSealed = false @@ -206,7 +203,6 @@ func TestConnBase_begin(t *testing.T) { assert.False(t, base.isLocalConnCfgSealed) assert.Equal(t, len(base.localConnCfgMap), 2) assert.Equal(t, len(base.connMap), 0) - assert.Equal(t, len(base.innerMap), 0) } func TestConnBase_GetConn_withLocalConnCfg(t *testing.T) { @@ -311,24 +307,6 @@ func TestConnBase_GetConn_failToCreateConn(t *testing.T) { } } -func TestConnBase_InnerMap(t *testing.T) { - Clear() - defer Clear() - - base := NewConnBase() - - m := base.InnerMap() - assert.Nil(t, m["param"]) - m["param"] = 123 - - m = base.InnerMap() - assert.Equal(t, m["param"], 123) - m["param"] = 456 - - m = base.InnerMap() - assert.Equal(t, m["param"], 456) -} - func TestConnBase_commit(t *testing.T) { Clear() defer Clear() diff --git a/dax.go b/dax.go index aad3522..f0c1d5b 100644 --- a/dax.go +++ b/dax.go @@ -4,10 +4,8 @@ package sabi -// Dax is an interface for a set of data accesses, and requires two method: -// #GetConn which gets a connection to an external data access, and #InnerMap -// which gets a map to communicate data among multiple data accesses. +// Dax is an interface for a set of data accesses, and requires a method: +// #GetConn which gets a connection to an external data access. type Dax interface { GetConn(name string) (Conn, Err) - InnerMap() map[string]any } diff --git a/proc.go b/proc.go index a1880a3..208718c 100644 --- a/proc.go +++ b/proc.go @@ -23,7 +23,7 @@ func (proc Proc[D]) AddLocalConnCfg(name string, cfg ConnCfg) { // RunTxn is a method which runs logic functions specified as arguments in a // transaction. -func (proc Proc[D]) RunTxn(logics ...func(dax D) Err) (map[string]any, Err) { +func (proc Proc[D]) RunTxn(logics ...func(dax D) Err) Err { proc.connBase.begin() err := Ok() @@ -45,13 +45,13 @@ func (proc Proc[D]) RunTxn(logics ...func(dax D) Err) (map[string]any, Err) { proc.connBase.close() - return proc.connBase.innerMap, err + return err } // NewTxn is a method which creates a transaction having specified logic // functions. -func (proc Proc[D]) NewTxn(logics ...func(dax D) Err) Txn[D] { - return Txn[D]{ +func (proc Proc[D]) Txn(logics ...func(dax D) Err) Runner { + return txnRunner[D]{ logics: logics, connBase: proc.connBase, dax: proc.dax, diff --git a/proc_test.go b/proc_test.go index 6ca34bc..51bd1c6 100644 --- a/proc_test.go +++ b/proc_test.go @@ -51,11 +51,11 @@ func NewBarSetDataDax(base sabi.Dax) BarSetDataDax { } func (dax BarSetDataDax) SetData(data string) sabi.Err { - _, err := dax.GetBarConn("bar") + conn, err := dax.GetBarConn("bar") if !err.IsOk() { return err } - dax.InnerMap()["result"] = data + conn.Store("result", data) return sabi.Ok() } @@ -80,12 +80,15 @@ func TestProc_RunTxn(t *testing.T) { sabi.AddGlobalConnCfg("foo", sabi.FooConnCfg{}) sabi.SealGlobalConnCfgs() + store := make(map[string]string) + proc := NewProc() - proc.AddLocalConnCfg("bar", &sabi.BarConnCfg{}) + proc.AddLocalConnCfg("bar", &sabi.BarConnCfg{Store: store}) - m, err := proc.RunTxn(GetAndSetDataLogic) + err := proc.RunTxn(GetAndSetDataLogic) assert.True(t, err.IsOk()) - assert.Equal(t, m["result"], "GETDATA") + + assert.Equal(t, store["result"], "GETDATA") } func TestProc_RunTxn_failToGetConn(t *testing.T) { @@ -95,12 +98,14 @@ func TestProc_RunTxn_failToGetConn(t *testing.T) { sabi.AddGlobalConnCfg("foo", sabi.FooConnCfg{}) sabi.SealGlobalConnCfgs() + store := make(map[string]string) + proc := NewProc() - proc.AddLocalConnCfg("bar", &sabi.BarConnCfg{}) + proc.AddLocalConnCfg("bar", &sabi.BarConnCfg{Store: store}) sabi.WillFailToCreateFooConn = true - m, err := proc.RunTxn(GetAndSetDataLogic) + err := proc.RunTxn(GetAndSetDataLogic) switch err.Reason().(type) { case sabi.FailToCreateConn: assert.Equal(t, err.Get("Name"), "foo") @@ -112,7 +117,8 @@ func TestProc_RunTxn_failToGetConn(t *testing.T) { default: assert.Fail(t, err.Error()) } - assert.Nil(t, m["result"]) + + assert.Equal(t, store["result"], "") } func TestProc_RunTxn_failToCommitConn(t *testing.T) { @@ -122,12 +128,14 @@ func TestProc_RunTxn_failToCommitConn(t *testing.T) { sabi.AddGlobalConnCfg("foo", sabi.FooConnCfg{}) sabi.SealGlobalConnCfgs() + store := make(map[string]string) + proc := NewProc() - proc.AddLocalConnCfg("bar", &sabi.BarConnCfg{}) + proc.AddLocalConnCfg("bar", &sabi.BarConnCfg{Store: store}) sabi.WillFailToCommitFooConn = true - m, err := proc.RunTxn(GetAndSetDataLogic) + err := proc.RunTxn(GetAndSetDataLogic) switch err.Reason().(type) { case sabi.FailToCommitConn: errs := err.Get("Errors").(map[string]sabi.Err) @@ -140,5 +148,6 @@ func TestProc_RunTxn_failToCommitConn(t *testing.T) { default: assert.Fail(t, err.Error()) } - assert.Equal(t, m["result"], "GETDATA") + + assert.Equal(t, store["result"], "GETDATA") } diff --git a/runner.go b/runner.go new file mode 100644 index 0000000..4458bc0 --- /dev/null +++ b/runner.go @@ -0,0 +1,121 @@ +// Copyright (C) 2022 Takayuki Sato. All Rights Reserved. +// This program is free software under MIT License. +// See the file LICENSE in this distribution for more details. + +package sabi + +import ( + "strconv" +) + +type /* error reasons */ ( + // FailToRunInParallel is a error reason which indicates some runner which + // is runned in parallel failed. + FailToRunInParallel struct { + Errors map[string]Err + } +) + +// Runner is an interface which has #Run method and is runned by RunSeq or +// RunPara functions. +type Runner interface { + Run() Err +} + +type seqRunner struct { + runners []Runner +} + +func (r seqRunner) Run() Err { + for _, runner := range r.runners { + err := runner.Run() + if !err.IsOk() { + return err + } + } + return Ok() +} + +// Seq is a function which creates a runner which runs multiple runners +// specified as arguments sequencially. +func Seq(runners ...Runner) Runner { + return seqRunner{runners: runners[:]} +} + +type paraRunner struct { + runners []Runner +} + +func (r paraRunner) Run() Err { + ch := make(chan Err) + + for _, runner := range r.runners { + go func(runner Runner, ch chan Err) { + err := runner.Run() + ch <- err + }(runner, ch) + } + + errs := make(map[string]Err) + n := len(r.runners) + for i := 0; i < n; i++ { + select { + case err := <-ch: + if !err.IsOk() { + errs[strconv.Itoa(i)] = err + } + } + } + + if len(errs) > 0 { + return ErrBy(FailToRunInParallel{Errors: errs}) + } + + return Ok() +} + +// Para is a function which creates a runner which runs multiple runners +// specified as arguments in parallel. +func Para(runners ...Runner) Runner { + return paraRunner{runners: runners[:]} +} + +// RunSeq is a function which runs specified runners sequencially. +func RunSeq(runners ...Runner) Err { + for _, runner := range runners { + err := runner.Run() + if !err.IsOk() { + return err + } + } + return Ok() +} + +// RunPara is a function which runs specified runners in parallel. +func RunPara(runners ...Runner) Err { + ch := make(chan Err) + + for _, runner := range runners { + go func(runner Runner, ch chan Err) { + err := runner.Run() + ch <- err + }(runner, ch) + } + + errs := make(map[string]Err) + n := len(runners) + for i := 0; i < n; i++ { + select { + case err := <-ch: + if !err.IsOk() { + errs[strconv.Itoa(i)] = err + } + } + } + + if len(errs) > 0 { + return ErrBy(FailToRunInParallel{Errors: errs}) + } + + return Ok() +} diff --git a/runner_test.go b/runner_test.go new file mode 100644 index 0000000..34ade9e --- /dev/null +++ b/runner_test.go @@ -0,0 +1,209 @@ +package sabi_test + +import ( + "container/list" + "github.com/stretchr/testify/assert" + "github.com/sttk-go/sabi" + "testing" + "time" +) + +type ( + FailToRun struct { + Name string + } +) + +var logs list.List +var errorRunnerName string + +func ClearLogs() { + logs.Init() + errorRunnerName = "" +} + +type MyRunner struct { + Name string + Wait time.Duration +} + +func (r MyRunner) Run() sabi.Err { + time.Sleep(r.Wait) + if r.Name == errorRunnerName { + return sabi.ErrBy(FailToRun{Name: r.Name}) + } + logs.PushBack(r.Name) + return sabi.Ok() +} + +func TestSeq(t *testing.T) { + ClearLogs() + defer ClearLogs() + + r0 := MyRunner{Name: "r-0", Wait: 50 * time.Millisecond} + r1 := MyRunner{Name: "r-1", Wait: 10 * time.Millisecond} + r2 := MyRunner{Name: "r-2", Wait: 20 * time.Millisecond} + r3 := sabi.Seq(r0, r1, r2) + + err := r3.Run() + assert.True(t, err.IsOk()) + + assert.Equal(t, logs.Len(), 3) + assert.Equal(t, logs.Front().Value, "r-0") + assert.Equal(t, logs.Front().Next().Value, "r-1") + assert.Equal(t, logs.Front().Next().Next().Value, "r-2") +} + +func TestSeq_FailToRun(t *testing.T) { + ClearLogs() + defer ClearLogs() + + errorRunnerName = "r-1" + + r0 := MyRunner{Name: "r-0", Wait: 50 * time.Millisecond} + r1 := MyRunner{Name: "r-1", Wait: 10 * time.Millisecond} + r2 := MyRunner{Name: "r-2", Wait: 20 * time.Millisecond} + r3 := sabi.Seq(r0, r1, r2) + + err := r3.Run() + assert.False(t, err.IsOk()) + switch err.Reason().(type) { + case FailToRun: + assert.Equal(t, err.Get("Name"), "r-1") + default: + assert.Fail(t, err.Error()) + } + + assert.Equal(t, logs.Len(), 1) + assert.Equal(t, logs.Front().Value, "r-0") +} + +func TestPara(t *testing.T) { + ClearLogs() + defer ClearLogs() + + r0 := MyRunner{Name: "r-0", Wait: 50 * time.Millisecond} + r1 := MyRunner{Name: "r-1", Wait: 10 * time.Millisecond} + r2 := MyRunner{Name: "r-2", Wait: 20 * time.Millisecond} + r3 := sabi.Para(r0, r1, r2) + + err := r3.Run() + assert.True(t, err.IsOk()) + + assert.Equal(t, logs.Len(), 3) + assert.Equal(t, logs.Front().Value, "r-1") + assert.Equal(t, logs.Front().Next().Value, "r-2") + assert.Equal(t, logs.Front().Next().Next().Value, "r-0") +} + +func TestPara_FailToRun(t *testing.T) { + ClearLogs() + defer ClearLogs() + + errorRunnerName = "r-1" + + r0 := MyRunner{Name: "r-0", Wait: 50 * time.Millisecond} + r1 := MyRunner{Name: "r-1", Wait: 10 * time.Millisecond} + r2 := MyRunner{Name: "r-2", Wait: 20 * time.Millisecond} + r3 := sabi.Para(r0, r1, r2) + + err := r3.Run() + assert.False(t, err.IsOk()) + switch err.Reason().(type) { + case sabi.FailToRunInParallel: + errs := err.Get("Errors").(map[string]sabi.Err) + assert.Equal(t, len(errs), 1) + assert.Equal(t, errs["0"].Error(), "{reason=FailToRun, Name=r-1}") + default: + assert.Fail(t, err.Error()) + } + + assert.Equal(t, logs.Len(), 2) + assert.Equal(t, logs.Front().Value, "r-2") + assert.Equal(t, logs.Front().Next().Value, "r-0") +} + +func TestRunSeq(t *testing.T) { + ClearLogs() + defer ClearLogs() + + r0 := MyRunner{Name: "r-0", Wait: 50 * time.Millisecond} + r1 := MyRunner{Name: "r-1", Wait: 10 * time.Millisecond} + r2 := MyRunner{Name: "r-2", Wait: 20 * time.Millisecond} + + err := sabi.RunSeq(r0, r1, r2) + assert.True(t, err.IsOk()) + + assert.Equal(t, logs.Len(), 3) + assert.Equal(t, logs.Front().Value, "r-0") + assert.Equal(t, logs.Front().Next().Value, "r-1") + assert.Equal(t, logs.Front().Next().Next().Value, "r-2") +} + +func TestRunSeq_FailToRun(t *testing.T) { + ClearLogs() + defer ClearLogs() + + errorRunnerName = "r-1" + + r0 := MyRunner{Name: "r-0", Wait: 50 * time.Millisecond} + r1 := MyRunner{Name: "r-1", Wait: 10 * time.Millisecond} + r2 := MyRunner{Name: "r-2", Wait: 20 * time.Millisecond} + + err := sabi.RunSeq(r0, r1, r2) + + assert.False(t, err.IsOk()) + switch err.Reason().(type) { + case FailToRun: + assert.Equal(t, err.Get("Name"), "r-1") + default: + assert.Fail(t, err.Error()) + } + + assert.Equal(t, logs.Len(), 1) + assert.Equal(t, logs.Front().Value, "r-0") +} + +func TestRunPara(t *testing.T) { + ClearLogs() + defer ClearLogs() + + r0 := MyRunner{Name: "r-0", Wait: 50 * time.Millisecond} + r1 := MyRunner{Name: "r-1", Wait: 10 * time.Millisecond} + r2 := MyRunner{Name: "r-2", Wait: 20 * time.Millisecond} + + err := sabi.RunPara(r0, r1, r2) + assert.True(t, err.IsOk()) + + assert.Equal(t, logs.Len(), 3) + assert.Equal(t, logs.Front().Value, "r-1") + assert.Equal(t, logs.Front().Next().Value, "r-2") + assert.Equal(t, logs.Front().Next().Next().Value, "r-0") +} + +func TestRunPara_FailToRun(t *testing.T) { + ClearLogs() + defer ClearLogs() + + errorRunnerName = "r-1" + + r0 := MyRunner{Name: "r-0", Wait: 50 * time.Millisecond} + r1 := MyRunner{Name: "r-1", Wait: 10 * time.Millisecond} + r2 := MyRunner{Name: "r-2", Wait: 20 * time.Millisecond} + + err := sabi.RunPara(r0, r1, r2) + + assert.False(t, err.IsOk()) + switch err.Reason().(type) { + case sabi.FailToRunInParallel: + errs := err.Get("Errors").(map[string]sabi.Err) + assert.Equal(t, len(errs), 1) + assert.Equal(t, errs["0"].Error(), "{reason=FailToRun, Name=r-1}") + default: + assert.Fail(t, err.Error()) + } + + assert.Equal(t, logs.Len(), 2) + assert.Equal(t, logs.Front().Value, "r-2") + assert.Equal(t, logs.Front().Next().Value, "r-0") +} diff --git a/txn.go b/txn.go index d7ee6e3..140ad6f 100644 --- a/txn.go +++ b/txn.go @@ -4,15 +4,13 @@ package sabi -// Txn is a structure type which represents a transaction. -type Txn[D any] struct { +type txnRunner[D any] struct { logics []func(D) Err connBase *ConnBase dax D } -// Run is a method to run a transaction of holding logic functions. -func (txn Txn[D]) Run() (map[string]any, Err) { +func (txn txnRunner[D]) Run() Err { txn.connBase.begin() err := Ok() @@ -34,5 +32,5 @@ func (txn Txn[D]) Run() (map[string]any, Err) { txn.connBase.close() - return txn.connBase.innerMap, err + return err } diff --git a/txn_test.go b/txn_test.go index c2ead86..9b56051 100644 --- a/txn_test.go +++ b/txn_test.go @@ -10,30 +10,35 @@ func TestTxn_Run(t *testing.T) { sabi.Clear() defer sabi.Clear() + store := make(map[string]string) + proc := NewProc() proc.AddLocalConnCfg("foo", sabi.FooConnCfg{}) - proc.AddLocalConnCfg("bar", &sabi.BarConnCfg{}) + proc.AddLocalConnCfg("bar", &sabi.BarConnCfg{Store: store}) - txn := proc.NewTxn(GetAndSetDataLogic) + txn := proc.Txn(GetAndSetDataLogic) - m, err := txn.Run() + err := txn.Run() assert.True(t, err.IsOk()) - assert.Equal(t, m["result"], "GETDATA") + + assert.Equal(t, store["result"], "GETDATA") } func TestTxn_Run_failToGetConn(t *testing.T) { sabi.Clear() defer sabi.Clear() + store := make(map[string]string) + proc := NewProc() proc.AddLocalConnCfg("foo", sabi.FooConnCfg{}) - proc.AddLocalConnCfg("bar", &sabi.BarConnCfg{}) + proc.AddLocalConnCfg("bar", &sabi.BarConnCfg{Store: store}) - txn := proc.NewTxn(GetAndSetDataLogic) + txn := proc.Txn(GetAndSetDataLogic) sabi.WillFailToCreateFooConn = true - m, err := txn.Run() + err := txn.Run() switch err.Reason().(type) { case sabi.FailToCreateConn: assert.Equal(t, err.Get("Name"), "foo") @@ -45,22 +50,25 @@ func TestTxn_Run_failToGetConn(t *testing.T) { default: assert.Fail(t, err.Error()) } - assert.Nil(t, m["result"]) + + assert.Equal(t, store["result"], "") } func TestTxn_Run_failToCommitConn(t *testing.T) { sabi.Clear() defer sabi.Clear() + store := make(map[string]string) + proc := NewProc() proc.AddLocalConnCfg("foo", sabi.FooConnCfg{}) - proc.AddLocalConnCfg("bar", &sabi.BarConnCfg{}) + proc.AddLocalConnCfg("bar", &sabi.BarConnCfg{Store: store}) - txn := proc.NewTxn(GetAndSetDataLogic) + txn := proc.Txn(GetAndSetDataLogic) sabi.WillFailToCommitFooConn = true - m, err := txn.Run() + err := txn.Run() switch err.Reason().(type) { case sabi.FailToCommitConn: errs := err.Get("Errors").(map[string]sabi.Err) @@ -73,5 +81,6 @@ func TestTxn_Run_failToCommitConn(t *testing.T) { default: assert.Fail(t, err.Error()) } - assert.Equal(t, m["result"], "GETDATA") + + assert.Equal(t, store["result"], "GETDATA") }