Skip to content

Commit

Permalink
kazoo : fix, send timeout callback to consumer process
Browse files Browse the repository at this point in the history
  • Loading branch information
lazedo committed Jul 1, 2015
1 parent 5b4d2a6 commit 67db972
Showing 1 changed file with 37 additions and 4 deletions.
41 changes: 37 additions & 4 deletions modules/kazoo/kz_amqp.c
Expand Up @@ -2034,7 +2034,32 @@ void kz_amqp_cb_error(kz_amqp_cmd_ptr cmd)
int n = route_get(&main_rt, cmd->err_route);
struct action *a = main_rt.rlist[n];
tmb.t_continue(cmd->t_hash, cmd->t_label, a);
kz_amqp_free_pipe_cmd(cmd);
}

int kz_send_worker_error_event(kz_amqp_cmd_ptr cmd)
{
cmd->return_code = -1;
kz_amqp_consumer_delivery_ptr ptr = (kz_amqp_consumer_delivery_ptr) shm_malloc(sizeof(kz_amqp_consumer_delivery));
if(ptr == NULL) {
LM_ERR("NO MORE SHARED MEMORY!");
return 0;
}
memset(ptr, 0, sizeof(kz_amqp_consumer_delivery));
ptr->cmd = cmd;

consumer++;
if(consumer >= dbk_consumer_processes) {
consumer = 0;
}

if (write(kz_worker_pipes[consumer], &ptr, sizeof(ptr)) != sizeof(ptr)) {
LM_ERR("failed to send payload to consumer %d : %s\nPayload %s\n", consumer, strerror(errno), cmd->payload);
kz_amqp_free_consumer_delivery(ptr);
return 0;
}

return 1;

}

void kz_amqp_cmd_timeout_cb(int fd, short event, void *arg)
Expand All @@ -2047,7 +2072,7 @@ void kz_amqp_cmd_timeout_cb(int fd, short event, void *arg)
, retrieved_cmd ->message_id->len, retrieved_cmd ->message_id->s
);
if(retrieved_cmd->type == KZ_AMQP_CMD_ASYNC_CALL) {
kz_amqp_cb_error(retrieved_cmd);
kz_send_worker_error_event(retrieved_cmd);
} else {
retrieved_cmd->return_code = -1;
lock_release(&retrieved_cmd->lock);
Expand Down Expand Up @@ -2577,6 +2602,9 @@ void kz_amqp_send_worker_event(int _kz_server_id, amqp_envelope_t* envelope, kz_
}
if(idx < dbk_channels) {
cmd = kz_cmd_retrieve(message_id);
if(cmd)
cmd->return_code = AMQP_RESPONSE_NORMAL;

/*
if(cmd != NULL) {
cmd->return_code = 0;
Expand Down Expand Up @@ -2791,8 +2819,13 @@ void kz_amqp_consumer_worker_cb(int fd, short event, void *arg)
LM_DBG("consumer %d received payload %s\n", my_pid(), cmd->payload);

if(cmd->cmd) {
kz_amqp_set_last_result(cmd->payload);
kz_amqp_cb_ok(cmd->cmd);
if(cmd->cmd->return_code == AMQP_RESPONSE_NORMAL) {
kz_amqp_set_last_result(cmd->payload);
kz_amqp_cb_ok(cmd->cmd);
} else {
kz_amqp_reset_last_result();
kz_amqp_cb_error(cmd->cmd);
}
} else {
kz_amqp_consumer_event(cmd->payload, cmd->event_key, cmd->event_subkey);
}
Expand Down

0 comments on commit 67db972

Please sign in to comment.