diff --git a/Makefile b/Makefile index e017e60..7064e1d 100644 --- a/Makefile +++ b/Makefile @@ -42,3 +42,9 @@ submake-isolation: $(MAKE) -C $(top_builddir)/src/test/isolation all temp-install: EXTRA_INSTALL=contrib/pg_query_state + +submake-progress_bar: + $(MAKE) -C $(top_builddir)/contrib/pg_query_state + +check_progress_bar: submake-progress_bar temp-install + $(prove_check) diff --git a/pg_query_state.c b/pg_query_state.c index 21a7846..fa1a9bb 100644 --- a/pg_query_state.c +++ b/pg_query_state.c @@ -1360,6 +1360,12 @@ progress_bar(PG_FUNCTION_ARGS) old_progress = 0; progress = 0; + if (SRF_IS_FIRSTCALL()) + { + pg_atomic_write_u32(&counterpart_userid->n_peers, 1); + params->reqid = ++reqid; + } + bg_worker_procs = GetRemoteBackendWorkers(proc); msgs = GetRemoteBackendQueryStates(proc, bg_worker_procs, diff --git a/t/test_bad_progress_bar.pl b/t/test_bad_progress_bar.pl new file mode 100644 index 0000000..39d5110 --- /dev/null +++ b/t/test_bad_progress_bar.pl @@ -0,0 +1,67 @@ +# pg_query_state/t/test_bad_progress_bar.pl +# +# Check uncorrect launches of functions progress_bar(pid) +# and progress_bar_visual(pid, delay) + +use strict; +use warnings; +use PostgresNode; +use TestLib; +use Test::More tests => 2; + +# List of checks for bad cases: +# 1) appealing to a bad pid +# ------- requires DBI and DBD::Pg modules ------- +# 2) extracting the state of the process itself + +# Test whether we have both DBI and DBD::pg +my $dbdpg_rc = eval +{ + require DBI; + require DBD::Pg; + DBD::Pg->import(':async'); + 1; +}; + +# start backend for function progress_bar +my $node = PostgresNode->get_new_node('master'); +$node->init; +$node->start; +$node->append_conf('postgresql.conf', "shared_preload_libraries = 'pg_query_state'"); +$node->restart; +$node->psql('postgres', 'CREATE EXTENSION pg_query_state;'); + +subtest 'Extracting from bad pid' => sub { + my $stderr; + $node->psql('postgres', 'SELECT * from progress_bar(-1)', stderr => \$stderr); + is ($stderr, 'psql::1: ERROR: backend with pid=-1 not found', "appealing to a bad pid for progress_bar"); + $node->psql('postgres', 'SELECT * from progress_bar(-1)_visual', stderr => \$stderr); + is ($stderr, 'psql::1: ERROR: backend with pid=-1 not found', "appealing to a bad pid for progress_bar_visual"); +}; + +if ( not $dbdpg_rc) { + diag('DBI and DBD::Pg are not available, skip 2/3 tests'); +} + +SKIP: { + skip "DBI and DBD::Pg are not available", 2 if not $dbdpg_rc; + + my $dbh_status = DBI->connect('DBI:Pg:' . $node->connstr($_)); + if ( !defined $dbh_status ) + { + die "Cannot connect to database for dbh with progress_bar\n"; + } + + my $pid_status = $dbh_status->{pg_pid}; + + subtest 'Extracting your own status' => sub { + $dbh_status->do('SELECT * from progress_bar(' . $pid_status . ')'); + is($dbh_status->errstr, 'ERROR: attempt to extract state of current process', "extracting the state of the process itself for progress_bar"); + $dbh_status->do('SELECT * from progress_bar_visual(' . $pid_status . ')'); + is($dbh_status->errstr, 'ERROR: attempt to extract state of current process', "extracting the state of the process itself for progress_bar_visual"); + }; + + $dbh_status->disconnect; +} + +$node->stop('fast'); diff --git a/tests/common.py b/tests/common.py index ac24e76..a852b82 100644 --- a/tests/common.py +++ b/tests/common.py @@ -161,6 +161,54 @@ def onetime_query_state(config, async_conn, query, args={}, num_workers=0): set_guc(async_conn, 'enable_mergejoin', 'on') return result, notices +def progress_bar(config, pid): + conn = psycopg2.connect(**config) + curs = conn.cursor() + + curs.callproc('progress_bar', (pid,)) + result = curs.fetchall() + notices = conn.notices[:] + conn.close() + + return result, notices + +def onetime_progress_bar(config, async_conn, query, args={}, num_workers=0): + """ + Get intermediate state of 'query' on connection 'async_conn' after number of 'steps' + of node executions from start of query + """ + + acurs = async_conn.cursor() + + set_guc(async_conn, 'enable_mergejoin', 'off') + set_guc(async_conn, 'max_parallel_workers_per_gather', num_workers) + acurs.execute(query) + + # extract progress of current query + MAX_PG_QS_RETRIES = 10 + DELAY_BETWEEN_RETRIES = 0.1 + pg_qs_args = { + 'config': config, + 'pid': async_conn.get_backend_pid(), + } + for k, v in args.items(): + pg_qs_args[k] = v + n_retries = 0 + while True: + result, notices = progress_bar(**pg_qs_args) + n_retries += 1 + if len(result) > 0: + break + if n_retries >= MAX_PG_QS_RETRIES: + # pg_query_state callings don't return any result, more likely run + # query has completed + break + time.sleep(DELAY_BETWEEN_RETRIES) + wait(async_conn) + + set_guc(async_conn, 'enable_mergejoin', 'on') + return result, notices + def set_guc(async_conn, param, value): acurs = async_conn.cursor() acurs.execute('set %s to %s' % (param, value)) diff --git a/tests/pg_qs_test_runner.py b/tests/pg_qs_test_runner.py index a6e02e9..d80844b 100644 --- a/tests/pg_qs_test_runner.py +++ b/tests/pg_qs_test_runner.py @@ -68,6 +68,7 @@ class TeardownException(Exception): pass test_formats, test_timing_buffers_conflicts, test_insert_on_conflict, + test_progress_bar, ] def setup(con): diff --git a/tests/test_cases.py b/tests/test_cases.py index 1750bb1..955b2b1 100644 --- a/tests/test_cases.py +++ b/tests/test_cases.py @@ -386,3 +386,18 @@ def test_timing_buffers_conflicts(config): and 'WARNING: buffers statistics disabled\n' in notices common.n_close((acon,)) + +def test_progress_bar(config): + """test progress_bar of simple query""" + + acon, = common.n_async_connect(config) + query = 'select * from foo join bar on foo.c1=bar.c1' + + qs, notices = common.onetime_progress_bar(config, acon, query) + assert qs[0][0] >= 0 and qs[0][0] < 1 + first_qs = qs[0][0] + + qs, _ = common.onetime_progress_bar(config, acon, query) + assert qs[0][0] >= first_qs and qs[0][0] < 1 + + common.n_close((acon,))