From 6dc613201ff80fbfb42d3468882aab4588dd9649 Mon Sep 17 00:00:00 2001 From: Preetam Dwivedi Date: Wed, 11 Feb 2026 12:01:59 -0800 Subject: [PATCH] (chore) make example server same as gateway --- examples/server/orchestrator/main.go | 57 +++++++++++++++++++--------- examples/server/speculator/main.go | 57 +++++++++++++++++++--------- 2 files changed, 80 insertions(+), 34 deletions(-) diff --git a/examples/server/orchestrator/main.go b/examples/server/orchestrator/main.go index 68f9d3b8..8bd9188d 100644 --- a/examples/server/orchestrator/main.go +++ b/examples/server/orchestrator/main.go @@ -6,6 +6,7 @@ import ( "net" "os" "os/signal" + "sync" "syscall" "time" @@ -30,7 +31,7 @@ func (s *OrchestratorServer) Ping(ctx context.Context, req *pb.PingRequest) (*pb func main() { if err := run(); err != nil { - fmt.Fprintf(os.Stderr, "Failed to start orchestrator server: %v\n", err) + fmt.Fprintf(os.Stderr, "Orchestrator server failure: %v\n", err) os.Exit(1) } } @@ -45,19 +46,35 @@ func run() error { // Initialize metrics scope scope := tally.NewTestScope("orchestrator", nil) + metricsStopCh := make(chan interface{}, 1) + metricsWgDone := sync.WaitGroup{} + metricsWgDone.Add(1) go func() { + defer metricsWgDone.Done() + ticker := time.NewTicker(10 * time.Second) defer ticker.Stop() - for range ticker.C { - snapshot := scope.Snapshot() - logger.Info("metrics snapshot", - zap.Any("counters", snapshot.Counters()), - zap.Any("gauges", snapshot.Gauges()), - zap.Any("timers", snapshot.Timers()), - ) + + for { + select { + case <-metricsStopCh: + return + case <-ticker.C: + snapshot := scope.Snapshot() + logger.Info("metrics snapshot", + zap.Any("counters", snapshot.Counters()), + zap.Any("gauges", snapshot.Gauges()), + zap.Any("timers", snapshot.Timers()), + ) + } } }() + defer func() { + close(metricsStopCh) + metricsWgDone.Wait() + }() + // Create gRPC server grpcServer := grpc.NewServer() @@ -81,20 +98,26 @@ func run() error { fmt.Printf("Orchestrator gRPC server is running on %s\n", port) fmt.Println("Press Ctrl+C to stop.") - // Start server in a goroutine + // Start server in a goroutine and wait for it to finish + serverErrCh := make(chan error, 1) go func() { - if err := grpcServer.Serve(listener); err != nil { - fmt.Fprintf(os.Stderr, "Failed to serve: %v\n", err) - } + serverErrCh <- grpcServer.Serve(listener) }() - // Wait for interrupt signal + // Wait for interrupt signal or server exit sigCh := make(chan os.Signal, 1) signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM) - <-sigCh - fmt.Println("\nShutting down orchestrator server...") - grpcServer.GracefulStop() + select { + case <-sigCh: + fmt.Println("\nShutting down orchestrator server...") + grpcServer.GracefulStop() + _ = <-serverErrCh // Wait for the server to exit and ignore the error + case errCh := <-serverErrCh: + if errCh != nil { + err = fmt.Errorf("\nServer exited with error: %w\n", errCh) + } + } - return nil + return err } diff --git a/examples/server/speculator/main.go b/examples/server/speculator/main.go index 0db02c6d..a92ae2c8 100644 --- a/examples/server/speculator/main.go +++ b/examples/server/speculator/main.go @@ -6,6 +6,7 @@ import ( "net" "os" "os/signal" + "sync" "syscall" "time" @@ -30,7 +31,7 @@ func (s *SpeculatorServer) Ping(ctx context.Context, req *pb.PingRequest) (*pb.P func main() { if err := run(); err != nil { - fmt.Fprintf(os.Stderr, "Failed to start speculator server: %v\n", err) + fmt.Fprintf(os.Stderr, "Speculator server failure: %v\n", err) os.Exit(1) } } @@ -45,19 +46,35 @@ func run() error { // Initialize metrics scope scope := tally.NewTestScope("speculator", nil) + metricsStopCh := make(chan interface{}, 1) + metricsWgDone := sync.WaitGroup{} + metricsWgDone.Add(1) go func() { + defer metricsWgDone.Done() + ticker := time.NewTicker(10 * time.Second) defer ticker.Stop() - for range ticker.C { - snapshot := scope.Snapshot() - logger.Info("metrics snapshot", - zap.Any("counters", snapshot.Counters()), - zap.Any("gauges", snapshot.Gauges()), - zap.Any("timers", snapshot.Timers()), - ) + + for { + select { + case <-metricsStopCh: + return + case <-ticker.C: + snapshot := scope.Snapshot() + logger.Info("metrics snapshot", + zap.Any("counters", snapshot.Counters()), + zap.Any("gauges", snapshot.Gauges()), + zap.Any("timers", snapshot.Timers()), + ) + } } }() + defer func() { + close(metricsStopCh) + metricsWgDone.Wait() + }() + // Create gRPC server grpcServer := grpc.NewServer() @@ -81,20 +98,26 @@ func run() error { fmt.Printf("Speculator gRPC server is running on %s\n", port) fmt.Println("Press Ctrl+C to stop.") - // Start server in a goroutine + // Start server in a goroutine and wait for it to finish + serverErrCh := make(chan error, 1) go func() { - if err := grpcServer.Serve(listener); err != nil { - fmt.Fprintf(os.Stderr, "Failed to serve: %v\n", err) - } + serverErrCh <- grpcServer.Serve(listener) }() - // Wait for interrupt signal + // Wait for interrupt signal or server exit sigCh := make(chan os.Signal, 1) signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM) - <-sigCh - fmt.Println("\nShutting down speculator server...") - grpcServer.GracefulStop() + select { + case <-sigCh: + fmt.Println("\nShutting down speculator server...") + grpcServer.GracefulStop() + _ = <-serverErrCh // Wait for the server to exit and ignore the error + case errCh := <-serverErrCh: + if errCh != nil { + err = fmt.Errorf("\nServer exited with error: %w\n", errCh) + } + } - return nil + return err }