Skip to content

Commit

Permalink
Use wire messages for all message types in all tests.
Browse files Browse the repository at this point in the history
Signed-off-by: James Hamlin <jfhamlin@gmail.com>
  • Loading branch information
jfhamlin committed Jan 5, 2015
1 parent 88f6c5c commit b9032d0
Show file tree
Hide file tree
Showing 4 changed files with 61 additions and 7 deletions.
30 changes: 29 additions & 1 deletion src/raft.c
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,10 @@
static raft_bool_t should_begin_election(raft_state_t* p_state);
static raft_status_t begin_election(raft_state_t* p_state);

static raft_status_t send_append_entries(raft_state_t* p_state,
raft_nodeid_t recipient_id,
raft_append_entries_args_t* p_args);

static raft_status_t send_request_vote(raft_state_t* p_state,
raft_nodeid_t recipient_id,
raft_request_vote_args_t* p_args);
Expand Down Expand Up @@ -95,7 +99,7 @@ raft_status_t raft_tick(raft_state_t* p_state,

for (uint32_t i = 0; i < p_config->node_count - 1; ++i) {
if (p_config->p_nodeids[i] != p_state->p.self) {
p_config->cb.pf_append_entries_rpc(p_config->p_nodeids[i], &args);
send_append_entries(p_state, p_config->p_nodeids[i], &args);
}
}

Expand Down Expand Up @@ -168,6 +172,30 @@ static raft_bool_t should_begin_election(raft_state_t* p_state) {
*******************************************************************************
******************************************************************************/

static raft_status_t send_append_entries(raft_state_t* p_state,
raft_nodeid_t recipient_id,
raft_append_entries_args_t* p_args) {
raft_config_t const* p_config = p_state->p_config;

raft_status_t status;
if (p_config->cb.pf_append_entries_rpc) {
status = p_config->cb.pf_append_entries_rpc(recipient_id, p_args);
} else {
raft_envelope_t envelope = { 0 };
status = raft_write_append_entries_envelope(&envelope,
recipient_id,
p_args);
if (RAFT_SUCCESS(status)) {
status = p_config->cb.pf_send_message(recipient_id,
envelope.p_message,
envelope.message_size);
} else {
raft_dealloc_envelope(&envelope);
}
}
return status;
}

static raft_status_t send_request_vote(raft_state_t* p_state,
raft_nodeid_t recipient_id,
raft_request_vote_args_t* p_args) {
Expand Down
24 changes: 24 additions & 0 deletions src/raft_rpc.c
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,30 @@ raft_status_t raft_recv_message(raft_state_t* p_state,
raft_status_t status = RAFT_STATUS_OK;
/* TODO: Validate message version number */
switch (raft_message_type(p_message_bytes)) {
case MSG_TYPE_APPEND_ENTRIES:
{
raft_append_entries_args_t args;
status = raft_read_append_entries_args(&args,
p_message_bytes,
buffer_size);
if (RAFT_FAILURE(status)) {
return status;
}
status = raft_recv_append_entries(p_state, &args);
break;
}
case MSG_TYPE_APPEND_ENTRIES_RESPONSE:
{
raft_append_entries_response_args_t args;
status = raft_read_append_entries_response_args(&args,
p_message_bytes,
buffer_size);
if (RAFT_FAILURE(status)) {
return status;
}
status = raft_recv_append_entries_response(p_state, &args);
break;
}
case MSG_TYPE_REQUEST_VOTE:
{
raft_request_vote_args_t args;
Expand Down
9 changes: 6 additions & 3 deletions src/raft_wire.c
Original file line number Diff line number Diff line change
Expand Up @@ -291,9 +291,12 @@ raft_status_t raft_read_append_entries_args(raft_append_entries_args_t* p_args,
RM_U32(&num_entries);
RM(leader_commit);

raft_log_entry_t* p_entries = calloc(num_entries, sizeof(raft_log_entry_t));
if (p_entries == NULL) {
goto fail_oom;
raft_log_entry_t* p_entries = NULL;
if (num_entries > 0) {
p_entries = calloc(num_entries, sizeof(raft_log_entry_t));
if (p_entries == NULL) {
goto fail_oom;
}
}

for (uint32_t ii = 0; ii < num_entries; ++ii) {
Expand Down
5 changes: 2 additions & 3 deletions tests/test_helpers.h
Original file line number Diff line number Diff line change
Expand Up @@ -175,9 +175,8 @@ static raft_state_t* make_raft_node(uint32_t id) {
p_config->election_timeout_max_ms = 1000;
p_config->election_timeout_min_ms = 500;

p_config->cb.pf_append_entries_rpc = &append_entries_rpc;
p_config->cb.pf_append_entries_response_rpc = &append_entries_response_rpc;

/* p_config->cb.pf_append_entries_rpc = &append_entries_rpc; */
/* p_config->cb.pf_append_entries_response_rpc = &append_entries_response_rpc; */
/* p_config->cb.pf_request_vote_rpc = &request_vote_rpc; */
/* p_config->cb.pf_request_vote_response_rpc = &request_vote_response_rpc; */

Expand Down

0 comments on commit b9032d0

Please sign in to comment.