Skip to content

Commit

Permalink
test: dispatcher 包完善测试用例
Browse files Browse the repository at this point in the history
  • Loading branch information
kercylan98 committed Jan 12, 2024
1 parent 7528dc4 commit 90b7e4c
Show file tree
Hide file tree
Showing 3 changed files with 403 additions and 45 deletions.
37 changes: 37 additions & 0 deletions server/internal/dispatcher/dispatcher_example_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package dispatcher_test

import (
"fmt"
"github.com/kercylan98/minotaur/server/internal/dispatcher"
"sync"
"sync/atomic"
)

func ExampleNewDispatcher() {
m := new(atomic.Int64)
fm := new(atomic.Int64)
w := new(sync.WaitGroup)
w.Add(1)
d := dispatcher.NewDispatcher(1024, "example-dispatcher", func(dispatcher *dispatcher.Dispatcher[string, *TestMessage], message *TestMessage) {
m.Add(1)
})
d.SetClosedHandler(func(dispatcher *dispatcher.Action[string, *TestMessage]) {
w.Done()
})
var producers = []string{"producer1", "producer2", "producer3"}
for i := 0; i < len(producers); i++ {
p := producers[i]
for i := 0; i < 10; i++ {
d.Put(&TestMessage{producer: p})
}
d.SetProducerDoneHandler(p, func(p string, dispatcher *dispatcher.Action[string, *TestMessage]) {
fm.Add(1)
})
}
d.Start()
d.Expel()
w.Wait()
fmt.Println(fmt.Sprintf("producer num: %d, producer done: %d, finished: %d", len(producers), fm.Load(), m.Load()))
// Output:
// producer num: 3, producer done: 3, finished: 30
}
156 changes: 149 additions & 7 deletions server/internal/dispatcher/dispatcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ func TestNewDispatcher(t *testing.T) {
}

for _, c := range cases {
c := c
t.Run(c.name, func(t *testing.T) {
defer func() {
if r := recover(); r != nil && !c.shouldPanic {
Expand All @@ -54,23 +55,24 @@ func TestDispatcher_SetProducerDoneHandler(t *testing.T) {
}

for _, c := range cases {
c := c
t.Run(c.name, func(t *testing.T) {
c.messageFinish = &atomic.Bool{}
w := new(sync.WaitGroup)
d := dispatcher.NewDispatcher(1024, c.name, func(dispatcher *dispatcher.Dispatcher[string, *TestMessage], message *TestMessage) {
w.Done()
})
d.SetProducerDoneHandler("producer", func(p string, dispatcher *dispatcher.Dispatcher[string, *TestMessage]) {
d.Put(&TestMessage{producer: c.producer})
d.SetProducerDoneHandler(c.producer, func(p string, dispatcher *dispatcher.Action[string, *TestMessage]) {
c.messageFinish.Store(true)
})
if c.cancel {
d.SetProducerDoneHandler("producer", nil)
d.SetProducerDoneHandler(c.producer, nil)
}
d.Put(&TestMessage{producer: "producer"})
w.Add(1)
d.Start()
w.Wait()
if c.messageFinish.Load() && c.cancel {
if c.cancel && c.messageFinish.Load() {
t.Errorf("%s should cancel, but not", c.name)
}
})
Expand All @@ -90,14 +92,15 @@ func TestDispatcher_SetClosedHandler(t *testing.T) {
}

for _, c := range cases {
c := c
t.Run(c.name, func(t *testing.T) {
c.handlerFinishMsgCount = &atomic.Int64{}
w := new(sync.WaitGroup)
d := dispatcher.NewDispatcher(1024, c.name, func(dispatcher *dispatcher.Dispatcher[string, *TestMessage], message *TestMessage) {
time.Sleep(c.msgTime)
c.handlerFinishMsgCount.Add(1)
})
d.SetClosedHandler(func(dispatcher *dispatcher.Dispatcher[string, *TestMessage]) {
d.SetClosedHandler(func(dispatcher *dispatcher.Action[string, *TestMessage]) {
w.Done()
})
for i := 0; i < c.msgCount; i++ {
Expand All @@ -114,6 +117,41 @@ func TestDispatcher_SetClosedHandler(t *testing.T) {
}
}

func TestIncrCount(t *testing.T) {
var cases = []struct {
name string
producer string
messageDone *atomic.Bool
}{
{name: "TestIncrCount_Normal", producer: "producer"},
}

for _, c := range cases {
c := c
t.Run(c.name, func(t *testing.T) {
c.messageDone = &atomic.Bool{}
w := new(sync.WaitGroup)
w.Add(1)
var d *dispatcher.Dispatcher[string, *TestMessage]
d = dispatcher.NewDispatcher(1024, c.name, func(dispatcher *dispatcher.Dispatcher[string, *TestMessage], message *TestMessage) {
c.messageDone.Store(true)
d.IncrCount(c.producer, -1)
})
d.SetClosedHandler(func(dispatcher *dispatcher.Action[string, *TestMessage]) {
w.Done()
})
d.Start()
d.IncrCount(c.producer, 1)
d.Expel()
d.Put(&TestMessage{producer: c.producer})
w.Wait()
if !c.messageDone.Load() {
t.Errorf("%s should done, but not", c.name)
}
})
}
}

func TestDispatcher_Expel(t *testing.T) {
var cases = []struct {
name string
Expand All @@ -127,14 +165,15 @@ func TestDispatcher_Expel(t *testing.T) {
}

for _, c := range cases {
c := c
t.Run(c.name, func(t *testing.T) {
c.handlerFinishMsgCount = &atomic.Int64{}
w := new(sync.WaitGroup)
d := dispatcher.NewDispatcher(1024, c.name, func(dispatcher *dispatcher.Dispatcher[string, *TestMessage], message *TestMessage) {
time.Sleep(c.msgTime)
c.handlerFinishMsgCount.Add(1)
})
d.SetClosedHandler(func(dispatcher *dispatcher.Dispatcher[string, *TestMessage]) {
d.SetClosedHandler(func(dispatcher *dispatcher.Action[string, *TestMessage]) {
w.Done()
})
for i := 0; i < c.msgCount; i++ {
Expand Down Expand Up @@ -163,13 +202,14 @@ func TestDispatcher_UnExpel(t *testing.T) {
}

for _, c := range cases {
c := c
t.Run(c.name, func(t *testing.T) {
c.closed = &atomic.Bool{}
w := new(sync.WaitGroup)
d := dispatcher.NewDispatcher(1024, c.name, func(dispatcher *dispatcher.Dispatcher[string, *TestMessage], message *TestMessage) {
w.Done()
})
d.SetClosedHandler(func(dispatcher *dispatcher.Dispatcher[string, *TestMessage]) {
d.SetClosedHandler(func(dispatcher *dispatcher.Action[string, *TestMessage]) {
c.closed.Store(true)
})
d.Put(&TestMessage{producer: "producer"})
Expand All @@ -188,3 +228,105 @@ func TestDispatcher_UnExpel(t *testing.T) {
})
}
}

func TestDispatcher_Put(t *testing.T) {
var cases = []struct {
name string
producer string
messageDone *atomic.Bool
}{
{name: "TestDispatcher_Put_Normal", producer: "producer"},
}

for _, c := range cases {
c := c
t.Run(c.name, func(t *testing.T) {
c.messageDone = &atomic.Bool{}
w := new(sync.WaitGroup)
w.Add(1)
d := dispatcher.NewDispatcher(1024, c.name, func(dispatcher *dispatcher.Dispatcher[string, *TestMessage], message *TestMessage) {
c.messageDone.Store(true)
w.Done()
})
d.Start()
d.Put(&TestMessage{producer: c.producer})
d.Expel()
w.Wait()
if !c.messageDone.Load() {
t.Errorf("%s should done, but not", c.name)
}
})
}
}

func TestDispatcher_Start(t *testing.T) {
var cases = []struct {
name string
producer string
messageDone *atomic.Bool
}{
{name: "TestDispatcher_Start_Normal", producer: "producer"},
}

for _, c := range cases {
c := c
t.Run(c.name, func(t *testing.T) {
c.messageDone = &atomic.Bool{}
w := new(sync.WaitGroup)
w.Add(1)
d := dispatcher.NewDispatcher(1024, c.name, func(dispatcher *dispatcher.Dispatcher[string, *TestMessage], message *TestMessage) {
c.messageDone.Store(true)
w.Done()
})
d.Start()
d.Put(&TestMessage{producer: c.producer})
d.Expel()
w.Wait()
if !c.messageDone.Load() {
t.Errorf("%s should done, but not", c.name)
}
})
}
}

func TestDispatcher_Name(t *testing.T) {
var cases = []struct {
name string
}{
{name: "TestDispatcher_Name_Normal"},
}

for _, c := range cases {
c := c
t.Run(c.name, func(t *testing.T) {
d := dispatcher.NewDispatcher(1024, c.name, func(dispatcher *dispatcher.Dispatcher[string, *TestMessage], message *TestMessage) {})
if d.Name() != c.name {
t.Errorf("%s should equal %s, but not", c.name, c.name)
}
})
}
}

func TestDispatcher_Closed(t *testing.T) {
var cases = []struct {
name string
}{
{name: "TestDispatcher_Closed_Normal"},
}

for _, c := range cases {
c := c
t.Run(c.name, func(t *testing.T) {
w := new(sync.WaitGroup)
w.Add(1)
d := dispatcher.NewDispatcher(1024, c.name, func(dispatcher *dispatcher.Dispatcher[string, *TestMessage], message *TestMessage) {})
d.SetClosedHandler(func(dispatcher *dispatcher.Action[string, *TestMessage]) { w.Done() })
d.Start()
d.Expel()
w.Wait()
if !d.Closed() {
t.Errorf("%s should closed, but not", c.name)
}
})
}
}

0 comments on commit 90b7e4c

Please sign in to comment.