Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

opt: improve comments on Conn and test cases #471

Merged
merged 4 commits into from
May 24, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 18 additions & 1 deletion client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,18 +14,28 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

gerr "github.com/panjf2000/gnet/v2/pkg/errors"
"github.com/panjf2000/gnet/v2/pkg/logging"
bbPool "github.com/panjf2000/gnet/v2/pkg/pool/bytebuffer"
goPool "github.com/panjf2000/gnet/v2/pkg/pool/goroutine"
)

type clientEvents struct {
*BuiltinEventEngine
tester *testing.T
svr *testClientServer
packetLen int
rspChMap sync.Map
}

func (ev *clientEvents) OnBoot(e Engine) Action {
fd, err := e.Dup()
require.ErrorIsf(ev.tester, err, gerr.ErrEmptyEngine, "expected error: %v, but got: %v",
gerr.ErrUnsupportedOp, err)
assert.EqualValuesf(ev.tester, fd, -1, "expected -1, but got: %d", fd)
return None
}

func (ev *clientEvents) OnOpen(c Conn) ([]byte, Action) {
c.SetContext([]byte{})
rspCh := make(chan []byte, 1)
Expand Down Expand Up @@ -68,6 +78,13 @@ func (ev *clientEvents) OnTick() (delay time.Duration, action Action) {
return
}

func (ev *clientEvents) OnShutdown(e Engine) {
fd, err := e.Dup()
require.ErrorIsf(ev.tester, err, gerr.ErrEmptyEngine, "expected error: %v, but got: %v",
gerr.ErrUnsupportedOp, err)
assert.EqualValuesf(ev.tester, fd, -1, "expected -1, but got: %d", fd)
}

func TestServeWithGnetClient(t *testing.T) {
// start an engine
// connect 10 clients
Expand Down Expand Up @@ -279,7 +296,7 @@ func testServeWithGnetClient(t *testing.T, network, addr string, reuseport, reus
workerPool: goPool.Default(),
}
var err error
ts.clientEV = &clientEvents{packetLen: streamLen, svr: ts}
ts.clientEV = &clientEvents{tester: t, packetLen: streamLen, svr: ts}
ts.client, err = NewClient(
ts.clientEV,
WithLogLevel(logging.DebugLevel),
Expand Down
6 changes: 4 additions & 2 deletions client_unix.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"errors"
"net"
"strconv"
"sync"
"syscall"

"golang.org/x/sync/errgroup"
Expand Down Expand Up @@ -66,14 +67,15 @@ func NewClient(eh EventHandler, opts ...Option) (cli *Client, err error) {

shutdownCtx, shutdown := context.WithCancel(context.Background())
eng := engine{
ln: &listener{network: "udp"},
ln: &listener{},
opts: options,
eventHandler: eh,
workerPool: struct {
*errgroup.Group
shutdownCtx context.Context
shutdown context.CancelFunc
}{&errgroup.Group{}, shutdownCtx, shutdown},
once sync.Once
}{&errgroup.Group{}, shutdownCtx, shutdown, sync.Once{}},
}
if options.Ticker {
eng.ticker.ctx, eng.ticker.cancel = context.WithCancel(context.Background())
Expand Down
3 changes: 2 additions & 1 deletion client_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,8 @@ func NewClient(eh EventHandler, opts ...Option) (cli *Client, err error) {
*errgroup.Group
shutdownCtx context.Context
shutdown context.CancelFunc
}{&errgroup.Group{}, shutdownCtx, shutdown},
once sync.Once
}{&errgroup.Group{}, shutdownCtx, shutdown, sync.Once{}},
eventHandler: eh,
}
cli.el = &eventloop{
Expand Down
28 changes: 12 additions & 16 deletions connection_unix.go
Original file line number Diff line number Diff line change
Expand Up @@ -246,8 +246,6 @@ func (c *conn) resetBuffer() {
c.inboundBuffer.Reset()
}

// ================================== Non-concurrency-safe API's ==================================

func (c *conn) Read(p []byte) (n int, err error) {
if c.inboundBuffer.IsEmpty() {
n = copy(p, c.buffer)
Expand Down Expand Up @@ -395,18 +393,6 @@ func (c *conn) OutboundBuffered() int {
return c.outboundBuffer.Buffered()
}

func (*conn) SetDeadline(_ time.Time) error {
return gerrors.ErrUnsupportedOp
}

func (*conn) SetReadDeadline(_ time.Time) error {
return gerrors.ErrUnsupportedOp
}

func (*conn) SetWriteDeadline(_ time.Time) error {
return gerrors.ErrUnsupportedOp
}

func (c *conn) Context() interface{} { return c.ctx }
func (c *conn) SetContext(ctx interface{}) { c.ctx = ctx }
func (c *conn) LocalAddr() net.Addr { return c.localAddr }
Expand Down Expand Up @@ -434,8 +420,6 @@ func (c *conn) SetKeepAlivePeriod(d time.Duration) error {
return socket.SetKeepAlivePeriod(c.fd, int(d.Seconds()))
}

// ==================================== Concurrency-safe API's ====================================

func (c *conn) AsyncWrite(buf []byte, callback AsyncCallback) error {
if c.isDatagram {
defer func() {
Expand Down Expand Up @@ -481,3 +465,15 @@ func (c *conn) Close() error {
return
}, nil)
}

func (*conn) SetDeadline(_ time.Time) error {
return gerrors.ErrUnsupportedOp
}

func (*conn) SetReadDeadline(_ time.Time) error {
return gerrors.ErrUnsupportedOp
}

func (*conn) SetWriteDeadline(_ time.Time) error {
return gerrors.ErrUnsupportedOp
}
27 changes: 12 additions & 15 deletions connection_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,8 +114,6 @@ func (c *conn) resetBuffer() {
c.inboundBuffer.Reset()
}

// ================================== Non-concurrency-safe API's ==================================

func (c *conn) Read(p []byte) (n int, err error) {
if c.inboundBuffer.IsEmpty() {
n = copy(p, c.buffer.B)
Expand Down Expand Up @@ -264,17 +262,6 @@ func (c *conn) OutboundBuffered() int {
return 0
}

func (*conn) SetDeadline(_ time.Time) error {
return errorx.ErrUnsupportedOp
}

func (*conn) SetReadDeadline(_ time.Time) error {
return errorx.ErrUnsupportedOp
}

func (*conn) SetWriteDeadline(_ time.Time) error {
return errorx.ErrUnsupportedOp
}
func (c *conn) Context() interface{} { return c.ctx }
func (c *conn) SetContext(ctx interface{}) { c.ctx = ctx }
func (c *conn) LocalAddr() net.Addr { return c.localAddr }
Expand Down Expand Up @@ -412,8 +399,6 @@ func (c *conn) SetKeepAlivePeriod(d time.Duration) error {
// this method is only implemented for compatibility, don't use it on Windows.
// func (c *conn) Gfd() gfd.GFD { return gfd.GFD{} }

// ==================================== Concurrency-safe API's ====================================

func (c *conn) AsyncWrite(buf []byte, cb AsyncCallback) error {
if cb == nil {
cb = func(c Conn, err error) error { return nil }
Expand Down Expand Up @@ -482,3 +467,15 @@ func (c *conn) CloseWithCallback(cb AsyncCallback) error {
}
return nil
}

func (*conn) SetDeadline(_ time.Time) error {
return errorx.ErrUnsupportedOp
}

func (*conn) SetReadDeadline(_ time.Time) error {
return errorx.ErrUnsupportedOp
}

func (*conn) SetWriteDeadline(_ time.Time) error {
return errorx.ErrUnsupportedOp
}
19 changes: 12 additions & 7 deletions engine_unix.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package gnet
import (
"context"
"runtime"
"sync"
"sync/atomic"

"golang.org/x/sync/errgroup"
Expand All @@ -44,6 +45,7 @@ type engine struct {

shutdownCtx context.Context
shutdown context.CancelFunc
once sync.Once
}
eventHandler EventHandler // user eventHandler
}
Expand All @@ -58,7 +60,9 @@ func (eng *engine) shutdown(err error) {
eng.opts.Logger.Errorf("engine is being shutdown with error: %v", err)
}

eng.workerPool.shutdown()
eng.workerPool.once.Do(func() {
eng.workerPool.shutdown()
})
}

func (eng *engine) startEventLoops() {
Expand All @@ -70,6 +74,7 @@ func (eng *engine) startEventLoops() {

func (eng *engine) closeEventLoops() {
eng.lb.iterate(func(i int, el *eventloop) bool {
el.ln.close()
_ = el.poller.Close()
return true
})
Expand All @@ -85,7 +90,6 @@ func (eng *engine) startSubReactors() {
func (eng *engine) activateEventLoops(numEventLoop int) (err error) {
network, address := eng.ln.network, eng.ln.address
ln := eng.ln
eng.ln = nil
var striker *eventloop
// Create loops locally and bind the listeners.
for i := 0; i < numEventLoop; i++ {
Expand Down Expand Up @@ -190,17 +194,15 @@ func (eng *engine) stop(s Engine) {

eng.eventHandler.OnShutdown(s)

// Notify all loops to close by closing all listeners
// Notify all event-loops to exit.
eng.lb.iterate(func(i int, el *eventloop) bool {
err := el.poller.UrgentTrigger(func(_ interface{}) error { return errors.ErrEngineShutdown }, nil)
if err != nil {
eng.opts.Logger.Errorf("failed to call UrgentTrigger on sub event-loop when stopping engine: %v", err)
}
return true
})

if eng.mainLoop != nil {
eng.ln.close()
err := eng.mainLoop.poller.UrgentTrigger(func(_ interface{}) error { return errors.ErrEngineShutdown }, nil)
if err != nil {
eng.opts.Logger.Errorf("failed to call UrgentTrigger on main event-loop when stopping engine: %v", err)
Expand All @@ -216,15 +218,17 @@ func (eng *engine) stop(s Engine) {
eng.opts.Logger.Errorf("engine shutdown error: %v", err)
}

// Close all listeners and pollers of event-loops.
eng.closeEventLoops()

if eng.mainLoop != nil {
eng.ln.close()
err := eng.mainLoop.poller.Close()
if err != nil {
eng.opts.Logger.Errorf("failed to close poller when stopping engine: %v", err)
}
}

// Put the engine into the shutdown state.
atomic.StoreInt32(&eng.inShutdown, 1)
}

Expand All @@ -249,7 +253,8 @@ func run(eventHandler EventHandler, listener *listener, options *Options, protoA
*errgroup.Group
shutdownCtx context.Context
shutdown context.CancelFunc
}{&errgroup.Group{}, shutdownCtx, shutdown},
once sync.Once
}{&errgroup.Group{}, shutdownCtx, shutdown, sync.Once{}},
eventHandler: eventHandler,
}
switch options.LB {
Expand Down
5 changes: 4 additions & 1 deletion engine_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package gnet
import (
"context"
"runtime"
"sync"
"sync/atomic"

"golang.org/x/sync/errgroup"
Expand All @@ -38,6 +39,7 @@ type engine struct {

shutdownCtx context.Context
shutdown context.CancelFunc
once sync.Once
}
eventHandler EventHandler // user eventHandler
}
Expand Down Expand Up @@ -123,7 +125,8 @@ func run(eventHandler EventHandler, listener *listener, options *Options, protoA
*errgroup.Group
shutdownCtx context.Context
shutdown context.CancelFunc
}{&errgroup.Group{}, shutdownCtx, shutdown},
once sync.Once
}{&errgroup.Group{}, shutdownCtx, shutdown, sync.Once{}},
}

switch options.LB {
Expand Down
Loading