Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add C driver logging for send_nak_message and resend #1553

Merged
merged 3 commits into from
Feb 8, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 2 additions & 0 deletions aeron-driver/src/main/c/aeron_driver_context.c
Original file line number Diff line number Diff line change
Expand Up @@ -1166,6 +1166,8 @@ int aeron_driver_context_init(aeron_driver_context_t **context)
_context->flow_control_on_receiver_removed_func = NULL;
_context->on_name_resolve_func = NULL;

_context->send_nak_message_func = NULL;

if ((_context->termination_validator_func = aeron_driver_termination_validator_load(
AERON_CONFIG_GETENV_OR_DEFAULT(AERON_DRIVER_TERMINATION_VALIDATOR_ENV_VAR, "deny"))) == NULL)
{
Expand Down
12 changes: 12 additions & 0 deletions aeron-driver/src/main/c/aeron_driver_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,16 @@ typedef void (*aeron_driver_flow_control_strategy_on_receiver_change_func_t)(
const char *channel,
size_t receiver_count);

typedef void (*aeron_driver_send_nak_message_func_t)(
const struct sockaddr_storage *address,
int32_t session_id,
int32_t stream_id,
int32_t term_id,
int32_t term_offset,
int32_t nak_length,
size_t channel_length,
const char *channel);

typedef void (*aeron_driver_name_resolver_on_resolve_t)(
aeron_name_resolver_t *name_resolver,
int64_t duration_ns,
Expand Down Expand Up @@ -262,6 +272,8 @@ typedef struct aeron_driver_context_stct
aeron_driver_flow_control_strategy_on_receiver_change_func_t flow_control_on_receiver_added_func;
aeron_driver_flow_control_strategy_on_receiver_change_func_t flow_control_on_receiver_removed_func;

aeron_driver_send_nak_message_func_t send_nak_message_func;

aeron_driver_termination_validator_func_t termination_validator_func;
void *termination_validator_state;

Expand Down
134 changes: 117 additions & 17 deletions aeron-driver/src/main/c/agent/aeron_driver_agent.c
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@ static aeron_driver_agent_log_event_t log_events[] =
{ "NAME_RESOLUTION_HOST_NAME", AERON_DRIVER_AGENT_EVENT_TYPE_OTHER, false },
{ "ADD_DYNAMIC_DISSECTOR", AERON_DRIVER_AGENT_EVENT_TYPE_OTHER, false },
{ "DYNAMIC_DISSECTOR_EVENT", AERON_DRIVER_AGENT_EVENT_TYPE_OTHER, false },
{ "SEND_NAK_MESSAGE", AERON_DRIVER_AGENT_EVENT_TYPE_OTHER, false },
};

#define AERON_DRIVER_EVENT_NUM_ELEMENTS (sizeof(log_events) / sizeof(aeron_driver_agent_log_event_t))
Expand Down Expand Up @@ -901,7 +902,7 @@ void aeron_driver_agent_flow_control_on_receiver_removed(
receiver_count);
}

int32_t aeron_driver_agent_socket_address_length(struct sockaddr_storage *address)
int32_t aeron_driver_agent_socket_address_length(const struct sockaddr_storage *address)
{
if (NULL == address)
{
Expand All @@ -919,6 +920,68 @@ int32_t aeron_driver_agent_socket_address_length(struct sockaddr_storage *addres
return 0;
}

void aeron_driver_agent_socket_address_copy(uint8_t *ptr, const struct sockaddr_storage *address, const size_t address_length)
{
if (AF_INET == address->ss_family)
{
struct sockaddr_in *address_in = (struct sockaddr_in *)address;
memcpy(ptr, &address_in->sin_addr, address_length);
}
else if (AF_INET6 == address->ss_family)
{
struct sockaddr_in6 *address_in6 = (struct sockaddr_in6 *)address;
memcpy(ptr, &address_in6->sin6_addr, address_length);
}
}

void aeron_driver_agent_send_nak_message(
const struct sockaddr_storage *address,
const int32_t session_id,
const int32_t stream_id,
const int32_t term_id,
const int32_t term_offset,
const int32_t nak_length,
const size_t channel_length,
const char *channel)
{
const size_t address_length = aeron_driver_agent_socket_address_length(address);

int32_t offset = aeron_mpsc_rb_try_claim(
&logging_mpsc_rb,
AERON_DRIVER_EVENT_SEND_NAK_MESSAGE,
sizeof(aeron_driver_agent_send_nak_message_header_t) +
address_length +
channel_length);

if (offset > 0)
{
uint8_t *ptr = (logging_mpsc_rb.buffer + offset);
aeron_driver_agent_send_nak_message_header_t *hdr =
(aeron_driver_agent_send_nak_message_header_t *)ptr;

hdr->time_ns = aeron_nano_clock();
hdr->session_id = session_id;
hdr->stream_id = stream_id;
hdr->term_id = term_id;
hdr->term_offset = term_offset;
hdr->nak_length = nak_length;
hdr->address_length = (int32_t)address_length;
hdr->channel_length = (int32_t)channel_length;

ptr += sizeof(aeron_driver_agent_send_nak_message_header_t);

if (NULL != address)
{
aeron_driver_agent_socket_address_copy(ptr, address, address_length);
}

ptr += address_length;
memcpy(ptr, channel, channel_length);

aeron_mpsc_rb_commit(&logging_mpsc_rb, offset);
}
}

void aeron_driver_agent_name_resolver_on_resolve(
aeron_name_resolver_t *name_resolver,
int64_t duration_ns,
Expand Down Expand Up @@ -963,22 +1026,10 @@ void aeron_driver_agent_name_resolver_on_resolve(

if (NULL != address)
{
if (AF_INET == address->ss_family)
{
struct sockaddr_in *address_in = (struct sockaddr_in *)address;
memcpy(
bodyPtr + resolverNameLength + hostnameLength,
&address_in->sin_addr,
addressLength);
}
else if (AF_INET6 == address->ss_family)
{
struct sockaddr_in6 *address_in6 = (struct sockaddr_in6 *)address;
memcpy(
bodyPtr + resolverNameLength + hostnameLength,
&address_in6->sin6_addr,
addressLength);
}
aeron_driver_agent_socket_address_copy(
bodyPtr + resolverNameLength + hostnameLength,
address,
addressLength);
}

aeron_mpsc_rb_commit(&logging_mpsc_rb, offset);
Expand Down Expand Up @@ -1234,6 +1285,11 @@ int aeron_driver_agent_init_logging_events_interceptors(aeron_driver_context_t *
context->on_host_name_func = aeron_driver_agent_name_resolver_on_host_name;
}

if (aeron_driver_agent_is_event_enabled(AERON_DRIVER_EVENT_SEND_NAK_MESSAGE))
{
context->send_nak_message_func = aeron_driver_agent_send_nak_message;
}

return 0;
}

Expand Down Expand Up @@ -1949,6 +2005,50 @@ void aeron_driver_agent_log_dissector(int32_t msg_type_id, const void *message,
break;
}

case AERON_DRIVER_EVENT_SEND_NAK_MESSAGE:
{
aeron_driver_agent_send_nak_message_header_t *hdr = (aeron_driver_agent_send_nak_message_header_t *)message;

char *address_str = "unknown-address";
char address_buf[INET6_ADDRSTRLEN] = { 0 };
uint8_t *address_ptr = (uint8_t *)message + sizeof(aeron_driver_agent_send_nak_message_header_t);
nbradac marked this conversation as resolved.
Show resolved Hide resolved

const char *addr_prefix = "";
const char *addr_suffix = "";
if (sizeof(((struct sockaddr_in *)0)->sin_addr) == hdr->address_length)
{
inet_ntop(AF_INET, address_ptr, address_buf, sizeof(address_buf));
address_str = address_buf;
}
else if (sizeof(((struct sockaddr_in6 *)0)->sin6_addr) == hdr->address_length)
{
addr_prefix = "[";
addr_suffix = "]";

inet_ntop(AF_INET, address_ptr, address_buf, sizeof(address_buf));
address_str = address_buf;
}

const char *channel = (const char *)address_ptr + hdr->address_length;

fprintf(
logfp,
"%s: address=%s%s%s sessionId=%d streamId=%d termId=%d termOffset=%d length=%d channel=%.*s\n",
aeron_driver_agent_dissect_log_header(hdr->time_ns, msg_type_id, length, length),
addr_prefix,
address_str,
addr_suffix,
hdr->session_id,
hdr->stream_id,
hdr->term_id,
hdr->term_offset,
hdr->nak_length,
hdr->channel_length,
channel);

break;
}

case AERON_DRIVER_EVENT_NAME_RESOLUTION_RESOLVE:
{
aeron_driver_agent_name_resolver_resolve_log_header_t *hdr =
Expand Down
27 changes: 26 additions & 1 deletion aeron-driver/src/main/c/agent/aeron_driver_agent.h
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,9 @@ typedef enum aeron_driver_agent_event_enum

// C-specific events. Note: event IDs are dynamic to avoid gaps in the sparse arrays.
AERON_DRIVER_EVENT_ADD_DYNAMIC_DISSECTOR = 54,
AERON_DRIVER_EVENT_DYNAMIC_DISSECTOR_EVENT = 55
AERON_DRIVER_EVENT_DYNAMIC_DISSECTOR_EVENT = 55,

AERON_DRIVER_EVENT_SEND_NAK_MESSAGE = 56
}
aeron_driver_agent_event_t;

Expand Down Expand Up @@ -164,6 +166,19 @@ typedef struct aeron_driver_agent_flow_control_receiver_change_log_header_stct
}
aeron_driver_agent_flow_control_receiver_change_log_header_t;

typedef struct aeron_driver_agent_send_nak_message_header_stct
{
int64_t time_ns;
int32_t session_id;
int32_t stream_id;
int32_t term_id;
int32_t term_offset;
int32_t nak_length;
int32_t address_length;
nbradac marked this conversation as resolved.
Show resolved Hide resolved
int32_t channel_length;
}
aeron_driver_agent_send_nak_message_header_t;

typedef struct aeron_driver_agent_name_resolver_resolve_log_header_stct
{
int64_t time_ns;
Expand Down Expand Up @@ -290,4 +305,14 @@ void aeron_driver_agent_flow_control_on_receiver_removed(
const char *channel,
size_t receiver_count);

void aeron_driver_agent_send_nak_message(
const struct sockaddr_storage *address,
int32_t session_id,
int32_t stream_id,
int32_t term_id,
int32_t term_offset,
int32_t nak_length,
size_t channel_length,
const char *channel);

#endif //AERON_DRIVER_AGENT_H
16 changes: 16 additions & 0 deletions aeron-driver/src/main/c/media/aeron_receive_channel_endpoint.c
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,8 @@ int aeron_receive_channel_endpoint_create(

_endpoint->cached_clock = context->receiver_cached_clock;

_endpoint->send_nak_message = context->send_nak_message_func;

if (NULL != straight_through_destination)
{
if (aeron_receive_channel_endpoint_add_destination(_endpoint, straight_through_destination) < 0)
Expand Down Expand Up @@ -332,6 +334,20 @@ int aeron_receive_channel_endpoint_send_nak(
}
}

aeron_driver_send_nak_message_func_t send_nak_message = endpoint->send_nak_message;
if (NULL != send_nak_message)
{
send_nak_message(
addr,
session_id,
stream_id,
term_id,
term_offset,
length,
endpoint->conductor_fields.udp_channel->uri_length,
endpoint->conductor_fields.udp_channel->original_uri);
}

return bytes_sent;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,8 @@ typedef struct aeron_receive_channel_endpoint_stct
aeron_udp_channel_transport_bindings_t *transport_bindings;
aeron_clock_cache_t *cached_clock;

aeron_driver_send_nak_message_func_t send_nak_message;

int64_t receiver_id;
volatile bool has_receiver_released;
struct
Expand Down
17 changes: 17 additions & 0 deletions aeron-driver/src/test/c/agent/aeron_driver_agent_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1563,3 +1563,20 @@ TEST_F(DriverAgentTest, shouldInitializeNameResolverFunctions)
EXPECT_NE(nullptr, m_context->on_host_name_func);
EXPECT_NE((void *)m_context->on_name_resolve_func, (void *)m_context->on_name_lookup_func);
}

TEST_F(DriverAgentTest, shouldNotSendNakMessageFunctionWhenNotConfigured)
{
aeron_driver_agent_init_logging_events_interceptors(m_context);

EXPECT_EQ(nullptr, m_context->send_nak_message_func);
}

TEST_F(DriverAgentTest, shouldInitializeSendNakMessageFunction)
{
EXPECT_EQ(nullptr, m_context->send_nak_message_func);

EXPECT_TRUE(aeron_driver_agent_logging_events_init("SEND_NAK_MESSAGE", nullptr));
aeron_driver_agent_init_logging_events_interceptors(m_context);

EXPECT_NE(nullptr, m_context->send_nak_message_func);
}