Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix GH-9344: pgsql pipeline mode proposal. #12644

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions UPGRADING
Original file line number Diff line number Diff line change
Expand Up @@ -432,6 +432,7 @@ PHP 8.3 UPGRADE NOTES
in error messages (with libpq >= 9.6).
. Added pg_enter_pipeline_mode().
. Added pg_exit_pipeline_mode().
. Added pg_send_flush_request().
. Added pg_pipeline_sync().
. Added pg_pipeline_status().

Expand Down
171 changes: 135 additions & 36 deletions ext/pgsql/pgsql.c
Original file line number Diff line number Diff line change
Expand Up @@ -3595,6 +3595,9 @@ PHP_FUNCTION(pg_send_query)
char *query;
size_t len;
PGconn *pgsql;
#ifdef LIBPQ_HAS_PIPELINING
bool is_pipeline_mode;
#endif
int is_non_blocking;
int ret;

Expand All @@ -3606,23 +3609,40 @@ PHP_FUNCTION(pg_send_query)
CHECK_PGSQL_LINK(link);
pgsql = link->conn;

is_non_blocking = PQisnonblocking(pgsql);
#ifdef LIBPQ_HAS_PIPELINING
is_pipeline_mode = (PQpipelineStatus(pgsql) == PQ_PIPELINE_ON);
if (is_pipeline_mode) {
is_non_blocking = 1;
} else {
#endif
is_non_blocking = PQisnonblocking(pgsql);

if (is_non_blocking == 0 && PQsetnonblocking(pgsql, 1) == -1) {
php_error_docref(NULL, E_NOTICE, "Cannot set connection to nonblocking mode");
RETURN_FALSE;
}
if (is_non_blocking == 0 && PQsetnonblocking(pgsql, 1) == -1) {
php_error_docref(NULL, E_NOTICE, "Cannot set connection to nonblocking mode");
RETURN_FALSE;
}

if (_php_pgsql_link_has_results(pgsql)) {
php_error_docref(NULL, E_NOTICE,
"There are results on this connection. Call pg_get_result() until it returns FALSE");
if (_php_pgsql_link_has_results(pgsql)) {
php_error_docref(NULL, E_NOTICE,
"There are results on this connection. Call pg_get_result() until it returns FALSE");
}
#ifdef LIBPQ_HAS_PIPELINING
}
#endif

if (is_non_blocking) {
if (!PQsendQuery(pgsql, query)) {
RETURN_FALSE;
}
ret = PQflush(pgsql);
#ifdef LIBPQ_HAS_PIPELINING
if (is_pipeline_mode) {
ret = 0;
} else {
#endif
ret = PQflush(pgsql);
#ifdef LIBPQ_HAS_PIPELINING
}
#endif
} else {
if (!PQsendQuery(pgsql, query)) {
if ((PGG(auto_reset_persistent) & 2) && PQstatus(pgsql) != CONNECTION_OK) {
Expand Down Expand Up @@ -3667,6 +3687,9 @@ PHP_FUNCTION(pg_send_query_params)
char *query;
size_t query_len;
PGconn *pgsql;
#ifdef LIBPQ_HAS_PIPELINING
bool is_pipeline_mode;
#endif
int is_non_blocking;
int ret;

Expand All @@ -3678,17 +3701,26 @@ PHP_FUNCTION(pg_send_query_params)
CHECK_PGSQL_LINK(link);
pgsql = link->conn;

is_non_blocking = PQisnonblocking(pgsql);
#ifdef LIBPQ_HAS_PIPELINING
is_pipeline_mode = (PQpipelineStatus(pgsql) == PQ_PIPELINE_ON);
if (is_pipeline_mode) {
is_non_blocking = 1;
} else {
#endif
is_non_blocking = PQisnonblocking(pgsql);

if (is_non_blocking == 0 && PQsetnonblocking(pgsql, 1) == -1) {
php_error_docref(NULL, E_NOTICE, "Cannot set connection to nonblocking mode");
RETURN_FALSE;
}
if (is_non_blocking == 0 && PQsetnonblocking(pgsql, 1) == -1) {
php_error_docref(NULL, E_NOTICE, "Cannot set connection to nonblocking mode");
RETURN_FALSE;
}

if (_php_pgsql_link_has_results(pgsql)) {
php_error_docref(NULL, E_NOTICE,
"There are results on this connection. Call pg_get_result() until it returns FALSE");
if (_php_pgsql_link_has_results(pgsql)) {
php_error_docref(NULL, E_NOTICE,
"There are results on this connection. Call pg_get_result() until it returns FALSE");
}
#ifdef LIBPQ_HAS_PIPELINING
}
#endif

num_params = zend_hash_num_elements(Z_ARRVAL_P(pv_param_arr));
if (num_params > 0) {
Expand Down Expand Up @@ -3727,7 +3759,15 @@ PHP_FUNCTION(pg_send_query_params)
}

if (is_non_blocking) {
ret = PQflush(pgsql);
#ifdef LIBPQ_HAS_PIPELINING
if (is_pipeline_mode) {
ret = 0;
} else {
#endif
ret = PQflush(pgsql);
#ifdef LIBPQ_HAS_PIPELINING
}
#endif
} else {
/* Wait to finish sending buffer */
while ((ret = PQflush(pgsql))) {
Expand Down Expand Up @@ -3761,6 +3801,9 @@ PHP_FUNCTION(pg_send_prepare)
char *query, *stmtname;
size_t stmtname_len, query_len;
PGconn *pgsql;
#ifdef LIBPQ_HAS_PIPELINING
bool is_pipeline_mode;
#endif
int is_non_blocking;
int ret;

Expand All @@ -3772,17 +3815,26 @@ PHP_FUNCTION(pg_send_prepare)
CHECK_PGSQL_LINK(link);
pgsql = link->conn;

is_non_blocking = PQisnonblocking(pgsql);
#ifdef LIBPQ_HAS_PIPELINING
is_pipeline_mode = (PQpipelineStatus(pgsql) == PQ_PIPELINE_ON);
if (is_pipeline_mode) {
is_non_blocking = 1;
} else {
#endif
is_non_blocking = PQisnonblocking(pgsql);

if (is_non_blocking == 0 && PQsetnonblocking(pgsql, 1) == -1) {
php_error_docref(NULL, E_NOTICE, "Cannot set connection to nonblocking mode");
RETURN_FALSE;
}
if (is_non_blocking == 0 && PQsetnonblocking(pgsql, 1) == -1) {
php_error_docref(NULL, E_NOTICE, "Cannot set connection to nonblocking mode");
RETURN_FALSE;
}

if (_php_pgsql_link_has_results(pgsql)) {
php_error_docref(NULL, E_NOTICE,
"There are results on this connection. Call pg_get_result() until it returns FALSE");
if (_php_pgsql_link_has_results(pgsql)) {
php_error_docref(NULL, E_NOTICE,
"There are results on this connection. Call pg_get_result() until it returns FALSE");
}
#ifdef LIBPQ_HAS_PIPELINING
}
#endif

if (!PQsendPrepare(pgsql, stmtname, query, 0, NULL)) {
if (is_non_blocking) {
Expand All @@ -3798,7 +3850,15 @@ PHP_FUNCTION(pg_send_prepare)
}

if (is_non_blocking) {
ret = PQflush(pgsql);
#ifdef LIBPQ_HAS_PIPELINING
if (is_pipeline_mode) {
ret = 0;
} else {
#endif
ret = PQflush(pgsql);
#ifdef LIBPQ_HAS_PIPELINING
}
#endif
} else {
/* Wait to finish sending buffer */
while ((ret = PQflush(pgsql))) {
Expand Down Expand Up @@ -3834,6 +3894,9 @@ PHP_FUNCTION(pg_send_execute)
char *stmtname;
size_t stmtname_len;
PGconn *pgsql;
#ifdef LIBPQ_HAS_PIPELINING
bool is_pipeline_mode;
#endif
int is_non_blocking;
int ret;

Expand All @@ -3845,17 +3908,26 @@ PHP_FUNCTION(pg_send_execute)
CHECK_PGSQL_LINK(link);
pgsql = link->conn;

is_non_blocking = PQisnonblocking(pgsql);
#ifdef LIBPQ_HAS_PIPELINING
is_pipeline_mode = (PQpipelineStatus(pgsql) == PQ_PIPELINE_ON);
if (is_pipeline_mode) {
is_non_blocking = 1;
} else {
#endif
is_non_blocking = PQisnonblocking(pgsql);

if (is_non_blocking == 0 && PQsetnonblocking(pgsql, 1) == -1) {
php_error_docref(NULL, E_NOTICE, "Cannot set connection to nonblocking mode");
RETURN_FALSE;
}
if (is_non_blocking == 0 && PQsetnonblocking(pgsql, 1) == -1) {
php_error_docref(NULL, E_NOTICE, "Cannot set connection to nonblocking mode");
RETURN_FALSE;
}

if (_php_pgsql_link_has_results(pgsql)) {
php_error_docref(NULL, E_NOTICE,
"There are results on this connection. Call pg_get_result() until it returns FALSE");
if (_php_pgsql_link_has_results(pgsql)) {
php_error_docref(NULL, E_NOTICE,
"There are results on this connection. Call pg_get_result() until it returns FALSE");
}
#ifdef LIBPQ_HAS_PIPELINING
}
#endif

num_params = zend_hash_num_elements(Z_ARRVAL_P(pv_param_arr));
if (num_params > 0) {
Expand Down Expand Up @@ -3896,7 +3968,15 @@ PHP_FUNCTION(pg_send_execute)
}

if (is_non_blocking) {
ret = PQflush(pgsql);
#ifdef LIBPQ_HAS_PIPELINING
if (is_pipeline_mode) {
ret = 0;
} else {
#endif
ret = PQflush(pgsql);
#ifdef LIBPQ_HAS_PIPELINING
}
#endif
} else {
/* Wait to finish sending buffer */
while ((ret = PQflush(pgsql))) {
Expand Down Expand Up @@ -5891,6 +5971,8 @@ PHP_FUNCTION(pg_enter_pipeline_mode)
pgsql_handle = Z_PGSQL_LINK_P(pgsql_link);
CHECK_PGSQL_LINK(pgsql_handle);

PQsetnonblocking(pgsql_handle->conn, 1);

RETURN_BOOL(PQenterPipelineMode(pgsql_handle->conn));
}

Expand All @@ -5906,9 +5988,26 @@ PHP_FUNCTION(pg_exit_pipeline_mode)
pgsql_handle = Z_PGSQL_LINK_P(pgsql_link);
CHECK_PGSQL_LINK(pgsql_handle);

PQsetnonblocking(pgsql_handle->conn, 0);

RETURN_BOOL(PQexitPipelineMode(pgsql_handle->conn));
}

PHP_FUNCTION(pg_send_flush_request)
{
zval *pgsql_link;
pgsql_link_handle *pgsql_handle;

if (zend_parse_parameters(ZEND_NUM_ARGS(), "O", &pgsql_link, pgsql_link_ce) == FAILURE) {
RETURN_THROWS();
}

pgsql_handle = Z_PGSQL_LINK_P(pgsql_link);
CHECK_PGSQL_LINK(pgsql_handle);

RETURN_BOOL(PQsendFlushRequest(pgsql_handle->conn));
}

PHP_FUNCTION(pg_pipeline_sync)
{
zval *pgsql_link;
Expand Down
1 change: 1 addition & 0 deletions ext/pgsql/pgsql.stub.php
Original file line number Diff line number Diff line change
Expand Up @@ -966,6 +966,7 @@ function pg_select(PgSql\Connection $connection, string $table_name, array $cond
#ifdef LIBPQ_HAS_PIPELINING
function pg_enter_pipeline_mode(PgSql\Connection $connection): bool {}
function pg_exit_pipeline_mode(PgSql\Connection $connection): bool {}
function pg_send_flush_request(PgSql\Connection $connection): bool {}
function pg_pipeline_sync(PgSql\Connection $connection): bool {}
function pg_pipeline_status(PgSql\Connection $connection): int {}
#endif
Expand Down
12 changes: 11 additions & 1 deletion ext/pgsql/pgsql_arginfo.h

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.