Skip to content

Commit

Permalink
kazoo: add consistent worker key
Browse files Browse the repository at this point in the history
  • Loading branch information
lazedo committed May 4, 2017
1 parent 8e730d6 commit a2fde7d
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 4 deletions.
25 changes: 21 additions & 4 deletions src/modules/kazoo/kz_amqp.c
Expand Up @@ -384,6 +384,8 @@ void kz_amqp_free_bind(kz_amqp_bind_ptr bind)
kz_amqp_bytes_free(bind->event_key);
if(bind->event_subkey.bytes)
kz_amqp_bytes_free(bind->event_subkey);
if(bind->consistent_worker_key)
shm_free(bind->consistent_worker_key);
shm_free(bind);
}

Expand Down Expand Up @@ -1771,6 +1773,7 @@ int kz_amqp_subscribe(struct sip_msg* msg, char* payload)
int no_ack = 1;
int federate = 0;
int consistent_worker = 0;
str* consistent_worker_key = NULL;
int wait_for_consumer_ack = 1;
kz_amqp_queue_ptr queue = NULL;
kz_amqp_exchange_ptr exchange = NULL;
Expand Down Expand Up @@ -1825,6 +1828,11 @@ int kz_amqp_subscribe(struct sip_msg* msg, char* payload)
consistent_worker = json_object_get_int(tmpObj);
}

tmpObj = kz_json_get_object(json_obj, "consistent-worker-key");
if(tmpObj != NULL) {
consistent_worker_key = kz_str_dup_from_char((char*)json_object_get_string(tmpObj));
}

tmpObj = kz_json_get_object(json_obj, "exchange-bindings");
if(tmpObj != NULL) {
exchange_binding = kz_amqp_exchange_binding_from_json(tmpObj);
Expand Down Expand Up @@ -1853,6 +1861,7 @@ int kz_amqp_subscribe(struct sip_msg* msg, char* payload)
bind->wait_for_consumer_ack = wait_for_consumer_ack;
bind->federate = federate;
bind->consistent_worker = consistent_worker;
bind->consistent_worker_key = consistent_worker_key;


kz_amqp_binding_ptr binding = shm_malloc(sizeof(kz_amqp_binding));
Expand Down Expand Up @@ -2866,6 +2875,8 @@ void kz_amqp_send_worker_event(kz_amqp_server_ptr server_ptr, amqp_envelope_t* e
char buffer[100];
kz_amqp_cmd_ptr cmd = NULL;
kz_amqp_consumer_delivery_ptr ptr = NULL;
json_obj_ptr json_obj = NULL;
json_object* JObj = NULL;
str* message_id = NULL;
int idx = envelope->channel-1;
int worker = 0;
Expand All @@ -2878,7 +2889,7 @@ void kz_amqp_send_worker_event(kz_amqp_server_ptr server_ptr, amqp_envelope_t* e
}
memset(json_data, 0, msg_size + 1);
memcpy(json_data, (char*)envelope->message.body.bytes, msg_size);
json_obj_ptr json_obj = kz_json_parse(json_data);
json_obj = kz_json_parse(json_data);
pkg_free(json_data);
if (json_obj == NULL) {
LM_ERR("error parsing json body\n");
Expand All @@ -2888,7 +2899,7 @@ void kz_amqp_send_worker_event(kz_amqp_server_ptr server_ptr, amqp_envelope_t* e
json_object_object_add(json_obj, BLF_JSON_BROKER_ZONE, json_object_new_string(server_ptr->zone->zone));


json_object* JObj = kz_json_get_object(json_obj, BLF_JSON_SERVERID);
JObj = kz_json_get_object(json_obj, BLF_JSON_SERVERID);
if(JObj != NULL) {
const char* _kz_server_id_str = json_object_get_string(JObj);
sprintf(buffer, "consumer://%d/%s", _kz_server_id, _kz_server_id_str);
Expand Down Expand Up @@ -2932,8 +2943,14 @@ void kz_amqp_send_worker_event(kz_amqp_server_ptr server_ptr, amqp_envelope_t* e

if(bind && bind->consistent_worker) {
str rk;
rk.s = (char*)envelope->routing_key.bytes;
rk.len = (int)envelope->routing_key.len;
if(bind->consistent_worker_key != NULL &&
(JObj = kz_json_get_object(json_obj, bind->consistent_worker_key->s)) != NULL) {
rk.s = (char*)json_object_get_string(JObj);
rk.len = strlen(rk.s);
} else {
rk.s = (char*)envelope->routing_key.bytes;
rk.len = (int)envelope->routing_key.len;
}
worker = core_hash(&rk, NULL, dbk_consumer_workers);
LM_DBG("computed worker for %.*s is %d\n", rk.len, rk.s, worker);
} else {
Expand Down
1 change: 1 addition & 0 deletions src/modules/kazoo/kz_amqp.h
Expand Up @@ -217,6 +217,7 @@ typedef struct {
amqp_boolean_t wait_for_consumer_ack;
amqp_boolean_t federate;
amqp_boolean_t consistent_worker;
str* consistent_worker_key;
} kz_amqp_bind, *kz_amqp_bind_ptr;

typedef struct {
Expand Down

0 comments on commit a2fde7d

Please sign in to comment.