Skip to content

Commit

Permalink
Debug
Browse files Browse the repository at this point in the history
  • Loading branch information
erimatnor committed Jan 26, 2023
1 parent 8db10e4 commit 7f4a128
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 4 deletions.
15 changes: 14 additions & 1 deletion tsl/src/nodes/data_node_dispatch.c
Original file line number Diff line number Diff line change
Expand Up @@ -641,16 +641,26 @@ await_all_responses(DataNodeDispatchState *sds, AsyncRequestSet *reqset)
{
AsyncResponseResult *rsp;
List *results = NIL;

unsigned num_responses = 0;

sds->next_tuple = 0;

elog(LOG, "awaiting all responses from data nodes");

while ((rsp = async_request_set_wait_any_result(reqset)))
{
DataNodeState *ss = async_response_result_get_user_data(rsp);
PGresult *res = async_response_result_get_pg_result(rsp);
ExecStatusType status = PQresultStatus(res);
bool report_error = true;

if (num_responses == 0)
{
elog(LOG, "got first response from %s", remote_connection_node_name(ss->conn));
}

num_responses++;

switch (status)
{
case PGRES_TUPLES_OK:
Expand Down Expand Up @@ -685,6 +695,9 @@ await_all_responses(DataNodeDispatchState *sds, AsyncRequestSet *reqset)
ss->next_tuple = 0;
}


elog(LOG, "got %u responses from data nodes", num_responses);

return results;
}

Expand Down
9 changes: 6 additions & 3 deletions tsl/src/remote/async.c
Original file line number Diff line number Diff line change
Expand Up @@ -729,11 +729,12 @@ wait_to_consume_data(AsyncRequestSet *set, TimestampTz end_time)
timeout_ms = secs * 1000 + (microsecs / 1000);
}

we_set = CreateWaitEventSet(CurrentMemoryContext, list_length(set->requests) + 1);
we_set = CreateWaitEventSet(CurrentMemoryContext, list_length(set->requests) + 2);

/* always wait for my latch */
AddWaitEventToSet(we_set, WL_LATCH_SET, PGINVALID_SOCKET, (Latch *) MyLatch, NULL);

AddWaitEventToSet(we_set, WL_EXIT_ON_PM_DEATH, PGINVALID_SOCKET, NULL, NULL);

foreach (lc, set->requests)
{
AsyncRequest *req = lfirst(lc);
Expand All @@ -748,6 +749,9 @@ wait_to_consume_data(AsyncRequestSet *set, TimestampTz end_time)
while (true)
{
wait_req = NULL;

CHECK_FOR_INTERRUPTS();

rc = WaitEventSetWait(we_set, timeout_ms, &event, 1, PG_WAIT_EXTENSION);

if (rc == 0)
Expand All @@ -771,7 +775,6 @@ wait_to_consume_data(AsyncRequestSet *set, TimestampTz end_time)
if (event.events & WL_LATCH_SET)
{
ResetLatch(MyLatch);
CHECK_FOR_INTERRUPTS();
}

if (event.events & WL_SOCKET_READABLE)
Expand Down

0 comments on commit 7f4a128

Please sign in to comment.