Skip to content

Commit

Permalink
allow creation of multiple engines on the same protocol and port, pro…
Browse files Browse the repository at this point in the history
…viding a Stop on the specific engine
  • Loading branch information
Jeffrey Damick committed Nov 1, 2022
1 parent 20ff40b commit 4b47ab6
Show file tree
Hide file tree
Showing 2 changed files with 106 additions and 1 deletion.
22 changes: 22 additions & 0 deletions gnet.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,28 @@ func (s Engine) Dup() (dupFD int, err error) {
return
}

// Stop this specific Engine
func (s Engine) Stop(ctx context.Context) error {
if s.eng.isInShutdown() {
return errors.ErrEngineInShutdown
}

s.eng.signalShutdown()

ticker := time.NewTicker(shutdownPollInterval)
defer ticker.Stop()
for {
if s.eng.isInShutdown() {
return nil
}
select {
case <-ctx.Done():
return ctx.Err()
case <-ticker.C:
}
}
}

// Reader is an interface that consists of a number of methods for reading that Conn must implement.
type Reader interface {
// ================================== Non-concurrency-safe API's ==================================
Expand Down
85 changes: 84 additions & 1 deletion gnet_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -928,11 +928,94 @@ func testStop(t *testing.T, network, addr string) {
assert.NoError(t, err)
}

func TestEngineStop(t *testing.T) {
testEngineStop(t, "tcp", ":9998")
}

type testStopEngine struct {
*BuiltinEventEngine
tester *testing.T
network, addr, protoAddr string
eng Engine
stopIter int
name string
exchngCount int
}

func (s *testStopEngine) OnBoot(eng Engine) (action Action) {
s.eng = eng
return
}

func (t *testStopEngine) OnClose(c Conn, err error) (action Action) {
logging.Debugf("closing connection...")
return
}

func (t *testStopEngine) OnTraffic(c Conn) (action Action) {
buf, _ := c.Peek(-1)
_, _ = c.Write(buf)
_, _ = c.Discard(-1)
t.exchngCount++
return
}

func (t *testStopEngine) OnTick() (delay time.Duration, action Action) {
delay = time.Millisecond * 100
go func() {
conn, err := net.Dial(t.network, t.addr)
require.NoError(t.tester, err)
defer conn.Close()
data := []byte("Hello World! " + t.name)
_, _ = conn.Write(data)
_, err = conn.Read(data)
require.NoError(t.tester, err)

if t.stopIter <= 0 {
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
defer cancel()
logging.Debugf("stop engine...", t.eng.Stop(ctx))
// waiting the engine shutdown.
_, err = conn.Read(data)
require.Error(t.tester, err)
}
t.stopIter--
}()
return
}

func testEngineStop(t *testing.T, network, addr string) {
events1 := &testStopEngine{tester: t, network: network, addr: addr, protoAddr: network + "://" + addr, name: "1", stopIter: 2}
events2 := &testStopEngine{tester: t, network: network, addr: addr, protoAddr: network + "://" + addr, name: "2", stopIter: 5}

result1 := make(chan error)
go func() {
err := Run(events1, events1.protoAddr, WithTicker(true), WithReuseAddr(true), WithReusePort(true))
result1 <- err
}()
// ensure the first handler processes before starting the next since the delay per tick is 100ms
time.Sleep(150 * time.Millisecond)
result2 := make(chan error)
go func() {
err := Run(events2, events2.protoAddr, WithTicker(true), WithReuseAddr(true), WithReusePort(true))
result2 <- err
}()

err := <-result1
assert.NoError(t, err)
err = <-result2
assert.NoError(t, err)
// make sure that each handler processed at least 1
require.Greater(t, events1.exchngCount, 0)
require.Greater(t, events2.exchngCount, 0)
require.Equal(t, 2+1+5+1, events1.exchngCount+events2.exchngCount)
}

// Test should not panic when we wake-up server_closed conn.
func TestClosedWakeUp(t *testing.T) {
events := &testClosedWakeUpServer{
tester: t,
BuiltinEventEngine: &BuiltinEventEngine{}, network: "tcp", addr: ":9998", protoAddr: "tcp://:9998",
BuiltinEventEngine: &BuiltinEventEngine{}, network: "tcp", addr: ":9999", protoAddr: "tcp://:9999",
clientClosed: make(chan struct{}),
serverClosed: make(chan struct{}),
wakeup: make(chan struct{}),
Expand Down

0 comments on commit 4b47ab6

Please sign in to comment.