Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

Change the sl_log_* selection query from using a complicated where cl…

…ause

into a series of UNION ALL queries with slightly simpler qualifications.
The PostgreSQL optimizer failed on the old query to apply a lower bound
on the index scankey, causing the query to always select from the beginning
of the table. With a large backlog this caused significant time to be lost,
visible as "time to first row". The new query delivers the first log row
usually within milliseconds regardless of the size of sl_log_*.

New feature explain_interval.
This new slon.conf variable defines an interval in seconds at which the
remote worker will output the current log selection query together with
it's EXPLAIN query plan.
  • Loading branch information...
commit cfdf5c511e390775fdeb43dc37b07dffc61a3b20 1 parent 6f60815
Jan Wieck authored
View
14 doc/adminguide/slonconf.sgml
@@ -137,6 +137,20 @@
<varlistentry id="slon-config-logging-log-timestamp-format" xreflabel="slon_conf_log_timestamp_format">
<term><varname>log_timestamp_format</varname> (<type>string</type>)</term>
<indexterm>
+ <primary><varname>explain_interval</varname> configuration parameter</primary>
+ </indexterm>
+ <listitem>
+ <para>An interval in seconds at which the remote worker thread will
+ output the query, used to select log rows from the data provider, together
+ with it's EXPLAIN query plan. The default value of 0 turns this feature off.
+ The allowed range is 0 (off) to 86400 (once per day).
+ </para>
+ </listitem>
+ </varlistentry>
+
+ <varlistentry id="slon-config-logging-explain-interval" xreflabel="slon_conf_explain_interval">
+ <term><varname>explain_interval</varname> (<type>integer</type>)</term>
+ <indexterm>
<primary><varname>log_timestamp_format</varname> configuration parameter</primary>
</indexterm>
<listitem>
View
8 share/slon.conf-sample
@@ -68,6 +68,12 @@
# Default is '%Y-%m-%d %H:%M:%S %Z'
#log_timestamp_format='%Y-%m-%d %H:%M:%S %Z'
+# An interval in seconds at which the remote worker will output the
+# query used to select log rows together with it's query plan. The
+# default value of 0 turns this feature off.
+# Range: [0-86400], default: 0
+#explain_interval=0
+
# Where to write the pid file. Default: no pid file
#pid_file='/path/to/your/pidfile'
@@ -108,4 +114,4 @@
# lag_interval="8 minutes"
# Directory in which to stow sync archive files
-# archive_dir="/tmp/somewhere"
+# archive_dir="/tmp/somewhere"
View
13 src/slon/confoptions.c
@@ -723,6 +723,19 @@ static struct config_int ConfigureNamesInt[] =
30000 /* max val */
},
+ {
+ {
+ (const char *) "explain_interval", /* conf name */
+ gettext_noop("Interval in seconds in which the remote worker will report an explain of the log selection query"), /* short desc */
+ gettext_noop("Interval in seconds in which the remote worker will report an explain of the log selection query"), /* long desc */
+ SLON_C_INT /* config type */
+ },
+ &explain_interval, /* var name */
+ 0, /* default val (never) */
+ 0, /* min val */
+ 86400 /* max val (1 day) */
+ },
+
{{0}}
};
View
584 src/slon/remote_worker.c
@@ -172,7 +172,8 @@ struct ProviderInfo_s
pthread_mutex_t helper_lock;
pthread_cond_t helper_cond;
WorkGroupStatus helper_status;
- SlonDString helper_qualification;
+ SlonDString helper_query;
+ int log_status;
ProviderSet *set_head;
ProviderSet *set_tail;
@@ -242,6 +243,9 @@ static pthread_mutex_t node_confirm_lock = PTHREAD_MUTEX_INITIALIZER;
int sync_group_maxsize;
int sync_max_rowsize;
int sync_max_largemem;
+int explain_interval;
+time_t explain_lastsec;
+int explain_thistime;
static int last_sync_group_size;
static int next_sync_group_size;
@@ -1693,7 +1697,7 @@ adjust_provider_info(SlonNode *node, WorkerGroupData *wd, int cleanup)
pthread_mutex_init(&(provider->helper_lock), NULL);
pthread_mutex_lock(&(provider->helper_lock));
pthread_cond_init(&(provider->helper_cond), NULL);
- dstring_init(&(provider->helper_qualification));
+ dstring_init(&(provider->helper_query));
provider->helper_status = SLON_WG_IDLE;
if (pthread_create(&(provider->helper_thread), NULL,
sync_helper, (void *) provider) != 0)
@@ -1832,7 +1836,7 @@ adjust_provider_info(SlonNode *node, WorkerGroupData *wd, int cleanup)
free(provider->pa_conninfo);
provider->pa_conninfo = NULL;
DLLIST_REMOVE(wd->provider_head, wd->provider_tail, provider);
- dstring_free(&(provider->helper_qualification));
+ dstring_free(&(provider->helper_query));
#ifdef SLON_MEMDEBUG
memset(provider, 55, sizeof(ProviderInfo));
#endif
@@ -3675,10 +3679,9 @@ sync_event(SlonNode *node, SlonConn *local_conn,
struct timeval tv_start;
struct timeval tv_now;
- SlonDString new_qual;
SlonDString query;
SlonDString lsquery;
- SlonDString *provider_qual;
+ SlonDString *provider_query;
SlonDString actionseq_subquery;
int actionlist_len;
@@ -3830,13 +3833,6 @@ sync_event(SlonNode *node, SlonConn *local_conn,
}
}
- dstring_init(&new_qual);
-
- (void) slon_mkquery(&new_qual,
- "(log_txid < '%s' and "
- "\"pg_catalog\".txid_visible_in_snapshot(log_txid, '%s'))",
- event->ev_maxtxid_c, event->ev_snapshot_c);
-
min_ssy_seqno = -1;
for (provider = wd->provider_head; provider; provider = provider->next)
{
@@ -3846,13 +3842,56 @@ sync_event(SlonNode *node, SlonConn *local_conn,
int ntuples2;
int tupno2;
int ntables_total = 0;
- int added_or_to_provider = 0;
+ int rc;
+ int need_union;
+
+ need_union = 0;
+ provider_query = &(provider->helper_query);
+ dstring_reset(provider_query);
+ (void) slon_mkquery(provider_query,
+ "declare LOG cursor for ");
+
+ /*
+ * Get the current sl_log_status value for this provider
+ */
+ (void) slon_mkquery(&query, "select last_value from %s.sl_log_status",
+ rtcfg_namespace);
- provider_qual = &(provider->helper_qualification);
- dstring_reset(provider_qual);
- (void) slon_mkquery(provider_qual,
- "where log_origin = %d and ( ",
- node->no_id);
+ start_monitored_event(&pm);
+ res1 = PQexec(provider->conn->dbconn, dstring_data(&query));
+ monitor_provider_query(&pm);
+
+ rc = PQresultStatus(res1);
+ if (rc != PGRES_TUPLES_OK)
+ {
+ slon_log(SLON_ERROR,
+ "remoteWorkerThread_%d: \"%s\" %s %s\n",
+ node->no_id, dstring_data(&query),
+ PQresStatus(rc),
+ PQresultErrorMessage(res1));
+ PQclear(res1);
+ dstring_free(&query);
+ dstring_free(&lsquery);
+ archive_terminate(node);
+ return 60;
+ }
+ if (PQntuples(res1) != 1)
+ {
+ slon_log(SLON_ERROR,
+ "remoteWorkerThread_%d: \"%s\" %s returned %d tuples\n",
+ node->no_id, dstring_data(&query),
+ PQresStatus(rc), PQntuples(res1));
+ PQclear(res1);
+ dstring_free(&query);
+ dstring_free(&lsquery);
+ archive_terminate(node);
+ return 60;
+ }
+ provider->log_status = strtol(PQgetvalue(res1, 0, 0), NULL, 10);
+ PQclear(res1);
+ slon_log(SLON_DEBUG2,
+ "remoteWorkerThread_%d_%d: current remote log_status = %d\n",
+ node->no_id, provider->no_id, provider->log_status);
/*
* Select all sets we receive from this provider and which are not
@@ -3885,7 +3924,6 @@ sync_event(SlonNode *node, SlonConn *local_conn,
node->no_id, dstring_data(&query),
PQresultErrorMessage(res1));
PQclear(res1);
- dstring_free(&new_qual);
dstring_free(&query);
dstring_free(&lsquery);
archive_terminate(node);
@@ -3899,7 +3937,6 @@ sync_event(SlonNode *node, SlonConn *local_conn,
if (ntuples1 == 0)
{
PQclear(res1);
- slon_appendquery(provider_qual, " false ) ");
continue;
}
num_sets += ntuples1;
@@ -3945,7 +3982,6 @@ sync_event(SlonNode *node, SlonConn *local_conn,
PQresultErrorMessage(res2));
PQclear(res2);
PQclear(res1);
- dstring_free(&new_qual);
dstring_free(&query);
dstring_free(&lsquery);
archive_terminate(node);
@@ -3961,38 +3997,287 @@ sync_event(SlonNode *node, SlonConn *local_conn,
{
PQclear(res2);
- if (!added_or_to_provider)
- {
- slon_appendquery(provider_qual, " ( false ) ");
- added_or_to_provider = 1;
- }
-
continue;
}
ntables_total += ntuples2;
/*
- * ... and build up a query qualification that is
- *
- * and ( (log_tableid in (<tables_in_set>) and
- * (<snapshot_qual_of_new_sync>) and (<snapshot_qual_of_setsync>)
- * ) OR ( <next_set_from_this_provider> ) )
- *
- * If we were using AND's there then no rows will ever end up
- * being selected when you have multiple sets.
+ * ... and build up a the provider query
*/
- if (added_or_to_provider)
+ switch (provider->log_status)
{
- slon_appendquery(provider_qual, "or (\n log_tableid in (");
+ case 0:
+ case 2:
+ case 3: if (need_union)
+ {
+ slon_appendquery(provider_query, " union all ");
+ }
+ need_union = 1;
+
+ /*
+ * First for the big chunk that does the index
+ * scan with upper and lower bounds:
+ *
+ * select ... from sl_log_1
+ * where log_origin = X
+ * and log_tableid in (<this set's tables>)
+ */
+ slon_appendquery(provider_query,
+ "select log_origin, log_txid, log_tableid, "
+ "log_actionseq, log_cmdtype, "
+ "octet_length(log_cmddata), "
+ "case when octet_length(log_cmddata) <= %d "
+ "then log_cmddata "
+ "else null end "
+ "from %s.sl_log_1 "
+ "where log_origin = %d "
+ "and log_tableid in (",
+ sync_max_rowsize,
+ rtcfg_namespace,
+ node->no_id);
+ for (tupno2 = 0; tupno2 < ntuples2; tupno2++)
+ {
+ if (tupno2 > 0)
+ dstring_addchar(provider_query, ',');
+ dstring_append(provider_query,
+ PQgetvalue(res2, tupno2, 0));
+ }
+ dstring_append(provider_query, ") ");
+
+ /*
+ * and log_txid >= '<maxxid_last_snapshot>'
+ * and log_txid < '<maxxid_this_snapshot>'
+ * and txit_visible_in_snapshot(log_txid, '<this_snapshot>')
+ */
+ slon_appendquery(provider_query,
+ "and log_txid >= '%s' "
+ "and log_txid < '%s' "
+ "and \"pg_catalog\".txid_visible_in_snapshot(log_txid, '%s') ",
+ ssy_maxxid,
+ event->ev_maxtxid_c,
+ event->ev_snapshot_c);
+
+ /*
+ * and (<actionseq_qual_on_first_sync>)
+ */
+ actionlist_len = strlen(ssy_action_list);
+ slon_log(SLON_DEBUG2, "remoteWorkerThread_%d_%d: "
+ "ssy_action_list length: %d\n",
+ node->no_id, provider->no_id,
+ actionlist_len);
+ slon_log(SLON_DEBUG4, "remoteWorkerThread_%d_%d: "
+ "ssy_action_list value: %s\n",
+ node->no_id, provider->no_id,
+ ssy_action_list);
+ if (actionlist_len > 0)
+ {
+ dstring_init(&actionseq_subquery);
+ compress_actionseq(ssy_action_list, &actionseq_subquery);
+ slon_appendquery(provider_query,
+ " and (%s)",
+ dstring_data(&actionseq_subquery));
+ dstring_free(&actionseq_subquery);
+ }
+
+ /*
+ * Now do it all over again to get the log rows
+ * from in-progress transactions at snapshot one
+ * that have committed by the time of snapshot two.
+ * again, we do:
+ *
+ * select ... from sl_log_1
+ * where log_origin = X
+ * and log_tableid in (<this set's tables>)
+ */
+ slon_appendquery(provider_query,
+ "union all "
+ "select log_origin, log_txid, log_tableid, "
+ "log_actionseq, log_cmdtype, "
+ "octet_length(log_cmddata), "
+ "case when octet_length(log_cmddata) <= %d "
+ "then log_cmddata "
+ "else null end "
+ "from %s.sl_log_1 "
+ "where log_origin = %d "
+ "and log_tableid in (",
+ sync_max_rowsize,
+ rtcfg_namespace,
+ node->no_id);
+ for (tupno2 = 0; tupno2 < ntuples2; tupno2++)
+ {
+ if (tupno2 > 0)
+ dstring_addchar(provider_query, ',');
+ dstring_append(provider_query,
+ PQgetvalue(res2, tupno2, 0));
+ }
+ dstring_append(provider_query, ") ");
+
+ /*
+ * and log_txid in (select
+ * txid_snapshot_xip('<last_snapshot>'))
+ * and txit_visible_in_snapshot(log_txid, '<this_snapshot>')
+ */
+ slon_appendquery(provider_query,
+ "and log_txid in (select * from "
+ "\"pg_catalog\".txid_snapshot_xip('%s') "
+ "except "
+ "select * from "
+ "\"pg_catalog\".txid_snapshot_xip('%s') )",
+ ssy_snapshot,
+ event->ev_snapshot_c);
+
+ /*
+ * and (<actionseq_qual_on_first_sync>)
+ */
+ actionlist_len = strlen(ssy_action_list);
+ if (actionlist_len > 0)
+ {
+ dstring_init(&actionseq_subquery);
+ compress_actionseq(ssy_action_list, &actionseq_subquery);
+ slon_appendquery(provider_query,
+ " and (%s)",
+ dstring_data(&actionseq_subquery));
+ dstring_free(&actionseq_subquery);
+ }
+ break;
}
- else
+
+ switch (provider->log_status)
{
- slon_appendquery(provider_qual, " (\n log_tableid in (");
- added_or_to_provider = 1;
+ case 1:
+ case 2:
+ case 3: if (need_union)
+ {
+ slon_appendquery(provider_query, " union all ");
+ }
+ need_union = 1;
+
+ /*
+ * First for the big chunk that does the index
+ * scan with upper and lower bounds:
+ *
+ * select ... from sl_log_1
+ * where log_origin = X
+ * and log_tableid in (<this set's tables>)
+ */
+ slon_appendquery(provider_query,
+ "select log_origin, log_txid, log_tableid, "
+ "log_actionseq, log_cmdtype, "
+ "octet_length(log_cmddata), "
+ "case when octet_length(log_cmddata) <= %d "
+ "then log_cmddata "
+ "else null end "
+ "from %s.sl_log_2 "
+ "where log_origin = %d "
+ "and log_tableid in (",
+ sync_max_rowsize,
+ rtcfg_namespace,
+ node->no_id);
+ for (tupno2 = 0; tupno2 < ntuples2; tupno2++)
+ {
+ if (tupno2 > 0)
+ dstring_addchar(provider_query, ',');
+ dstring_append(provider_query,
+ PQgetvalue(res2, tupno2, 0));
+ }
+ dstring_append(provider_query, ") ");
+
+ /*
+ * and log_txid >= '<maxxid_last_snapshot>'
+ * and log_txid < '<maxxid_this_snapshot>'
+ * and txit_visible_in_snapshot(log_txid, '<this_snapshot>')
+ */
+ slon_appendquery(provider_query,
+ "and log_txid >= '%s' "
+ "and log_txid < '%s' "
+ "and \"pg_catalog\".txid_visible_in_snapshot(log_txid, '%s') ",
+ ssy_maxxid,
+ event->ev_maxtxid_c,
+ event->ev_snapshot_c);
+
+ /*
+ * and (<actionseq_qual_on_first_sync>)
+ */
+ actionlist_len = strlen(ssy_action_list);
+ slon_log(SLON_DEBUG2, " ssy_action_list length: %d\n",
+ actionlist_len);
+ slon_log(SLON_DEBUG4, " ssy_action_list value: %s\n",
+ ssy_action_list);
+ if (actionlist_len > 0)
+ {
+ dstring_init(&actionseq_subquery);
+ compress_actionseq(ssy_action_list, &actionseq_subquery);
+ slon_appendquery(provider_query,
+ " and (%s)",
+ dstring_data(&actionseq_subquery));
+ dstring_free(&actionseq_subquery);
+ }
+
+ /*
+ * Now do it all over again to get the log rows
+ * from in-progress transactions at snapshot one
+ * that have committed by the time of snapshot two.
+ * again, we do:
+ *
+ * select ... from sl_log_1
+ * where log_origin = X
+ * and log_tableid in (<this set's tables>)
+ */
+ slon_appendquery(provider_query,
+ "union all "
+ "select log_origin, log_txid, log_tableid, "
+ "log_actionseq, log_cmdtype, "
+ "octet_length(log_cmddata), "
+ "case when octet_length(log_cmddata) <= %d "
+ "then log_cmddata "
+ "else null end "
+ "from %s.sl_log_2 "
+ "where log_origin = %d "
+ "and log_tableid in (",
+ sync_max_rowsize,
+ rtcfg_namespace,
+ node->no_id);
+ for (tupno2 = 0; tupno2 < ntuples2; tupno2++)
+ {
+ if (tupno2 > 0)
+ dstring_addchar(provider_query, ',');
+ dstring_append(provider_query,
+ PQgetvalue(res2, tupno2, 0));
+ }
+ dstring_append(provider_query, ") ");
+
+ /*
+ * and log_txid in (select
+ * txid_snapshot_xip('<last_snapshot>'))
+ * and txit_visible_in_snapshot(log_txid, '<this_snapshot>')
+ */
+ slon_appendquery(provider_query,
+ "and log_txid in (select * from "
+ "\"pg_catalog\".txid_snapshot_xip('%s') "
+ "except "
+ "select * from "
+ "\"pg_catalog\".txid_snapshot_xip('%s') )",
+ ssy_snapshot,
+ event->ev_snapshot_c);
+
+ /*
+ * and (<actionseq_qual_on_first_sync>)
+ */
+ actionlist_len = strlen(ssy_action_list);
+ if (actionlist_len > 0)
+ {
+ dstring_init(&actionseq_subquery);
+ compress_actionseq(ssy_action_list, &actionseq_subquery);
+ slon_appendquery(provider_query,
+ " and (%s)",
+ dstring_data(&actionseq_subquery));
+ dstring_free(&actionseq_subquery);
+ }
+ break;
}
- /* the <tables_in_set> tab_id list */
+ /* Remember info about the tables in the set */
for (tupno2 = 0; tupno2 < ntuples2; tupno2++)
{
int tab_id = strtol(PQgetvalue(res2, tupno2, 0), NULL, 10);
@@ -4029,39 +4314,6 @@ sync_event(SlonNode *node, SlonConn *local_conn,
break;
}
}
-
- if (tupno2 > 0)
- dstring_addchar(provider_qual, ',');
- dstring_append(provider_qual, PQgetvalue(res2, tupno2, 0));
- }
-
- /* add the <snapshot_qual_of_new_sync> */
- slon_appendquery(provider_qual,
- ")\n and %s\n and ",
- dstring_data(&new_qual));
-
- /* add the <snapshot_qual_of_setsync> */
- slon_appendquery(provider_qual,
- "(log_txid >= '%s' or "
- "log_txid IN (select * from \"pg_catalog\".txid_snapshot_xip('%s')))",
- ssy_maxxid, ssy_snapshot);
- actionlist_len = strlen(ssy_action_list);
- slon_log(SLON_DEBUG4, " ssy_action_list value: %s\n",
- ssy_action_list);
- slon_log(SLON_DEBUG2, " ssy_action_list length: %d\n",
- actionlist_len);
- if (actionlist_len == 0)
- {
- slon_appendquery(provider_qual, "\n) ");
- }
- else
- {
- dstring_init(&actionseq_subquery);
- compress_actionseq(ssy_action_list, &actionseq_subquery);
- slon_appendquery(provider_qual,
- " and (%s)\n) ",
- dstring_data(&actionseq_subquery));
- dstring_free(&actionseq_subquery);
}
PQclear(res2);
@@ -4069,22 +4321,33 @@ sync_event(SlonNode *node, SlonConn *local_conn,
PQclear(res1);
/*
- * We didn't add anything good in the provider clause. That shouldn't
- * be!
+ * Finally add the order by clause.
*/
- if (added_or_to_provider)
- {
- /* close out our OR block */
- slon_appendquery(provider_qual, ")");
- }
- else
+ dstring_append(provider_query, " order by log_actionseq");
+ dstring_terminate(provider_query);
+
+ /*
+ * Check that we at select something from the provider.
+ */
+ if (!need_union)
{
- slon_log(SLON_DEBUG2, "remoteWorkerThread_%d: Didn't add or to provider\n", node->no_id);
+ /*
+ * This should never happen.
+ */
+ slon_log(SLON_ERROR, "remoteWorkerThread_%d: "
+ "Nothing selected from provider.\n", node->no_id);
+ dstring_free(&query);
+ dstring_free(&lsquery);
+ if (archive_dir)
+ {
+ rc = archive_close(node);
+ if (rc < 0)
+ slon_retry();
+ }
+ return 60;
}
}
- dstring_free(&new_qual);
-
/*
* If we have found no sets needing sync at all, why bother the helpers?
*/
@@ -4146,6 +4409,27 @@ sync_event(SlonNode *node, SlonConn *local_conn,
PQclear(res1);
/*
+ * If we have a explain_interval, run the query through explain
+ * and output the query as well as the resulting query plan.
+ */
+ if (explain_interval > 0)
+ {
+ struct timeval current_time;
+
+ gettimeofday(&current_time, NULL);
+
+ if (explain_lastsec + explain_interval <= current_time.tv_sec)
+ {
+ explain_thistime = true;
+ explain_lastsec = current_time.tv_sec;
+ }
+ else
+ {
+ explain_thistime = false;
+ }
+ }
+
+ /*
* Time to get the helpers busy.
*/
wd->workgroup_status = SLON_WG_BUSY;
@@ -4210,10 +4494,10 @@ sync_event(SlonNode *node, SlonConn *local_conn,
if (PQresultStatus(res1) != PGRES_COMMAND_OK)
{
slon_log(SLON_ERROR, "remoteWorkerThread_%d: "
- "\"%s\" %s - qualification was: %s\n",
+ "\"%s\" %s - query was: %s\n",
node->no_id, dstring_data(&(wgline->data)),
PQresultErrorMessage(res1),
- dstring_data(&(wgline->provider->helper_qualification)));
+ dstring_data(&(wgline->provider->helper_query)));
num_errors++;
}
PQclear(res1);
@@ -4231,10 +4515,10 @@ sync_event(SlonNode *node, SlonConn *local_conn,
if (PQresultStatus(res1) != PGRES_COMMAND_OK)
{
slon_log(SLON_ERROR, "remoteWorkerThread_%d: "
- "\"%s\" %s - qualification was: %s\n",
+ "\"%s\" %s - query was: %s\n",
node->no_id, dstring_data(&(wgline->data)),
PQresultErrorMessage(res1),
- dstring_data(&(wgline->provider->helper_qualification)));
+ dstring_data(&(wgline->provider->helper_query)));
num_errors++;
}
#ifdef SLON_CHECK_CMDTUPLES
@@ -4245,10 +4529,10 @@ sync_event(SlonNode *node, SlonConn *local_conn,
slon_log(SLON_ERROR, "remoteWorkerThread_%d: "
"replication query did not affect "
"one data row (cmdTuples = %s) - "
- "query was: %s - qualification was: %s\n",
+ "query was: %s - query was: %s\n",
node->no_id, PQcmdTuples(res1),
dstring_data(&(wgline->data)),
- dstring_data(&(wgline->provider->helper_qualification)));
+ dstring_data(&(wgline->provider->helper_query)));
num_errors++;
}
else
@@ -4678,87 +4962,67 @@ sync_helper(void *cdata)
node->no_id, provider->no_id, log_status);
/*
- * Open a cursor that reads the log data.
- *
- * Depending on sl_log_status select from sl_log_1, sl_log_2 or
- * both.
+ * See if we have to run the query through EXPLAIN first
*/
- switch (log_status)
+ if (explain_thistime)
{
- case 0:
- (void) slon_mkquery(&query,
- "declare LOG cursor for select "
- " log_origin, log_txid, log_tableid, "
- " log_actionseq, log_cmdtype, "
- " octet_length(log_cmddata), "
- " case when octet_length(log_cmddata) <= %d "
- " then log_cmddata "
- " else null end "
- "from %s.sl_log_1 %s order by log_actionseq; ",
- sync_max_rowsize,
- rtcfg_namespace,
- dstring_data(&(provider->helper_qualification)));
- break;
+ SlonDString explain_query;
- case 1:
- (void) slon_mkquery(&query,
- "declare LOG cursor for select "
- " log_origin, log_txid, log_tableid, "
- " log_actionseq, log_cmdtype, "
- " octet_length(log_cmddata), "
- " case when octet_length(log_cmddata) <= %d "
- " then log_cmddata "
- " else null end "
- "from %s.sl_log_2 %s order by log_actionseq; ",
- sync_max_rowsize,
- rtcfg_namespace,
- dstring_data(&(provider->helper_qualification)));
- break;
-
- case 2:
- case 3:
- (void) slon_mkquery(&query,
- "declare LOG cursor for select * from ("
- " select log_origin, log_txid, log_tableid, "
- " log_actionseq, log_cmdtype, "
- " octet_length(log_cmddata), "
- " case when octet_length(log_cmddata) <= %d "
- " then log_cmddata "
- " else null end "
- " from %s.sl_log_1 %s "
- " union all "
- " select log_origin, log_txid, log_tableid, "
- " log_actionseq, log_cmdtype, "
- " octet_length(log_cmddata), "
- " case when octet_length(log_cmddata) <= %d "
- " then log_cmddata "
- " else null end "
- " from %s.sl_log_2 %s) as log_union "
- "order by log_actionseq; ",
- sync_max_rowsize,
- rtcfg_namespace,
- dstring_data(&(provider->helper_qualification)),
- sync_max_rowsize,
- rtcfg_namespace,
- dstring_data(&(provider->helper_qualification)));
- break;
+ /*
+ * Let Postgres EXPLAIN the query plan for the current
+ * log selection query
+ */
+ dstring_init(&explain_query);
+ slon_mkquery(&explain_query, "explain %s",
+ dstring_data(&(provider->helper_query)));
- default:
- slon_log(SLON_ERROR,
- "remoteWorkerThread_%d: unexpected log_status %d\n",
- node->no_id, log_status);
+ res = PQexec(dbconn, dstring_data(&explain_query));
+ if (PQresultStatus(res) != PGRES_TUPLES_OK)
+ {
+ slon_log(SLON_ERROR, "remoteHelperThread_%d_%d: \"%s\" %s",
+ node->no_id, provider->no_id,
+ dstring_data(&explain_query),
+ PQresultErrorMessage(res));
+ PQclear(res);
+ dstring_free(&explain_query);
errors++;
break;
+ }
+
+ slon_log(SLON_INFO,
+ "remoteWorkerThread_%d_%d: "
+ "Log selection query: %s\n",
+ node->no_id, provider->no_id,
+ dstring_data(&explain_query));
+ slon_log(SLON_INFO,
+ "remoteWorkerThread_%d_%d: Query Plan:\n",
+ node->no_id, provider->no_id);
+
+ ntuples = PQntuples(res);
+ for (tupno = 0; tupno < ntuples; tupno++)
+ {
+ slon_log(SLON_INFO,
+ "remoteWorkerThread_%d_%d: PLAN %s\n",
+ node->no_id, provider->no_id,
+ PQgetvalue(res, tupno, 0));
+ }
+ slon_log(SLON_INFO,
+ "remoteWorkerThread_%d_%d: PLAN_END\n",
+ node->no_id, provider->no_id);
+
+ PQclear(res);
+ dstring_free(&explain_query);
}
- if (errors)
- break;
gettimeofday(&tv_start, NULL);
first_fetch = true;
res = NULL;
+ /*
+ * Open a cursor that reads the log data.
+ */
start_monitored_event(&pm);
- if (query_execute(node, dbconn, &query) < 0)
+ if (query_execute(node, dbconn, &(provider->helper_query)) < 0)
{
errors++;
break;
@@ -5207,7 +5471,7 @@ sync_helper(void *cdata)
node->no_id, provider->no_id);
pthread_mutex_lock(&(provider->helper_lock));
provider->helper_status = SLON_WG_DONE;
- dstring_reset(&provider->helper_qualification);
+ dstring_reset(&provider->helper_query);
pthread_mutex_unlock(&(provider->helper_lock));
slon_log(SLON_DEBUG4,
View
1  src/slon/slon.h
@@ -525,6 +525,7 @@ extern void *remoteListenThread_main(void *cdata);
extern int sync_group_maxsize;
extern int sync_max_rowsize;
extern int sync_max_largemem;
+extern int explain_interval;
/* ----------
Please sign in to comment.
Something went wrong with that request. Please try again.