From 7b9fb11eb31adea2967926eaa062ab1424d57f2e Mon Sep 17 00:00:00 2001 From: InvalidJoker <0limitdev@gmail.com> Date: Tue, 21 Apr 2026 22:33:50 +0200 Subject: [PATCH 1/6] servermon: stop when management server is not reachable --- servermon/monitor.go | 54 +++++++++++++++++++++++++++++++++++++------- 1 file changed, 46 insertions(+), 8 deletions(-) diff --git a/servermon/monitor.go b/servermon/monitor.go index ead4753a..d0ef8af4 100644 --- a/servermon/monitor.go +++ b/servermon/monitor.go @@ -67,7 +67,10 @@ func (m Monitor) Run(ctx context.Context) error { return fmt.Errorf("dial: %w", err) } - rpcConn := jsonrpc2.NewConn(ctx, websocket.NewObjectStream(wsConn), &m) + runCtx, cancel := context.WithCancel(ctx) + defer cancel() + + rpcConn := jsonrpc2.NewConn(runCtx, websocket.NewObjectStream(wsConn), &m) defer rpcConn.Close() listTicker := time.NewTicker(1 * time.Second) @@ -77,6 +80,16 @@ func (m Monitor) Run(ctx context.Context) error { defer checkTicker.Stop() joined := atomic.Bool{} + lastSuccessfulPlayerFetch := atomic.Int64{} + lastSuccessfulPlayerFetch.Store(time.Now().UnixNano()) + errCh := make(chan error, 1) + + reportErr := func(err error) { + select { + case errCh <- err: + default: + } + } workloadID := os.Getenv("PLATFORMD_WORKLOAD_ID") if workloadID == "" { @@ -86,14 +99,28 @@ func (m Monitor) Run(ctx context.Context) error { logger := m.logger.With("workload_id", workloadID) go func() { - for range listTicker.C { + for { + select { + case <-runCtx.Done(): + return + case <-listTicker.C: + } + players := make([]player, 0) - if err := rpcConn.Call(ctx, "minecraft:players", nil, &players); err != nil { - logger.ErrorContext(ctx, "failed to call players", "err", err) + if err := rpcConn.Call(runCtx, "minecraft:players", nil, &players); err != nil { + logger.ErrorContext(runCtx, "failed to call players", "err", err) + + lastSuccess := time.Unix(0, lastSuccessfulPlayerFetch.Load()) + if time.Since(lastSuccess) >= m.conf.PlayerCountCheckInterval { + reportErr(fmt.Errorf("management api unreachable for %s: %w", m.conf.PlayerCountCheckInterval, err)) + return + } + continue } + lastSuccessfulPlayerFetch.Store(time.Now().UnixNano()) logger.Debug("got players", "player_count", len(players)) if len(players) > 0 { @@ -103,10 +130,16 @@ func (m Monitor) Run(ctx context.Context) error { }() go func() { - for range checkTicker.C { + for { + select { + case <-runCtx.Done(): + return + case <-checkTicker.C: + } + if joined.Load() { joined.Store(false) - break + continue } logger.Info( @@ -126,8 +159,13 @@ func (m Monitor) Run(ctx context.Context) error { } }() - <-ctx.Done() - return nil + select { + case <-ctx.Done(): + return nil + case err := <-errCh: + cancel() + return err + } } // Handle is present, because jsonrpc2 crashes if we pass a nil handler to jsonrpc2.NewConn From d009cc64d6fcdbc66ab1951fccf0e9f057c0bb9c Mon Sep 17 00:00:00 2001 From: InvalidJoker <0limitdev@gmail.com> Date: Wed, 22 Apr 2026 17:22:28 +0200 Subject: [PATCH 2/6] fix: call channel directly --- servermon/monitor.go | 9 +-------- 1 file changed, 1 insertion(+), 8 deletions(-) diff --git a/servermon/monitor.go b/servermon/monitor.go index d0ef8af4..874a1579 100644 --- a/servermon/monitor.go +++ b/servermon/monitor.go @@ -84,13 +84,6 @@ func (m Monitor) Run(ctx context.Context) error { lastSuccessfulPlayerFetch.Store(time.Now().UnixNano()) errCh := make(chan error, 1) - reportErr := func(err error) { - select { - case errCh <- err: - default: - } - } - workloadID := os.Getenv("PLATFORMD_WORKLOAD_ID") if workloadID == "" { return fmt.Errorf("PLATFORMD_WORKLOAD_ID not set") @@ -113,7 +106,7 @@ func (m Monitor) Run(ctx context.Context) error { lastSuccess := time.Unix(0, lastSuccessfulPlayerFetch.Load()) if time.Since(lastSuccess) >= m.conf.PlayerCountCheckInterval { - reportErr(fmt.Errorf("management api unreachable for %s: %w", m.conf.PlayerCountCheckInterval, err)) + errCh <- fmt.Errorf("failed to fetch player count for %v: %w", time.Since(lastSuccess), err) return } From a39ed02184d1341ef688c4f626c0acc000d25196 Mon Sep 17 00:00:00 2001 From: InvalidJoker <0limitdev@gmail.com> Date: Thu, 23 Apr 2026 15:25:47 +0200 Subject: [PATCH 3/6] revert changes --- servermon/monitor.go | 45 +++++++------------------------------------- 1 file changed, 7 insertions(+), 38 deletions(-) diff --git a/servermon/monitor.go b/servermon/monitor.go index 874a1579..d18aef15 100644 --- a/servermon/monitor.go +++ b/servermon/monitor.go @@ -67,10 +67,7 @@ func (m Monitor) Run(ctx context.Context) error { return fmt.Errorf("dial: %w", err) } - runCtx, cancel := context.WithCancel(ctx) - defer cancel() - - rpcConn := jsonrpc2.NewConn(runCtx, websocket.NewObjectStream(wsConn), &m) + rpcConn := jsonrpc2.NewConn(ctx, websocket.NewObjectStream(wsConn), &m) defer rpcConn.Close() listTicker := time.NewTicker(1 * time.Second) @@ -80,9 +77,6 @@ func (m Monitor) Run(ctx context.Context) error { defer checkTicker.Stop() joined := atomic.Bool{} - lastSuccessfulPlayerFetch := atomic.Int64{} - lastSuccessfulPlayerFetch.Store(time.Now().UnixNano()) - errCh := make(chan error, 1) workloadID := os.Getenv("PLATFORMD_WORKLOAD_ID") if workloadID == "" { @@ -92,28 +86,14 @@ func (m Monitor) Run(ctx context.Context) error { logger := m.logger.With("workload_id", workloadID) go func() { - for { - select { - case <-runCtx.Done(): - return - case <-listTicker.C: - } - + for range listTicker.C { players := make([]player, 0) - if err := rpcConn.Call(runCtx, "minecraft:players", nil, &players); err != nil { - logger.ErrorContext(runCtx, "failed to call players", "err", err) - - lastSuccess := time.Unix(0, lastSuccessfulPlayerFetch.Load()) - if time.Since(lastSuccess) >= m.conf.PlayerCountCheckInterval { - errCh <- fmt.Errorf("failed to fetch player count for %v: %w", time.Since(lastSuccess), err) - return - } - + if err := rpcConn.Call(ctx, "minecraft:players", nil, &players); err != nil { + logger.ErrorContext(ctx, "failed to call players", "err", err) continue } - lastSuccessfulPlayerFetch.Store(time.Now().UnixNano()) logger.Debug("got players", "player_count", len(players)) if len(players) > 0 { @@ -123,13 +103,7 @@ func (m Monitor) Run(ctx context.Context) error { }() go func() { - for { - select { - case <-runCtx.Done(): - return - case <-checkTicker.C: - } - + for range checkTicker.C { if joined.Load() { joined.Store(false) continue @@ -152,13 +126,8 @@ func (m Monitor) Run(ctx context.Context) error { } }() - select { - case <-ctx.Done(): - return nil - case err := <-errCh: - cancel() - return err - } + <-ctx.Done() + return nil } // Handle is present, because jsonrpc2 crashes if we pass a nil handler to jsonrpc2.NewConn From 1d20143c8c713702e9d0a23417f09e7e7b3a118e Mon Sep 17 00:00:00 2001 From: InvalidJoker <0limitdev@gmail.com> Date: Thu, 23 Apr 2026 18:04:24 +0200 Subject: [PATCH 4/6] test: add close test --- servermon/monitor_test.go | 53 ++++++++++++++++++++++++++++++++++++++- 1 file changed, 52 insertions(+), 1 deletion(-) diff --git a/servermon/monitor_test.go b/servermon/monitor_test.go index f0e9896d..86796f0f 100644 --- a/servermon/monitor_test.go +++ b/servermon/monitor_test.go @@ -21,7 +21,8 @@ import ( ) type fakeManagementAPI struct { - result func() []player + result func() []player + closeConn chan struct{} } type player struct { @@ -45,6 +46,13 @@ func (f fakeManagementAPI) serveWs(w http.ResponseWriter, r *http.Request) { return } + if f.closeConn != nil { + go func() { + <-f.closeConn + conn.Close() + }() + } + for { var req jsonrpc2.Request if err := conn.ReadJSON(&req); err != nil { @@ -160,3 +168,46 @@ func TestServerMonKeepsWorkloadWhenPlayersArePresent(t *testing.T) { Id: wlID, }) } + +func TestServerMonExitsWhenManagementAPIUnreachable(t *testing.T) { + var ( + wlID = "blabla" + ctx, cancel = context.WithTimeout(context.Background(), 10*time.Second) + wlMock = mock.NewMockV1alpha2WorkloadServiceClient(t) + closeConn = make(chan struct{}) + fake = fakeManagementAPI{ + result: func() []player { + return []player{} + }, + closeConn: closeConn, + } + mon = servermon.New( + slog.New(slog.NewTextHandler(os.Stdout, nil)), + servermon.Config{ + PlayerCountCheckInterval: 5 * time.Second, + MCServerManagementAPIEndpoint: "ws://localhost:30751", + }, + wlMock, + ) + ) + + _ = os.Setenv("PLATFORMD_WORKLOAD_ID", wlID) + defer cancel() + + go fake.Run(t, 30751) + + errCh := make(chan error, 1) + go func() { + errCh <- mon.Run(ctx) + }() + + time.Sleep(1 * time.Second) + close(closeConn) + + select { + case err := <-errCh: + require.Error(t, err) + case <-time.After(5 * time.Second): + t.Fatal("Run did not exit after management API became unreachable") + } +} From 95090574ef66eca0e41f3486362b439e6dec5e88 Mon Sep 17 00:00:00 2001 From: InvalidJoker <0limitdev@gmail.com> Date: Thu, 23 Apr 2026 18:07:49 +0200 Subject: [PATCH 5/6] test: test case we fixed --- servermon/monitor_test.go | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/servermon/monitor_test.go b/servermon/monitor_test.go index 86796f0f..bce32f82 100644 --- a/servermon/monitor_test.go +++ b/servermon/monitor_test.go @@ -8,6 +8,7 @@ import ( "log/slog" "net/http" "os" + "sync/atomic" "testing" "time" @@ -175,8 +176,12 @@ func TestServerMonExitsWhenManagementAPIUnreachable(t *testing.T) { ctx, cancel = context.WithTimeout(context.Background(), 10*time.Second) wlMock = mock.NewMockV1alpha2WorkloadServiceClient(t) closeConn = make(chan struct{}) + playerCount = atomic.Int32{} fake = fakeManagementAPI{ result: func() []player { + if playerCount.Load() > 0 { + return []player{{ID: "1", Name: "A"}} + } return []player{} }, closeConn: closeConn, @@ -184,7 +189,7 @@ func TestServerMonExitsWhenManagementAPIUnreachable(t *testing.T) { mon = servermon.New( slog.New(slog.NewTextHandler(os.Stdout, nil)), servermon.Config{ - PlayerCountCheckInterval: 5 * time.Second, + PlayerCountCheckInterval: 2 * time.Second, MCServerManagementAPIEndpoint: "ws://localhost:30751", }, wlMock, @@ -201,7 +206,10 @@ func TestServerMonExitsWhenManagementAPIUnreachable(t *testing.T) { errCh <- mon.Run(ctx) }() - time.Sleep(1 * time.Second) + playerCount.Store(1) + time.Sleep(2 * time.Second) + playerCount.Store(0) + time.Sleep(200 * time.Millisecond) close(closeConn) select { From 27148175ae948efa02ffafe1730ce549a4261c22 Mon Sep 17 00:00:00 2001 From: InvalidJoker <0limitdev@gmail.com> Date: Thu, 23 Apr 2026 18:15:49 +0200 Subject: [PATCH 6/6] test: no error channel anymore --- servermon/monitor_test.go | 18 ++++++++++-------- 1 file changed, 10 insertions(+), 8 deletions(-) diff --git a/servermon/monitor_test.go b/servermon/monitor_test.go index bce32f82..42199dfa 100644 --- a/servermon/monitor_test.go +++ b/servermon/monitor_test.go @@ -199,11 +199,18 @@ func TestServerMonExitsWhenManagementAPIUnreachable(t *testing.T) { _ = os.Setenv("PLATFORMD_WORKLOAD_ID", wlID) defer cancel() + wlMock. + EXPECT(). + StopWorkload(mocky.Anything, &workloadv1alpha2.WorkloadStopRequest{ + Id: wlID, + }). + Return(&workloadv1alpha2.WorkloadStopResponse{}, nil) + go fake.Run(t, 30751) - errCh := make(chan error, 1) go func() { - errCh <- mon.Run(ctx) + err := mon.Run(ctx) + require.NoError(t, err) }() playerCount.Store(1) @@ -212,10 +219,5 @@ func TestServerMonExitsWhenManagementAPIUnreachable(t *testing.T) { time.Sleep(200 * time.Millisecond) close(closeConn) - select { - case err := <-errCh: - require.Error(t, err) - case <-time.After(5 * time.Second): - t.Fatal("Run did not exit after management API became unreachable") - } + <-ctx.Done() }