From 97cd5d3188b1d3a2ef8f7105d753eeb1ee02c20b Mon Sep 17 00:00:00 2001 From: lazedo Date: Sun, 31 Jan 2016 00:01:30 +0000 Subject: [PATCH] kazoo: add amqps connection support --- modules/kazoo/kazoo.c | 11 ++++ modules/kazoo/kz_amqp.c | 128 +++++++++++++++++++++++++++++----------- 2 files changed, 106 insertions(+), 33 deletions(-) diff --git a/modules/kazoo/kazoo.c b/modules/kazoo/kazoo.c index 23cbb83f6dc..28c97bd1229 100644 --- a/modules/kazoo/kazoo.c +++ b/modules/kazoo/kazoo.c @@ -109,6 +109,12 @@ db_func_t kz_pa_dbf; str kz_presentity_table = str_init("presentity"); str kz_db_url = {0,0}; +str kz_amqps_ca_cert = {0,0}; +str kz_amqps_cert = {0,0}; +str kz_amqps_key = {0,0}; +int kz_amqps_verify_peer = 1; +int kz_amqps_verify_hostname = 1; + str kz_query_timeout_avp = {0,0}; pv_spec_t kz_query_timeout_spec; @@ -201,6 +207,11 @@ static param_export_t params[] = { {"amqp_primary_zone", STR_PARAM, &dbk_primary_zone_name.s}, {"amqp_command_hashtable_size", INT_PARAM, &dbk_command_table_size}, {"amqp_result_avp", STR_PARAM, &kz_query_result_avp.s}, + {"amqps_ca_cert", STR_PARAM, &kz_amqps_ca_cert.s}, + {"amqps_cert", STR_PARAM, &kz_amqps_cert.s}, + {"amqps_key", STR_PARAM, &kz_amqps_key.s}, + {"amqps_verify_peer", INT_PARAM, &kz_amqps_verify_peer}, + {"amqps_verify_hostname", INT_PARAM, &kz_amqps_verify_hostname}, {0, 0, 0} }; diff --git a/modules/kazoo/kz_amqp.c b/modules/kazoo/kz_amqp.c index 535c5a515d0..b949064982f 100644 --- a/modules/kazoo/kz_amqp.c +++ b/modules/kazoo/kz_amqp.c @@ -4,6 +4,7 @@ #include #include #include +#include #include #include #include "../../mem/mem.h" @@ -39,6 +40,12 @@ extern struct timeval kz_amqp_tv; extern struct timeval kz_qtimeout_tv; extern struct timeval kz_timer_tv; +extern str kz_amqps_ca_cert; +extern str kz_amqps_cert; +extern str kz_amqps_key; +extern int kz_amqps_verify_peer; +extern int kz_amqps_verify_hostname; + extern pv_spec_t kz_query_timeout_spec; const amqp_bytes_t kz_amqp_empty_bytes = { 0, NULL }; @@ -727,8 +734,88 @@ void kz_amqp_channel_close(kz_amqp_conn_ptr rmq, amqp_channel_t channel) { kz_amqp_error("closing channel", amqp_channel_close(rmq->conn, channel, AMQP_REPLY_SUCCESS)); } +int kz_ssl_initialized = 0; + +int kz_amqp_connection_open_ssl(kz_amqp_conn_ptr rmq) { + + if(!kz_ssl_initialized) { + kz_ssl_initialized = 1; + amqp_set_initialize_ssl_library(1); + } + + if (!(rmq->conn = amqp_new_connection())) { + LM_ERR("Failed to create new AMQP connection\n"); + goto error; + } + + rmq->socket = amqp_ssl_socket_new(rmq->conn); + if (!rmq->socket) { + LM_ERR("Failed to create SSL socket to AMQP broker\n"); + goto error; + } + + if (kz_amqps_ca_cert.s) { + if (amqp_ssl_socket_set_cacert(rmq->socket, kz_amqps_ca_cert.s)) { + LM_ERR("Failed to set CA certificate for amqps connection\n"); + goto nosocket; + } + } + + if (kz_amqps_cert.s && kz_amqps_key.s) { + if (amqp_ssl_socket_set_key(rmq->socket, kz_amqps_cert.s, kz_amqps_key.s)) { + LM_ERR("Failed to set client key/certificate for amqps connection\n"); + goto nosocket; + } + } + +#if AMQP_VERSION_MAJOR == 0 && AMQP_VERSION_MINOR < 8 + amqp_ssl_socket_set_verify(rmq->socket, kz_amqps_verify_peer | kz_amqps_verify_hostname); +#else + amqp_ssl_socket_set_verify_peer(rmq->socket, kz_amqps_verify_peer); + amqp_ssl_socket_set_verify_hostname(rmq->socket, kz_amqps_verify_hostname); +#endif + + if (amqp_socket_open(rmq->socket, rmq->server->connection->info.host, rmq->server->connection->info.port)) { + LM_ERR("Failed to open SSL socket to AMQP broker : %s : %i\n", + rmq->server->connection->info.host, rmq->server->connection->info.port); + goto nosocket; + } + + if (kz_amqp_error("Logging in", amqp_login(rmq->conn, + rmq->server->connection->info.vhost, + 0, + 131072, + dbk_use_hearbeats, + AMQP_SASL_METHOD_PLAIN, + rmq->server->connection->info.user, + rmq->server->connection->info.password))) { + + LM_ERR("Login to AMQP broker failed!\n"); + goto error; + } + + rmq->state = KZ_AMQP_CONNECTION_OPEN; + return 0; + +nosocket: + if (amqp_destroy_connection(rmq->conn) < 0) { + LM_ERR("cannot destroy connection\n"); + } + return -1; + + error: + kz_amqp_connection_close(rmq); + return -1; +} + int kz_amqp_connection_open(kz_amqp_conn_ptr rmq) { + rmq->state = KZ_AMQP_CONNECTION_CLOSED; rmq->channel_count = rmq->channel_counter = 0; + + if(rmq->server->connection->info.ssl) + return kz_amqp_connection_open_ssl(rmq); + + rmq->channel_count = rmq->channel_counter = 0; if (!(rmq->conn = amqp_new_connection())) { LM_DBG("Failed to create new AMQP connection\n"); goto error; @@ -758,7 +845,8 @@ int kz_amqp_connection_open(kz_amqp_conn_ptr rmq) { goto error; } - return 0; + rmq->state = KZ_AMQP_CONNECTION_OPEN; + return 0; error: kz_amqp_connection_close(rmq); @@ -2118,37 +2206,10 @@ int kz_amqp_connect(kz_amqp_conn_ptr rmq) if(rmq->state != KZ_AMQP_CONNECTION_CLOSED) { kz_amqp_connection_close(rmq); } - rmq->state = KZ_AMQP_CONNECTION_CLOSED; - rmq->channel_count = rmq->channel_counter = 0; - if (!(rmq->conn = amqp_new_connection())) { - LM_DBG("Failed to create new AMQP connection\n"); - goto error; - } - rmq->socket = amqp_tcp_socket_new(rmq->conn); - if (!rmq->socket) { - LM_DBG("Failed to create TCP socket to AMQP broker\n"); - goto error; - } - - if (amqp_socket_open(rmq->socket, rmq->server->connection->info.host, rmq->server->connection->info.port)) { - LM_DBG("Failed to open TCP socket to AMQP broker\n"); - goto error; - } - - if (kz_amqp_error("Logging in", amqp_login(rmq->conn, - rmq->server->connection->info.vhost, - 0, - 131072, - dbk_use_hearbeats, - AMQP_SASL_METHOD_PLAIN, - rmq->server->connection->info.user, - rmq->server->connection->info.password))) { + if(kz_amqp_connection_open(rmq) != 0) + goto error; - LM_ERR("Login to AMQP broker failed!\n"); - goto error; - } - rmq->state = KZ_AMQP_CONNECTION_OPEN; kz_amqp_fire_connection_event("open", rmq->server->connection->info.host); for(i=0,channel_res=0; i < dbk_channels && channel_res == 0; i++) { /* start cleanup */ @@ -2170,10 +2231,10 @@ int kz_amqp_connect(kz_amqp_conn_ptr rmq) return 0; - error: - kz_amqp_connection_close(rmq); - kz_amqp_handle_server_failure(rmq); +error: + kz_amqp_handle_server_failure(rmq); return -1; + } void kz_amqp_reconnect_cb(int fd, short event, void *arg) @@ -2651,6 +2712,7 @@ int kz_amqp_consumer_proc(kz_amqp_server_ptr server_ptr) OK = kz_amqp_consume_error(consumer); break; default: + LM_ERR("AMQP_RESPONSE_LIBRARY_EXCEPTION %i\n", reply.library_error); OK = 0; break; };