Skip to content
This repository has been archived by the owner on Jun 12, 2024. It is now read-only.

Commit

Permalink
Merge pull request signalwire#7 from inteliquent/rayo_connections
Browse files Browse the repository at this point in the history
Rayo connections
  • Loading branch information
xmppjingle committed Jul 29, 2019
2 parents 534ca9a + 7b6bcf4 commit 7eb6850
Showing 1 changed file with 52 additions and 15 deletions.
67 changes: 52 additions & 15 deletions src/mod/event_handlers/mod_rayo/xmpp_streams.c
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
#include "sasl.h"

#define MAX_QUEUE_LEN 25000
#define MAX_OUTBOUND_HOSTS 16

/**
* Context for all streams
Expand Down Expand Up @@ -131,6 +132,26 @@ struct xmpp_stream {
void *user_private;
};

/**
* Outbound connection information
*/
struct xmpp_outbound {
/** context for this stream */
struct xmpp_stream_context *context;
/** stream pool */
switch_memory_pool_t *pool;
/** domain for this context */
const char *domain;
/** outbound addresses */
char *addresses;
/** port of this stream */
int port;
/** outbound hosts */
char *hosts[MAX_OUTBOUND_HOSTS];
/** outbound hosts count */
int host_count;
};

/**
* A socket listening for new connections
*/
Expand Down Expand Up @@ -1339,19 +1360,30 @@ static struct xmpp_stream *xmpp_stream_create(struct xmpp_stream_context *contex
*/
static void *SWITCH_THREAD_FUNC xmpp_outbound_stream_thread(switch_thread_t *thread, void *obj)
{
struct xmpp_stream *stream = (struct xmpp_stream *)obj;
struct xmpp_stream_context *context = stream->context;
struct xmpp_outbound *outbound = (struct xmpp_outbound *)obj;
struct xmpp_stream *stream = NULL;
struct xmpp_stream_context *context = outbound->context;
int current_host = -1;
switch_socket_t *socket;
int warned = 0;

switch_thread_rwlock_rdlock(context->shutdown_rwlock);

/* connect to server */
while (!context->shutdown) {
struct xmpp_stream *new_stream = NULL;
switch_memory_pool_t *pool;
switch_sockaddr_t *sa;

current_host = (current_host + 1) % outbound->host_count;

/* create new stream for connection */
if (stream) {
xmpp_stream_destroy(stream);
}
switch_core_new_memory_pool(&pool);
stream = xmpp_stream_create(context, pool, outbound->hosts[current_host], outbound->port, 1, 0);
stream->jid = switch_core_strdup(pool, outbound->domain);

if (switch_sockaddr_info_get(&sa, stream->address, SWITCH_UNSPEC, stream->port, 0, stream->pool) != SWITCH_STATUS_SUCCESS) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "%s:%i, failed to get sockaddr info!\n", stream->address, stream->port);
goto fail;
Expand Down Expand Up @@ -1385,13 +1417,6 @@ static void *SWITCH_THREAD_FUNC xmpp_outbound_stream_thread(switch_thread_t *thr

/* re-establish connection if not shutdown */
if (!context->shutdown) {
/* create new stream for reconnection */
switch_core_new_memory_pool(&pool);
new_stream = xmpp_stream_create(stream->context, pool, stream->address, stream->port, 1, 0);
new_stream->jid = switch_core_strdup(pool, stream->jid);
xmpp_stream_destroy(stream);
stream = new_stream;

switch_yield(1000 * 1000); /* 1000 ms */
continue;
}
Expand All @@ -1411,9 +1436,13 @@ static void *SWITCH_THREAD_FUNC xmpp_outbound_stream_thread(switch_thread_t *thr

fail:

xmpp_stream_destroy(stream);
if (stream) {
xmpp_stream_destroy(stream);
}

switch_thread_rwlock_unlock(context->shutdown_rwlock);
switch_core_destroy_memory_pool(&outbound->pool);

return NULL;
}

Expand Down Expand Up @@ -1465,7 +1494,7 @@ static void xmpp_listener_destroy(struct xmpp_listener *listener)
*/
switch_status_t xmpp_stream_context_connect(struct xmpp_stream_context *context, const char *peer_domain, const char *peer_address, int peer_port)
{
struct xmpp_stream *stream;
struct xmpp_outbound *outbound;
switch_memory_pool_t *pool;
switch_thread_t *thread;
switch_threadattr_t *thd_attr = NULL;
Expand All @@ -1482,12 +1511,20 @@ switch_status_t xmpp_stream_context_connect(struct xmpp_stream_context *context,

/* start outbound stream thread */
switch_core_new_memory_pool(&pool);
stream = xmpp_stream_create(context, pool, peer_address, peer_port, 1, 0);
stream->jid = switch_core_strdup(pool, peer_domain);
if (!(outbound = switch_core_alloc(pool, sizeof(*outbound)))) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Memory Error\n");
return SWITCH_STATUS_MEMERR;
}
outbound->pool = pool;
outbound->addresses = switch_core_strdup(pool, peer_address);
outbound->domain = switch_core_strdup(pool, peer_domain);
outbound->port = peer_port;
outbound->host_count = switch_split(outbound->addresses, ',', outbound->hosts);
outbound->context = context;
switch_threadattr_create(&thd_attr, pool);
switch_threadattr_detach_set(thd_attr, 1);
switch_threadattr_stacksize_set(thd_attr, SWITCH_THREAD_STACKSIZE);
switch_thread_create(&thread, thd_attr, xmpp_outbound_stream_thread, stream, pool);
switch_thread_create(&thread, thd_attr, xmpp_outbound_stream_thread, outbound, pool);

return SWITCH_STATUS_SUCCESS;
}
Expand Down

0 comments on commit 7eb6850

Please sign in to comment.