Skip to content

Commit

Permalink
interface change
Browse files Browse the repository at this point in the history
type of report and report channel to reporter is 1-to-many, thus
backend provider could have this assumption.
  • Loading branch information
mission-liao committed Dec 30, 2015
1 parent 14831d0 commit d871139
Show file tree
Hide file tree
Showing 14 changed files with 160 additions and 79 deletions.
6 changes: 5 additions & 1 deletion amqp/amqp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,11 @@ func TestAmqp127_0_0_1(t *testing.T) {

ch, err := conn.Channel()
ass.Nil(err)
defer conn.ReleaseChannel(ch)
if err != nil {
conn.ReleaseChannel(ch)
} else {
conn.ReleaseChannel(nil)
}

ass.Nil(conn.Close())
}
2 changes: 1 addition & 1 deletion amqp/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ func (me *backend) ReporterHook(eventID int, payload interface{}) (err error) {
return
}

func (me *backend) Report(reports <-chan *dingo.ReportEnvelope) (id int, err error) {
func (me *backend) Report(name string, reports <-chan *dingo.ReportEnvelope) (id int, err error) {
quit, done, id := me.reporters.New(0)
go me._reporter_routine_(quit, done, me.reporters.Events(), reports)
return
Expand Down
6 changes: 4 additions & 2 deletions backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,15 @@ type Reporter interface {
// - err: errors
ReporterHook(eventID int, payload interface{}) (err error)

// attach a report channel to backend.
// attach a report channel to backend. what dingo can promise is:
// - all reports belongs to the same task(name, id) would be sent through the same channel
//
// parameters:
// - name: all reports sent through this channel would be this name
// - reports: a input channel to receive reports from dingo.
// returns:
// - err: errors
Report(reports <-chan *ReportEnvelope) (id int, err error)
Report(name string, reports <-chan *ReportEnvelope) (id int, err error)
}

/*StoreEvent are those IDs of events that might be sent to StoreHook
Expand Down
2 changes: 1 addition & 1 deletion backend_local.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ func (bkd *localBackend) ReporterHook(eventID int, payload interface{}) (err err
return
}

func (bkd *localBackend) Report(reports <-chan *ReportEnvelope) (id int, err error) {
func (bkd *localBackend) Report(name string, reports <-chan *ReportEnvelope) (id int, err error) {
quit, done, id := bkd.reporters.New(0)
go bkd.reporterRoutine(quit, done, bkd.reporters.Events(), reports)

Expand Down
2 changes: 1 addition & 1 deletion backend_local_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ func TestLocalReporter(t *testing.T) {

// test case for Report/Unbind
reports := make(chan *dingo.ReportEnvelope, 10)
_, err = reporter.Report(reports)
_, err = reporter.Report("TestLocalReporter", reports)
ass.Nil(err)

// teardown
Expand Down
158 changes: 107 additions & 51 deletions backend_test_suite.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,12 @@ import (
type BackendTestSuite struct {
suite.Suite

Gen func() (Backend, error)
Trans *fnMgr
Bkd Backend
Rpt Reporter
Sto Store
Reports chan *ReportEnvelope
Tasks []*Task
Gen func() (Backend, error)
Trans *fnMgr
Bkd Backend
Rpt Reporter
Sto Store
Tasks []*Task
}

func (ts *BackendTestSuite) SetupSuite() {
Expand All @@ -51,21 +50,13 @@ func (ts *BackendTestSuite) SetupTest() {
ts.Rpt, ts.Sto = ts.Bkd.(Reporter), ts.Bkd.(Store)
ts.NotNil(ts.Rpt)
ts.NotNil(ts.Sto)

ts.Reports = make(chan *ReportEnvelope, 10)
_, err = ts.Rpt.Report(ts.Reports)
ts.Nil(err)

ts.Tasks = []*Task{}
}

func (ts *BackendTestSuite) TearDownTest() {
ts.Nil(ts.Bkd.(Object).Close())
ts.Bkd, ts.Rpt, ts.Sto = nil, nil, nil

close(ts.Reports)
ts.Reports = nil

ts.Tasks = nil
}

Expand All @@ -74,34 +65,58 @@ func (ts *BackendTestSuite) TearDownTest() {
//

func (ts *BackendTestSuite) TestBasic() {
var err error
defer func() {
ts.Nil(err)
}()

// register an encoding for this method
ts.Nil(ts.Trans.Register("basic", func() {}))
err = ts.Trans.Register("basic", func() {})
if err != nil {
return
}

// compose a dummy task
task, err := ts.Trans.ComposeTask("basic", nil, []interface{}{})
ts.Nil(err)
if err != nil {
return
}

// trigger hook
ts.Nil(ts.Rpt.ReporterHook(ReporterEvent.BeforeReport, task))
err = ts.Rpt.ReporterHook(ReporterEvent.BeforeReport, task)
if err != nil {
return
}

// register a report channel
reports := make(chan *ReportEnvelope, 10)
_, err = ts.Rpt.Report("basic", reports)
if err != nil {
return
}

// send a report
report, err := task.composeReport(Status.Sent, make([]interface{}, 0), nil)
ts.Nil(err)
{
b, err := ts.Trans.EncodeReport(report)
ts.Nil(err)
ts.Reports <- &ReportEnvelope{
ID: report,
Body: b,
}
if err != nil {
return
}
b, err := ts.Trans.EncodeReport(report)
if err != nil {
return
}
reports <- &ReportEnvelope{
ID: report,
Body: b,
}

// polling
reports, err := ts.Sto.Poll(task)
ts.Nil(err)
ts.NotNil(reports)
rs, err := ts.Sto.Poll(task)
if err != nil {
return
}
ts.NotNil(rs)
select {
case v, ok := <-reports:
case v, ok := <-rs:
ts.True(ok)
if !ok {
break
Expand All @@ -112,19 +127,22 @@ func (ts *BackendTestSuite) TestBasic() {
}

// done polling
ts.Nil(ts.Sto.Done(task))
err = ts.Sto.Done(task)
if err != nil {
return
}

ts.Tasks = append(ts.Tasks, task)
}

func (ts *BackendTestSuite) send(task *Task, s int16) {
func (ts *BackendTestSuite) send(task *Task, out chan<- *ReportEnvelope, s int16) {
r, err := task.composeReport(s, nil, nil)
ts.Nil(err)

b, err := ts.Trans.EncodeReport(r)
ts.Nil(err)

ts.Reports <- &ReportEnvelope{task, b}
out <- &ReportEnvelope{task, b}
}

func (ts *BackendTestSuite) chk(task *Task, b []byte, s int16) {
Expand All @@ -138,14 +156,14 @@ func (ts *BackendTestSuite) chk(task *Task, b []byte, s int16) {
}
}

func (ts *BackendTestSuite) gen(task *Task, wait *sync.WaitGroup) {
func (ts *BackendTestSuite) gen(task *Task, out chan<- *ReportEnvelope, wait *sync.WaitGroup) {
defer wait.Done()

ts.Nil(ts.Rpt.ReporterHook(ReporterEvent.BeforeReport, task))

ts.send(task, Status.Sent)
ts.send(task, Status.Progress)
ts.send(task, Status.Success)
ts.send(task, out, Status.Sent)
ts.send(task, out, Status.Progress)
ts.send(task, out, Status.Success)
}

func (ts *BackendTestSuite) chks(task *Task, wait *sync.WaitGroup) {
Expand All @@ -162,20 +180,36 @@ func (ts *BackendTestSuite) chks(task *Task, wait *sync.WaitGroup) {
}

func (ts *BackendTestSuite) TestOrder() {
// send reports of tasks, make sure their order correct
ts.Nil(ts.Trans.Register("order", func() {}))

var (
tasks []*Task
wait sync.WaitGroup
err error
tasks []*Task
t *Task
wait sync.WaitGroup
reports = make(chan *ReportEnvelope, 10)
)
defer func() {
ts.Nil(err)
}()

// send reports of tasks, make sure their order correct
err = ts.Trans.Register("order", func() {})
if err != nil {
return
}

_, err = ts.Rpt.Report("order", reports)
if err != nil {
return
}

for i := 0; i < 100; i++ {
t, err := ts.Trans.ComposeTask("order", nil, nil)
ts.Nil(err)
t, err = ts.Trans.ComposeTask("order", nil, nil)
if err != nil {
return
}
if t != nil {
wait.Add(1)
go ts.gen(t, &wait)
go ts.gen(t, reports, &wait)

tasks = append(tasks, t)
}
Expand Down Expand Up @@ -211,22 +245,44 @@ func (ts *BackendTestSuite) TestSameID() {
countOfTypes = 10
countOfTasks = 10
tasks []*Task
t *Task
wait sync.WaitGroup
err error
)
defer func() {
ts.Nil(err)
}()

// register idMaker, task
for i := 0; i < countOfTypes; i++ {
name := fmt.Sprintf("SameID.%d", i)
ts.Nil(ts.Trans.AddIDMaker(100+i, &testSeqID{}))
ts.Nil(ts.Trans.Register(name, func() {}))
ts.Nil(ts.Trans.SetIDMaker(name, 100+i))
err = ts.Trans.AddIDMaker(100+i, &testSeqID{})
if err != nil {
return
}
err = ts.Trans.Register(name, func() {})
if err != nil {
return
}
err = ts.Trans.SetIDMaker(name, 100+i)
if err != nil {
return
}

reports := make(chan *ReportEnvelope, 10)
_, err = ts.Rpt.Report(name, reports)
if err != nil {
return
}

for j := 0; j < countOfTasks; j++ {
t, err := ts.Trans.ComposeTask(name, nil, nil)
ts.Nil(err)
t, err = ts.Trans.ComposeTask(name, nil, nil)
if err != nil {
return
}
if t != nil {
wait.Add(1)
go ts.gen(t, &wait)
go ts.gen(t, reports, &wait)

tasks = append(tasks, t)
}
Expand Down
2 changes: 1 addition & 1 deletion bridge.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ type bridge interface {
//
// proxy for Reporter
//
Report(reports <-chan *Report) (err error)
Report(name string, reports <-chan *Report) (err error)

//
// proxy for Store
Expand Down
6 changes: 3 additions & 3 deletions bridge_local.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ func newLocalBridge(args ...interface{}) (b bridge) {
events: make(chan *Event, 10),
listeners: NewRoutines(),
reporters: NewRoutines(),
broker: make(chan *Task, 100), // TODO: config
pollers: make(chan *localStorePoller, 10), // TODO: config
broker: make(chan *Task, 100), // TODO: config
pollers: make(chan *localStorePoller, 100), // TODO: config
supported: ObjT.Reporter | ObjT.Store | ObjT.Producer | ObjT.Consumer,
}
b = v
Expand Down Expand Up @@ -151,7 +151,7 @@ func (bdg *localBridge) StopAllListeners() (err error) {
return
}

func (bdg *localBridge) Report(reports <-chan *Report) (err error) {
func (bdg *localBridge) Report(name string, reports <-chan *Report) (err error) {
bdg.objLock.Lock()
defer bdg.objLock.Unlock()

Expand Down
4 changes: 2 additions & 2 deletions bridge_remote.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ func (bdg *remoteBridge) StopAllListeners() (err error) {
return
}

func (bdg *remoteBridge) Report(reports <-chan *Report) (err error) {
func (bdg *remoteBridge) Report(name string, reports <-chan *Report) (err error) {
bdg.reporterLock.RLock()
defer bdg.reporterLock.RUnlock()

Expand All @@ -145,7 +145,7 @@ func (bdg *remoteBridge) Report(reports <-chan *Report) (err error) {
}

r := make(chan *ReportEnvelope, 10)
if _, err = bdg.reporter.Report(r); err != nil {
if _, err = bdg.reporter.Report(name, r); err != nil {
return
}

Expand Down
4 changes: 2 additions & 2 deletions bridge_remote_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ func (ts *remoteBridgeTestSuite) TestReturnFix() {

// attach a reporting channel
reports := make(chan *Report, 10)
ts.Nil(ts.bg.Report(reports))
ts.Nil(ts.bg.Report("ReturnFix", reports))

// poll the task
outputs, err := ts.bg.Poll(t)
Expand Down Expand Up @@ -124,7 +124,7 @@ func (ts *remoteBridgeTestSuite) TestReport() {
bg := newRemoteBridge(ts.trans)

// add a report channel, should fail
ts.NotNil(bg.Report(make(chan *Report, 10)))
ts.NotNil(bg.Report("TestReport", make(chan *Report, 10)))
}

func (ts *remoteBridgeTestSuite) TestPoll() {
Expand Down
Loading

0 comments on commit d871139

Please sign in to comment.