Skip to content

Commit

Permalink
More debugging
Browse files Browse the repository at this point in the history
  • Loading branch information
erimatnor committed Jan 26, 2023
1 parent 7f4a128 commit d1c41c4
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 7 deletions.
30 changes: 24 additions & 6 deletions tsl/src/nodes/data_node_dispatch.c
Original file line number Diff line number Diff line change
Expand Up @@ -489,6 +489,8 @@ send_batch_to_data_node(DataNodeDispatchState *sds, DataNodeState *ss)
Assert(NUM_STORED_TUPLES(ss) <= sds->flush_threshold);
Assert(NUM_STORED_TUPLES(ss) > 0);

elog(LOG, "sending batch to %s", remote_connection_node_name(ss->conn));

ss->num_tuples_sent = 0;

while (
Expand Down Expand Up @@ -570,6 +572,8 @@ send_batch_to_data_node(DataNodeDispatchState *sds, DataNodeState *ss)
* batch/node */
stmt_params_reset(sds->stmt_params);

elog(LOG, "DONE sending batch to %s", remote_connection_node_name(ss->conn));

return req;
}

Expand Down Expand Up @@ -642,11 +646,11 @@ await_all_responses(DataNodeDispatchState *sds, AsyncRequestSet *reqset)
AsyncResponseResult *rsp;
List *results = NIL;
unsigned num_responses = 0;

sds->next_tuple = 0;

elog(LOG, "awaiting all responses from data nodes");

while ((rsp = async_request_set_wait_any_result(reqset)))
{
DataNodeState *ss = async_response_result_get_user_data(rsp);
Expand All @@ -656,11 +660,11 @@ await_all_responses(DataNodeDispatchState *sds, AsyncRequestSet *reqset)

if (num_responses == 0)
{
elog(LOG, "got first response from %s", remote_connection_node_name(ss->conn));
elog(LOG, "got first response from %s", remote_connection_node_name(ss->conn));
}

num_responses++;

switch (status)
{
case PGRES_TUPLES_OK:
Expand Down Expand Up @@ -695,9 +699,8 @@ await_all_responses(DataNodeDispatchState *sds, AsyncRequestSet *reqset)
ss->next_tuple = 0;
}


elog(LOG, "got %u responses from data nodes", num_responses);

return results;
}

Expand Down Expand Up @@ -822,6 +825,8 @@ handle_flush(DataNodeDispatchState *sds)

Assert(sds->state == SD_FLUSH || sds->state == SD_LAST_FLUSH);

elog(LOG, "flushing all data nodes");

/* Save the requests and responses in the batch memory context since they
* need to survive across several iterations of the executor loop when
* there is a RETURNING clause. The batch memory context is cleared the
Expand All @@ -835,6 +840,7 @@ handle_flush(DataNodeDispatchState *sds)
pfree(reqset);
}

elog(LOG, "DONE flushing all data nodes");
data_node_dispatch_set_state(sds, SD_RETURNING);
MemoryContextSwitchTo(oldcontext);
}
Expand Down Expand Up @@ -1010,20 +1016,32 @@ data_node_dispatch_exec(CustomScanState *node)
switch (sds->state)
{
case SD_READ:
elog(LOG, "handle read");
handle_read(sds);
elog(LOG, "DONE: handle read");
break;
case SD_FLUSH:
elog(LOG, "handle flush");
handle_flush(sds);
elog(LOG, "DONE: handle flush");
break;
case SD_LAST_FLUSH:
elog(LOG, "handle last flush");
handle_flush(sds);
elog(LOG, "DONE: handle last flush");
break;
case SD_RETURNING:
elog(LOG, "handle returning");
slot = handle_returning(sds);
/* If a tuple was read, return it and wait to get called again */
done = !TupIsNull(slot);
elog(LOG, "DONE: handle returning");
break;
case SD_DONE:
done = true;
elog(LOG, "handle DONE");
Assert(TupIsNull(slot));
elog(LOG, "DONE: handle DONE");
break;
}
}
Expand Down
2 changes: 1 addition & 1 deletion tsl/src/remote/async.c
Original file line number Diff line number Diff line change
Expand Up @@ -734,7 +734,7 @@ wait_to_consume_data(AsyncRequestSet *set, TimestampTz end_time)
/* always wait for my latch */
AddWaitEventToSet(we_set, WL_LATCH_SET, PGINVALID_SOCKET, (Latch *) MyLatch, NULL);
AddWaitEventToSet(we_set, WL_EXIT_ON_PM_DEATH, PGINVALID_SOCKET, NULL, NULL);

foreach (lc, set->requests)
{
AsyncRequest *req = lfirst(lc);
Expand Down

0 comments on commit d1c41c4

Please sign in to comment.