Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions internal/native/cgo/ctrl.c
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ const char *jetkvm_ui_event_code_to_name(int code) {

void video_report_format(bool ready, const char *error, u_int16_t width, u_int16_t height, double frame_per_second)
{
state.streaming = video_get_streaming_status();
state.ready = ready;
state.error = error;
state.width = width;
Expand All @@ -69,6 +70,13 @@ void video_report_format(bool ready, const char *error, u_int16_t width, u_int16
}
}

void video_send_format_report() {
state.streaming = video_get_streaming_status();
if (video_state_handler != NULL) {
(*video_state_handler)(&state);
}
}

int video_send_frame(const uint8_t *frame, ssize_t len)
{
if (video_handler != NULL) {
Expand Down Expand Up @@ -367,6 +375,10 @@ void jetkvm_video_stop() {
video_stop_streaming();
}

uint8_t jetkvm_video_get_streaming_status() {
return video_get_streaming_status();
}

int jetkvm_video_set_quality_factor(float quality_factor) {
if (quality_factor <= 0 || quality_factor > 1) {
return -1;
Expand Down
3 changes: 3 additions & 0 deletions internal/native/cgo/ctrl.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
typedef struct
{
bool ready;
uint8_t streaming;
const char *error;
u_int16_t width;
u_int16_t height;
Expand Down Expand Up @@ -56,6 +57,7 @@ int jetkvm_video_init(float quality_factor);
void jetkvm_video_shutdown();
void jetkvm_video_start();
void jetkvm_video_stop();
uint8_t jetkvm_video_get_streaming_status();
int jetkvm_video_set_quality_factor(float quality_factor);
float jetkvm_video_get_quality_factor();
int jetkvm_video_set_edid(const char *edid_hex);
Expand All @@ -64,6 +66,7 @@ char *jetkvm_video_log_status();
jetkvm_video_state_t *jetkvm_video_get_status();

void video_report_format(bool ready, const char *error, u_int16_t width, u_int16_t height, double frame_per_second);
void video_send_format_report();
int video_send_frame(const uint8_t *frame, ssize_t len);


Expand Down
82 changes: 66 additions & 16 deletions internal/native/cgo/video.c
Original file line number Diff line number Diff line change
Expand Up @@ -349,7 +349,10 @@ static void *venc_read_stream(void *arg)
}

uint32_t detected_width, detected_height;
bool detected_signal = false, streaming_flag = false, streaming_stopped = true;
bool detected_signal = false, streaming_flag = false;

bool streaming_stopped = true;
pthread_mutex_t streaming_stopped_mutex = PTHREAD_MUTEX_INITIALIZER;

pthread_t *streaming_thread = NULL;
pthread_mutex_t streaming_mutex = PTHREAD_MUTEX_INITIALIZER;
Expand All @@ -370,8 +373,27 @@ void set_streaming_flag(bool flag)
pthread_mutex_lock(&streaming_mutex);
streaming_flag = flag;
pthread_mutex_unlock(&streaming_mutex);

video_send_format_report();
}

void set_streaming_stopped(bool stopped)
{
pthread_mutex_lock(&streaming_stopped_mutex);
streaming_stopped = stopped;
pthread_mutex_unlock(&streaming_stopped_mutex);

video_send_format_report();
}

bool get_streaming_stopped()
{
pthread_mutex_lock(&streaming_stopped_mutex);
bool stopped = streaming_stopped;
pthread_mutex_unlock(&streaming_stopped_mutex);
return stopped;
}

void write_buffer_to_file(const uint8_t *buffer, size_t length, const char *filename)
{
FILE *file = fopen(filename, "wb");
Expand All @@ -385,8 +407,7 @@ void *run_video_stream(void *arg)

log_info("running video stream");

streaming_stopped = false;

set_streaming_stopped(false);
while (streaming_flag)
{
if (detected_signal == false)
Expand Down Expand Up @@ -528,6 +549,8 @@ void *run_video_stream(void *arg)
uint32_t num = 0;
VIDEO_FRAME_INFO_S stFrame;



while (streaming_flag)
{
FD_ZERO(&fds);
Expand All @@ -539,6 +562,7 @@ void *run_video_stream(void *arg)
if (r == 0)
{
log_info("select timeout");
ensure_sleep_mode_disabled();
break;
}
if (r == -1)
Expand Down Expand Up @@ -634,7 +658,7 @@ void *run_video_stream(void *arg)

log_info("video stream thread exiting");

streaming_stopped = true;
set_streaming_stopped(true);

return NULL;
}
Expand Down Expand Up @@ -670,9 +694,10 @@ void video_start_streaming()
log_info("starting video streaming");
if (streaming_thread != NULL)
{
if (streaming_stopped == true) {
bool stopped = get_streaming_stopped();
if (stopped == true) {
log_error("video streaming already stopped but streaming_thread is not NULL");
assert(streaming_stopped == true);
assert(stopped == true);
}
log_warn("video streaming already started");
return;
Expand All @@ -699,6 +724,21 @@ void video_start_streaming()
streaming_thread = new_thread;
}

bool wait_for_streaming_stopped()
{
int attempts = 0;
while (attempts < 30) {
if (get_streaming_stopped() == true) {
log_info("video streaming stopped after %d attempts", attempts);
return true;
}
usleep(100000); // 100ms
attempts++;
}
log_error("video streaming did not stop after 3s");
return false;
}

void video_stop_streaming()
{
if (streaming_thread == NULL) {
Expand All @@ -710,14 +750,7 @@ void video_stop_streaming()
set_streaming_flag(false);

log_info("waiting for video streaming thread to exit");
int attempts = 0;
while (!streaming_stopped && attempts < 30) {
usleep(100000); // 100ms
attempts++;
}
if (!streaming_stopped) {
log_error("video streaming thread did not exit after 30s");
}
wait_for_streaming_stopped();

pthread_join(*streaming_thread, NULL);
free(streaming_thread);
Expand All @@ -726,13 +759,30 @@ void video_stop_streaming()
log_info("video streaming stopped");
}

uint8_t video_get_streaming_status() {
// streaming flag can be false when stopping streaming
if (get_streaming_flag() == true) return 1;
if (get_streaming_stopped() == false) return 2;
return 0;
}

void video_restart_streaming()
{
if (get_streaming_flag() == true)
uint8_t streaming_status = video_get_streaming_status();
if (streaming_status == 0)
{
log_info("restarting video streaming");
log_info("will not restart video streaming because it's stopped");
return;
}

if (streaming_status == 2) {
video_stop_streaming();
}

if (!wait_for_streaming_stopped()) {
return;
}

video_start_streaming();
}

Expand Down
7 changes: 7 additions & 0 deletions internal/native/cgo/video.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,13 @@ void video_start_streaming();
*/
void video_stop_streaming();

/**
* @brief Get the streaming status of the video
*
* @return uint8_t 1 if the video streaming is active, 2 if the video streaming is stopping, 0 otherwise
*/
uint8_t video_get_streaming_status();

/**
* @brief Set the quality factor of the video
*
Expand Down
10 changes: 10 additions & 0 deletions internal/native/cgo_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ var (
func jetkvm_go_video_state_handler(state *C.jetkvm_video_state_t) {
videoState := VideoState{
Ready: bool(state.ready),
Streaming: VideoStreamingStatus(state.streaming),
Error: C.GoString(state.error),
Width: int(state.width),
Height: int(state.height),
Expand Down Expand Up @@ -168,6 +169,15 @@ func videoStop() {
C.jetkvm_video_stop()
}

func videoGetStreamingStatus() VideoStreamingStatus {
cgoLock.Lock()
defer cgoLock.Unlock()

isStreaming := C.jetkvm_video_get_streaming_status()

return VideoStreamingStatus(isStreaming)
}

func videoLogStatus() string {
cgoLock.Lock()
defer cgoLock.Unlock()
Expand Down
5 changes: 5 additions & 0 deletions internal/native/cgo_notlinux.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,11 @@ func videoSetEDID(edid string) error {
return nil
}

func videoGetStreamingStatus() VideoStreamingStatus {
panicPlatformNotSupported()
return VideoStreamingStatusInactive
}

func crash() {
panicPlatformNotSupported()
}
1 change: 1 addition & 0 deletions internal/native/chan.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ func (n *Native) handleVideoFrameChan() {
func (n *Native) handleVideoStateChan() {
for {
state := <-videoStateChan

n.onVideoStateChange(state)
}
}
Expand Down
6 changes: 0 additions & 6 deletions internal/native/grpc_servermethods.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,19 +70,13 @@ func (s *grpcServer) VideoLogStatus(ctx context.Context, req *pb.Empty) (*pb.Vid
}

func (s *grpcServer) VideoStop(ctx context.Context, req *pb.Empty) (*pb.Empty, error) {
procPrefix = "jetkvm: [native]"
setProcTitle(lastProcTitle)

if err := s.native.VideoStop(); err != nil {
return nil, status.Error(codes.Internal, err.Error())
}
return &pb.Empty{}, nil
}

func (s *grpcServer) VideoStart(ctx context.Context, req *pb.Empty) (*pb.Empty, error) {
procPrefix = "jetkvm: [native+video]"
setProcTitle(lastProcTitle)

if err := s.native.VideoStart(); err != nil {
return nil, status.Error(codes.Internal, err.Error())
}
Expand Down
20 changes: 20 additions & 0 deletions internal/native/native.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,26 @@ type NativeOptions struct {
OnNativeRestart func()
}

type VideoStreamingStatus uint8

const (
VideoStreamingStatusActive VideoStreamingStatus = 1
VideoStreamingStatusStopping VideoStreamingStatus = 2 // video is stopping, but not yet stopped
VideoStreamingStatusInactive VideoStreamingStatus = 0
)

func (s VideoStreamingStatus) String() string {
switch s {
case VideoStreamingStatusActive:
return "active"
case VideoStreamingStatusStopping:
return "stopping"
case VideoStreamingStatusInactive:
return "inactive"
}
return "unknown"
}

func NewNative(opts NativeOptions) *Native {
pid := os.Getpid()
nativeSubLogger := nativeLogger.With().Int("pid", pid).Str("scope", "native").Logger()
Expand Down
22 changes: 22 additions & 0 deletions internal/native/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,25 @@ func monitorCrashSignal(ctx context.Context, logger *zerolog.Logger, nativeInsta
}
}

func updateProcessTitle(state *VideoState) {
if state == nil {
procPrefix = "jetkvm: [native]"
} else {
var status string
if state.Streaming == VideoStreamingStatusInactive {
status = "inactive"
} else if !state.Ready {
status = "not ready"
} else if state.Error != "" {
status = state.Error
} else {
status = fmt.Sprintf("%s,%dx%d,%.1ffps", state.Streaming.String(), state.Width, state.Height, state.FramePerSecond)
}
procPrefix = fmt.Sprintf("jetkvm: [native+video{%s}]", status)
}
setProcTitle(lastProcTitle)
}

// RunNativeProcess runs the native process mode
func RunNativeProcess(binaryName string) {
appCtx, appCtxCancel := context.WithCancel(context.Background())
Expand Down Expand Up @@ -82,6 +101,9 @@ func RunNativeProcess(binaryName string) {
logger.Fatal().Err(err).Msg("failed to write frame to video stream socket")
}
}
nativeOptions.OnVideoStateChange = func(state VideoState) {
updateProcessTitle(&state)
}

// Create native instance
nativeInstance := NewNative(*nativeOptions)
Expand Down
Loading