Skip to content

Commit

Permalink
improve converge
Browse files Browse the repository at this point in the history
  • Loading branch information
mission-liao committed Dec 27, 2015
1 parent e6541a9 commit 63a257a
Show file tree
Hide file tree
Showing 21 changed files with 723 additions and 279 deletions.
21 changes: 5 additions & 16 deletions bridge_local.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,22 +121,17 @@ func (bdg *localBridge) AddListener(rcpt <-chan *TaskReceipt) (tasks <-chan *Tas
}
}
clean:
finished := false
for {
select {
case t, ok := <-input:
if !ok {
finished = true
break
break clean
}
if out(t) {
finished = true
break clean
}
default:
finished = true
}
if finished {
break
break clean
}
}
close(output)
Expand Down Expand Up @@ -260,13 +255,11 @@ func (bdg *localBridge) Report(reports <-chan *Report) (err error) {
}
}
clean:
finished := false
for {
select {
case v, ok := <-inputs:
if !ok {
finished = true
break
break clean
}

if !outF(v) {
Expand All @@ -276,11 +269,7 @@ func (bdg *localBridge) Report(reports <-chan *Report) (err error) {
)
}
default:
finished = true
}

if finished {
break
break clean
}
}

Expand Down
26 changes: 11 additions & 15 deletions bridge_remote.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,28 +167,27 @@ func (bdg *remoteBridge) Report(reports <-chan *Report) (err error) {
Body: b,
}
}
finished:
for {
select {
case _, _ = <-quit:
break finished
goto clean
case v, ok := <-input:
if !ok {
break finished
goto clean
}
out(v)
}
}

clean:
for {
select {
case v, ok := <-input:
if !ok {
return
break clean
}
out(v)
default:
return
break clean
}
}
}(bdg.reporters.New(), bdg.reporters.Wait(), bdg.events, reports, r)
Expand Down Expand Up @@ -258,32 +257,29 @@ func (bdg *remoteBridge) Poll(t *Task) (reports <-chan *Report, err error) {
done = r.Done()
return done
}

finished:
for {
select {
case _, _ = <-quit:
break finished
goto clean
case v, ok := <-inputs:
if !ok {
break finished
goto clean
}
if out(v) {
break finished
goto clean
}
}
}

cleared:
clean:
for {
select {
case v, ok := <-inputs:
if !ok {
break cleared
break clean
}
out(v)
default:
break cleared
break clean
}
}
}(bdg.storers.New(), bdg.storers.Wait(), bdg.storers.Events(), r, reports2)
Expand Down
89 changes: 89 additions & 0 deletions bridge_remote_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,3 +85,92 @@ func (ts *remoteBridgeTestSuite) TestReturnFix() {
ts.Equal(float64(6), v)
}
}

func (ts *remoteBridgeTestSuite) TestSendTask() {
var (
bg = newRemoteBridge(ts.trans)
err error
)
defer func() {
ts.Nil(err)
}()

// register a task
err = ts.trans.Register("TestSendTask", func() {})
if err != nil {
return
}

// compose a task
task, err := ts.trans.ComposeTask("TestSendTask", nil, nil)
if err != nil {
return
}

// send it, should fail
ts.NotNil(bg.SendTask(task))
}

func (ts *remoteBridgeTestSuite) TestAddListener() {
bg := newRemoteBridge(ts.trans)

// add a new listener, should fail
tasks, err := bg.AddListener(make(chan *TaskReceipt, 10))
ts.Nil(tasks)
ts.NotNil(err)
}

func (ts *remoteBridgeTestSuite) TestAddNamedListener() {
var (
bg = newRemoteBridge(ts.trans)
err error
)
defer func() {
ts.Nil(err)
}()

// register a task
err = ts.trans.Register("TestAddNamedListener", func() {})
if err != nil {
return
}

// add a named listener, should fail
tasks, err2 := bg.AddNamedListener("TestAddNamedListener", make(chan *TaskReceipt, 10))
ts.Nil(tasks)
ts.NotNil(err2)
}

func (ts *remoteBridgeTestSuite) TestReport() {
bg := newRemoteBridge(ts.trans)

// add a report channel, should fail
ts.NotNil(bg.Report(make(chan *Report, 10)))
}

func (ts *remoteBridgeTestSuite) TestPoll() {
var (
bg = newRemoteBridge(ts.trans)
err error
)
defer func() {
ts.Nil(err)
}()

// register a task
err = ts.trans.Register("TestPoll", func() {})
if err != nil {
return
}

// compose a task
task, err := ts.trans.ComposeTask("TestPoll", nil, nil)
if err != nil {
return
}

// poll it, should fail
reports, err2 := bg.Poll(task)
ts.Nil(reports)
ts.NotNil(err2)
}
1 change: 1 addition & 0 deletions broker_local.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ func (brk *localBroker) consumerRoutine(quit <-chan int, wait *sync.WaitGroup, e
}
}
clean:
// TODO: clean up
}

//
Expand Down
15 changes: 15 additions & 0 deletions dingo.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,7 @@ func NewApp(nameOfBridge string, cfg *Config) (app *App, err error) {

func (dg *App) attachObject(obj Object, types int) (err error) {
if obj == nil {
err = errors.New("object to be attached is nil")
return
}

Expand Down Expand Up @@ -353,6 +354,8 @@ returns:
- err: any error produced
*/
func (dg *App) Allocate(name string, count, share int) (remain int, err error) {
remain = count

// check if this name register
_, err = dg.trans.GetOption(name)
if err != nil {
Expand Down Expand Up @@ -443,6 +446,11 @@ ObjT.Consumer|ObjT.Reporter, if reporting is not reuqired(make sure there is no
then only ObjT.Consumer is used.
*/
func (dg *App) Use(obj Object, types int) (id int, used int, err error) {
if obj == nil {
err = errors.New("object to be attached is nil")
return
}

dg.objsLock.Lock()
defer dg.objsLock.Unlock()

Expand Down Expand Up @@ -498,6 +506,13 @@ func (dg *App) Use(obj Object, types int) (id int, used int, err error) {
}
}
}
if types&ObjT.NamedConsumer == ObjT.NamedConsumer {
namedConsumer, ok = obj.(NamedConsumer)
if !ok {
err = errors.New("named consumer is not found")
return
}
}
if types&ObjT.NamedConsumer == ObjT.NamedConsumer {
namedConsumer, ok = obj.(NamedConsumer)
if !ok {
Expand Down
8 changes: 4 additions & 4 deletions dingo_amqp_redis_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,12 @@ import (
//

type amqpRedisSingleAppTestSuite struct {
dingo.DingoSingleAppTestSuite
DingoSingleAppTestSuite
}

func TestDingoAmqpRedisSingleAppSuite(t *testing.T) {
suite.Run(t, &amqpRedisSingleAppTestSuite{
dingo.DingoSingleAppTestSuite{
DingoSingleAppTestSuite{
GenApp: func() (app *dingo.App, err error) {
app, err = dingo.NewApp("remote", nil)
if err != nil {
Expand Down Expand Up @@ -54,12 +54,12 @@ func TestDingoAmqpRedisSingleAppSuite(t *testing.T) {
//

type amqpRedisMultiAppTestSuite struct {
dingo.DingoMultiAppTestSuite
DingoMultiAppTestSuite
}

func TestDingoAmqpRedisMultiAppSuite(t *testing.T) {
suite.Run(t, &amqpRedisMultiAppTestSuite{
dingo.DingoMultiAppTestSuite{
DingoMultiAppTestSuite{
CountOfCallers: 3,
CountOfWorkers: 3,
GenCaller: func() (app *dingo.App, err error) {
Expand Down
8 changes: 4 additions & 4 deletions dingo_amqp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,12 @@ import (
//

type amqpSingleAppTestSuite struct {
dingo.DingoSingleAppTestSuite
DingoSingleAppTestSuite
}

func TestDingoAmqpSingleAppSuite(t *testing.T) {
suite.Run(t, &amqpSingleAppTestSuite{
dingo.DingoSingleAppTestSuite{
DingoSingleAppTestSuite{
GenApp: func() (app *dingo.App, err error) {
app, err = dingo.NewApp("remote", nil)
if err != nil {
Expand Down Expand Up @@ -53,12 +53,12 @@ func TestDingoAmqpSingleAppSuite(t *testing.T) {
//

type amqpMultiAppTestSuite struct {
dingo.DingoMultiAppTestSuite
DingoMultiAppTestSuite
}

func TestDingoAmqpMultiAppSuite(t *testing.T) {
suite.Run(t, &amqpMultiAppTestSuite{
dingo.DingoMultiAppTestSuite{
DingoMultiAppTestSuite{
CountOfCallers: 3,
CountOfWorkers: 3,
GenCaller: func() (app *dingo.App, err error) {
Expand Down
Loading

0 comments on commit 63a257a

Please sign in to comment.