Skip to content

Commit

Permalink
Fix
Browse files Browse the repository at this point in the history
  • Loading branch information
erimatnor committed Jan 27, 2023
1 parent 08a1654 commit 4a56906
Showing 1 changed file with 140 additions and 8 deletions.
148 changes: 140 additions & 8 deletions tsl/src/remote/async.c
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,8 @@ async_request_send_internal(AsyncRequest *req, int elevel)

if (req->stmt_name)
{
elog(LOG, "calling PQsendQueryPrepared() to %s", remote_connection_node_name(req->conn));
elog(LOG, "calling PQsendQueryPrepared() to %s stmt: %s",
remote_connection_node_name(req->conn), req->stmt_name);

ret = PQsendQueryPrepared(remote_connection_get_pg_conn(req->conn),
req->stmt_name,
Expand All @@ -177,7 +178,8 @@ async_request_send_internal(AsyncRequest *req, int elevel)
}
else
{
elog(LOG, "calling PQsendQueryParams() to %s", remote_connection_node_name(req->conn));
elog(LOG, "calling PQsendQueryParams() to %s SQL: %s",
remote_connection_node_name(req->conn), req->sql);

/*
* We intentionally do not specify parameter types here, but leave the
Expand Down Expand Up @@ -720,7 +722,8 @@ get_single_response_nonblocking2(AsyncRequestSet *set)
{
AsyncRequest *req = lfirst(lc);
PGconn *pg_conn = remote_connection_get_pg_conn(req->conn);

int ret = 0;

switch (req->state)
{
case DEFERRED:
Expand All @@ -742,9 +745,14 @@ get_single_response_nonblocking2(AsyncRequestSet *set)
case EXECUTING:
elog(LOG, "checking busy on %s", remote_connection_node_name(req->conn));

if (0 == PQisBusy(pg_conn))
ret = PQisBusy(pg_conn);

elog(LOG, "PQisBusy() on %s returned %d",
remote_connection_node_name(req->conn), ret);

if (0 == ret)
{
elog(LOG, "gettint request on %s", remote_connection_node_name(req->conn));
elog(LOG, "getting request on %s", remote_connection_node_name(req->conn));

PGresult *res = PQgetResult(pg_conn);

Expand All @@ -761,17 +769,24 @@ get_single_response_nonblocking2(AsyncRequestSet *set)
set->requests = list_delete_ptr(set->requests, req);
remote_connection_set_status(req->conn, CONN_IDLE);
async_request_set_state(req, COMPLETED);
elog(LOG, "node %s is completed", remote_connection_node_name(req->conn));

/* set changed so rerun function */
return get_single_response_nonblocking(set);
return get_single_response_nonblocking2(set);
}
return &async_response_result_create(req, res)->base;
}
else
{
elog(LOG, "connection to %s is busy", remote_connection_node_name(req->conn));
}

break;
case COMPLETED:
return async_response_error_create("request already completed");
}
}
elog(LOG, "get_single_response return NULL with num_requests=%d", list_length(set->requests));

return NULL;
}
Expand Down Expand Up @@ -895,6 +910,120 @@ wait_to_consume_data(AsyncRequestSet *set, TimestampTz end_time)
return result;
}

static AsyncResponse *
wait_to_consume_data2(AsyncRequestSet *set, TimestampTz end_time)
{
/*
* Looks like there is no good way to modify a WaitEventSet so we have to
* make a new one, otherwise we can't turn off wait events
*/
WaitEventSet *we_set;
ListCell *lc;
int rc;
WaitEvent event;
AsyncRequest *wait_req;
AsyncResponse *result;
long timeout_ms = -1L;

Assert(list_length(set->requests) > 0);

if (end_time != TS_NO_TIMEOUT)
{
TimestampTz now = GetCurrentTimestamp();
long secs;
int microsecs;

if (now >= end_time)
return async_response_timeout_create();

TimestampDifference(now, end_time, &secs, &microsecs);
timeout_ms = secs * 1000 + (microsecs / 1000);
}

we_set = CreateWaitEventSet(CurrentMemoryContext, list_length(set->requests) + 2);

/* always wait for my latch */
AddWaitEventToSet(we_set, WL_LATCH_SET, PGINVALID_SOCKET, (Latch *) MyLatch, NULL);
AddWaitEventToSet(we_set, WL_EXIT_ON_PM_DEATH, PGINVALID_SOCKET, NULL, NULL);

foreach (lc, set->requests)
{
AsyncRequest *req = lfirst(lc);

elog(LOG, "adding %s to wait set", remote_connection_node_name(req->conn));

AddWaitEventToSet(we_set,
WL_SOCKET_READABLE,
PQsocket(remote_connection_get_pg_conn(req->conn)),
NULL,
req);
}

while (true)
{
wait_req = NULL;

CHECK_FOR_INTERRUPTS();

rc = WaitEventSetWait(we_set, timeout_ms, &event, 1, PG_WAIT_EXTENSION);

if (rc == 0)
{
result = async_response_timeout_create();
break;
}

if (event.events & ~(WL_SOCKET_READABLE | WL_LATCH_SET))
{
/*
* Sanity check on the wait result: we haven't requested anything
* other than my latch or the socket becoming readable.
*/
result = async_response_error_create(
psprintf("Unexpected event 0x%X while waiting for async request result",
event.events));
break;
}

if (event.events & WL_LATCH_SET)
{
ResetLatch(MyLatch);
CHECK_FOR_INTERRUPTS();
}

if (event.events & WL_SOCKET_READABLE)
{
wait_req = event.user_data;
Assert(wait_req != NULL);
PGconn *pg_conn = remote_connection_get_pg_conn(wait_req->conn);

if (0 == PQconsumeInput(pg_conn))
{
/* An error has occurred, remove connection from set. */
set->requests = list_delete_ptr(set->requests, wait_req);
result = &async_response_communication_error_create(wait_req)->base;
break;
}

/*
* From postgres docs on PQConsumeInput():
* Note that the result does not say whether any input data was
* actually collected. After calling PQconsumeInput, the
* application can check PQisBusy and/or PQnotifies to see if their
* state has changed.
*/
if (PQisBusy(pg_conn) == 0)
{
result = NULL;
break;
}
}
}

FreeWaitEventSet(we_set);
return result;
}

/* Return NULL when nothing more to do in set */
AsyncResponse *
async_request_set_wait_any_response_deadline(AsyncRequestSet *set, TimestampTz endtime)
Expand Down Expand Up @@ -962,7 +1091,10 @@ async_request_set_wait_any_response_deadline2(AsyncRequestSet *set, TimestampTz
{
elog(LOG, "getting single response");
response = get_single_response_nonblocking2(set);
elog(LOG, "DONE: getting single response=%p", response);
elog(LOG,
"DONE: getting single response=%p num_requests=%d",
response,
list_length(set->requests));

if (response != NULL)
break;
Expand All @@ -972,7 +1104,7 @@ async_request_set_wait_any_response_deadline2(AsyncRequestSet *set, TimestampTz
return NULL;

elog(LOG, "Waiting to consume data");
response = wait_to_consume_data(set, endtime);
response = wait_to_consume_data2(set, endtime);
elog(LOG, "DONE: Waiting to consume data");

if (response != NULL)
Expand Down

0 comments on commit 4a56906

Please sign in to comment.