Skip to content

Commit

Permalink
Merge branch 'kotsachin-master'
Browse files Browse the repository at this point in the history
  • Loading branch information
schmiddy committed May 25, 2015
2 parents 13e3591 + 8fc8b65 commit d3a99db
Show file tree
Hide file tree
Showing 4 changed files with 114 additions and 31 deletions.
63 changes: 57 additions & 6 deletions bin/pg_repack.c
Expand Up @@ -204,6 +204,7 @@ static void repack_one_table(repack_table *table, const char *order_by);
static bool repack_table_indexes(PGresult *index_details);
static bool repack_all_indexes(char *errbuf, size_t errsize);
static void repack_cleanup(bool fatal, const repack_table *table);
static void repack_cleanup_callback(bool fatal, void *userdata);
static bool rebuild_indexes(const repack_table *table);

static char *getstr(PGresult *res, int row, int col);
Expand Down Expand Up @@ -235,13 +236,20 @@ static bool only_indexes = false;
static int wait_timeout = 60; /* in seconds */
static int jobs = 0; /* number of concurrent worker conns. */
static bool dryrun = false;
static unsigned int temp_obj_num = 0; /* temporary objects counter */

/* buffer should have at least 11 bytes */
static char *
utoa(unsigned int value, char *buffer)
{
sprintf(buffer, "%u", value);
return buffer;
/* XXX: originally, we would just return buffer here without
* the pgut_strdup(). But repack_cleanup_callback() seems to
* depend on getting back a freshly strdup'd copy of buffer,
* not sure why. So now we are leaking a tiny bit of memory
* with each utoa() call.
*/
return pgut_strdup(buffer);
}

static pgut_option options[] =
Expand Down Expand Up @@ -898,6 +906,13 @@ rebuild_indexes(const repack_table *table)

ret = select(max_fd + 1, &input_mask, NULL, NULL, &timeout);
#endif
/* XXX: the errno != EINTR check means we won't bail
* out on SIGINT. We should probably just remove this
* check, though it seems we also need to fix up
* the on_interrupt handling for workers' index
* builds (those PGconns don't seem to have c->cancel
* set, so we don't cancel the in-progress builds).
*/
if (ret < 0 && errno != EINTR)
elog(ERROR, "poll() failed: %d, %d", ret, errno);

Expand Down Expand Up @@ -990,7 +1005,7 @@ static void
repack_one_table(repack_table *table, const char *orderby)
{
PGresult *res = NULL;
const char *params[2];
const char *params[3];
int num;
char *vxid = NULL;
char buffer[12];
Expand Down Expand Up @@ -1041,6 +1056,9 @@ repack_one_table(repack_table *table, const char *orderby)

if (dryrun)
return;

/* push repack_cleanup_callback() on stack to clean temporary objects */
pgut_atexit_push(repack_cleanup_callback, &table->target_oid);

/*
* 1. Setup advisory lock and trigger on main table.
Expand Down Expand Up @@ -1146,8 +1164,11 @@ repack_one_table(repack_table *table, const char *orderby)
CLEARPGRES(res);

command(table->create_pktype, 0, NULL);
temp_obj_num++;
command(table->create_log, 0, NULL);
temp_obj_num++;
command(table->create_trigger, 0, NULL);
temp_obj_num++;
command(table->enable_trigger, 0, NULL);
printfStringInfo(&sql, "SELECT repack.disable_autovacuum('repack.log_%u')", table->target_oid);
command(sql.data, 0, NULL);
Expand Down Expand Up @@ -1283,6 +1304,7 @@ repack_one_table(repack_table *table, const char *orderby)
goto cleanup;

command(table->create_table, 0, NULL);
temp_obj_num++;
printfStringInfo(&sql, "SELECT repack.disable_autovacuum('repack.table_%u')", table->target_oid);
if (table->drop_columns)
command(table->drop_columns, 0, NULL);
Expand Down Expand Up @@ -1375,8 +1397,10 @@ repack_one_table(repack_table *table, const char *orderby)
elog(DEBUG2, "---- drop ----");

command("BEGIN ISOLATION LEVEL READ COMMITTED", 0, NULL);
command("SELECT repack.repack_drop($1)", 1, params);
params[1] = utoa(temp_obj_num, buffer);
command("SELECT repack.repack_drop($1, $2)", 2, params);
command("COMMIT", 0, NULL);
temp_obj_num = 0; /* reset temporary object counter after cleanup */

/*
* 7. Analyze.
Expand All @@ -1395,7 +1419,7 @@ repack_one_table(repack_table *table, const char *orderby)

/* Release advisory lock on table. */
params[0] = REPACK_LOCK_PREFIX_STR;
params[1] = buffer;
params[1] = utoa(table->target_oid, buffer);

res = pgut_execute(connection, "SELECT pg_advisory_unlock($1, CAST(-2147483648 + $2::bigint AS integer))",
2, params);
Expand Down Expand Up @@ -1675,6 +1699,31 @@ lock_exclusive(PGconn *conn, const char *relid, const char *lock_query, bool sta
return ret;
}

/* This function calls to repack_drop() to clean temporary objects on error
* in creation of temporary objects.
*/
void
repack_cleanup_callback(bool fatal, void *userdata)
{
Oid target_table = *(Oid *) userdata;
const char *params[2];
char buffer[12];

if(fatal)
{
params[0] = utoa(target_table, buffer);
params[1] = utoa(temp_obj_num, buffer);

/* testing PQstatus() of connection and conn2, as we do
* in repack_cleanup(), doesn't seem to work here,
* so just use an unconditional reconnect().
*/
reconnect(ERROR);
command("SELECT repack.repack_drop($1, $2)", 2, params);
temp_obj_num = 0; /* reset temporary object counter after cleanup */
}
}

/*
* The userdata pointing a table being re-organized. We need to cleanup temp
* objects before the program exits.
Expand All @@ -1689,7 +1738,7 @@ repack_cleanup(bool fatal, const repack_table *table)
else
{
char buffer[12];
const char *params[1];
const char *params[2];

/* Try reconnection if not available. */
if (PQstatus(connection) != CONNECTION_OK ||
Expand All @@ -1698,7 +1747,9 @@ repack_cleanup(bool fatal, const repack_table *table)

/* do cleanup */
params[0] = utoa(table->target_oid, buffer);
command("SELECT repack.repack_drop($1)", 1, params);
params[1] = utoa(temp_obj_num, buffer);
command("SELECT repack.repack_drop($1, $2)", 2, params);
temp_obj_num = 0; /* reset temporary object counter after cleanup */
}
}

Expand Down
20 changes: 16 additions & 4 deletions bin/pgut/pgut.c
Expand Up @@ -64,7 +64,7 @@ static void on_before_exec(pgutConn *conn);
static void on_after_exec(pgutConn *conn);
static void on_interrupt(void);
static void on_cleanup(void);
static void exit_or_abort(int exitcode);
static void exit_or_abort(int exitcode, int elevel);

void
pgut_init(int argc, char **argv)
Expand Down Expand Up @@ -872,7 +872,10 @@ pgut_errfinish(int dummy, ...)
edata->detail.data);

if (pgut_abort_level <= edata->elevel && edata->elevel <= PANIC)
exit_or_abort(edata->code);
{
in_cleanup = true; /* need to be set for cleaning temporary objects on error */
exit_or_abort(edata->code, edata->elevel);
}
}

#ifndef PGUT_OVERRIDE_ELOG
Expand Down Expand Up @@ -1180,7 +1183,9 @@ call_atexit_callbacks(bool fatal)
pgut_atexit_item *item;

for (item = pgut_atexit_stack; item; item = item->next)
{
item->callback(fatal, item->userdata);
}
}

static void
Expand All @@ -1193,12 +1198,19 @@ on_cleanup(void)
}

static void
exit_or_abort(int exitcode)
exit_or_abort(int exitcode, int elevel)
{
if (in_cleanup)

if (in_cleanup && FATAL > elevel)
{
/* oops, error in cleanup*/
call_atexit_callbacks(true);
exit(exitcode);
}
else if (FATAL <= elevel <= PANIC)
{
/* on FATAL or PANIC */
call_atexit_callbacks(true);
abort();
}
else
Expand Down
2 changes: 1 addition & 1 deletion lib/pg_repack.sql.in
Expand Up @@ -247,7 +247,7 @@ CREATE FUNCTION repack.repack_swap(oid) RETURNS void AS
'MODULE_PATHNAME', 'repack_swap'
LANGUAGE C VOLATILE STRICT;

CREATE FUNCTION repack.repack_drop(oid) RETURNS void AS
CREATE FUNCTION repack.repack_drop(oid, int) RETURNS void AS
'MODULE_PATHNAME', 'repack_drop'
LANGUAGE C VOLATILE STRICT;

Expand Down
60 changes: 40 additions & 20 deletions lib/repack.c
Expand Up @@ -928,6 +928,7 @@ Datum
repack_drop(PG_FUNCTION_ARGS)
{
Oid oid = PG_GETARG_OID(0);
int numobj = PG_GETARG_INT32(1);
const char *relname = get_quoted_relname(oid);
const char *nspname = get_quoted_nspname(oid);

Expand All @@ -943,14 +944,41 @@ repack_drop(PG_FUNCTION_ARGS)
/* connect to SPI manager */
repack_init();

/* drop log table: must be done before dropping the pk type,
* since the log table is dependent on the pk type. (That's
* why we check numobj > 1 here.)
*/
if (numobj > 1)
{
execute_with_format(
SPI_OK_UTILITY,
"DROP TABLE IF EXISTS repack.log_%u CASCADE",
oid);
--numobj;
}

/* drop type for pk type */
if (numobj > 0)
{
execute_with_format(
SPI_OK_UTILITY,
"DROP TYPE IF EXISTS repack.pk_%u",
oid);
--numobj;
}

/*
* drop repack trigger: We have already dropped the trigger in normal
* cases, but it can be left on error.
*/
execute_with_format(
SPI_OK_UTILITY,
"DROP TRIGGER IF EXISTS z_repack_trigger ON %s.%s CASCADE",
nspname, relname);
if (numobj > 0)
{
execute_with_format(
SPI_OK_UTILITY,
"DROP TRIGGER IF EXISTS z_repack_trigger ON %s.%s CASCADE",
nspname, relname);
--numobj;
}

#if PG_VERSION_NUM < 80400
/* delete autovacuum settings */
Expand All @@ -965,23 +993,15 @@ repack_drop(PG_FUNCTION_ARGS)
oid, oid);
#endif

/* drop log table */
execute_with_format(
SPI_OK_UTILITY,
"DROP TABLE IF EXISTS repack.log_%u CASCADE",
oid);

/* drop temp table */
execute_with_format(
SPI_OK_UTILITY,
"DROP TABLE IF EXISTS repack.table_%u CASCADE",
oid);

/* drop type for log table */
execute_with_format(
SPI_OK_UTILITY,
"DROP TYPE IF EXISTS repack.pk_%u CASCADE",
oid);
if (numobj > 0)
{
execute_with_format(
SPI_OK_UTILITY,
"DROP TABLE IF EXISTS repack.table_%u CASCADE",
oid);
--numobj;
}

SPI_finish();

Expand Down

0 comments on commit d3a99db

Please sign in to comment.