Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 27 additions & 10 deletions src/mod/event_handlers/mod_event_socket/mod_event_socket.c
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,8 @@ struct listener {
time_t linger_timeout;
struct listener *next;
switch_pollfd_t *pollfd;
uint8_t lock_acquired;
uint8_t finished;
};

typedef struct listener listener_t;
Expand Down Expand Up @@ -163,7 +165,7 @@ SWITCH_DECLARE_GLOBAL_STRING_FUNC(set_pref_ip, prefs.ip);
SWITCH_DECLARE_GLOBAL_STRING_FUNC(set_pref_pass, prefs.password);

static void *SWITCH_THREAD_FUNC listener_run(switch_thread_t *thread, void *obj);
static void launch_listener_thread(listener_t *listener);
static switch_status_t launch_listener_thread(listener_t *listener);

static switch_status_t socket_logger(const switch_log_node_t *node, switch_log_level_t level)
{
Expand Down Expand Up @@ -523,9 +525,17 @@ SWITCH_STANDARD_APP(socket_function)
if (switch_test_flag(listener, LFLAG_ASYNC)) {
const char *var;

launch_listener_thread(listener);
if (launch_listener_thread(listener) != SWITCH_STATUS_SUCCESS) {
switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_ERROR, "Failed to start listener\n");
return;
}

while (switch_channel_ready(channel) && !switch_test_flag(listener, LFLAG_CONNECTED)) {
/* Wait until listener_thread acquires session read lock */
while (!listener->lock_acquired && !listener->finished) {
switch_cond_next();
}

while (switch_channel_ready(channel) && !listener->finished && !switch_test_flag(listener, LFLAG_CONNECTED)) {
switch_cond_next();
}

Expand Down Expand Up @@ -2637,10 +2647,13 @@ static void *SWITCH_THREAD_FUNC listener_run(switch_thread_t *thread, void *obj)

if ((session = listener->session)) {
if (switch_core_session_read_lock(session) != SWITCH_STATUS_SUCCESS) {
switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_ERROR, "Unable to lock session!\n");
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Unable to lock session!\n");
locked = 0;
session = NULL;
goto done;
}

listener->lock_acquired = 1;
}

if (!listener->sock) {
Expand Down Expand Up @@ -2791,7 +2804,7 @@ static void *SWITCH_THREAD_FUNC listener_run(switch_thread_t *thread, void *obj)
}
switch_mutex_unlock(listener->filter_mutex);

if (listener->session) {
if (listener->session && locked) {
channel = switch_core_session_get_channel(listener->session);
}

Expand Down Expand Up @@ -2823,7 +2836,9 @@ static void *SWITCH_THREAD_FUNC listener_run(switch_thread_t *thread, void *obj)
}

if (listener->session) {
switch_channel_clear_flag(switch_core_session_get_channel(listener->session), CF_CONTROLLED);
if (locked) {
switch_channel_clear_flag(switch_core_session_get_channel(listener->session), CF_CONTROLLED);
}
switch_clear_flag_locked(listener, LFLAG_SESSION);
if (locked) {
switch_core_session_rwunlock(listener->session);
Expand All @@ -2837,20 +2852,22 @@ static void *SWITCH_THREAD_FUNC listener_run(switch_thread_t *thread, void *obj)
prefs.threads--;
switch_mutex_unlock(globals.listener_mutex);

listener->finished = 1;

return NULL;
}


/* Create a thread for the socket and launch it */
static void launch_listener_thread(listener_t *listener)
static switch_status_t launch_listener_thread(listener_t *listener)
{
switch_thread_t *thread;
switch_threadattr_t *thd_attr = NULL;

switch_threadattr_create(&thd_attr, listener->pool);
switch_threadattr_detach_set(thd_attr, 1);
switch_threadattr_stacksize_set(thd_attr, SWITCH_THREAD_STACKSIZE);
switch_thread_create(&thread, thd_attr, listener_run, listener, listener->pool);
return switch_thread_create(&thread, thd_attr, listener_run, listener, listener->pool);
}

static int config(void)
Expand Down Expand Up @@ -3033,8 +3050,8 @@ SWITCH_MODULE_RUNTIME_FUNCTION(mod_event_socket_runtime)
if (switch_socket_addr_get(&listener->sa, SWITCH_TRUE, listener->sock) == SWITCH_STATUS_SUCCESS && listener->sa) {
switch_get_addr(listener->remote_ip, sizeof(listener->remote_ip), listener->sa);
if ((listener->remote_port = switch_sockaddr_get_port(listener->sa))) {
launch_listener_thread(listener);
continue;
if (launch_listener_thread(listener) == SWITCH_STATUS_SUCCESS)
continue;
}
}

Expand Down