diff --git a/internal/native/cgo/ctrl.c b/internal/native/cgo/ctrl.c index 547d5694b..62d2c8b92 100644 --- a/internal/native/cgo/ctrl.c +++ b/internal/native/cgo/ctrl.c @@ -59,6 +59,7 @@ const char *jetkvm_ui_event_code_to_name(int code) { void video_report_format(bool ready, const char *error, u_int16_t width, u_int16_t height, double frame_per_second) { + state.streaming = video_get_streaming_status(); state.ready = ready; state.error = error; state.width = width; @@ -69,6 +70,13 @@ void video_report_format(bool ready, const char *error, u_int16_t width, u_int16 } } +void video_send_format_report() { + state.streaming = video_get_streaming_status(); + if (video_state_handler != NULL) { + (*video_state_handler)(&state); + } +} + int video_send_frame(const uint8_t *frame, ssize_t len) { if (video_handler != NULL) { @@ -367,6 +375,10 @@ void jetkvm_video_stop() { video_stop_streaming(); } +uint8_t jetkvm_video_get_streaming_status() { + return video_get_streaming_status(); +} + int jetkvm_video_set_quality_factor(float quality_factor) { if (quality_factor <= 0 || quality_factor > 1) { return -1; diff --git a/internal/native/cgo/ctrl.h b/internal/native/cgo/ctrl.h index 774ee1473..59f9e4cdf 100644 --- a/internal/native/cgo/ctrl.h +++ b/internal/native/cgo/ctrl.h @@ -8,6 +8,7 @@ typedef struct { bool ready; + uint8_t streaming; const char *error; u_int16_t width; u_int16_t height; @@ -56,6 +57,7 @@ int jetkvm_video_init(float quality_factor); void jetkvm_video_shutdown(); void jetkvm_video_start(); void jetkvm_video_stop(); +uint8_t jetkvm_video_get_streaming_status(); int jetkvm_video_set_quality_factor(float quality_factor); float jetkvm_video_get_quality_factor(); int jetkvm_video_set_edid(const char *edid_hex); @@ -64,6 +66,7 @@ char *jetkvm_video_log_status(); jetkvm_video_state_t *jetkvm_video_get_status(); void video_report_format(bool ready, const char *error, u_int16_t width, u_int16_t height, double frame_per_second); +void video_send_format_report(); int video_send_frame(const uint8_t *frame, ssize_t len); diff --git a/internal/native/cgo/video.c b/internal/native/cgo/video.c index 857acbbb9..9107c70ef 100644 --- a/internal/native/cgo/video.c +++ b/internal/native/cgo/video.c @@ -349,7 +349,10 @@ static void *venc_read_stream(void *arg) } uint32_t detected_width, detected_height; -bool detected_signal = false, streaming_flag = false, streaming_stopped = true; +bool detected_signal = false, streaming_flag = false; + +bool streaming_stopped = true; +pthread_mutex_t streaming_stopped_mutex = PTHREAD_MUTEX_INITIALIZER; pthread_t *streaming_thread = NULL; pthread_mutex_t streaming_mutex = PTHREAD_MUTEX_INITIALIZER; @@ -370,8 +373,27 @@ void set_streaming_flag(bool flag) pthread_mutex_lock(&streaming_mutex); streaming_flag = flag; pthread_mutex_unlock(&streaming_mutex); + + video_send_format_report(); +} + +void set_streaming_stopped(bool stopped) +{ + pthread_mutex_lock(&streaming_stopped_mutex); + streaming_stopped = stopped; + pthread_mutex_unlock(&streaming_stopped_mutex); + + video_send_format_report(); } +bool get_streaming_stopped() +{ + pthread_mutex_lock(&streaming_stopped_mutex); + bool stopped = streaming_stopped; + pthread_mutex_unlock(&streaming_stopped_mutex); + return stopped; +} + void write_buffer_to_file(const uint8_t *buffer, size_t length, const char *filename) { FILE *file = fopen(filename, "wb"); @@ -385,8 +407,7 @@ void *run_video_stream(void *arg) log_info("running video stream"); - streaming_stopped = false; - + set_streaming_stopped(false); while (streaming_flag) { if (detected_signal == false) @@ -528,6 +549,8 @@ void *run_video_stream(void *arg) uint32_t num = 0; VIDEO_FRAME_INFO_S stFrame; + + while (streaming_flag) { FD_ZERO(&fds); @@ -539,6 +562,7 @@ void *run_video_stream(void *arg) if (r == 0) { log_info("select timeout"); + ensure_sleep_mode_disabled(); break; } if (r == -1) @@ -634,7 +658,7 @@ void *run_video_stream(void *arg) log_info("video stream thread exiting"); - streaming_stopped = true; + set_streaming_stopped(true); return NULL; } @@ -670,9 +694,10 @@ void video_start_streaming() log_info("starting video streaming"); if (streaming_thread != NULL) { - if (streaming_stopped == true) { + bool stopped = get_streaming_stopped(); + if (stopped == true) { log_error("video streaming already stopped but streaming_thread is not NULL"); - assert(streaming_stopped == true); + assert(stopped == true); } log_warn("video streaming already started"); return; @@ -699,6 +724,21 @@ void video_start_streaming() streaming_thread = new_thread; } +bool wait_for_streaming_stopped() +{ + int attempts = 0; + while (attempts < 30) { + if (get_streaming_stopped() == true) { + log_info("video streaming stopped after %d attempts", attempts); + return true; + } + usleep(100000); // 100ms + attempts++; + } + log_error("video streaming did not stop after 3s"); + return false; +} + void video_stop_streaming() { if (streaming_thread == NULL) { @@ -710,14 +750,7 @@ void video_stop_streaming() set_streaming_flag(false); log_info("waiting for video streaming thread to exit"); - int attempts = 0; - while (!streaming_stopped && attempts < 30) { - usleep(100000); // 100ms - attempts++; - } - if (!streaming_stopped) { - log_error("video streaming thread did not exit after 30s"); - } + wait_for_streaming_stopped(); pthread_join(*streaming_thread, NULL); free(streaming_thread); @@ -726,13 +759,30 @@ void video_stop_streaming() log_info("video streaming stopped"); } +uint8_t video_get_streaming_status() { + // streaming flag can be false when stopping streaming + if (get_streaming_flag() == true) return 1; + if (get_streaming_stopped() == false) return 2; + return 0; +} + void video_restart_streaming() { - if (get_streaming_flag() == true) + uint8_t streaming_status = video_get_streaming_status(); + if (streaming_status == 0) { - log_info("restarting video streaming"); + log_info("will not restart video streaming because it's stopped"); + return; + } + + if (streaming_status == 2) { video_stop_streaming(); } + + if (!wait_for_streaming_stopped()) { + return; + } + video_start_streaming(); } diff --git a/internal/native/cgo/video.h b/internal/native/cgo/video.h index 6fa00ca42..391f7dddb 100644 --- a/internal/native/cgo/video.h +++ b/internal/native/cgo/video.h @@ -31,6 +31,13 @@ void video_start_streaming(); */ void video_stop_streaming(); +/** + * @brief Get the streaming status of the video + * + * @return uint8_t 1 if the video streaming is active, 2 if the video streaming is stopping, 0 otherwise + */ +uint8_t video_get_streaming_status(); + /** * @brief Set the quality factor of the video * diff --git a/internal/native/cgo_linux.go b/internal/native/cgo_linux.go index b33eb5347..dcd25e42a 100644 --- a/internal/native/cgo_linux.go +++ b/internal/native/cgo_linux.go @@ -57,6 +57,7 @@ var ( func jetkvm_go_video_state_handler(state *C.jetkvm_video_state_t) { videoState := VideoState{ Ready: bool(state.ready), + Streaming: VideoStreamingStatus(state.streaming), Error: C.GoString(state.error), Width: int(state.width), Height: int(state.height), @@ -168,6 +169,15 @@ func videoStop() { C.jetkvm_video_stop() } +func videoGetStreamingStatus() VideoStreamingStatus { + cgoLock.Lock() + defer cgoLock.Unlock() + + isStreaming := C.jetkvm_video_get_streaming_status() + + return VideoStreamingStatus(isStreaming) +} + func videoLogStatus() string { cgoLock.Lock() defer cgoLock.Unlock() diff --git a/internal/native/cgo_notlinux.go b/internal/native/cgo_notlinux.go index 4602f7133..9bc77806d 100644 --- a/internal/native/cgo_notlinux.go +++ b/internal/native/cgo_notlinux.go @@ -123,6 +123,11 @@ func videoSetEDID(edid string) error { return nil } +func videoGetStreamingStatus() VideoStreamingStatus { + panicPlatformNotSupported() + return VideoStreamingStatusInactive +} + func crash() { panicPlatformNotSupported() } diff --git a/internal/native/chan.go b/internal/native/chan.go index 4162f2605..cd6d07af1 100644 --- a/internal/native/chan.go +++ b/internal/native/chan.go @@ -28,6 +28,7 @@ func (n *Native) handleVideoFrameChan() { func (n *Native) handleVideoStateChan() { for { state := <-videoStateChan + n.onVideoStateChange(state) } } diff --git a/internal/native/grpc_servermethods.go b/internal/native/grpc_servermethods.go index cc16dfd10..c1dea54fc 100644 --- a/internal/native/grpc_servermethods.go +++ b/internal/native/grpc_servermethods.go @@ -70,9 +70,6 @@ func (s *grpcServer) VideoLogStatus(ctx context.Context, req *pb.Empty) (*pb.Vid } func (s *grpcServer) VideoStop(ctx context.Context, req *pb.Empty) (*pb.Empty, error) { - procPrefix = "jetkvm: [native]" - setProcTitle(lastProcTitle) - if err := s.native.VideoStop(); err != nil { return nil, status.Error(codes.Internal, err.Error()) } @@ -80,9 +77,6 @@ func (s *grpcServer) VideoStop(ctx context.Context, req *pb.Empty) (*pb.Empty, e } func (s *grpcServer) VideoStart(ctx context.Context, req *pb.Empty) (*pb.Empty, error) { - procPrefix = "jetkvm: [native+video]" - setProcTitle(lastProcTitle) - if err := s.native.VideoStart(); err != nil { return nil, status.Error(codes.Internal, err.Error()) } diff --git a/internal/native/native.go b/internal/native/native.go index 61c4b0ac7..87eebf185 100644 --- a/internal/native/native.go +++ b/internal/native/native.go @@ -40,6 +40,26 @@ type NativeOptions struct { OnNativeRestart func() } +type VideoStreamingStatus uint8 + +const ( + VideoStreamingStatusActive VideoStreamingStatus = 1 + VideoStreamingStatusStopping VideoStreamingStatus = 2 // video is stopping, but not yet stopped + VideoStreamingStatusInactive VideoStreamingStatus = 0 +) + +func (s VideoStreamingStatus) String() string { + switch s { + case VideoStreamingStatusActive: + return "active" + case VideoStreamingStatusStopping: + return "stopping" + case VideoStreamingStatusInactive: + return "inactive" + } + return "unknown" +} + func NewNative(opts NativeOptions) *Native { pid := os.Getpid() nativeSubLogger := nativeLogger.With().Int("pid", pid).Str("scope", "native").Logger() diff --git a/internal/native/server.go b/internal/native/server.go index ae983159d..f52289e8c 100644 --- a/internal/native/server.go +++ b/internal/native/server.go @@ -54,6 +54,25 @@ func monitorCrashSignal(ctx context.Context, logger *zerolog.Logger, nativeInsta } } +func updateProcessTitle(state *VideoState) { + if state == nil { + procPrefix = "jetkvm: [native]" + } else { + var status string + if state.Streaming == VideoStreamingStatusInactive { + status = "inactive" + } else if !state.Ready { + status = "not ready" + } else if state.Error != "" { + status = state.Error + } else { + status = fmt.Sprintf("%s,%dx%d,%.1ffps", state.Streaming.String(), state.Width, state.Height, state.FramePerSecond) + } + procPrefix = fmt.Sprintf("jetkvm: [native+video{%s}]", status) + } + setProcTitle(lastProcTitle) +} + // RunNativeProcess runs the native process mode func RunNativeProcess(binaryName string) { appCtx, appCtxCancel := context.WithCancel(context.Background()) @@ -82,6 +101,9 @@ func RunNativeProcess(binaryName string) { logger.Fatal().Err(err).Msg("failed to write frame to video stream socket") } } + nativeOptions.OnVideoStateChange = func(state VideoState) { + updateProcessTitle(&state) + } // Create native instance nativeInstance := NewNative(*nativeOptions) diff --git a/internal/native/video.go b/internal/native/video.go index c556a9383..176511c69 100644 --- a/internal/native/video.go +++ b/internal/native/video.go @@ -15,11 +15,12 @@ var extraLockTimeout = 5 * time.Second // VideoState is the state of the video stream. type VideoState struct { - Ready bool `json:"ready"` - Error string `json:"error,omitempty"` //no_signal, no_lock, out_of_range - Width int `json:"width"` - Height int `json:"height"` - FramePerSecond float64 `json:"fps"` + Ready bool `json:"ready"` + Streaming VideoStreamingStatus `json:"streaming"` + Error string `json:"error,omitempty"` //no_signal, no_lock, out_of_range + Width int `json:"width"` + Height int `json:"height"` + FramePerSecond float64 `json:"fps"` } func isSleepModeSupported() bool { @@ -27,15 +28,53 @@ func isSleepModeSupported() bool { return err == nil } +const sleepModeWaitTimeout = 3 * time.Second + +func (n *Native) waitForVideoStreamingStatus(status VideoStreamingStatus) error { + timeout := time.After(sleepModeWaitTimeout) + ticker := time.NewTicker(100 * time.Millisecond) + defer ticker.Stop() + + for { + if videoGetStreamingStatus() == status { + return nil + } + select { + case <-timeout: + return fmt.Errorf("timed out waiting for video streaming status to be %s", status.String()) + case <-ticker.C: + } + } +} + +// before calling this function, make sure to lock n.videoLock func (n *Native) setSleepMode(enabled bool) error { if !n.sleepModeSupported { return nil } bEnabled := "0" + shouldWait := false if enabled { bEnabled = "1" + + switch videoGetStreamingStatus() { + case VideoStreamingStatusActive: + n.l.Info().Msg("stopping video stream to enable sleep mode") + videoStop() + shouldWait = true + case VideoStreamingStatusStopping: + n.l.Info().Msg("video stream is stopping, will enable sleep mode in a few seconds") + shouldWait = true + } + } + + if shouldWait { + if err := n.waitForVideoStreamingStatus(VideoStreamingStatusInactive); err != nil { + return err + } } + return os.WriteFile(sleepModeFile, []byte(bEnabled), 0644) } @@ -159,3 +198,11 @@ func (n *Native) VideoStart() error { videoStart() return nil } + +// VideoGetStreamingStatus gets the streaming status of the video. +func (n *Native) VideoGetStreamingStatus() VideoStreamingStatus { + n.videoLock.Lock() + defer n.videoLock.Unlock() + + return videoGetStreamingStatus() +} diff --git a/webrtc.go b/webrtc.go index 76de29145..10c43ddf1 100644 --- a/webrtc.go +++ b/webrtc.go @@ -386,6 +386,7 @@ func newSession(config SessionConfig) (*Session, error) { isConnected = false onActiveSessionsChanged() if decrActiveSessions() == 0 { + scopedLogger.Info().Msg("last session disconnected, stopping video stream") onLastSessionDisconnected() } }