Skip to content

Commit

Permalink
Fix GH-9344: pgsql pipeline mode proposal.
Browse files Browse the repository at this point in the history
Adding pg_send_flush_request.
Fix freeze after next execute pg_send_* on PQgetResult in _php_pgsql_link_has_results.
Set nonblocking for pipelining mode.
No flush client buffer in pg_send_* for pipelining mode.

Close GH-12644
  • Loading branch information
degtyaryov authored and devnexen committed Nov 10, 2023
1 parent 677db1b commit 6389800
Show file tree
Hide file tree
Showing 6 changed files with 195 additions and 50 deletions.
3 changes: 3 additions & 0 deletions NEWS
Expand Up @@ -44,6 +44,9 @@ PHP NEWS
. Fixed bug GH-11374 (Backport upstream fix, Different preg_match result
with -d pcre.jit=0). (mvorisek)

- PGSQL:
. Fixed pipeline mode GH-9344. (degtyaryov)

- SOAP:
. Fix potential crash with an edge case of persistent encoders. (nielsdos)
. Fixed bug #75306 (Memleak in SoapClient). (nielsdos)
Expand Down
1 change: 1 addition & 0 deletions UPGRADING
Expand Up @@ -432,6 +432,7 @@ PHP 8.3 UPGRADE NOTES
in error messages (with libpq >= 9.6).
. Added pg_enter_pipeline_mode().
. Added pg_exit_pipeline_mode().
. Added pg_send_flush_request().
. Added pg_pipeline_sync().
. Added pg_pipeline_status().

Expand Down
171 changes: 135 additions & 36 deletions ext/pgsql/pgsql.c
Expand Up @@ -3595,6 +3595,9 @@ PHP_FUNCTION(pg_send_query)
char *query;
size_t len;
PGconn *pgsql;
#ifdef LIBPQ_HAS_PIPELINING
bool is_pipeline_mode;
#endif
int is_non_blocking;
int ret;

Expand All @@ -3606,23 +3609,40 @@ PHP_FUNCTION(pg_send_query)
CHECK_PGSQL_LINK(link);
pgsql = link->conn;

is_non_blocking = PQisnonblocking(pgsql);
#ifdef LIBPQ_HAS_PIPELINING
is_pipeline_mode = (PQpipelineStatus(pgsql) == PQ_PIPELINE_ON);
if (is_pipeline_mode) {
is_non_blocking = 1;
} else {
#endif
is_non_blocking = PQisnonblocking(pgsql);

if (is_non_blocking == 0 && PQsetnonblocking(pgsql, 1) == -1) {
php_error_docref(NULL, E_NOTICE, "Cannot set connection to nonblocking mode");
RETURN_FALSE;
}
if (is_non_blocking == 0 && PQsetnonblocking(pgsql, 1) == -1) {
php_error_docref(NULL, E_NOTICE, "Cannot set connection to nonblocking mode");
RETURN_FALSE;
}

if (_php_pgsql_link_has_results(pgsql)) {
php_error_docref(NULL, E_NOTICE,
"There are results on this connection. Call pg_get_result() until it returns FALSE");
if (_php_pgsql_link_has_results(pgsql)) {
php_error_docref(NULL, E_NOTICE,
"There are results on this connection. Call pg_get_result() until it returns FALSE");
}
#ifdef LIBPQ_HAS_PIPELINING
}
#endif

if (is_non_blocking) {
if (!PQsendQuery(pgsql, query)) {
RETURN_FALSE;
}
ret = PQflush(pgsql);
#ifdef LIBPQ_HAS_PIPELINING
if (is_pipeline_mode) {
ret = 0;
} else {
#endif
ret = PQflush(pgsql);
#ifdef LIBPQ_HAS_PIPELINING
}
#endif
} else {
if (!PQsendQuery(pgsql, query)) {
if ((PGG(auto_reset_persistent) & 2) && PQstatus(pgsql) != CONNECTION_OK) {
Expand Down Expand Up @@ -3667,6 +3687,9 @@ PHP_FUNCTION(pg_send_query_params)
char *query;
size_t query_len;
PGconn *pgsql;
#ifdef LIBPQ_HAS_PIPELINING
bool is_pipeline_mode;
#endif
int is_non_blocking;
int ret;

Expand All @@ -3678,17 +3701,26 @@ PHP_FUNCTION(pg_send_query_params)
CHECK_PGSQL_LINK(link);
pgsql = link->conn;

is_non_blocking = PQisnonblocking(pgsql);
#ifdef LIBPQ_HAS_PIPELINING
is_pipeline_mode = (PQpipelineStatus(pgsql) == PQ_PIPELINE_ON);
if (is_pipeline_mode) {
is_non_blocking = 1;
} else {
#endif
is_non_blocking = PQisnonblocking(pgsql);

if (is_non_blocking == 0 && PQsetnonblocking(pgsql, 1) == -1) {
php_error_docref(NULL, E_NOTICE, "Cannot set connection to nonblocking mode");
RETURN_FALSE;
}
if (is_non_blocking == 0 && PQsetnonblocking(pgsql, 1) == -1) {
php_error_docref(NULL, E_NOTICE, "Cannot set connection to nonblocking mode");
RETURN_FALSE;
}

if (_php_pgsql_link_has_results(pgsql)) {
php_error_docref(NULL, E_NOTICE,
"There are results on this connection. Call pg_get_result() until it returns FALSE");
if (_php_pgsql_link_has_results(pgsql)) {
php_error_docref(NULL, E_NOTICE,
"There are results on this connection. Call pg_get_result() until it returns FALSE");
}
#ifdef LIBPQ_HAS_PIPELINING
}
#endif

num_params = zend_hash_num_elements(Z_ARRVAL_P(pv_param_arr));
if (num_params > 0) {
Expand Down Expand Up @@ -3727,7 +3759,15 @@ PHP_FUNCTION(pg_send_query_params)
}

if (is_non_blocking) {
ret = PQflush(pgsql);
#ifdef LIBPQ_HAS_PIPELINING
if (is_pipeline_mode) {
ret = 0;
} else {
#endif
ret = PQflush(pgsql);
#ifdef LIBPQ_HAS_PIPELINING
}
#endif
} else {
/* Wait to finish sending buffer */
while ((ret = PQflush(pgsql))) {
Expand Down Expand Up @@ -3761,6 +3801,9 @@ PHP_FUNCTION(pg_send_prepare)
char *query, *stmtname;
size_t stmtname_len, query_len;
PGconn *pgsql;
#ifdef LIBPQ_HAS_PIPELINING
bool is_pipeline_mode;
#endif
int is_non_blocking;
int ret;

Expand All @@ -3772,17 +3815,26 @@ PHP_FUNCTION(pg_send_prepare)
CHECK_PGSQL_LINK(link);
pgsql = link->conn;

is_non_blocking = PQisnonblocking(pgsql);
#ifdef LIBPQ_HAS_PIPELINING
is_pipeline_mode = (PQpipelineStatus(pgsql) == PQ_PIPELINE_ON);
if (is_pipeline_mode) {
is_non_blocking = 1;
} else {
#endif
is_non_blocking = PQisnonblocking(pgsql);

if (is_non_blocking == 0 && PQsetnonblocking(pgsql, 1) == -1) {
php_error_docref(NULL, E_NOTICE, "Cannot set connection to nonblocking mode");
RETURN_FALSE;
}
if (is_non_blocking == 0 && PQsetnonblocking(pgsql, 1) == -1) {
php_error_docref(NULL, E_NOTICE, "Cannot set connection to nonblocking mode");
RETURN_FALSE;
}

if (_php_pgsql_link_has_results(pgsql)) {
php_error_docref(NULL, E_NOTICE,
"There are results on this connection. Call pg_get_result() until it returns FALSE");
if (_php_pgsql_link_has_results(pgsql)) {
php_error_docref(NULL, E_NOTICE,
"There are results on this connection. Call pg_get_result() until it returns FALSE");
}
#ifdef LIBPQ_HAS_PIPELINING
}
#endif

if (!PQsendPrepare(pgsql, stmtname, query, 0, NULL)) {
if (is_non_blocking) {
Expand All @@ -3798,7 +3850,15 @@ PHP_FUNCTION(pg_send_prepare)
}

if (is_non_blocking) {
ret = PQflush(pgsql);
#ifdef LIBPQ_HAS_PIPELINING
if (is_pipeline_mode) {
ret = 0;
} else {
#endif
ret = PQflush(pgsql);
#ifdef LIBPQ_HAS_PIPELINING
}
#endif
} else {
/* Wait to finish sending buffer */
while ((ret = PQflush(pgsql))) {
Expand Down Expand Up @@ -3834,6 +3894,9 @@ PHP_FUNCTION(pg_send_execute)
char *stmtname;
size_t stmtname_len;
PGconn *pgsql;
#ifdef LIBPQ_HAS_PIPELINING
bool is_pipeline_mode;
#endif
int is_non_blocking;
int ret;

Expand All @@ -3845,17 +3908,26 @@ PHP_FUNCTION(pg_send_execute)
CHECK_PGSQL_LINK(link);
pgsql = link->conn;

is_non_blocking = PQisnonblocking(pgsql);
#ifdef LIBPQ_HAS_PIPELINING
is_pipeline_mode = (PQpipelineStatus(pgsql) == PQ_PIPELINE_ON);
if (is_pipeline_mode) {
is_non_blocking = 1;
} else {
#endif
is_non_blocking = PQisnonblocking(pgsql);

if (is_non_blocking == 0 && PQsetnonblocking(pgsql, 1) == -1) {
php_error_docref(NULL, E_NOTICE, "Cannot set connection to nonblocking mode");
RETURN_FALSE;
}
if (is_non_blocking == 0 && PQsetnonblocking(pgsql, 1) == -1) {
php_error_docref(NULL, E_NOTICE, "Cannot set connection to nonblocking mode");
RETURN_FALSE;
}

if (_php_pgsql_link_has_results(pgsql)) {
php_error_docref(NULL, E_NOTICE,
"There are results on this connection. Call pg_get_result() until it returns FALSE");
if (_php_pgsql_link_has_results(pgsql)) {
php_error_docref(NULL, E_NOTICE,
"There are results on this connection. Call pg_get_result() until it returns FALSE");
}
#ifdef LIBPQ_HAS_PIPELINING
}
#endif

num_params = zend_hash_num_elements(Z_ARRVAL_P(pv_param_arr));
if (num_params > 0) {
Expand Down Expand Up @@ -3896,7 +3968,15 @@ PHP_FUNCTION(pg_send_execute)
}

if (is_non_blocking) {
ret = PQflush(pgsql);
#ifdef LIBPQ_HAS_PIPELINING
if (is_pipeline_mode) {
ret = 0;
} else {
#endif
ret = PQflush(pgsql);
#ifdef LIBPQ_HAS_PIPELINING
}
#endif
} else {
/* Wait to finish sending buffer */
while ((ret = PQflush(pgsql))) {
Expand Down Expand Up @@ -5891,6 +5971,8 @@ PHP_FUNCTION(pg_enter_pipeline_mode)
pgsql_handle = Z_PGSQL_LINK_P(pgsql_link);
CHECK_PGSQL_LINK(pgsql_handle);

PQsetnonblocking(pgsql_handle->conn, 1);

RETURN_BOOL(PQenterPipelineMode(pgsql_handle->conn));
}

Expand All @@ -5906,9 +5988,26 @@ PHP_FUNCTION(pg_exit_pipeline_mode)
pgsql_handle = Z_PGSQL_LINK_P(pgsql_link);
CHECK_PGSQL_LINK(pgsql_handle);

PQsetnonblocking(pgsql_handle->conn, 0);

RETURN_BOOL(PQexitPipelineMode(pgsql_handle->conn));
}

PHP_FUNCTION(pg_send_flush_request)
{
zval *pgsql_link;
pgsql_link_handle *pgsql_handle;

if (zend_parse_parameters(ZEND_NUM_ARGS(), "O", &pgsql_link, pgsql_link_ce) == FAILURE) {
RETURN_THROWS();
}

pgsql_handle = Z_PGSQL_LINK_P(pgsql_link);
CHECK_PGSQL_LINK(pgsql_handle);

RETURN_BOOL(PQsendFlushRequest(pgsql_handle->conn));
}

PHP_FUNCTION(pg_pipeline_sync)
{
zval *pgsql_link;
Expand Down
1 change: 1 addition & 0 deletions ext/pgsql/pgsql.stub.php
Expand Up @@ -966,6 +966,7 @@ function pg_select(PgSql\Connection $connection, string $table_name, array $cond
#ifdef LIBPQ_HAS_PIPELINING
function pg_enter_pipeline_mode(PgSql\Connection $connection): bool {}
function pg_exit_pipeline_mode(PgSql\Connection $connection): bool {}
function pg_send_flush_request(PgSql\Connection $connection): bool {}
function pg_pipeline_sync(PgSql\Connection $connection): bool {}
function pg_pipeline_status(PgSql\Connection $connection): int {}
#endif
Expand Down
12 changes: 11 additions & 1 deletion ext/pgsql/pgsql_arginfo.h

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit 6389800

Please sign in to comment.