Skip to content

Commit

Permalink
kazoo: add new functions & params
Browse files Browse the repository at this point in the history
  • Loading branch information
lazedo committed Nov 7, 2019
1 parent a7b8cb3 commit 6d036dc
Show file tree
Hide file tree
Showing 4 changed files with 23 additions and 32 deletions.
6 changes: 6 additions & 0 deletions src/modules/kazoo/kazoo.c
Expand Up @@ -88,6 +88,7 @@ int dbk_internal_loop_count = 5;
int dbk_consumer_loop_count = 10;
int dbk_consumer_ack_loop_count = 20;
int dbk_include_entity = 1;
int dbk_use_full_entity = 0;
int dbk_pua_mode = 1;
db_locking_t kz_pua_lock_type = DB_LOCKING_WRITE;
int dbk_use_hearbeats = 0;
Expand Down Expand Up @@ -140,6 +141,7 @@ static pv_export_t kz_mod_pvs[] = {
*/
static cmd_export_t cmds[] = {
{"kazoo_publish", (cmd_function) kz_amqp_publish, 3, fixup_kz_amqp, fixup_kz_amqp_free, ANY_ROUTE},
{"kazoo_publish", (cmd_function) kz_amqp_publish_ex, 4, fixup_kz_amqp, fixup_kz_amqp_free, ANY_ROUTE},
{"kazoo_query", (cmd_function) kz_amqp_query, 4, fixup_kz_amqp, fixup_kz_amqp_free, ANY_ROUTE},
{"kazoo_query", (cmd_function) kz_amqp_query_ex, 3, fixup_kz_amqp, fixup_kz_amqp_free, ANY_ROUTE},
{"kazoo_pua_publish", (cmd_function) kz_pua_publish, 1, 0, 0, ANY_ROUTE},
Expand All @@ -156,6 +158,9 @@ static cmd_export_t cmds[] = {
{"kazoo_encode", (cmd_function) kz_amqp_encode, 2, fixup_kz_amqp_encode, fixup_kz_amqp_encode_free, ANY_ROUTE},

{"kazoo_async_query", (cmd_function) kz_amqp_async_query, 5, fixup_kz_async_amqp, fixup_kz_async_amqp_free, ANY_ROUTE},
{"kazoo_async_query", (cmd_function) kz_amqp_async_query_ex, 6, fixup_kz_async_amqp, fixup_kz_async_amqp_free, ANY_ROUTE},
{"kazoo_query_async", (cmd_function) kz_amqp_async_query, 5, fixup_kz_async_amqp, fixup_kz_async_amqp_free, ANY_ROUTE},
{"kazoo_query_async", (cmd_function) kz_amqp_async_query_ex, 6, fixup_kz_async_amqp, fixup_kz_async_amqp_free, ANY_ROUTE},

{0, 0, 0, 0, 0, 0}
};
Expand All @@ -181,6 +186,7 @@ static param_export_t params[] = {
{"amqp_consumer_loop_count", INT_PARAM, &dbk_consumer_loop_count},
{"amqp_consumer_ack_loop_count", INT_PARAM, &dbk_consumer_ack_loop_count},
{"pua_include_entity", INT_PARAM, &dbk_include_entity},
{"presence_use_full_entity", INT_PARAM, &dbk_use_full_entity},
{"presentity_table", PARAM_STR, &kz_presentity_table},
{"db_url", PARAM_STR, &kz_db_url},
{"pua_mode", INT_PARAM, &dbk_pua_mode},
Expand Down
17 changes: 13 additions & 4 deletions src/modules/kazoo/kz_amqp.c
Expand Up @@ -1213,7 +1213,7 @@ int kz_amqp_pipe_send_receive(str *str_exchange, str *str_routing_key, str *str_
return ret;
}

int kz_amqp_publish(struct sip_msg* msg, char* exchange, char* routing_key, char* payload)
int kz_amqp_publish_ex(struct sip_msg* msg, char* exchange, char* routing_key, char* payload, char* _pub_flags)
{
str json_s;
str exchange_s;
Expand Down Expand Up @@ -1253,6 +1253,10 @@ int kz_amqp_publish(struct sip_msg* msg, char* exchange, char* routing_key, char

};

int kz_amqp_publish(struct sip_msg* msg, char* exchange, char* routing_key, char* payload)
{
return kz_amqp_publish_ex(msg, exchange, routing_key, payload, NULL);
}

char* last_payload_result = NULL;

Expand All @@ -1261,7 +1265,7 @@ int kz_pv_get_last_query_result(struct sip_msg *msg, pv_param_t *param, pv_value
return last_payload_result == NULL ? pv_get_null(msg, param, res) : pv_get_strzval(msg, param, res, last_payload_result);
}

int kz_amqp_async_query(struct sip_msg* msg, char* _exchange, char* _routing_key, char* _payload, char* _cb_route, char* _err_route)
int kz_amqp_async_query_ex(struct sip_msg* msg, char* _exchange, char* _routing_key, char* _payload, char* _cb_route, char* _err_route, char* _pub_flags)
{
str json_s;
str exchange_s;
Expand Down Expand Up @@ -1406,6 +1410,11 @@ int kz_amqp_async_query(struct sip_msg* msg, char* _exchange, char* _routing_key
return ret;
};

int kz_amqp_async_query(struct sip_msg* msg, char* _exchange, char* _routing_key, char* _payload, char* _cb_route, char* _err_route)
{
return kz_amqp_async_query_ex(msg, _exchange, _routing_key, _payload, _cb_route, _err_route, NULL);
}

void kz_amqp_reset_last_result()
{
if(last_payload_result)
Expand Down Expand Up @@ -3230,8 +3239,8 @@ int kz_amqp_consumer_worker_proc(int cmd_pipe)
set_non_blocking(cmd_pipe);
event_set(&pipe_ev, cmd_pipe, EV_READ | EV_PERSIST, kz_amqp_consumer_worker_cb, &pipe_ev);
event_add(&pipe_ev, NULL);
event_dispatch();
return 0;

return event_dispatch();
}

void kz_amqp_timer_destroy(kz_amqp_timer_ptr* pTimer)
Expand Down
2 changes: 2 additions & 0 deletions src/modules/kazoo/kz_amqp.h
Expand Up @@ -273,6 +273,7 @@ void kz_amqp_destroy();
int kz_amqp_add_connection(modparam_t type, void* val);

int kz_amqp_publish(struct sip_msg* msg, char* exchange, char* routing_key, char* payload);
int kz_amqp_publish_ex(struct sip_msg* msg, char* exchange, char* routing_key, char* payload, char* _pub_flags);
int kz_amqp_query(struct sip_msg* msg, char* exchange, char* routing_key, char* payload, char* dst);
int kz_amqp_query_ex(struct sip_msg* msg, char* exchange, char* routing_key, char* payload);
int kz_amqp_subscribe(struct sip_msg* msg, char* payload);
Expand All @@ -281,6 +282,7 @@ int kz_amqp_encode(struct sip_msg* msg, char* unencoded, char* encoded);
int kz_amqp_encode_ex(str* unencoded, pv_value_p dst_val);

int kz_amqp_async_query(struct sip_msg* msg, char* exchange, char* routing_key, char* payload, char* _cb_route, char* _err_route);
int kz_amqp_async_query_ex(struct sip_msg* msg, char* _exchange, char* _routing_key, char* _payload, char* _cb_route, char* _err_route, char* _pub_flags);

//void kz_amqp_generic_consumer_loop(int child_no);
void kz_amqp_manager_loop(int child_no);
Expand Down
30 changes: 2 additions & 28 deletions src/modules/kazoo/kz_fixup.c
Expand Up @@ -108,38 +108,12 @@ int fixup_kz_amqp_encode_free(void** param, int param_no)

int fixup_kz_amqp(void** param, int param_no)
{
if (param_no == 1 || param_no == 2 || param_no == 3) {
return fixup_spve_null(param, 1);
}

if (param_no == 4) {
if (fixup_pvar_null(param, 1) != 0) {
LM_ERR("failed to fixup result pvar\n");
return -1;
}
if (((pv_spec_t *)(*param))->setf == NULL) {
LM_ERR("result pvar is not writeble\n");
return -1;
}
return 0;
}

LM_ERR("invalid parameter number <%d>\n", param_no);
return -1;
return fixup_spve_null(param, 1);
}

int fixup_kz_amqp_free(void** param, int param_no)
{
if (param_no == 1 || param_no == 2 || param_no == 3) {
return fixup_free_spve_null(param, 1);
}

if (param_no == 4) {
return fixup_free_pvar_null(param, 1);
}

LM_ERR("invalid parameter number <%d>\n", param_no);
return -1;
return fixup_free_spve_null(param, 1);
}


Expand Down

0 comments on commit 6d036dc

Please sign in to comment.