diff --git a/acceptor_windows.go b/acceptor_windows.go index 7ce46cd49..7dcc8259a 100644 --- a/acceptor_windows.go +++ b/acceptor_windows.go @@ -21,9 +21,17 @@ package gnet -import "time" +import ( + "runtime" + "time" +) + +func (svr *server) listenerRun(lockOSThread bool) { + if lockOSThread { + runtime.LockOSThread() + defer runtime.UnlockOSThread() + } -func (svr *server) listenerRun() { var err error defer func() { svr.signalShutdown(err) }() var packet [0x10000]byte diff --git a/errors/errors.go b/errors/errors.go index 81d3cc0a7..645e86890 100644 --- a/errors/errors.go +++ b/errors/errors.go @@ -23,6 +23,8 @@ package errors import "errors" var ( + // ErrTooManyEventLoopThreads occurs when attempting to set up more than 10,000 event-loop goroutines under LockOSThread mode. + ErrTooManyEventLoopThreads = errors.New("too many event-loops under LockOSThread mode") // ErrUnsupportedProtocol occurs when trying to use protocol that is not supported. ErrUnsupportedProtocol = errors.New("only unix, tcp/tcp4/tcp6, udp/udp4/udp6 are supported") // ErrUnsupportedTCPProtocol occurs when trying to use an unsupported TCP protocol. diff --git a/eventloop_unix.go b/eventloop_unix.go index 271e01bd8..93f272afb 100644 --- a/eventloop_unix.go +++ b/eventloop_unix.go @@ -25,6 +25,7 @@ package gnet import ( "os" + "runtime" "time" "github.com/panjf2000/gnet/errors" @@ -51,7 +52,12 @@ func (el *eventloop) closeAllConns() { } } -func (el *eventloop) loopRun() { +func (el *eventloop) loopRun(lockOSThread bool) { + if lockOSThread { + runtime.LockOSThread() + defer runtime.UnlockOSThread() + } + defer func() { el.closeAllConns() el.ln.close() diff --git a/eventloop_windows.go b/eventloop_windows.go index cf560db93..92342b0ca 100644 --- a/eventloop_windows.go +++ b/eventloop_windows.go @@ -22,6 +22,7 @@ package gnet import ( + "runtime" "time" "github.com/panjf2000/gnet/pool/bytebuffer" @@ -39,7 +40,12 @@ type eventloop struct { calibrateCallback func(*eventloop, int32) // callback func for re-adjusting connCount } -func (el *eventloop) loopRun() { +func (el *eventloop) loopRun(lockOSThread bool) { + if lockOSThread { + runtime.LockOSThread() + defer runtime.UnlockOSThread() + } + var err error defer func() { if el.idx == 0 && el.svr.opts.Ticker { diff --git a/gnet.go b/gnet.go index 338671526..c51ec212f 100644 --- a/gnet.go +++ b/gnet.go @@ -28,6 +28,7 @@ import ( "sync/atomic" "time" + "github.com/panjf2000/gnet/errors" "github.com/panjf2000/gnet/internal/logging" ) @@ -238,6 +239,14 @@ func Serve(eventHandler EventHandler, protoAddr string, opts ...Option) (err err } defer logging.Cleanup() + // The maximum number of operating system threads that the Go program can use is initially set to 10000, + // which should be the maximum amount of I/O event-loops locked to OS threads users can start up. + if options.LockOSThread && options.NumEventLoop > 10000 { + logging.DefaultLogger.Errorf("too many event-loops under LockOSThread mode, should be less than 10,000 "+ + "while you are trying to set up %d\n", options.NumEventLoop) + return errors.ErrTooManyEventLoopThreads + } + network, addr := parseProtoAddr(protoAddr) var ln *listener diff --git a/gnet_test.go b/gnet_test.go index 33cb96d77..e4aaa505f 100644 --- a/gnet_test.go +++ b/gnet_test.go @@ -530,7 +530,7 @@ func testServe(network, addr string, reuseport, multicore, async bool, nclients nclients: nclients, workerPool: goroutine.Default(), } - must(Serve(ts, network+"://"+addr, WithMulticore(multicore), WithReusePort(reuseport), WithTicker(true), + must(Serve(ts, network+"://"+addr, WithLockOSThread(async), WithMulticore(multicore), WithReusePort(reuseport), WithTicker(true), WithTCPKeepAlive(time.Minute*1), WithLoadBalancing(lb))) } @@ -1022,3 +1022,12 @@ func testCloseConnection(network, addr string) { events := &testCloseConnectionServer{network: network, addr: addr} must(Serve(events, network+"://"+addr, WithTicker(true))) } + +func TestServerOptionsCheck(t *testing.T) { + if err := Serve(&EventServer{}, "tcp://:3500", WithNumEventLoop(10001), WithLockOSThread(true)); err != errors.ErrTooManyEventLoopThreads { + t.Fail() + t.Log("error returned with LockOSThread option") + } else { + t.Log("got expected result") + } +} diff --git a/options.go b/options.go index 60fc68cf4..0eab585ac 100644 --- a/options.go +++ b/options.go @@ -45,6 +45,12 @@ type Options struct { // assigned to the value of logical CPUs usable by the current process. Multicore bool + // LockOSThread is used to determine whether each I/O event-loop is associated to an OS thread, it is useful when you + // need some kind of mechanisms like thread local storage, or invoke certain C libraries (such as graphics lib: GLib) + // that require thread-level manipulation via cgo, or want all I/O event-loops to actually run in parallel for a + // potential higher performance. + LockOSThread bool + // LB represents the load-balancing algorithm used when assigning new connections. LB LoadBalancing @@ -83,6 +89,13 @@ func WithMulticore(multicore bool) Option { } } +// WithLockOSThread sets up lockOSThread mode for I/O event-loops. +func WithLockOSThread(lockOSThread bool) Option { + return func(opts *Options) { + opts.LockOSThread = lockOSThread + } +} + // WithLoadBalancing sets up the load-balancing algorithm in gnet server. func WithLoadBalancing(lb LoadBalancing) Option { return func(opts *Options) { diff --git a/reactor_bsd.go b/reactor_bsd.go index d862d504b..53f839a11 100644 --- a/reactor_bsd.go +++ b/reactor_bsd.go @@ -23,12 +23,20 @@ package gnet import ( + "runtime" + "github.com/panjf2000/gnet/errors" "github.com/panjf2000/gnet/internal/netpoll" ) -func (svr *server) activateMainReactor() { +func (svr *server) activateMainReactor(lockOSThread bool) { + if lockOSThread { + runtime.LockOSThread() + defer runtime.UnlockOSThread() + } + defer svr.signalShutdown() + switch err := svr.mainLoop.poller.Polling(func(fd int, filter int16) error { return svr.acceptNewConnection(fd) }); err { case errors.ErrServerShutdown: svr.logger.Infof("Main reactor is exiting normally on the signal error: %v", err) @@ -38,7 +46,12 @@ func (svr *server) activateMainReactor() { } } -func (svr *server) activateSubReactor(el *eventloop) { +func (svr *server) activateSubReactor(el *eventloop, lockOSThread bool) { + if lockOSThread { + runtime.LockOSThread() + defer runtime.UnlockOSThread() + } + defer func() { el.closeAllConns() if el.idx == 0 && svr.opts.Ticker { diff --git a/reactor_linux.go b/reactor_linux.go index df6d3562c..14d873645 100644 --- a/reactor_linux.go +++ b/reactor_linux.go @@ -21,12 +21,20 @@ package gnet import ( + "runtime" + "github.com/panjf2000/gnet/errors" "github.com/panjf2000/gnet/internal/netpoll" ) -func (svr *server) activateMainReactor() { +func (svr *server) activateMainReactor(lockOSThread bool) { + if lockOSThread { + runtime.LockOSThread() + defer runtime.UnlockOSThread() + } + defer svr.signalShutdown() + switch err := svr.mainLoop.poller.Polling(func(fd int, ev uint32) error { return svr.acceptNewConnection(fd) }); err { case errors.ErrServerShutdown: svr.logger.Infof("Main reactor is exiting normally on the signal error: %v", err) @@ -35,7 +43,12 @@ func (svr *server) activateMainReactor() { } } -func (svr *server) activateSubReactor(el *eventloop) { +func (svr *server) activateSubReactor(el *eventloop, lockOSThread bool) { + if lockOSThread { + runtime.LockOSThread() + defer runtime.UnlockOSThread() + } + defer func() { el.closeAllConns() if el.idx == 0 && svr.opts.Ticker { diff --git a/server_unix.go b/server_unix.go index ad587765a..b0cf307f7 100644 --- a/server_unix.go +++ b/server_unix.go @@ -70,7 +70,7 @@ func (svr *server) startEventLoops() { svr.subEventLoopSet.iterate(func(i int, el *eventloop) bool { svr.wg.Add(1) go func() { - el.loopRun() + el.loopRun(svr.opts.LockOSThread) svr.wg.Done() }() return true @@ -88,7 +88,7 @@ func (svr *server) startSubReactors() { svr.subEventLoopSet.iterate(func(i int, el *eventloop) bool { svr.wg.Add(1) go func() { - svr.activateSubReactor(el) + svr.activateSubReactor(el, svr.opts.LockOSThread) svr.wg.Done() }() return true @@ -163,7 +163,7 @@ func (svr *server) activateReactors(numEventLoop int) error { // Start main reactor in background. svr.wg.Add(1) go func() { - svr.activateMainReactor() + svr.activateMainReactor(svr.opts.LockOSThread) svr.wg.Done() }() } else { diff --git a/server_windows.go b/server_windows.go index 8fcfbf7f1..3460e5181 100644 --- a/server_windows.go +++ b/server_windows.go @@ -78,7 +78,7 @@ func (svr *server) signalShutdown(err error) { func (svr *server) startListener() { svr.listenerWG.Add(1) go func() { - svr.listenerRun() + svr.listenerRun(svr.opts.LockOSThread) svr.listenerWG.Done() }() } @@ -97,7 +97,7 @@ func (svr *server) startEventLoops(numEventLoop int) { svr.loopWG.Add(svr.subEventLoopSet.len()) svr.subEventLoopSet.iterate(func(i int, el *eventloop) bool { - go el.loopRun() + go el.loopRun(svr.opts.LockOSThread) return true }) }