Skip to content

Commit

Permalink
new: added Proc, Txn, Runner, Seq, Para, RunSeq, and RunPara (#11)
Browse files Browse the repository at this point in the history
  • Loading branch information
sttk committed Sep 20, 2022
1 parent ba6151f commit 0d1a59f
Show file tree
Hide file tree
Showing 9 changed files with 386 additions and 72 deletions.
8 changes: 0 additions & 8 deletions conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,6 @@ type ConnBase struct {
isLocalConnCfgSealed bool
localConnCfgMap map[string]ConnCfg
connMap map[string]Conn
innerMap map[string]any
connMutex sync.Mutex
}

Expand All @@ -81,7 +80,6 @@ func NewConnBase() *ConnBase {
isLocalConnCfgSealed: false,
localConnCfgMap: make(map[string]ConnCfg),
connMap: make(map[string]Conn),
innerMap: make(map[string]any),
}
}

Expand Down Expand Up @@ -125,12 +123,6 @@ func (base *ConnBase) GetConn(name string) (Conn, Err) {
return conn, Ok()
}

// InnerMap gets a singular map in a ConnBase/Proc for communicating among
// multiple data accesses.
func (base *ConnBase) InnerMap() map[string]any {
return base.innerMap
}

func (base *ConnBase) begin() {
base.isLocalConnCfgSealed = true
isGlobalConnCfgSealed = true
Expand Down
34 changes: 6 additions & 28 deletions conn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ func (cfg FooConnCfg) CreateConn() (Conn, Err) {

type BarConn struct {
Label string
store map[string]string
}

func (conn *BarConn) Commit() Err {
Expand All @@ -67,13 +68,17 @@ func (conn *BarConn) Rollback() {
func (conn *BarConn) Close() {
logs.PushBack("BarConn#Close")
}
func (conn *BarConn) Store(name, value string) {
conn.store[name] = value
}

type BarConnCfg struct {
Label string
Store map[string]string
}

func (cfg BarConnCfg) CreateConn() (Conn, Err) {
return &BarConn{Label: cfg.Label}, Ok()
return &BarConn{Label: cfg.Label, store: cfg.Store}, Ok()
}

func TestAddGlobalConnCfg(t *testing.T) {
Expand Down Expand Up @@ -133,7 +138,6 @@ func TestNewConnBase(t *testing.T) {
assert.False(t, base.isLocalConnCfgSealed)
assert.Equal(t, len(base.localConnCfgMap), 0)
assert.Equal(t, len(base.connMap), 0)
assert.Equal(t, len(base.innerMap), 0)
}

func TestConnBase_addLocalConnCfg(t *testing.T) {
Expand All @@ -145,21 +149,18 @@ func TestConnBase_addLocalConnCfg(t *testing.T) {
assert.False(t, base.isLocalConnCfgSealed)
assert.Equal(t, len(base.localConnCfgMap), 0)
assert.Equal(t, len(base.connMap), 0)
assert.Equal(t, len(base.innerMap), 0)

base.addLocalConnCfg("foo", FooConnCfg{})

assert.False(t, base.isLocalConnCfgSealed)
assert.Equal(t, len(base.localConnCfgMap), 1)
assert.Equal(t, len(base.connMap), 0)
assert.Equal(t, len(base.innerMap), 0)

base.addLocalConnCfg("bar", &BarConnCfg{})

assert.False(t, base.isLocalConnCfgSealed)
assert.Equal(t, len(base.localConnCfgMap), 2)
assert.Equal(t, len(base.connMap), 0)
assert.Equal(t, len(base.innerMap), 0)
}

func TestConnBase_begin(t *testing.T) {
Expand All @@ -172,31 +173,27 @@ func TestConnBase_begin(t *testing.T) {
assert.False(t, base.isLocalConnCfgSealed)
assert.Equal(t, len(base.localConnCfgMap), 0)
assert.Equal(t, len(base.connMap), 0)
assert.Equal(t, len(base.innerMap), 0)

base.addLocalConnCfg("foo", FooConnCfg{})

assert.False(t, isGlobalConnCfgSealed)
assert.False(t, base.isLocalConnCfgSealed)
assert.Equal(t, len(base.localConnCfgMap), 1)
assert.Equal(t, len(base.connMap), 0)
assert.Equal(t, len(base.innerMap), 0)

base.begin()

assert.True(t, isGlobalConnCfgSealed)
assert.True(t, base.isLocalConnCfgSealed)
assert.Equal(t, len(base.localConnCfgMap), 1)
assert.Equal(t, len(base.connMap), 0)
assert.Equal(t, len(base.innerMap), 0)

base.addLocalConnCfg("bar", &BarConnCfg{})

assert.True(t, isGlobalConnCfgSealed)
assert.True(t, base.isLocalConnCfgSealed)
assert.Equal(t, len(base.localConnCfgMap), 1)
assert.Equal(t, len(base.connMap), 0)
assert.Equal(t, len(base.innerMap), 0)

base.isLocalConnCfgSealed = false

Expand All @@ -206,7 +203,6 @@ func TestConnBase_begin(t *testing.T) {
assert.False(t, base.isLocalConnCfgSealed)
assert.Equal(t, len(base.localConnCfgMap), 2)
assert.Equal(t, len(base.connMap), 0)
assert.Equal(t, len(base.innerMap), 0)
}

func TestConnBase_GetConn_withLocalConnCfg(t *testing.T) {
Expand Down Expand Up @@ -311,24 +307,6 @@ func TestConnBase_GetConn_failToCreateConn(t *testing.T) {
}
}

func TestConnBase_InnerMap(t *testing.T) {
Clear()
defer Clear()

base := NewConnBase()

m := base.InnerMap()
assert.Nil(t, m["param"])
m["param"] = 123

m = base.InnerMap()
assert.Equal(t, m["param"], 123)
m["param"] = 456

m = base.InnerMap()
assert.Equal(t, m["param"], 456)
}

func TestConnBase_commit(t *testing.T) {
Clear()
defer Clear()
Expand Down
6 changes: 2 additions & 4 deletions dax.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,8 @@

package sabi

// Dax is an interface for a set of data accesses, and requires two method:
// #GetConn which gets a connection to an external data access, and #InnerMap
// which gets a map to communicate data among multiple data accesses.
// Dax is an interface for a set of data accesses, and requires a method:
// #GetConn which gets a connection to an external data access.
type Dax interface {
GetConn(name string) (Conn, Err)
InnerMap() map[string]any
}
8 changes: 4 additions & 4 deletions proc.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ func (proc Proc[D]) AddLocalConnCfg(name string, cfg ConnCfg) {

// RunTxn is a method which runs logic functions specified as arguments in a
// transaction.
func (proc Proc[D]) RunTxn(logics ...func(dax D) Err) (map[string]any, Err) {
func (proc Proc[D]) RunTxn(logics ...func(dax D) Err) Err {
proc.connBase.begin()

err := Ok()
Expand All @@ -45,13 +45,13 @@ func (proc Proc[D]) RunTxn(logics ...func(dax D) Err) (map[string]any, Err) {

proc.connBase.close()

return proc.connBase.innerMap, err
return err
}

// NewTxn is a method which creates a transaction having specified logic
// functions.
func (proc Proc[D]) NewTxn(logics ...func(dax D) Err) Txn[D] {
return Txn[D]{
func (proc Proc[D]) Txn(logics ...func(dax D) Err) Runner {
return txnRunner[D]{
logics: logics,
connBase: proc.connBase,
dax: proc.dax,
Expand Down
31 changes: 20 additions & 11 deletions proc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,11 +51,11 @@ func NewBarSetDataDax(base sabi.Dax) BarSetDataDax {
}

func (dax BarSetDataDax) SetData(data string) sabi.Err {
_, err := dax.GetBarConn("bar")
conn, err := dax.GetBarConn("bar")
if !err.IsOk() {
return err
}
dax.InnerMap()["result"] = data
conn.Store("result", data)
return sabi.Ok()
}

Expand All @@ -80,12 +80,15 @@ func TestProc_RunTxn(t *testing.T) {
sabi.AddGlobalConnCfg("foo", sabi.FooConnCfg{})
sabi.SealGlobalConnCfgs()

store := make(map[string]string)

proc := NewProc()
proc.AddLocalConnCfg("bar", &sabi.BarConnCfg{})
proc.AddLocalConnCfg("bar", &sabi.BarConnCfg{Store: store})

m, err := proc.RunTxn(GetAndSetDataLogic)
err := proc.RunTxn(GetAndSetDataLogic)
assert.True(t, err.IsOk())
assert.Equal(t, m["result"], "GETDATA")

assert.Equal(t, store["result"], "GETDATA")
}

func TestProc_RunTxn_failToGetConn(t *testing.T) {
Expand All @@ -95,12 +98,14 @@ func TestProc_RunTxn_failToGetConn(t *testing.T) {
sabi.AddGlobalConnCfg("foo", sabi.FooConnCfg{})
sabi.SealGlobalConnCfgs()

store := make(map[string]string)

proc := NewProc()
proc.AddLocalConnCfg("bar", &sabi.BarConnCfg{})
proc.AddLocalConnCfg("bar", &sabi.BarConnCfg{Store: store})

sabi.WillFailToCreateFooConn = true

m, err := proc.RunTxn(GetAndSetDataLogic)
err := proc.RunTxn(GetAndSetDataLogic)
switch err.Reason().(type) {
case sabi.FailToCreateConn:
assert.Equal(t, err.Get("Name"), "foo")
Expand All @@ -112,7 +117,8 @@ func TestProc_RunTxn_failToGetConn(t *testing.T) {
default:
assert.Fail(t, err.Error())
}
assert.Nil(t, m["result"])

assert.Equal(t, store["result"], "")
}

func TestProc_RunTxn_failToCommitConn(t *testing.T) {
Expand All @@ -122,12 +128,14 @@ func TestProc_RunTxn_failToCommitConn(t *testing.T) {
sabi.AddGlobalConnCfg("foo", sabi.FooConnCfg{})
sabi.SealGlobalConnCfgs()

store := make(map[string]string)

proc := NewProc()
proc.AddLocalConnCfg("bar", &sabi.BarConnCfg{})
proc.AddLocalConnCfg("bar", &sabi.BarConnCfg{Store: store})

sabi.WillFailToCommitFooConn = true

m, err := proc.RunTxn(GetAndSetDataLogic)
err := proc.RunTxn(GetAndSetDataLogic)
switch err.Reason().(type) {
case sabi.FailToCommitConn:
errs := err.Get("Errors").(map[string]sabi.Err)
Expand All @@ -140,5 +148,6 @@ func TestProc_RunTxn_failToCommitConn(t *testing.T) {
default:
assert.Fail(t, err.Error())
}
assert.Equal(t, m["result"], "GETDATA")

assert.Equal(t, store["result"], "GETDATA")
}
121 changes: 121 additions & 0 deletions runner.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
// Copyright (C) 2022 Takayuki Sato. All Rights Reserved.
// This program is free software under MIT License.
// See the file LICENSE in this distribution for more details.

package sabi

import (
"strconv"
)

type /* error reasons */ (
// FailToRunInParallel is a error reason which indicates some runner which
// is runned in parallel failed.
FailToRunInParallel struct {
Errors map[string]Err
}
)

// Runner is an interface which has #Run method and is runned by RunSeq or
// RunPara functions.
type Runner interface {
Run() Err
}

type seqRunner struct {
runners []Runner
}

func (r seqRunner) Run() Err {
for _, runner := range r.runners {
err := runner.Run()
if !err.IsOk() {
return err
}
}
return Ok()
}

// Seq is a function which creates a runner which runs multiple runners
// specified as arguments sequencially.
func Seq(runners ...Runner) Runner {
return seqRunner{runners: runners[:]}
}

type paraRunner struct {
runners []Runner
}

func (r paraRunner) Run() Err {
ch := make(chan Err)

for _, runner := range r.runners {
go func(runner Runner, ch chan Err) {
err := runner.Run()
ch <- err
}(runner, ch)
}

errs := make(map[string]Err)
n := len(r.runners)
for i := 0; i < n; i++ {
select {
case err := <-ch:
if !err.IsOk() {
errs[strconv.Itoa(i)] = err
}
}
}

if len(errs) > 0 {
return ErrBy(FailToRunInParallel{Errors: errs})
}

return Ok()
}

// Para is a function which creates a runner which runs multiple runners
// specified as arguments in parallel.
func Para(runners ...Runner) Runner {
return paraRunner{runners: runners[:]}
}

// RunSeq is a function which runs specified runners sequencially.
func RunSeq(runners ...Runner) Err {
for _, runner := range runners {
err := runner.Run()
if !err.IsOk() {
return err
}
}
return Ok()
}

// RunPara is a function which runs specified runners in parallel.
func RunPara(runners ...Runner) Err {
ch := make(chan Err)

for _, runner := range runners {
go func(runner Runner, ch chan Err) {
err := runner.Run()
ch <- err
}(runner, ch)
}

errs := make(map[string]Err)
n := len(runners)
for i := 0; i < n; i++ {
select {
case err := <-ch:
if !err.IsOk() {
errs[strconv.Itoa(i)] = err
}
}
}

if len(errs) > 0 {
return ErrBy(FailToRunInParallel{Errors: errs})
}

return Ok()
}

0 comments on commit 0d1a59f

Please sign in to comment.