From e694827eb967047c65f2d3b623d71ffde5696c71 Mon Sep 17 00:00:00 2001 From: Adam Shiervani Date: Thu, 20 Nov 2025 14:40:37 +0100 Subject: [PATCH 1/4] feat: handle grpc events --- internal/native/grpc_client.go | 26 ++++++++++++-------------- 1 file changed, 12 insertions(+), 14 deletions(-) diff --git a/internal/native/grpc_client.go b/internal/native/grpc_client.go index 300a22848..85a3201bd 100644 --- a/internal/native/grpc_client.go +++ b/internal/native/grpc_client.go @@ -79,6 +79,18 @@ func NewGRPCClient(opts grpcClientOptions) (*GRPCClient, error) { // Start event stream go grpcClient.startEventStream() + // Start event handler to process events from the channel + go func() { + for { + select { + case event := <-grpcClient.eventCh: + grpcClient.handleEvent(event) + case <-grpcClient.eventDone: + return + } + } + }() + return grpcClient, nil } @@ -234,20 +246,6 @@ func (c *GRPCClient) handleEvent(event *pb.Event) { } } -// OnEvent registers an event handler -func (c *GRPCClient) OnEvent(eventType string, handler func(data interface{})) { - go func() { - for { - select { - case event := <-c.eventCh: - c.handleEvent(event) - case <-c.eventDone: - return - } - } - }() -} - // Close closes the gRPC client func (c *GRPCClient) Close() error { c.closeM.Lock() From 08937b119707eb9ba763c008f42d61b4a5e7f196 Mon Sep 17 00:00:00 2001 From: Adam Shiervani Date: Thu, 20 Nov 2025 14:56:17 +0100 Subject: [PATCH 2/4] refactor: remove redundant initialization of native and display components in Main function (#987) --- main.go | 3 --- 1 file changed, 3 deletions(-) diff --git a/main.go b/main.go index a4d80fb74..88d2dec7c 100644 --- a/main.go +++ b/main.go @@ -70,9 +70,6 @@ func Main() { initOta() - initNative(systemVersionLocal, appVersionLocal) - initDisplay() - http.DefaultClient.Timeout = 1 * time.Minute // Initialize network From e3d2f96afd16cda0425db745e03ce3388d77f14d Mon Sep 17 00:00:00 2001 From: Siyuan Date: Thu, 20 Nov 2025 14:43:26 +0000 Subject: [PATCH 3/4] chore: use single channel for broadcasting events, as there won't be multiple StreamEventsClient anyway --- internal/native/grpc_server.go | 47 +++++++--------------------------- 1 file changed, 9 insertions(+), 38 deletions(-) diff --git a/internal/native/grpc_server.go b/internal/native/grpc_server.go index 304203ced..dc177ef98 100644 --- a/internal/native/grpc_server.go +++ b/internal/native/grpc_server.go @@ -15,18 +15,18 @@ import ( // grpcServer wraps the Native instance and implements the gRPC service type grpcServer struct { pb.UnimplementedNativeServiceServer - native *Native - logger *zerolog.Logger - eventChs []chan *pb.Event - eventM sync.Mutex + native *Native + logger *zerolog.Logger + eventCh chan *pb.Event + eventM sync.Mutex } // NewGRPCServer creates a new gRPC server for the native service func NewGRPCServer(n *Native, logger *zerolog.Logger) *grpcServer { s := &grpcServer{ - native: n, - logger: logger, - eventChs: make([]chan *pb.Event, 0), + native: n, + logger: logger, + eventCh: make(chan *pb.Event, 100), } // Store original callbacks and wrap them to also broadcast events @@ -82,16 +82,7 @@ func NewGRPCServer(n *Native, logger *zerolog.Logger) *grpcServer { } func (s *grpcServer) broadcastEvent(event *pb.Event) { - s.eventM.Lock() - defer s.eventM.Unlock() - - for _, ch := range s.eventChs { - select { - case ch <- event: - default: - // Channel full, skip - } - } + s.eventCh <- event } func (s *grpcServer) IsReady(ctx context.Context, req *pb.IsReadyRequest) (*pb.IsReadyResponse, error) { @@ -103,30 +94,10 @@ func (s *grpcServer) StreamEvents(req *pb.Empty, stream pb.NativeService_StreamE setProcTitle("connected") defer setProcTitle("waiting") - eventCh := make(chan *pb.Event, 100) - - // Register this channel for events - s.eventM.Lock() - s.eventChs = append(s.eventChs, eventCh) - s.eventM.Unlock() - - // Unregister on exit - defer func() { - s.eventM.Lock() - defer s.eventM.Unlock() - for i, ch := range s.eventChs { - if ch == eventCh { - s.eventChs = append(s.eventChs[:i], s.eventChs[i+1:]...) - break - } - } - close(eventCh) - }() - // Stream events for { select { - case event := <-eventCh: + case event := <-s.eventCh: if err := stream.Send(event); err != nil { return err } From 6ee260f0b8d99404af19197498552b6a6b7797d6 Mon Sep 17 00:00:00 2001 From: Siyuan Date: Thu, 20 Nov 2025 14:50:40 +0000 Subject: [PATCH 4/4] fix: use single channel for broadcasting events, as there won't be multiple StreamEventsClient anyway --- internal/native/grpc_server.go | 58 +++++++++++++++++++++++++++------- 1 file changed, 47 insertions(+), 11 deletions(-) diff --git a/internal/native/grpc_server.go b/internal/native/grpc_server.go index dc177ef98..9b54fb5b7 100644 --- a/internal/native/grpc_server.go +++ b/internal/native/grpc_server.go @@ -15,18 +15,20 @@ import ( // grpcServer wraps the Native instance and implements the gRPC service type grpcServer struct { pb.UnimplementedNativeServiceServer - native *Native - logger *zerolog.Logger - eventCh chan *pb.Event - eventM sync.Mutex + native *Native + logger *zerolog.Logger + eventStreamChan chan *pb.Event + eventStreamMu sync.Mutex + eventStreamCtx context.Context + eventStreamCancel context.CancelFunc } // NewGRPCServer creates a new gRPC server for the native service func NewGRPCServer(n *Native, logger *zerolog.Logger) *grpcServer { s := &grpcServer{ - native: n, - logger: logger, - eventCh: make(chan *pb.Event, 100), + native: n, + logger: logger, + eventStreamChan: make(chan *pb.Event, 100), } // Store original callbacks and wrap them to also broadcast events @@ -82,7 +84,7 @@ func NewGRPCServer(n *Native, logger *zerolog.Logger) *grpcServer { } func (s *grpcServer) broadcastEvent(event *pb.Event) { - s.eventCh <- event + s.eventStreamChan <- event } func (s *grpcServer) IsReady(ctx context.Context, req *pb.IsReadyRequest) (*pb.IsReadyResponse, error) { @@ -94,15 +96,49 @@ func (s *grpcServer) StreamEvents(req *pb.Empty, stream pb.NativeService_StreamE setProcTitle("connected") defer setProcTitle("waiting") + // Cancel previous stream if exists + s.eventStreamMu.Lock() + if s.eventStreamCancel != nil { + s.logger.Debug().Msg("cancelling previous StreamEvents call") + s.eventStreamCancel() + } + + // Create a cancellable context for this stream + ctx, cancel := context.WithCancel(stream.Context()) + s.eventStreamCtx = ctx + s.eventStreamCancel = cancel + s.eventStreamMu.Unlock() + + // Clean up when this stream ends + defer func() { + s.eventStreamMu.Lock() + defer s.eventStreamMu.Unlock() + if s.eventStreamCtx == ctx { + s.eventStreamCancel = nil + s.eventStreamCtx = nil + } + cancel() + }() + // Stream events for { select { - case event := <-s.eventCh: + case event := <-s.eventStreamChan: + // Check if this stream is still the active one + s.eventStreamMu.Lock() + isActive := s.eventStreamCtx == ctx + s.eventStreamMu.Unlock() + + if !isActive { + s.logger.Debug().Msg("stream replaced by new call, exiting") + return context.Canceled + } + if err := stream.Send(event); err != nil { return err } - case <-stream.Context().Done(): - return stream.Context().Err() + case <-ctx.Done(): + return ctx.Err() } } }