Skip to content
This repository has been archived by the owner on May 1, 2020. It is now read-only.

Commit

Permalink
Implement graceful server shutdown
Browse files Browse the repository at this point in the history
Implement graceful server shutdown by providing a new server API
method: server.Shutdown() that'll block the calling goroutine
until the webwire server finished processing ongoing signals and
requests. During the shutdown the server should wait until all
previously called handler callbacks return and must not
accept incoming connections, must reject new incoming requests
and should ignore any incoming signals.

Update setup utility to provide a new stopServer callback
which tries to automatically shutdown both the webwire
and the HTTP server.

Update examples to demonstrate correct server shutdown.
  • Loading branch information
romshark committed Mar 16, 2018
1 parent a7eee66 commit 678f860
Show file tree
Hide file tree
Showing 7 changed files with 122 additions and 10 deletions.
4 changes: 4 additions & 0 deletions client/verifyProtocolVersion.go
Expand Up @@ -34,6 +34,10 @@ func (clt *Client) verifyProtocolVersion() error {
return fmt.Errorf("Couldn't read metadata response body: %s", err)
}

if response.StatusCode == http.StatusServiceUnavailable {
return fmt.Errorf("Endpoint unavailable: %s", response.Status)
}

// Unmarshal response
var metadata struct {
ProtocolVersion string `json:"protocol-version"`
Expand Down
16 changes: 15 additions & 1 deletion examples/chatroom/server/main.go
Expand Up @@ -5,6 +5,8 @@ import (
"fmt"
"log"
"os"
"os/signal"
"syscall"

wwr "github.com/qbeon/webwire-go"
)
Expand All @@ -16,7 +18,7 @@ func main() {
flag.Parse()

// Setup webwire server
_, _, addr, runServer, err := wwr.SetupServer(wwr.SetupOptions{
_, _, addr, runServer, stopServer, err := wwr.SetupServer(wwr.SetupOptions{
ServerAddress: *serverAddr,
ServerOptions: wwr.ServerOptions{
Hooks: wwr.Hooks{
Expand All @@ -36,6 +38,18 @@ func main() {
panic(fmt.Errorf("Failed setting up WebWire server: %s", err))
}

// Listen for OS signals and shutdown server in case of demanded termination
osSignals := make(chan os.Signal, 1)
signal.Notify(osSignals, os.Interrupt, syscall.SIGINT, syscall.SIGTERM)
go func() {
sig := <-osSignals
log.Printf("Termination demanded by the OS (%s), shutting down...", sig)
if err := stopServer(); err != nil {
log.Printf("Error during server shutdown: %s", err)
}
log.Println("Server gracefully terminated")
}()

// Launch server
log.Printf("Listening on %s", addr)
if err := runServer(); err != nil {
Expand Down
18 changes: 16 additions & 2 deletions examples/echo/server/server.go
Expand Up @@ -6,6 +6,8 @@ import (
"fmt"
"log"
"os"
"os/signal"
"syscall"

wwr "github.com/qbeon/webwire-go"
)
Expand All @@ -27,7 +29,7 @@ func main() {
flag.Parse()

// Setup webwire server
_, _, addr, runServer, err := wwr.SetupServer(wwr.SetupOptions{
_, _, addr, runServer, stopServer, err := wwr.SetupServer(wwr.SetupOptions{
ServerAddress: *serverAddr,
ServerOptions: wwr.ServerOptions{
Hooks: wwr.Hooks{
Expand All @@ -41,8 +43,20 @@ func main() {
panic(fmt.Errorf("Failed setting up WebWire server: %s", err))
}

// Listen for OS signals and shutdown server in case of demanded termination
osSignals := make(chan os.Signal, 1)
signal.Notify(osSignals, os.Interrupt, syscall.SIGINT, syscall.SIGTERM)
go func() {
sig := <-osSignals
log.Printf("Termination demanded by the OS (%s), shutting down...", sig)
if err := stopServer(); err != nil {
log.Printf("Error during server shutdown: %s", err)
}
log.Println("Server gracefully terminated")
}()

// Launch echo server
log.Printf("Listening on %s", addr)

if err := runServer(); err != nil {
panic(fmt.Errorf("WebWire server failed: %s", err))
}
Expand Down
2 changes: 1 addition & 1 deletion examples/pubsub/server/server.go
Expand Up @@ -29,7 +29,7 @@ func main() {
flag.Parse()

// Setup webwire server
_, _, addr, runServer, err := wwr.SetupServer(wwr.SetupOptions{
_, _, addr, runServer, _, err := wwr.SetupServer(wwr.SetupOptions{
ServerAddress: *serverAddr,
ServerOptions: wwr.ServerOptions{
Hooks: wwr.Hooks{
Expand Down
74 changes: 72 additions & 2 deletions server.go
Expand Up @@ -128,6 +128,10 @@ type Server struct {
hooks Hooks

// State
shutdown bool
shutdownRdy chan bool
currentOps uint32
opsLock sync.Mutex
clientsLock *sync.Mutex
clients []*Client
sessionsEnabled bool
Expand All @@ -154,6 +158,10 @@ func NewServer(opts ServerOptions) *Server {
hooks: opts.Hooks,

// State
shutdown: false,
shutdownRdy: make(chan bool),
currentOps: 0,
opsLock: sync.Mutex{},
clients: make([]*Client, 0),
clientsLock: &sync.Mutex{},
sessionsEnabled: sessionsEnabled,
Expand Down Expand Up @@ -299,12 +307,43 @@ func (srv *Server) handleSessionClosure(msg *Message) error {
// handleSignal handles incoming signals
// and returns an error if the ongoing connection cannot be proceeded
func (srv *Server) handleSignal(msg *Message) {
srv.opsLock.Lock()
// Ignore incoming signals during shutdown
if srv.shutdown {
srv.opsLock.Unlock()
return
}
srv.currentOps++
srv.opsLock.Unlock()

srv.hooks.OnSignal(context.WithValue(context.Background(), Msg, *msg))

// Mark signal as done and shutdown the server if scheduled and no ops are left
srv.opsLock.Lock()
srv.currentOps--
if srv.shutdown && srv.currentOps < 1 {
close(srv.shutdownRdy)
}
srv.opsLock.Unlock()
}

// handleRequest handles incoming requests
// and returns an error if the ongoing connection cannot be proceeded
func (srv *Server) handleRequest(msg *Message) {
srv.opsLock.Lock()
// Reject incoming requests during shutdown, return special shutdown error
if srv.shutdown {
srv.opsLock.Unlock()
// TODO: it'd be better to return a special shutdown error reply to avoid collision with
// user code which could make identical error codes cause reconnections on the client
msg.fail(Error{
Code: "SRV_SHUTDOWN",
})
return
}
srv.currentOps++
srv.opsLock.Unlock()

replyPayload, err := srv.hooks.OnRequest(
context.WithValue(context.Background(), Msg, *msg),
)
Expand All @@ -318,9 +357,17 @@ func (srv *Server) handleRequest(msg *Message) {
Message: err.Error(),
})
}
return
} else {
msg.fulfill(replyPayload)
}

// Mark request as done and shutdown the server if scheduled and no ops are left
srv.opsLock.Lock()
srv.currentOps--
if srv.shutdown && srv.currentOps < 1 {
close(srv.shutdownRdy)
}
msg.fulfill(replyPayload)
srv.opsLock.Unlock()
}

// handleMetadata handles endpoint metadata requests
Expand Down Expand Up @@ -365,6 +412,15 @@ func (srv *Server) ServeHTTP(
resp http.ResponseWriter,
req *http.Request,
) {
// Reject incoming connections during shutdown, pretend the server is temporarily unavailable
srv.opsLock.Lock()
if srv.shutdown {
srv.opsLock.Unlock()
http.Error(resp, "Server shutting down", http.StatusServiceUnavailable)
return
}
srv.opsLock.Unlock()

switch req.Method {
case "OPTIONS":
srv.hooks.OnOptions(resp)
Expand Down Expand Up @@ -456,3 +512,17 @@ func (srv *Server) deregisterSession(clt *Client) {
srv.errorLog.Printf("OnSessionClosed hook failed: %s", err)
}
}

// Shutdown appoints a server shutdown and blocks the calling goroutine until the server
// was gracefuly stopped finishing ongoing signals and requests. During the shutdown new
// requests and incoming connections are rejected while incoming signals are ignored.
func (srv *Server) Shutdown() {
srv.opsLock.Lock()
srv.shutdown = true
// Don't block if there's no currently processed operations
if srv.currentOps < 1 {
return
}
srv.opsLock.Unlock()
<-srv.shutdownRdy
}
16 changes: 13 additions & 3 deletions setupServer.go
@@ -1,6 +1,7 @@
package webwire

import (
"context"
"fmt"
"net"
"net/http"
Expand Down Expand Up @@ -40,6 +41,7 @@ func SetupServer(opts SetupOptions) (
httpServer *http.Server,
addr string,
runFunc func() error,
stopFunc func() error,
err error,
) {
wwrSrv = NewServer(opts.ServerOptions)
Expand All @@ -59,21 +61,29 @@ func SetupServer(opts SetupOptions) (
// Initialize TCP/IP listener
listener, err := net.Listen("tcp", addr)
if err != nil {
return nil, nil, "", nil, fmt.Errorf("Failed setting up TCP/IP listener: %s", err)
return nil, nil, "", nil, nil, fmt.Errorf("Failed setting up TCP/IP listener: %s", err)
}

runFunc = func() (err error) {
// Launch server
err = httpServer.Serve(
tcpKeepAliveListener{listener.(*net.TCPListener)},
)
if err != nil {
if err != http.ErrServerClosed {
return fmt.Errorf("HTTP Server failure: %s", err)
}
return nil
}

stopFunc = func() error {
wwrSrv.Shutdown()
if err := httpServer.Shutdown(context.Background()); err != nil {
return fmt.Errorf("Couldn't properly shutdown HTTP server: %s", err)
}
return nil
}

addr = listener.Addr().String()

return wwrSrv, httpServer, addr, runFunc, nil
return wwrSrv, httpServer, addr, runFunc, stopFunc, nil
}
2 changes: 1 addition & 1 deletion test/test.go
Expand Up @@ -15,7 +15,7 @@ func setupServer(t *testing.T, opts wwr.ServerOptions) (*wwr.Server, string) {
// Setup headed server on arbitrary port
opts.WarnLog = os.Stdout
opts.ErrorLog = os.Stderr
srv, _, addr, run, err := wwr.SetupServer(wwr.SetupOptions{
srv, _, addr, run, _, err := wwr.SetupServer(wwr.SetupOptions{
ServerAddress: "127.0.0.1:0",
ServerOptions: opts,
})
Expand Down

0 comments on commit 678f860

Please sign in to comment.