Skip to content

Commit

Permalink
Fix GH-9344: pgsql pipeline mode proposal.
Browse files Browse the repository at this point in the history
Adding pg_enter_pipeline_mode, pg_exit_pipeline_mode,
pg_pipeline_sync and pg_pipeline_status.

Close GH-10868
  • Loading branch information
devnexen committed Apr 28, 2023
1 parent 732d92c commit 6a9061e
Show file tree
Hide file tree
Showing 5 changed files with 249 additions and 1 deletion.
1 change: 1 addition & 0 deletions NEWS
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ PHP NEWS
- PGSQL:
. pg_fetch_object raises a ValueError instead of an Exception.
(David Carlier)
. Added GH-9344, pipeline mode support. (David Carlier)

- Phar:
. Fix memory leak in phar_rename_archive(). (stkeke)
Expand Down
62 changes: 62 additions & 0 deletions ext/pgsql/pgsql.c
Original file line number Diff line number Diff line change
Expand Up @@ -5798,4 +5798,66 @@ PHP_FUNCTION(pg_select)
}
/* }}} */

#ifdef LIBPQ_HAS_PIPELINING
PHP_FUNCTION(pg_enter_pipeline_mode)
{
zval *pgsql_link;
pgsql_link_handle *pgsql_handle;

if (zend_parse_parameters(ZEND_NUM_ARGS(), "O", &pgsql_link, pgsql_link_ce) == FAILURE) {
RETURN_THROWS();
}

pgsql_handle = Z_PGSQL_LINK_P(pgsql_link);
CHECK_PGSQL_LINK(pgsql_handle);

RETURN_BOOL(PQenterPipelineMode(pgsql_handle->conn));
}

PHP_FUNCTION(pg_exit_pipeline_mode)
{
zval *pgsql_link;
pgsql_link_handle *pgsql_handle;

if (zend_parse_parameters(ZEND_NUM_ARGS(), "O", &pgsql_link, pgsql_link_ce) == FAILURE) {
RETURN_THROWS();
}

pgsql_handle = Z_PGSQL_LINK_P(pgsql_link);
CHECK_PGSQL_LINK(pgsql_handle);

RETURN_BOOL(PQexitPipelineMode(pgsql_handle->conn));
}

PHP_FUNCTION(pg_pipeline_sync)
{
zval *pgsql_link;
pgsql_link_handle *pgsql_handle;

if (zend_parse_parameters(ZEND_NUM_ARGS(), "O", &pgsql_link, pgsql_link_ce) == FAILURE) {
RETURN_THROWS();
}

pgsql_handle = Z_PGSQL_LINK_P(pgsql_link);
CHECK_PGSQL_LINK(pgsql_handle);

RETURN_BOOL(PQpipelineSync(pgsql_handle->conn));
}

PHP_FUNCTION(pg_pipeline_status)
{
zval *pgsql_link;
pgsql_link_handle *pgsql_handle;

if (zend_parse_parameters(ZEND_NUM_ARGS(), "O", &pgsql_link, pgsql_link_ce) == FAILURE) {
RETURN_THROWS();
}

pgsql_handle = Z_PGSQL_LINK_P(pgsql_link);
CHECK_PGSQL_LINK(pgsql_handle);

RETURN_LONG(PQpipelineStatus(pgsql_handle->conn));
}
#endif

#endif
30 changes: 30 additions & 0 deletions ext/pgsql/pgsql.stub.php
Original file line number Diff line number Diff line change
Expand Up @@ -413,6 +413,29 @@
*/
const PGSQL_DML_STRING = UNKNOWN;

#ifdef LIBPQ_HAS_PIPELINING
/**
* @var int
* @cvalue PGRES_PIPELINE_SYNC
*/
const PGSQL_PIPELINE_SYNC = UNKNOWN;
/**
* @var int
* @cvalue PQ_PIPELINE_ON
*/
const PGSQL_PIPELINE_ON = UNKNOWN;
/**
* @var int
* @cvalue PQ_PIPELINE_OFF
*/
const PGSQL_PIPELINE_OFF = UNKNOWN;
/**
* @var int
* @cvalue PQ_PIPELINE_ABORTED
*/
const PGSQL_PIPELINE_ABORTED = UNKNOWN;
#endif

function pg_connect(string $connection_string, int $flags = 0): PgSql\Connection|false {}

function pg_pconnect(string $connection_string, int $flags = 0): PgSql\Connection|false {}
Expand Down Expand Up @@ -894,6 +917,13 @@ function pg_delete(PgSql\Connection $connection, string $table_name, array $cond
* @refcount 1
*/
function pg_select(PgSql\Connection $connection, string $table_name, array $conditions, int $flags = PGSQL_DML_EXEC, int $mode = PGSQL_ASSOC): array|string|false {}

#ifdef LIBPQ_HAS_PIPELINING
function pg_enter_pipeline_mode(PgSql\Connection $connection): bool {}
function pg_exit_pipeline_mode(PgSql\Connection $connection): bool {}
function pg_pipeline_sync(PgSql\Connection $connection): bool {}
function pg_pipeline_status(PgSql\Connection $connection): int {}
#endif
}

namespace PgSql {
Expand Down
58 changes: 57 additions & 1 deletion ext/pgsql/pgsql_arginfo.h

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

99 changes: 99 additions & 0 deletions ext/pgsql/tests/pg_pipeline_sync.phpt
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
--TEST--
PostgreSQL pipeline mode
--EXTENSIONS--
pgsql
--SKIPIF--
<?php
include("skipif.inc");
if (!defined('PGSQL_PIPELINE_SYNC') || !function_exists('pg_send_query_params')) {
die('skip pipeline mode not available');
}
?>
--FILE--
<?php

include('config.inc');
include('nonblocking.inc');

if (!$db = pg_connect($conn_str, PGSQL_CONNECT_ASYNC)) {
die("pg_connect() error");
} elseif (pg_connection_status($db) === PGSQL_CONNECTION_BAD) {
die("pg_connect() error");
} elseif ($db_socket = pg_socket($db)) {
stream_set_blocking($db_socket, FALSE);
} else {
die("pg_socket() error");
}

while (true) {
switch ($status = pg_connect_poll($db)) {
case PGSQL_POLLING_READING:
nb_is_readable($db_socket);
break;
case PGSQL_POLLING_WRITING:
nb_is_writable($db_socket);
break;
case PGSQL_POLLING_FAILED:
die("async connection failed");
case PGSQL_POLLING_OK:
break 2;
default:
die("unknown poll status");
}
}

if (!pg_enter_pipeline_mode($db)) {
die('pg_enter_pipeline_mode{}');
}

if (!pg_send_query_params($db, "select $1 as index, now() + ($1||' day')::interval as time", array(1))) {
die('pg_send_query_params failed');
}

if (!pg_pipeline_sync($db)) {
die('pg_pipeline_sync failed');
}

if (pg_pipeline_status($db) !== PGSQL_PIPELINE_ON) {
die('pg_pipeline_status failed');
}

if (!($result = pg_get_result($db))) {
die('pg_get_result');
}

if (pg_result_status($result) !== PGSQL_TUPLES_OK) {
die('pg_result_status failed');
}

if (pg_num_rows($result) == -1) {
die('pg_num_rows failed');
}

if (!pg_fetch_row($result, null)) {
die('pg_fetch_row failed');
}

pg_free_result($result);

if (pg_get_result($db) !== false) {
die('pg_get_result failed');
}

if (($result = pg_get_result($db)) !== false) {
if (pg_result_status($result) !== PGSQL_PIPELINE_SYNC) {
die('pg_result_status failed');
}
}

if (!pg_exit_pipeline_mode($db)) {
die('pg_exit_pipeline_mode failed');
}

echo "OK";

pg_close($db);

?>
--EXPECT--
OK

0 comments on commit 6a9061e

Please sign in to comment.