diff --git a/.travis.yml b/.travis.yml index 28464d8..d9236a0 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,7 +1,6 @@ language: go go: - - 1.7.4 - - 1.8rc1 + - 1.8 - tip os: @@ -15,4 +14,4 @@ install: script: - make bundle - make check - \ No newline at end of file + diff --git a/CONFIGURATION.md b/CONFIGURATION.md index 0cb9ebc..17f9f6e 100644 --- a/CONFIGURATION.md +++ b/CONFIGURATION.md @@ -18,6 +18,7 @@ The configuration for Gaurun has some sections. The example is [here](conf/gauru |queues |int64 |size of internal queue for push notification|8192 |`-q` options can overwrite | |notification_max|int64 |limit of push notifications once |100 | | |pusher_max |int64 |maximum goroutines for asynchronous pushing |0 |If the value is less than or equal to zero, each worker pushes synchronously| +|shutdown_timeout|int64 |timeout to wait for connections to return to idle when server shutdown (second) | 10 | | ## iOS Section diff --git a/README.md b/README.md index 0d17a2f..a2771a1 100644 --- a/README.md +++ b/README.md @@ -25,7 +25,7 @@ There are two way to install Gaurun; using a precompiled binary or install from To install a precompiled binary, download the appropriate zip package for your OS and architecture from [here](https://github.com/mercari/gaurun/releases). Once the zip is downloaded, unzip it and place the binary where you want to use (if you want to access it from the command-line, make sure to put it on `$PATH`). -To compile from source, you need Go1.7.3 or later (including `$GOPATH` setup) and [glide](https://github.com/Masterminds/glide) for dependency management. After setup, then clone the source code by running the following command, +To compile from source, you need Go1.8 or later (including `$GOPATH` setup) and [glide](https://github.com/Masterminds/glide) for dependency management. After setup, then clone the source code by running the following command, ```bash $ mkdir -p $GOPATH/src/github.com/mercari diff --git a/cmd/gaurun/gaurun.go b/cmd/gaurun/gaurun.go index 326afd8..55e2227 100644 --- a/cmd/gaurun/gaurun.go +++ b/cmd/gaurun/gaurun.go @@ -1,12 +1,15 @@ package main import ( + "context" "flag" "fmt" "io/ioutil" + "net/http" "os" "os/signal" "syscall" + "time" "github.com/mercari/gaurun/gaurun" ) @@ -85,8 +88,8 @@ func main() { } } - sigChan := make(chan os.Signal, 1) - signal.Notify(sigChan, syscall.SIGHUP) + sigHUPChan := make(chan os.Signal, 1) + signal.Notify(sigHUPChan, syscall.SIGHUP) sighupHandler := func() { if err := accessLogReopener.Reopen(); err != nil { @@ -97,7 +100,7 @@ func main() { } } - go signalHandler(sigChan, sighupHandler) + go signalHandler(sigHUPChan, sighupHandler) if err := gaurun.InitHttpClient(); err != nil { gaurun.LogSetupFatal(fmt.Errorf("failed to init http client")) @@ -105,8 +108,52 @@ func main() { gaurun.InitStat() gaurun.StartPushWorkers(gaurun.ConfGaurun.Core.WorkerNum, gaurun.ConfGaurun.Core.QueueNum) - gaurun.RegisterHTTPHandlers() - gaurun.RunHTTPServer() + mux := http.NewServeMux() + gaurun.RegisterHandlers(mux) + + server := &http.Server{ + Handler: mux, + } + go func() { + gaurun.LogError.Info("start server") + if err := gaurun.RunServer(server, &gaurun.ConfGaurun); err != nil { + gaurun.LogError.Info(fmt.Sprintf("failed to serve: %s", err)) + } + }() + + // Graceful shutdown (kicked by SIGTERM). + // + // First, it shutdowns server and stops accepting new requests. + // Then wait until all remaining queues in buffer are flushed. + sigTERMChan := make(chan os.Signal, 1) + signal.Notify(sigTERMChan, syscall.SIGTERM) + + <-sigTERMChan + gaurun.LogError.Info("shutdown server") + timeout := time.Duration(conf.Core.ShutdownTimeout) * time.Second + ctx, cancel := context.WithTimeout(context.Background(), timeout) + defer cancel() + if err := server.Shutdown(ctx); err != nil { + gaurun.LogError.Error(fmt.Sprintf("failed to shutdown server: %v", err)) + } + + // Start a goroutine to log number of job queue. + go func() { + for { + queue := len(gaurun.QueueNotification) + if queue == 0 { + break + } + + gaurun.LogError.Info(fmt.Sprintf("wait until queue is empty. Current queue len: %d", queue)) + time.Sleep(1 * time.Second) + } + }() + + // Block until all pusher worker job is done. + gaurun.PusherWg.Wait() + + gaurun.LogError.Info("successfully shutdown") } func signalHandler(ch <-chan os.Signal, sighupFn func()) { diff --git a/conf/gaurun.toml b/conf/gaurun.toml index 7fc9f8c..31fc896 100644 --- a/conf/gaurun.toml +++ b/conf/gaurun.toml @@ -4,6 +4,7 @@ port = "1056" workers = 8 queues = 8192 notification_max = 100 +shutdown_timeout = 30 [android] apikey = "apikey for GCM" diff --git a/gaurun/conf.go b/gaurun/conf.go index f47b8b7..9b26d18 100644 --- a/gaurun/conf.go +++ b/gaurun/conf.go @@ -25,6 +25,7 @@ type SectionCore struct { QueueNum int64 `toml:"queues"` NotificationMax int64 `toml:"notification_max"` PusherMax int64 `toml:"pusher_max"` + ShutdownTimeout int64 `toml:"shutdown_timeout"` } type SectionAndroid struct { @@ -64,6 +65,7 @@ func BuildDefaultConf() ConfToml { conf.Core.QueueNum = 8192 conf.Core.NotificationMax = 100 conf.Core.PusherMax = 0 + conf.Core.ShutdownTimeout = 10 // Android conf.Android.ApiKey = "" conf.Android.Enabled = true diff --git a/gaurun/server.go b/gaurun/server.go index c9b1d89..475f36d 100644 --- a/gaurun/server.go +++ b/gaurun/server.go @@ -1,7 +1,7 @@ package gaurun import ( - "log" + "fmt" "net" "net/http" "os" @@ -9,39 +9,75 @@ import ( "strings" statsGo "github.com/fukata/golang-stats-api-handler" + "github.com/lestrrat/go-server-starter/listener" ) -func RegisterHTTPHandlers() { - http.HandleFunc("/push", PushNotificationHandler) - http.HandleFunc("/stat/app", StatsHandler) - http.HandleFunc("/config/app", ConfigHandler) +func RegisterHandlers(mux *http.ServeMux) { + mux.HandleFunc("/push", PushNotificationHandler) + mux.HandleFunc("/stat/app", StatsHandler) + mux.HandleFunc("/config/app", ConfigHandler) + mux.HandleFunc("/config/pushers", ConfigPushersHandler) + statsGo.PrettyPrintEnabled() - http.HandleFunc("/stat/go", statsGo.Handler) - http.HandleFunc("/config/pushers", ConfigPushersHandler) + mux.HandleFunc("/stat/go", statsGo.Handler) } -func RunHTTPServer() { - // Listen TCP Port - if _, err := strconv.Atoi(ConfGaurun.Core.Port); err == nil { - http.ListenAndServe(":"+ConfGaurun.Core.Port, nil) +// getListener returns a listener. +func getListener(conf *ConfToml) (net.Listener, error) { + // By default, it starts to listen a listener provided + // by `go-server-starter`. If not, then check port defined + // in configuration file. + listeners, err := listener.ListenAll() + if err != nil && err != listener.ErrNoListeningTarget { + return nil, err + } + + if len(listeners) > 0 { + return listeners[0], nil + } + + // If port is empty, nothing to listen so returns error. + port := conf.Core.Port + if len(port) == 0 { + return nil, fmt.Errorf("no port to listen") + } + + // Try to listen as TCP port, first + if _, err := strconv.Atoi(port); err == nil { + l, err := net.Listen("tcp", ":"+port) + if err != nil { + return nil, err + } + return l, nil } - // Listen UNIX Socket - if strings.HasPrefix(ConfGaurun.Core.Port, "unix:/") { - sockPath := ConfGaurun.Core.Port[5:] + // Try to listen as UNIX socket. + if strings.HasPrefix(port, "unix:/") { + sockPath := port[5:] + fi, err := os.Lstat(sockPath) if err == nil && (fi.Mode()&os.ModeSocket) == os.ModeSocket { - err := os.Remove(sockPath) - if err != nil { - log.Fatal("failed to remove " + sockPath) + if err := os.Remove(sockPath); err != nil { + return nil, fmt.Errorf("failed to remove socket path: %s", err) } } + l, err := net.Listen("unix", sockPath) if err != nil { - log.Fatal("failed to listen: " + sockPath) + return nil, fmt.Errorf("failed to listen unix socket: %s", err) } - http.Serve(l, nil) + + return l, nil + } + + return nil, fmt.Errorf("invalid port %s (it must be number or path start with 'unix:/')", port) +} + +func RunServer(server *http.Server, conf *ConfToml) error { + l, err := getListener(conf) + if err != nil { + return err } - log.Fatal("port parameter is invalid: " + ConfGaurun.Core.Port) + return server.Serve(l) } diff --git a/gaurun/worker.go b/gaurun/worker.go index 234e952..5ed3913 100644 --- a/gaurun/worker.go +++ b/gaurun/worker.go @@ -3,6 +3,7 @@ package gaurun import ( "fmt" "strings" + "sync" "sync/atomic" "github.com/RobotsAndPencils/buford/push" @@ -11,6 +12,12 @@ import ( var ( // PusherCountAll is the shared value between workers PusherCountAll int64 + + // PusherWg is global wait group for pusher worker. + // It increments when new pusher is swapned and decrements when job is done. + // + // This is used to block main process to shutdown while pusher is still working. + PusherWg sync.WaitGroup ) func init() { @@ -41,6 +48,8 @@ func isExternalServerError(err error, platform int) bool { } func pushSync(pusher func(req RequestGaurunNotification) error, req RequestGaurunNotification, retryMax int) { + PusherWg.Add(1) + defer PusherWg.Done() Retry: err := pusher(req) if err != nil && req.Retry < retryMax && isExternalServerError(err, req.Platform) { @@ -50,6 +59,7 @@ Retry: } func pushAsync(pusher func(req RequestGaurunNotification) error, req RequestGaurunNotification, retryMax int, pusherCount *int64) { + defer PusherWg.Done() Retry: err := pusher(req) if err != nil && req.Retry < retryMax && isExternalServerError(err, req.Platform) { @@ -97,7 +107,7 @@ func pushNotificationWorker() { // as the increment in goroutine runs asynchronously. atomic.AddInt64(&pusherCount, 1) atomic.AddInt64(&PusherCountAll, 1) - + PusherWg.Add(1) go pushAsync(pusher, notification, retryMax, &pusherCount) continue } else { diff --git a/glide.lock b/glide.lock index e17a56e..9b304b8 100644 --- a/glide.lock +++ b/glide.lock @@ -1,5 +1,5 @@ -hash: 75e4d22bceb424fb5a3f2357f1dce75d9dd9634604ec4a88e3e40c6bd656c11d -updated: 2016-11-24T19:59:46.47680243+09:00 +hash: d3eeb4e68bd90fea7711484b9032c3ae0249bda1305dc5f501e872e2e767a303 +updated: 2017-01-24T12:49:27.499393495+09:00 imports: - name: github.com/BurntSushi/toml version: bbd5bb678321a0d6e58f1099321dfa73391c1b6f @@ -7,6 +7,10 @@ imports: version: 4b86f9c0ead51cc410d05655596e30f281ed9071 - name: github.com/fukata/golang-stats-api-handler version: ab9f90f16caab828afda479fd34bfbbbba2efcee +- name: github.com/lestrrat/go-server-starter + version: 901cec093d58ca36652e8d7906e4bd3e32793801 + subpackages: + - listener - name: github.com/mercari/gcm version: 987b1dc4ce9034b698395d35de1aadf999388b8f - name: github.com/RobotsAndPencils/buford diff --git a/glide.yaml b/glide.yaml index 2789faa..53abb32 100644 --- a/glide.yaml +++ b/glide.yaml @@ -13,3 +13,4 @@ import: - package: github.com/stretchr/testify - package: github.com/uber-go/zap - package: github.com/client9/reopen +- package: github.com/lestrrat/go-server-starter