From ff9a5b175537fc38171f36b641502405c08a557f Mon Sep 17 00:00:00 2001 From: wswaaa123 Date: Mon, 22 Jan 2024 15:13:16 +0800 Subject: [PATCH] feat: An attempt to add multithreaded packet forwarding support for videoroom. --- src/ice.c | 163 ++++++++++++++++++++++++++++++++++++++++++++++++++++-- src/ice.h | 19 ++++++- 2 files changed, 176 insertions(+), 6 deletions(-) diff --git a/src/ice.c b/src/ice.c index 7d334efa26..35f282fea2 100644 --- a/src/ice.c +++ b/src/ice.c @@ -227,6 +227,17 @@ static void *janus_ice_static_event_loop_thread(void *data) { janus_refcount_decrease(&loop->ref); return NULL; } +static void janus_relay_helper_destroy(janus_relay_helper* helper) { + if (helper && g_atomic_int_compare_and_exchange(&helper->destroyed, 0, 1)) + janus_refcount_decrease(&helper->ref); +} +static void janus_relay_helper_free(const janus_refcount* helper_ref) { + janus_relay_helper* helper = janus_refcount_containerof(helper_ref, janus_relay_helper, ref); + /* This helper can be destroyed, free all the resources */ + g_async_queue_unref(helper->queued_packets); + g_free(helper); +} + int janus_ice_get_static_event_loops(void) { return static_event_loops; } @@ -513,10 +524,65 @@ const char *janus_media_type_str(janus_media_type type) { /* Deallocation helpers for handles and related structs */ static void janus_ice_handle_free(const janus_refcount *handle_ref); +static void janus_ice_handle_helper_free(gpointer data, gpointer user_data); static void janus_ice_webrtc_free(janus_ice_handle *handle); static void janus_ice_plugin_session_free(const janus_refcount *app_handle_ref); static void janus_ice_peerconnection_free(const janus_refcount *pc_ref); static void janus_ice_peerconnection_medium_free(const janus_refcount *medium_ref); +static void* janus_ice_helper_thread(void* data); + +static gboolean janus_ice_outgoing_rtcp_handle(gpointer user_data); +static gboolean janus_ice_outgoing_stats_handle(gpointer user_data); +static gboolean janus_ice_outgoing_traffic_handle(janus_ice_handle* handle, janus_ice_queued_packet* pkt); + +/* Custom GSource for outgoing helper traffic */ +typedef struct janus_ice_outgoing_helper_traffic { + GSource parent; + janus_relay_helper* helper; + GDestroyNotify destroy; +} janus_ice_outgoing_helper_traffic; +static gboolean janus_ice_outgoing_traffic_helper_prepare(GSource* source, gint* timeout) { + janus_ice_outgoing_helper_traffic* t = (janus_ice_outgoing_helper_traffic*)source; + return (g_async_queue_length(t->helper->queued_packets) > 0); +} +static gboolean janus_ice_outgoing_traffic_helper_dispatch(GSource* source, GSourceFunc callback, gpointer user_data) { + janus_ice_outgoing_helper_traffic* t = (janus_ice_outgoing_helper_traffic*)source; + int ret = G_SOURCE_CONTINUE; + janus_ice_queued_packet* pkt = NULL; + while ((pkt = g_async_queue_try_pop(t->helper->queued_packets)) != NULL) { + if (janus_ice_outgoing_traffic_handle(t->helper->handle, pkt) == G_SOURCE_REMOVE) + ret = G_SOURCE_REMOVE; + } + return ret; +} +static void janus_ice_outgoing_traffic_helper_finalize(GSource* source) { + janus_ice_outgoing_helper_traffic* t = (janus_ice_outgoing_helper_traffic*)source; + JANUS_LOG(LOG_VERB, "[%"SCNu64"] Finalizing helper loop source\n", t->helper->id); + + if (t->helper->mainloop != NULL && g_main_loop_is_running(t->helper->mainloop)) { + /* This handle had a dedicated event loop, quit it */ + g_main_loop_quit(t->helper->mainloop); + } + janus_refcount_decrease(&t->helper->ref); +} +static GSourceFuncs janus_ice_outgoing_traffic_helper_funcs = { + janus_ice_outgoing_traffic_helper_prepare, + NULL, /* We don't need check */ + janus_ice_outgoing_traffic_helper_dispatch, + janus_ice_outgoing_traffic_helper_finalize, + NULL, NULL +}; +static GSource* janus_ice_outgoing_traffic_helper_create(janus_relay_helper* helper, GDestroyNotify destroy) { + GSource* source = g_source_new(&janus_ice_outgoing_traffic_helper_funcs, sizeof(janus_ice_outgoing_helper_traffic)); + janus_ice_outgoing_helper_traffic* t = (janus_ice_outgoing_helper_traffic*)source; + char name[255]; + g_snprintf(name, sizeof(name), "helper-%"SCNu64, helper->id); + g_source_set_name(source, name); + janus_refcount_increase(&helper->ref); + t->helper = helper; + t->destroy = destroy; + return source; +} /* Custom GSource for outgoing traffic */ typedef struct janus_ice_outgoing_traffic { @@ -524,9 +590,7 @@ typedef struct janus_ice_outgoing_traffic { janus_ice_handle *handle; GDestroyNotify destroy; } janus_ice_outgoing_traffic; -static gboolean janus_ice_outgoing_rtcp_handle(gpointer user_data); -static gboolean janus_ice_outgoing_stats_handle(gpointer user_data); -static gboolean janus_ice_outgoing_traffic_handle(janus_ice_handle *handle, janus_ice_queued_packet *pkt); + static gboolean janus_ice_outgoing_traffic_prepare(GSource *source, gint *timeout) { janus_ice_outgoing_traffic *t = (janus_ice_outgoing_traffic *)source; return (g_async_queue_length(t->handle->queued_packets) > 0); @@ -535,9 +599,14 @@ static gboolean janus_ice_outgoing_traffic_dispatch(GSource *source, GSourceFunc janus_ice_outgoing_traffic *t = (janus_ice_outgoing_traffic *)source; int ret = G_SOURCE_CONTINUE; janus_ice_queued_packet *pkt = NULL; + int i = 0; while((pkt = g_async_queue_try_pop(t->handle->queued_packets)) != NULL) { - if(janus_ice_outgoing_traffic_handle(t->handle, pkt) == G_SOURCE_REMOVE) - ret = G_SOURCE_REMOVE; +// if(janus_ice_outgoing_traffic_handle(t->handle, pkt) == G_SOURCE_REMOVE) +// ret = G_SOURCE_REMOVE; + janus_relay_helper* helper = (janus_relay_helper*)g_slist_nth_data(t->handle->threads, i++); + g_async_queue_push(helper->queued_packets, pkt); + if (i == 5) + i = 0; } return ret; } @@ -549,6 +618,11 @@ static void janus_ice_outgoing_traffic_finalize(GSource *source) { janus_ice_webrtc_free(t->handle); janus_refcount_decrease(&t->handle->ref); } else if(t->handle->mainloop != NULL && g_main_loop_is_running(t->handle->mainloop)) { + /*wsw clear helper threads */ + if (t->handle->helper_threads != 0) + { + g_list_foreach(t->handle->threads, janus_ice_handle_helper_free, NULL); + } /* This handle had a dedicated event loop, quit it */ g_main_loop_quit(t->handle->mainloop); } @@ -570,6 +644,38 @@ static GSource *janus_ice_outgoing_traffic_create(janus_ice_handle *handle, GDes janus_refcount_increase(&handle->ref); t->handle = handle; t->destroy = destroy; + + /* Create heler threads */ + int i = 5; + for (i; i <= handle->helper_threads; i++) + { + GError* error = NULL; + char tname[64]; + janus_relay_helper* helper = g_malloc0(sizeof(janus_relay_helper)); + helper->handle = handle; + helper->id = i; + helper->mainctx = g_main_context_new(); + helper->mainloop = g_main_loop_new(helper->mainctx, FALSE); + helper->source = janus_ice_outgoing_traffic_helper_create(helper, (GDestroyNotify)g_free); + g_source_set_priority(helper->source, G_PRIORITY_DEFAULT); + g_source_attach(helper->source, helper->mainctx); + helper->queued_packets = g_async_queue_new(); + janus_mutex_init(&helper->mutex); + janus_refcount_init(&helper->ref, janus_relay_helper_free); + /* Spawn a thread and add references */ + g_snprintf(tname, sizeof(tname), "vhelp %u-%"SCNu64"", helper->id, handle->handle_id); + janus_refcount_increase(&helper->ref); + helper->thread = g_thread_try_new(tname, &janus_ice_helper_thread, helper, &error); + if (error != NULL) { + /* TODO Should this be a hard failure? */ + JANUS_LOG(LOG_ERR, "Got error %d (%s) trying to launch the helper thread...\n", + error->code, error->message ? error->message : "??"); + } + else { + janus_refcount_increase(&helper->ref); + handle->threads = g_list_append(handle->threads, helper); + } + } return source; } @@ -801,6 +907,17 @@ static void janus_ice_clear_queued_candidates(janus_ice_handle *handle) { } } +static void janus_ice_clear_helper_queued_packets(janus_relay_helper* helper) { + if (helper == NULL || helper->queued_packets == NULL) { + return; + } + janus_ice_queued_packet* pkt = NULL; + while (g_async_queue_length(helper->queued_packets) > 0) { + pkt = g_async_queue_try_pop(helper->queued_packets); + janus_ice_free_queued_packet(pkt); + } +} + static void janus_ice_clear_queued_packets(janus_ice_handle *handle) { if(handle == NULL || handle->queued_packets == NULL) { return; @@ -1368,6 +1485,27 @@ static void *janus_ice_handle_thread(void *data) { return NULL; } +/* Thread to take care of the helper loop */ +static void* janus_ice_helper_thread(void* data) { + janus_relay_helper* helper = data; + JANUS_LOG(LOG_VERB, "[%"SCNu64"] Handle thread started; %p\n", helper->handle_id, helper); + if (helper->mainloop == NULL) { + JANUS_LOG(LOG_ERR, "[%"SCNu64"] Invalid loop...\n", helper->id); + janus_refcount_decrease(&helper->ref); + g_thread_unref(g_thread_self()); + return NULL; + } + JANUS_LOG(LOG_DBG, "[%"SCNu64"] Looping...\n", helper->id); + g_main_loop_run(helper->mainloop); + //janus_ice_webrtc_free(handle); + helper->thread = NULL; + JANUS_LOG(LOG_VERB, "[%"SCNu64"] Handle thread ended! %p\n", helper->id, helper); + /* Unref the handle */ + janus_refcount_decrease(&helper->ref); + g_thread_unref(g_thread_self()); + return NULL; +} + janus_ice_handle *janus_ice_handle_create(void *core_session, const char *opaque_id, const char *token) { if(core_session == NULL) return NULL; @@ -1580,6 +1718,13 @@ gint janus_ice_handle_destroy(void *core_session, janus_ice_handle *handle) { return 0; } +static void janus_ice_handle_helper_free(gpointer data, gpointer user_data) +{ + janus_relay_helper* helper = (janus_relay_helper*)data; + janus_ice_clear_helper_queued_packets(helper); + janus_relay_helper_destroy(helper); +} + static void janus_ice_handle_free(const janus_refcount *handle_ref) { janus_ice_handle *handle = janus_refcount_containerof(handle_ref, janus_ice_handle, ref); /* This stack can be destroyed, free all the resources */ @@ -1592,6 +1737,13 @@ static void janus_ice_handle_free(const janus_refcount *handle_ref) { janus_ice_clear_queued_packets(handle); g_async_queue_unref(handle->queued_packets); } + + /*wsw clear helper threads */ + if (handle->helper_threads != 0) + { + g_list_foreach(handle->threads, janus_ice_handle_helper_free, NULL); + } + if(static_event_loops == 0 && handle->mainloop != NULL) { g_main_loop_unref(handle->mainloop); handle->mainloop = NULL; @@ -4625,6 +4777,7 @@ static gboolean janus_ice_outgoing_traffic_handle(janus_ice_handle *handle, janu plugin ? plugin->get_package() : NULL, handle->opaque_id, handle->token); return G_SOURCE_REMOVE; } else if(pkt == &janus_ice_data_ready) { + JANUS_LOG(LOG_WARN, "pkt == &janus_ice_data_ready\n"); /* Data is writable on this PeerConnection, notify the plugin */ janus_plugin *plugin = (janus_plugin *)handle->app; if(plugin != NULL && plugin->data_ready != NULL && handle->app_handle != NULL) { diff --git a/src/ice.h b/src/ice.h index e9d121f2eb..170318921a 100644 --- a/src/ice.h +++ b/src/ice.h @@ -343,7 +343,6 @@ enum { SEQ_RECVED }; - /*! \brief Janus ICE handle */ struct janus_ice_handle { /*! \brief Opaque pointer to the core/peer session */ @@ -370,6 +369,10 @@ struct janus_ice_handle { GMainLoop *mainloop; /*! \brief In case static event loops are used, opaque pointer to the loop */ void *static_event_loop; + /*! \brief wsw Number of helper threads for relaying purpose*/ + int helper_threads; + /*! \brief wsw List of helper threads, if any*/ + GList* threads; /*! \brief GLib thread for the handle and libnice */ GThread *thread; /*! \brief GLib sources for outgoing traffic, recurring RTCP, and stats (and optionally TWCC) */ @@ -424,6 +427,20 @@ struct janus_ice_handle { janus_refcount ref; }; +/*\brief Janus ICE janus_relay_helper*/ +typedef struct janus_relay_helper { + struct janus_ice_handle* handle; + guint id; + GThread* thread; + GSource* source; + GMainContext* mainctx; + GMainLoop* mainloop; + GAsyncQueue* queued_packets; + volatile gint destroyed; + janus_mutex mutex; + janus_refcount ref; +} janus_relay_helper; + /*! \brief Janus handle WebRTC PeerConnection */ struct janus_ice_peerconnection { /*! \brief Janus ICE handle this stream belongs to */