Skip to content

Commit

Permalink
Transport-related events
Browse files Browse the repository at this point in the history
  • Loading branch information
lminiero committed Jan 17, 2017
1 parent 5386fc3 commit 5e1c567
Show file tree
Hide file tree
Showing 11 changed files with 261 additions and 11 deletions.
32 changes: 29 additions & 3 deletions events.c
Original file line number Diff line number Diff line change
Expand Up @@ -68,12 +68,24 @@ void janus_events_notify_handlers(int type, guint64 session_id, ...) {
/* Core events also allocate a json_t object for its data, unref it */
json_t *body = va_arg(args, json_t *);
json_decref(body);
} else if(type == JANUS_EVENT_TYPE_PLUGIN || type == JANUS_EVENT_TYPE_TRANSPORT) {
} else if(type == JANUS_EVENT_TYPE_SESSION) {
/* Session events may allocate a json_t object for transport-related info, unref it */
va_arg(args, char *);
json_t *transport = va_arg(args, json_t *);
if(transport != NULL)
json_decref(transport);
} else if(type == JANUS_EVENT_TYPE_PLUGIN) {
/* Plugin originated events also allocate a json_t object for the plugin data, skip some arguments and unref it */
va_arg(args, guint64);
va_arg(args, char *);
json_t *data = va_arg(args, json_t *);
json_decref(data);
} else if(type == JANUS_EVENT_TYPE_TRANSPORT) {
/* Transport originated events also allocate a json_t object for the transport data, skip some arguments and unref it */
va_arg(args, char *);
va_arg(args, void *);
json_t *data = va_arg(args, json_t *);
json_decref(data);
}
va_end(args);
return;
Expand All @@ -100,6 +112,9 @@ void janus_events_notify_handlers(int type, guint64 session_id, ...) {
/* For sessions, there's just a generic event name (what happened) */
char *name = va_arg(args, char *);
json_object_set_new(body, "name", json_string(name));
json_t *transport = va_arg(args, json_t *);
if(transport != NULL)
json_object_set(body, "transport", transport);
break;
}
case JANUS_EVENT_TYPE_HANDLE: {
Expand Down Expand Up @@ -141,8 +156,7 @@ void janus_events_notify_handlers(int type, guint64 session_id, ...) {
body = va_arg(args, json_t *);
break;
}
case JANUS_EVENT_TYPE_PLUGIN:
case JANUS_EVENT_TYPE_TRANSPORT: {
case JANUS_EVENT_TYPE_PLUGIN: {
/* For plugin-originated events, there's the handle ID, the plugin name, and a generic, plugin specific, json_t object */
guint64 handle_id = va_arg(args, guint64);
if(handle_id > 0) /* Plugins and transports may not specify a session and handle ID for out of context events */
Expand All @@ -153,6 +167,18 @@ void janus_events_notify_handlers(int type, guint64 session_id, ...) {
json_object_set(body, "data", data);
break;
}
case JANUS_EVENT_TYPE_TRANSPORT: {
char *name = va_arg(args, char *);
json_object_set_new(body, "transport", json_string(name));
char *instance = va_arg(args, void *);
char id[32];
memset(id, 0, sizeof(id));
g_snprintf(id, sizeof(id), "%p", instance);
json_object_set_new(body, "id", json_string(id));
json_t *data = va_arg(args, json_t *);
json_object_set(body, "data", data);
break;
}
case JANUS_EVENT_TYPE_CORE: {
/* For core-related events, there's a json_t object with info on what happened */
body = va_arg(args, json_t *);
Expand Down
14 changes: 12 additions & 2 deletions events/janus_sampleevh.c
Original file line number Diff line number Diff line change
Expand Up @@ -342,14 +342,24 @@ static void *janus_sampleevh_handler(void *data) {
switch(type) {
case JANUS_EVENT_TYPE_SESSION:
/* This is a session related event. The only info that is
* provided is a name for the event itself. Here's an example
* of a new session being created:
* required is a name for the event itself: a "created"
* event may also contain transport info, in the form of
* the transport module that originated the session
* (e.g., "janus.transport.http") and an internal unique
* ID for the transport instance (which may be associated
* to a connection or anything else within the specifics
* of the transport module itself). Here's an example of
* a new session being created:
{
"type": 1,
"timestamp": 3583879627,
"session_id": 2004798115,
"event": {
"name": "created"
},
"transport": {
"transport": "janus.transport.http",
"id": "0x7fcb100008c0"
}
}
*/
Expand Down
34 changes: 29 additions & 5 deletions janus.c
Original file line number Diff line number Diff line change
Expand Up @@ -319,6 +319,7 @@ gboolean janus_transport_is_api_secret_needed(janus_transport *plugin);
gboolean janus_transport_is_api_secret_valid(janus_transport *plugin, const char *apisecret);
gboolean janus_transport_is_auth_token_needed(janus_transport *plugin);
gboolean janus_transport_is_auth_token_valid(janus_transport *plugin, const char *token);
void janus_transport_notify_event(janus_transport *plugin, void *transport, json_t *event);

static janus_transport_callbacks janus_handler_transport =
{
Expand All @@ -328,6 +329,8 @@ static janus_transport_callbacks janus_handler_transport =
.is_api_secret_valid = janus_transport_is_api_secret_valid,
.is_auth_token_needed = janus_transport_is_auth_token_needed,
.is_auth_token_valid = janus_transport_is_auth_token_valid,
.events_is_enabled = janus_events_is_enabled,
.notify_event = janus_transport_notify_event,
};
GThreadPool *tasks = NULL;
void janus_transport_task(gpointer data, gpointer user_data);
Expand Down Expand Up @@ -407,7 +410,7 @@ static gboolean janus_check_sessions(gpointer user_data) {
}
/* Notify event handlers as well */
if(janus_events_is_enabled())
janus_events_notify_handlers(JANUS_EVENT_TYPE_SESSION, session->session_id, "timeout");
janus_events_notify_handlers(JANUS_EVENT_TYPE_SESSION, session->session_id, "timeout", NULL);

/* Mark the session as over, we'll deal with it later */
session->timeout = 1;
Expand Down Expand Up @@ -470,9 +473,6 @@ janus_session *janus_session_create(guint64 session_id) {
janus_mutex_lock(&sessions_mutex);
g_hash_table_insert(sessions, janus_uint64_dup(session->session_id), session);
janus_mutex_unlock(&sessions_mutex);
/* Notify event handlers */
if(janus_events_is_enabled())
janus_events_notify_handlers(JANUS_EVENT_TYPE_SESSION, session_id, "created");
return session;
}

Expand Down Expand Up @@ -675,6 +675,17 @@ int janus_process_incoming_request(janus_request *request) {
session->source = janus_request_new(request->transport, request->instance, NULL, FALSE, NULL);
/* Notify the source that a new session has been created */
request->transport->session_created(request->instance, session->session_id);
/* Notify event handlers */
if(janus_events_is_enabled()) {
/* Session created, add info on the transport that originated it */
json_t *transport = json_object();
json_object_set_new(transport, "transport", json_string(session->source->transport->get_package()));
char id[32];
memset(id, 0, sizeof(id));
g_snprintf(id, sizeof(id), "%p", session->source->instance);
json_object_set_new(transport, "id", json_string(id));
janus_events_notify_handlers(JANUS_EVENT_TYPE_SESSION, session_id, "created", transport);
}
/* Prepare JSON reply */
json_t *reply = json_object();
json_object_set_new(reply, "janus", json_string("success"));
Expand Down Expand Up @@ -846,7 +857,7 @@ int janus_process_incoming_request(janus_request *request) {
ret = janus_process_success(request, reply);
/* Notify event handlers as well */
if(janus_events_is_enabled())
janus_events_notify_handlers(JANUS_EVENT_TYPE_SESSION, session_id, "destroyed");
janus_events_notify_handlers(JANUS_EVENT_TYPE_SESSION, session_id, "destroyed", NULL);
} else if(!strcasecmp(message_text, "detach")) {
if(handle == NULL) {
/* Query is an handle-level command */
Expand Down Expand Up @@ -2475,6 +2486,19 @@ gboolean janus_transport_is_auth_token_valid(janus_transport *plugin, const char
return token && janus_auth_check_token(token);
}

void janus_transport_notify_event(janus_transport *plugin, void *transport, json_t *event) {
/* A plugin asked to notify an event to the handlers */
if(!plugin || !event || !json_is_object(event))
return;
/* Notify event handlers */
if(janus_events_is_enabled()) {
janus_events_notify_handlers(JANUS_EVENT_TYPE_TRANSPORT,
0, plugin->get_package(), transport, event);
} else {
json_decref(event);
}
}

void janus_transport_task(gpointer data, gpointer user_data) {
JANUS_LOG(LOG_VERB, "Transport task pool, serving request\n");
janus_request *request = (janus_request *)data;
Expand Down
39 changes: 39 additions & 0 deletions transports/janus_http.c
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ static gint initialized = 0, stopping = 0;
static janus_transport_callbacks *gateway = NULL;
static gboolean http_janus_api_enabled = FALSE;
static gboolean http_admin_api_enabled = FALSE;
static gboolean notify_events = TRUE;

/* JSON serialization options */
static size_t json_format = JSON_INDENT(3) | JSON_PRESERVE_ORDER;
Expand Down Expand Up @@ -572,6 +573,14 @@ int janus_http_init(janus_transport_callbacks *callback, const char *config_path
}
}

/* Check if we need to send events to handlers */
janus_config_item *events = janus_config_get_item_drilldown(config, "general", "events");
if(events != NULL && events->value != NULL)
notify_events = janus_is_true(events->value);
if(!notify_events && callback->events_is_enabled()) {
JANUS_LOG(LOG_WARN, "Notification of events to handlers disabled for %s\n", JANUS_REST_NAME);
}

/* Check the base paths */
item = janus_config_get_item_drilldown(config, "general", "base_path");
if(item && item->value) {
Expand Down Expand Up @@ -1060,6 +1069,21 @@ int janus_http_handler(void *cls, struct MHD_Connection *connection, const char
*ptr = msg;
MHD_get_connection_values(connection, MHD_HEADER_KIND, &janus_http_headers, msg);
ret = MHD_YES;
/* Notify handlers about this new transport instance */
if(notify_events && gateway->events_is_enabled()) {
json_t *info = json_object();
json_object_set_new(info, "event", json_string("request"));
json_object_set_new(info, "admin_api", json_false());
const union MHD_ConnectionInfo *conninfo = MHD_get_connection_info(connection, MHD_CONNECTION_INFO_CLIENT_ADDRESS);
if(conninfo != NULL) {
char *ip = janus_address_to_ip((struct sockaddr *)conninfo->client_addr);
json_object_set_new(info, "ip", json_string(ip));
g_free(ip);
uint16_t port = janus_address_to_port((struct sockaddr *)conninfo->client_addr);
json_object_set_new(info, "port", json_integer(port));
}
gateway->notify_event(&janus_http_transport, msg, info);
}
} else {
JANUS_LOG(LOG_DBG, "Processing HTTP %s request on %s...\n", method, url);
}
Expand Down Expand Up @@ -1441,6 +1465,21 @@ int janus_http_admin_handler(void *cls, struct MHD_Connection *connection, const
*ptr = msg;
MHD_get_connection_values(connection, MHD_HEADER_KIND, &janus_http_headers, msg);
ret = MHD_YES;
/* Notify handlers about this new transport instance */
if(notify_events && gateway->events_is_enabled()) {
json_t *info = json_object();
json_object_set_new(info, "event", json_string("request"));
json_object_set_new(info, "admin_api", json_true());
const union MHD_ConnectionInfo *conninfo = MHD_get_connection_info(connection, MHD_CONNECTION_INFO_CLIENT_ADDRESS);
if(conninfo != NULL) {
char *ip = janus_address_to_ip((struct sockaddr *)conninfo->client_addr);
json_object_set_new(info, "ip", json_string(ip));
g_free(ip);
uint16_t port = janus_address_to_port((struct sockaddr *)conninfo->client_addr);
json_object_set_new(info, "port", json_integer(port));
}
gateway->notify_event(&janus_http_transport, msg, info);
}
}
/* Parse request */
if (strcasecmp(method, "GET") && strcasecmp(method, "POST") && strcasecmp(method, "OPTIONS")) {
Expand Down
42 changes: 42 additions & 0 deletions transports/janus_mqtt.c
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,9 @@ janus_transport *create(void) {
static gboolean janus_mqtt_api_enabled_ = FALSE;
static gboolean janus_mqtt_admin_api_enabled_ = FALSE;

/* Event handlers */
static gboolean notify_events = TRUE;

/* JSON serialization options */
static size_t json_format_ = JSON_INDENT(3) | JSON_PRESERVE_ORDER;

Expand Down Expand Up @@ -204,6 +207,14 @@ int janus_mqtt_init(janus_transport_callbacks *callback, const char *config_path
}
}

/* Check if we need to send events to handlers */
janus_config_item *events = janus_config_get_item_drilldown(config, "general", "events");
if(events != NULL && events->value != NULL)
notify_events = janus_is_true(events->value);
if(!notify_events && callback->events_is_enabled()) {
JANUS_LOG(LOG_WARN, "Notification of events to handlers disabled for %s\n", JANUS_MQTT_NAME);
}

/* Connect configuration */
janus_config_item *keep_alive_interval_item = janus_config_get_item_drilldown(config, "general", "keep_alive_interval");
ctx->connect.keep_alive_interval = (keep_alive_interval_item && keep_alive_interval_item->value) ? atoi(keep_alive_interval_item->value) : 20;
Expand Down Expand Up @@ -409,6 +420,14 @@ void janus_mqtt_session_over(void *context, guint64 session_id, gboolean timeout
void janus_mqtt_client_connection_lost(void *context, char *cause) {
JANUS_LOG(LOG_INFO, "MQTT connection lost cause of %s. Reconnecting...\n", cause);
/* Automatic reconnect */

/* Notify handlers about this transport being gone */
janus_mqtt_context *ctx = (janus_mqtt_context *)context;
if(notify_events && ctx && ctx->gateway && ctx->gateway->events_is_enabled()) {
json_t *info = json_object();
json_object_set_new(info, "event", json_string("reconnecting"));
ctx->gateway->notify_event(&janus_mqtt_transport_, context, info);
}
}

int janus_mqtt_client_message_arrived(void *context, char *topicName, int topicLen, MQTTAsync_message *message) {
Expand Down Expand Up @@ -462,12 +481,28 @@ void janus_mqtt_client_connect_success(void *context, MQTTAsync_successData *res
JANUS_LOG(LOG_ERR, "Can't subscribe to MQTT admin topic: %s, return code: %d\n", ctx->admin.subscribe.topic, rc);
}
}

/* Notify handlers about this new transport */
if(notify_events && ctx->gateway && ctx->gateway->events_is_enabled()) {
json_t *info = json_object();
json_object_set_new(info, "event", json_string("connected"));
ctx->gateway->notify_event(&janus_mqtt_transport_, context, info);
}
}

void janus_mqtt_client_connect_failure(void *context, MQTTAsync_failureData *response) {
int rc = response ? response->code : 0;
JANUS_LOG(LOG_ERR, "MQTT client has been failed connecting to the broker, return code: %d. Reconnecting...\n", rc);
/* Automatic reconnect */

/* Notify handlers about this transport failure */
janus_mqtt_context *ctx = (janus_mqtt_context *)context;
if(notify_events && ctx && ctx->gateway && ctx->gateway->events_is_enabled()) {
json_t *info = json_object();
json_object_set_new(info, "event", json_string("failed"));
json_object_set_new(info, "code", json_integer(rc));
ctx->gateway->notify_event(&janus_mqtt_transport_, context, info);
}
}

int janus_mqtt_client_reconnect(janus_mqtt_context *ctx) {
Expand Down Expand Up @@ -505,7 +540,14 @@ int janus_mqtt_client_disconnect(janus_mqtt_context *ctx) {
void janus_mqtt_client_disconnect_success(void *context, MQTTAsync_successData *response) {
JANUS_LOG(LOG_INFO, "MQTT client has been successfully disconnected. Destroying the client...\n");

/* Notify handlers about this transport being gone */
janus_mqtt_context *ctx = (janus_mqtt_context *)context;
if(notify_events && ctx && ctx->gateway && ctx->gateway->events_is_enabled()) {
json_t *info = json_object();
json_object_set_new(info, "event", json_string("disconnected"));
ctx->gateway->notify_event(&janus_mqtt_transport_, context, info);
}

janus_mqtt_client_destroy_context(&ctx);
}

Expand Down
Loading

0 comments on commit 5e1c567

Please sign in to comment.