Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: An attempt to forward a handle's media stream with multiple threads for videoroom. #3323

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
163 changes: 158 additions & 5 deletions src/ice.c
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,17 @@ static void *janus_ice_static_event_loop_thread(void *data) {
janus_refcount_decrease(&loop->ref);
return NULL;
}
static void janus_relay_helper_destroy(janus_relay_helper* helper) {
if (helper && g_atomic_int_compare_and_exchange(&helper->destroyed, 0, 1))
janus_refcount_decrease(&helper->ref);
}
static void janus_relay_helper_free(const janus_refcount* helper_ref) {
janus_relay_helper* helper = janus_refcount_containerof(helper_ref, janus_relay_helper, ref);
/* This helper can be destroyed, free all the resources */
g_async_queue_unref(helper->queued_packets);
g_free(helper);
}

int janus_ice_get_static_event_loops(void) {
return static_event_loops;
}
Expand Down Expand Up @@ -513,20 +524,73 @@ const char *janus_media_type_str(janus_media_type type) {

/* Deallocation helpers for handles and related structs */
static void janus_ice_handle_free(const janus_refcount *handle_ref);
static void janus_ice_handle_helper_free(gpointer data, gpointer user_data);
static void janus_ice_webrtc_free(janus_ice_handle *handle);
static void janus_ice_plugin_session_free(const janus_refcount *app_handle_ref);
static void janus_ice_peerconnection_free(const janus_refcount *pc_ref);
static void janus_ice_peerconnection_medium_free(const janus_refcount *medium_ref);
static void* janus_ice_helper_thread(void* data);

static gboolean janus_ice_outgoing_rtcp_handle(gpointer user_data);
static gboolean janus_ice_outgoing_stats_handle(gpointer user_data);
static gboolean janus_ice_outgoing_traffic_handle(janus_ice_handle* handle, janus_ice_queued_packet* pkt);

/* Custom GSource for outgoing helper traffic */
typedef struct janus_ice_outgoing_helper_traffic {
GSource parent;
janus_relay_helper* helper;
GDestroyNotify destroy;
} janus_ice_outgoing_helper_traffic;
static gboolean janus_ice_outgoing_traffic_helper_prepare(GSource* source, gint* timeout) {
janus_ice_outgoing_helper_traffic* t = (janus_ice_outgoing_helper_traffic*)source;
return (g_async_queue_length(t->helper->queued_packets) > 0);
}
static gboolean janus_ice_outgoing_traffic_helper_dispatch(GSource* source, GSourceFunc callback, gpointer user_data) {
janus_ice_outgoing_helper_traffic* t = (janus_ice_outgoing_helper_traffic*)source;
int ret = G_SOURCE_CONTINUE;
janus_ice_queued_packet* pkt = NULL;
while ((pkt = g_async_queue_try_pop(t->helper->queued_packets)) != NULL) {
if (janus_ice_outgoing_traffic_handle(t->helper->handle, pkt) == G_SOURCE_REMOVE)
ret = G_SOURCE_REMOVE;
}
return ret;
}
static void janus_ice_outgoing_traffic_helper_finalize(GSource* source) {
janus_ice_outgoing_helper_traffic* t = (janus_ice_outgoing_helper_traffic*)source;
JANUS_LOG(LOG_VERB, "[%"SCNu64"] Finalizing helper loop source\n", t->helper->id);

if (t->helper->mainloop != NULL && g_main_loop_is_running(t->helper->mainloop)) {
/* This handle had a dedicated event loop, quit it */
g_main_loop_quit(t->helper->mainloop);
}
janus_refcount_decrease(&t->helper->ref);
}
static GSourceFuncs janus_ice_outgoing_traffic_helper_funcs = {
janus_ice_outgoing_traffic_helper_prepare,
NULL, /* We don't need check */
janus_ice_outgoing_traffic_helper_dispatch,
janus_ice_outgoing_traffic_helper_finalize,
NULL, NULL
};
static GSource* janus_ice_outgoing_traffic_helper_create(janus_relay_helper* helper, GDestroyNotify destroy) {
GSource* source = g_source_new(&janus_ice_outgoing_traffic_helper_funcs, sizeof(janus_ice_outgoing_helper_traffic));
janus_ice_outgoing_helper_traffic* t = (janus_ice_outgoing_helper_traffic*)source;
char name[255];
g_snprintf(name, sizeof(name), "helper-%"SCNu64, helper->id);
g_source_set_name(source, name);
janus_refcount_increase(&helper->ref);
t->helper = helper;
t->destroy = destroy;
return source;
}

/* Custom GSource for outgoing traffic */
typedef struct janus_ice_outgoing_traffic {
GSource parent;
janus_ice_handle *handle;
GDestroyNotify destroy;
} janus_ice_outgoing_traffic;
static gboolean janus_ice_outgoing_rtcp_handle(gpointer user_data);
static gboolean janus_ice_outgoing_stats_handle(gpointer user_data);
static gboolean janus_ice_outgoing_traffic_handle(janus_ice_handle *handle, janus_ice_queued_packet *pkt);

static gboolean janus_ice_outgoing_traffic_prepare(GSource *source, gint *timeout) {
janus_ice_outgoing_traffic *t = (janus_ice_outgoing_traffic *)source;
return (g_async_queue_length(t->handle->queued_packets) > 0);
Expand All @@ -535,9 +599,14 @@ static gboolean janus_ice_outgoing_traffic_dispatch(GSource *source, GSourceFunc
janus_ice_outgoing_traffic *t = (janus_ice_outgoing_traffic *)source;
int ret = G_SOURCE_CONTINUE;
janus_ice_queued_packet *pkt = NULL;
int i = 0;
while((pkt = g_async_queue_try_pop(t->handle->queued_packets)) != NULL) {
if(janus_ice_outgoing_traffic_handle(t->handle, pkt) == G_SOURCE_REMOVE)
ret = G_SOURCE_REMOVE;
// if(janus_ice_outgoing_traffic_handle(t->handle, pkt) == G_SOURCE_REMOVE)
// ret = G_SOURCE_REMOVE;
janus_relay_helper* helper = (janus_relay_helper*)g_slist_nth_data(t->handle->threads, i++);
g_async_queue_push(helper->queued_packets, pkt);
if (i == 5)
i = 0;
}
return ret;
}
Expand All @@ -549,6 +618,11 @@ static void janus_ice_outgoing_traffic_finalize(GSource *source) {
janus_ice_webrtc_free(t->handle);
janus_refcount_decrease(&t->handle->ref);
} else if(t->handle->mainloop != NULL && g_main_loop_is_running(t->handle->mainloop)) {
/*wsw clear helper threads */
if (t->handle->helper_threads != 0)
{
g_list_foreach(t->handle->threads, janus_ice_handle_helper_free, NULL);
}
/* This handle had a dedicated event loop, quit it */
g_main_loop_quit(t->handle->mainloop);
}
Expand All @@ -570,6 +644,38 @@ static GSource *janus_ice_outgoing_traffic_create(janus_ice_handle *handle, GDes
janus_refcount_increase(&handle->ref);
t->handle = handle;
t->destroy = destroy;

/* Create heler threads */
int i = 5;
for (i; i <= handle->helper_threads; i++)
{
GError* error = NULL;
char tname[64];
janus_relay_helper* helper = g_malloc0(sizeof(janus_relay_helper));
helper->handle = handle;
helper->id = i;
helper->mainctx = g_main_context_new();
helper->mainloop = g_main_loop_new(helper->mainctx, FALSE);
helper->source = janus_ice_outgoing_traffic_helper_create(helper, (GDestroyNotify)g_free);
g_source_set_priority(helper->source, G_PRIORITY_DEFAULT);
g_source_attach(helper->source, helper->mainctx);
helper->queued_packets = g_async_queue_new();
janus_mutex_init(&helper->mutex);
janus_refcount_init(&helper->ref, janus_relay_helper_free);
/* Spawn a thread and add references */
g_snprintf(tname, sizeof(tname), "vhelp %u-%"SCNu64"", helper->id, handle->handle_id);
janus_refcount_increase(&helper->ref);
helper->thread = g_thread_try_new(tname, &janus_ice_helper_thread, helper, &error);
if (error != NULL) {
/* TODO Should this be a hard failure? */
JANUS_LOG(LOG_ERR, "Got error %d (%s) trying to launch the helper thread...\n",
error->code, error->message ? error->message : "??");
}
else {
janus_refcount_increase(&helper->ref);
handle->threads = g_list_append(handle->threads, helper);
}
}
return source;
}

Expand Down Expand Up @@ -801,6 +907,17 @@ static void janus_ice_clear_queued_candidates(janus_ice_handle *handle) {
}
}

static void janus_ice_clear_helper_queued_packets(janus_relay_helper* helper) {
if (helper == NULL || helper->queued_packets == NULL) {
return;
}
janus_ice_queued_packet* pkt = NULL;
while (g_async_queue_length(helper->queued_packets) > 0) {
pkt = g_async_queue_try_pop(helper->queued_packets);
janus_ice_free_queued_packet(pkt);
}
}

static void janus_ice_clear_queued_packets(janus_ice_handle *handle) {
if(handle == NULL || handle->queued_packets == NULL) {
return;
Expand Down Expand Up @@ -1368,6 +1485,27 @@ static void *janus_ice_handle_thread(void *data) {
return NULL;
}

/* Thread to take care of the helper loop */
static void* janus_ice_helper_thread(void* data) {
janus_relay_helper* helper = data;
JANUS_LOG(LOG_VERB, "[%"SCNu64"] Handle thread started; %p\n", helper->handle_id, helper);
if (helper->mainloop == NULL) {
JANUS_LOG(LOG_ERR, "[%"SCNu64"] Invalid loop...\n", helper->id);
janus_refcount_decrease(&helper->ref);
g_thread_unref(g_thread_self());
return NULL;
}
JANUS_LOG(LOG_DBG, "[%"SCNu64"] Looping...\n", helper->id);
g_main_loop_run(helper->mainloop);
//janus_ice_webrtc_free(handle);
helper->thread = NULL;
JANUS_LOG(LOG_VERB, "[%"SCNu64"] Handle thread ended! %p\n", helper->id, helper);
/* Unref the handle */
janus_refcount_decrease(&helper->ref);
g_thread_unref(g_thread_self());
return NULL;
}

janus_ice_handle *janus_ice_handle_create(void *core_session, const char *opaque_id, const char *token) {
if(core_session == NULL)
return NULL;
Expand Down Expand Up @@ -1580,6 +1718,13 @@ gint janus_ice_handle_destroy(void *core_session, janus_ice_handle *handle) {
return 0;
}

static void janus_ice_handle_helper_free(gpointer data, gpointer user_data)
{
janus_relay_helper* helper = (janus_relay_helper*)data;
janus_ice_clear_helper_queued_packets(helper);
janus_relay_helper_destroy(helper);
}

static void janus_ice_handle_free(const janus_refcount *handle_ref) {
janus_ice_handle *handle = janus_refcount_containerof(handle_ref, janus_ice_handle, ref);
/* This stack can be destroyed, free all the resources */
Expand All @@ -1592,6 +1737,13 @@ static void janus_ice_handle_free(const janus_refcount *handle_ref) {
janus_ice_clear_queued_packets(handle);
g_async_queue_unref(handle->queued_packets);
}

/*wsw clear helper threads */
if (handle->helper_threads != 0)
{
g_list_foreach(handle->threads, janus_ice_handle_helper_free, NULL);
}

if(static_event_loops == 0 && handle->mainloop != NULL) {
g_main_loop_unref(handle->mainloop);
handle->mainloop = NULL;
Expand Down Expand Up @@ -4625,6 +4777,7 @@ static gboolean janus_ice_outgoing_traffic_handle(janus_ice_handle *handle, janu
plugin ? plugin->get_package() : NULL, handle->opaque_id, handle->token);
return G_SOURCE_REMOVE;
} else if(pkt == &janus_ice_data_ready) {
JANUS_LOG(LOG_WARN, "pkt == &janus_ice_data_ready\n");
/* Data is writable on this PeerConnection, notify the plugin */
janus_plugin *plugin = (janus_plugin *)handle->app;
if(plugin != NULL && plugin->data_ready != NULL && handle->app_handle != NULL) {
Expand Down
19 changes: 18 additions & 1 deletion src/ice.h
Original file line number Diff line number Diff line change
Expand Up @@ -343,7 +343,6 @@ enum {
SEQ_RECVED
};


/*! \brief Janus ICE handle */
struct janus_ice_handle {
/*! \brief Opaque pointer to the core/peer session */
Expand All @@ -370,6 +369,10 @@ struct janus_ice_handle {
GMainLoop *mainloop;
/*! \brief In case static event loops are used, opaque pointer to the loop */
void *static_event_loop;
/*! \brief wsw Number of helper threads for relaying purpose*/
int helper_threads;
/*! \brief wsw List of helper threads, if any*/
GList* threads;
/*! \brief GLib thread for the handle and libnice */
GThread *thread;
/*! \brief GLib sources for outgoing traffic, recurring RTCP, and stats (and optionally TWCC) */
Expand Down Expand Up @@ -424,6 +427,20 @@ struct janus_ice_handle {
janus_refcount ref;
};

/*\brief Janus ICE janus_relay_helper*/
typedef struct janus_relay_helper {
struct janus_ice_handle* handle;
guint id;
GThread* thread;
GSource* source;
GMainContext* mainctx;
GMainLoop* mainloop;
GAsyncQueue* queued_packets;
volatile gint destroyed;
janus_mutex mutex;
janus_refcount ref;
} janus_relay_helper;

/*! \brief Janus handle WebRTC PeerConnection */
struct janus_ice_peerconnection {
/*! \brief Janus ICE handle this stream belongs to */
Expand Down