Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

allow creation of multiple engines on the same protocol and port #419

Merged
merged 4 commits into from
Nov 4, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions gnet.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,29 @@ func (s Engine) Dup() (dupFD int, err error) {
return
}

// Stop gracefully shuts down this Engine without interrupting any active event-loops,
// it waits indefinitely for connections and event-loops to be closed and then shuts down.
func (s Engine) Stop(ctx context.Context) error {
if s.eng.isInShutdown() {
return errors.ErrEngineInShutdown
}

s.eng.signalShutdown()

ticker := time.NewTicker(shutdownPollInterval)
defer ticker.Stop()
for {
if s.eng.isInShutdown() {
return nil
}
select {
case <-ctx.Done():
return ctx.Err()
case <-ticker.C:
}
}
}

// Reader is an interface that consists of a number of methods for reading that Conn must implement.
type Reader interface {
// ================================== Non-concurrency-safe API's ==================================
Expand Down Expand Up @@ -401,6 +424,7 @@ var (

// Stop gracefully shuts down the engine without interrupting any active event-loops,
// it waits indefinitely for connections and event-loops to be closed and then shuts down.
// Deprecated: The global Stop only shuts down the last registered Engine with the same protocol and IP:Port as the previous Engine's, which can lead to leaks of Engine if you invoke gnet.Run multiple times using the same protocol and IP:Port under the condition that WithReuseAddr(true) and WithReusePort(true) are enabled. Use Engine.Stop instead.
func Stop(ctx context.Context, protoAddr string) error {
var eng *engine
if s, ok := allEngines.Load(protoAddr); ok {
Expand Down
88 changes: 87 additions & 1 deletion gnet_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -928,11 +928,97 @@ func testStop(t *testing.T, network, addr string) {
assert.NoError(t, err)
}

func TestEngineStop(t *testing.T) {
testEngineStop(t, "tcp", ":9998")
}

type testStopEngine struct {
*BuiltinEventEngine
tester *testing.T
network, addr, protoAddr string
eng Engine
stopIter int64
name string
exchngCount int64
}

func (t *testStopEngine) OnBoot(eng Engine) (action Action) {
t.eng = eng
return
}

func (t *testStopEngine) OnClose(c Conn, err error) (action Action) {
logging.Debugf("closing connection...")
return
}

func (t *testStopEngine) OnTraffic(c Conn) (action Action) {
buf, _ := c.Peek(-1)
_, _ = c.Write(buf)
_, _ = c.Discard(-1)
atomic.AddInt64(&t.exchngCount, 1)
return
}

func (t *testStopEngine) OnTick() (delay time.Duration, action Action) {
delay = time.Millisecond * 100
go func() {
conn, err := net.Dial(t.network, t.addr)
require.NoError(t.tester, err)
defer conn.Close()
data := []byte("Hello World! " + t.name)
_, _ = conn.Write(data)
_, err = conn.Read(data)
require.NoError(t.tester, err)

iter := atomic.LoadInt64(&t.stopIter)
if iter <= 0 {
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
defer cancel()
logging.Debugf("stop engine...", t.eng.Stop(ctx))
// waiting the engine shutdown.
_, err = conn.Read(data)
require.Error(t.tester, err)
}
atomic.AddInt64(&t.stopIter, -1)
}()
return
}

func testEngineStop(t *testing.T, network, addr string) {
events1 := &testStopEngine{tester: t, network: network, addr: addr, protoAddr: network + "://" + addr, name: "1", stopIter: 2}
events2 := &testStopEngine{tester: t, network: network, addr: addr, protoAddr: network + "://" + addr, name: "2", stopIter: 5}

result1 := make(chan error)
go func() {
err := Run(events1, events1.protoAddr, WithTicker(true), WithReuseAddr(true), WithReusePort(true))
result1 <- err
}()
// ensure the first handler processes before starting the next since the delay per tick is 100ms
time.Sleep(150 * time.Millisecond)
result2 := make(chan error)
go func() {
err := Run(events2, events2.protoAddr, WithTicker(true), WithReuseAddr(true), WithReusePort(true))
result2 <- err
}()

err := <-result1
assert.NoError(t, err)
err = <-result2
assert.NoError(t, err)
// make sure that each handler processed at least 1
require.Greater(t, events1.exchngCount, int64(0))
require.Greater(t, events2.exchngCount, int64(0))
require.Equal(t, int64(2+1+5+1), events1.exchngCount+events2.exchngCount)
// stop an already stopped engine
require.Equal(t, gerr.ErrEngineInShutdown, events1.eng.Stop(context.Background()))
}

// Test should not panic when we wake-up server_closed conn.
func TestClosedWakeUp(t *testing.T) {
events := &testClosedWakeUpServer{
tester: t,
BuiltinEventEngine: &BuiltinEventEngine{}, network: "tcp", addr: ":9998", protoAddr: "tcp://:9998",
BuiltinEventEngine: &BuiltinEventEngine{}, network: "tcp", addr: ":9999", protoAddr: "tcp://:9999",
clientClosed: make(chan struct{}),
serverClosed: make(chan struct{}),
wakeup: make(chan struct{}),
Expand Down