Skip to content

Commit

Permalink
Recurse CLUSTER command to chunks
Browse files Browse the repository at this point in the history
Clustering a table means reordering it according to an index.
This operation requires an exclusive lock and extensive
processing time. On large hypertables with many chunks, CLUSTER
risks blocking a lot of operations by holding locks for a long time.
This is alleviated by processing each chunk in a new transaction,
ensuring locks are only held on one chunk at a time.
  • Loading branch information
erimatnor committed Nov 20, 2017
1 parent 9c7191e commit 5d0cbc1
Show file tree
Hide file tree
Showing 6 changed files with 360 additions and 8 deletions.
56 changes: 54 additions & 2 deletions src/chunk_index.c
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
#include <optimizer/var.h>
#include <commands/defrem.h>
#include <commands/tablecmds.h>
#include <commands/cluster.h>
#include <access/xact.h>

#include "chunk_index.h"
#include "hypertable.h"
Expand Down Expand Up @@ -483,6 +485,44 @@ chunk_index_scan(int indexid, ScanKeyData scankey[], int nkeys,
#define chunk_index_scan_update(idxid, scankey, nkeys, tuple_found, data) \
chunk_index_scan(idxid, scankey, nkeys, tuple_found, data, RowExclusiveLock)

static bool
chunk_index_collect(TupleInfo *ti, void *data)
{
List **mappings = data;
FormData_chunk_index *chunk_index = (FormData_chunk_index *) GETSTRUCT(ti->tuple);
Chunk *chunk = chunk_get_by_id(chunk_index->chunk_id, 0, true);
Oid nspoid = get_rel_namespace(chunk->table_id);
ChunkIndexMapping *cim = palloc(sizeof(ChunkIndexMapping));

cim->chunkoid = chunk->table_id;
cim->indexoid = get_relname_relid(NameStr(chunk_index->index_name), nspoid);
cim->parent_indexoid = get_relname_relid(NameStr(chunk_index->hypertable_index_name), nspoid);
*mappings = lappend(*mappings, cim);

return true;
}

List *
chunk_index_get_mappings(Hypertable *ht, Oid hypertable_indexrelid)
{
ScanKeyData scankey[2];
const char *indexname = get_rel_name(hypertable_indexrelid);
List *mappings = NIL;

ScanKeyInit(&scankey[0],
Anum_chunk_index_hypertable_id_hypertable_index_name_idx_hypertable_id,
BTEqualStrategyNumber, F_INT4EQ, Int32GetDatum(ht->fd.id));
ScanKeyInit(&scankey[1],
Anum_chunk_index_hypertable_id_hypertable_index_name_idx_hypertable_index_name,
BTEqualStrategyNumber, F_NAMEEQ,
DirectFunctionCall1(namein, CStringGetDatum((indexname))));

chunk_index_scan(CHUNK_INDEX_HYPERTABLE_ID_HYPERTABLE_INDEX_NAME_IDX,
scankey, 2, chunk_index_collect, &mappings, AccessShareLock);

return mappings;
}

static bool
chunk_index_tuple_delete(TupleInfo *ti, void *data)
{
Expand Down Expand Up @@ -513,7 +553,8 @@ chunk_index_delete_children_of(Hypertable *ht, Oid hypertable_indexrelid, bool s
BTEqualStrategyNumber, F_INT4EQ, Int32GetDatum(ht->fd.id));
ScanKeyInit(&scankey[1],
Anum_chunk_index_hypertable_id_hypertable_index_name_idx_hypertable_index_name,
BTEqualStrategyNumber, F_NAMEEQ, CStringGetDatum(indexname));
BTEqualStrategyNumber, F_NAMEEQ,
DirectFunctionCall1(namein, CStringGetDatum((indexname))));

return chunk_index_scan_update(CHUNK_INDEX_HYPERTABLE_ID_HYPERTABLE_INDEX_NAME_IDX,
scankey, 2, chunk_index_tuple_delete, &should_drop);
Expand All @@ -530,7 +571,8 @@ chunk_index_delete(Chunk *chunk, Oid chunk_indexrelid, bool drop_index)
BTEqualStrategyNumber, F_INT4EQ, Int32GetDatum(chunk->fd.id));
ScanKeyInit(&scankey[1],
Anum_chunk_index_chunk_id_index_name_idx_index_name,
BTEqualStrategyNumber, F_NAMEEQ, CStringGetDatum(indexname));
BTEqualStrategyNumber, F_NAMEEQ,
DirectFunctionCall1(namein, CStringGetDatum(indexname)));

return chunk_index_scan_update(CHUNK_INDEX_CHUNK_ID_INDEX_NAME_IDX,
scankey, 2, chunk_index_tuple_delete, &drop_index);
Expand Down Expand Up @@ -660,3 +702,13 @@ chunk_index_set_tablespace(Hypertable *ht, Oid hypertable_indexrelid, const char
scankey, 2, chunk_index_tuple_set_tablespace,
(char *) tablespace);
}

void
chunk_index_mark_clustered(Oid chunkrelid, Oid indexrelid)
{
Relation rel = heap_open(chunkrelid, AccessShareLock);

mark_index_clustered(rel, indexrelid, true);
CommandCounterIncrement();
heap_close(rel, AccessShareLock);
}
10 changes: 10 additions & 0 deletions src/chunk_index.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,13 @@
typedef struct Chunk Chunk;
typedef struct Hypertable Hypertable;

typedef struct ChunkIndexMapping
{
Oid chunkoid;
Oid parent_indexoid;
Oid indexoid;
} ChunkIndexMapping;

extern void chunk_index_create_all(int32 hypertable_id, Oid hypertable_relid, int32 chunk_id, Oid chunkrelid);
extern Oid chunk_index_create_from_stmt(IndexStmt *stmt, int32 chunk_id, Oid chunkrelid, int32 hypertable_id, Oid hypertable_indexrelid);
extern int chunk_index_delete_children_of(Hypertable *ht, Oid hypertable_indexrelid, bool should_drop);
Expand All @@ -15,4 +22,7 @@ extern int chunk_index_rename(Chunk *chunk, Oid chunk_indexrelid, const char *ne
extern int chunk_index_rename_parent(Hypertable *ht, Oid hypertable_indexrelid, const char *newname);
extern int chunk_index_set_tablespace(Hypertable *ht, Oid hypertable_indexrelid, const char *tablespace);
extern void chunk_index_create_from_constraint(int32 hypertable_id, Oid hypertable_constaint, int32 chunk_id, Oid chunk_constraint);
extern List *chunk_index_get_mappings(Hypertable *ht, Oid hypertable_indexrelid);
extern void chunk_index_mark_clustered(Oid chunkrelid, Oid indexrelid);

#endif /* TIMESCALEDB_CHUNK_INDEX_H */
165 changes: 159 additions & 6 deletions src/process_utility.c
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#include <postgres.h>
#include <nodes/parsenodes.h>
#include <nodes/nodes.h>
#include <nodes/makefuncs.h>
#include <tcop/utility.h>
#include <catalog/namespace.h>
#include <catalog/pg_inherits_fn.h>
Expand All @@ -13,12 +14,15 @@
#include <commands/defrem.h>
#include <commands/trigger.h>
#include <commands/tablecmds.h>
#include <commands/cluster.h>
#include <commands/event_trigger.h>
#include <access/htup_details.h>
#include <access/xact.h>
#include <utils/rel.h>
#include <utils/lsyscache.h>
#include <utils/syscache.h>
#include <utils/builtins.h>
#include <utils/snapmgr.h>
#include <parser/parse_utilcmd.h>

#include <miscadmin.h>
Expand Down Expand Up @@ -970,6 +974,159 @@ process_index_end(Node *parsetree, CollectedCommand *cmd)
return handled;
}

static Oid
find_clustered_index(Oid table_relid)
{
Relation rel;
ListCell *index;
Oid index_relid = InvalidOid;

rel = heap_open(table_relid, NoLock);

/* We need to find the index that has indisclustered set. */
foreach(index, RelationGetIndexList(rel))
{
HeapTuple idxtuple;
Form_pg_index indexForm;

index_relid = lfirst_oid(index);
idxtuple = SearchSysCache1(INDEXRELID,
ObjectIdGetDatum(index_relid));
if (!HeapTupleIsValid(idxtuple))
elog(ERROR, "cache lookup failed for index %u", index_relid);
indexForm = (Form_pg_index) GETSTRUCT(idxtuple);

if (indexForm->indisclustered)
{
ReleaseSysCache(idxtuple);
break;
}
ReleaseSysCache(idxtuple);
index_relid = InvalidOid;
}

heap_close(rel, NoLock);

if (!OidIsValid(index_relid))
ereport(ERROR,
(errcode(ERRCODE_UNDEFINED_OBJECT),
errmsg("there is no previously clustered index for table \"%s\"",
get_rel_name(table_relid))));

return index_relid;
}

/*
* Cluster a hypertable.
*
* The functionality to cluster all chunks of a hypertable is based on the
* regular cluster function's mode to cluster multiple tables. Since clustering
* involves taking exclusive locks on all tables for extensive periods of time,
* each subtable is clustered in its own transaction. This will release all
* locks on subtables once they are done.
*/
static bool
process_cluster_start(Node *parsetree, ProcessUtilityContext context)
{
ClusterStmt *stmt = (ClusterStmt *) parsetree;
Cache *hcache;
Hypertable *ht;

Assert(IsA(stmt, ClusterStmt));

/* If this is a re-cluster on all tables, there is nothing we need to do */
if (NULL == stmt->relation)
return false;

hcache = hypertable_cache_pin();
ht = hypertable_cache_get_entry_rv(hcache, stmt->relation);

if (NULL != ht)
{
bool is_top_level = (context == PROCESS_UTILITY_TOPLEVEL);
Oid index_relid;
List *chunk_indexes;
ListCell *lc;
MemoryContext old,
mcxt;

if (!pg_class_ownercheck(ht->main_table_relid, GetUserId()))
aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_CLASS,
get_rel_name(ht->main_table_relid));

/*
* If CLUSTER is run inside a user transaction block; we bail out or
* otherwise we'd be holding locks way too long.
*/
PreventTransactionChain(is_top_level, "CLUSTER");

if (NULL == stmt->indexname)
index_relid = find_clustered_index(ht->main_table_relid);
else
index_relid = get_relname_relid(stmt->indexname,
get_rel_namespace(ht->main_table_relid));

if (!OidIsValid(index_relid))
{
/* Let regular process utility handle */
cache_release(hcache);
return false;
}

/*
* The list of chunks and their indexes need to be on a memory context
* that will survive moving to a new transaction for each chunk
*/
mcxt = AllocSetContextCreate(PortalContext,
"Hypertable cluster",
ALLOCSET_DEFAULT_SIZES);

/*
* Get a list of chunks and indexes that correspond to the
* hypertable's index
*/
old = MemoryContextSwitchTo(mcxt);
chunk_indexes = chunk_index_get_mappings(ht, index_relid);
MemoryContextSwitchTo(old);

/* Commit to get out of starting transaction */
PopActiveSnapshot();
CommitTransactionCommand();

foreach(lc, chunk_indexes)
{
ChunkIndexMapping *cim = lfirst(lc);

/* Start a new transaction for each relation. */
StartTransactionCommand();
/* functions in indexes may want a snapshot set */
PushActiveSnapshot(GetTransactionSnapshot());

/*
* We must mark each chunk index as clustered before calling
* cluster_rel() because it expects indexes that need to be
* rechecked (due to new transaction) to already have that mark
* set
*/
chunk_index_mark_clustered(cim->chunkoid, cim->indexoid);

/* Do the job. */
cluster_rel(cim->chunkoid, cim->indexoid, true, stmt->verbose);
PopActiveSnapshot();
CommitTransactionCommand();
}
/* Start a new transaction for the cleanup work. */
StartTransactionCommand();

/* Clean up working storage */
MemoryContextDelete(mcxt);
}

cache_release(hcache);

return false;
}

/*
* Process create table statements.
*
Expand Down Expand Up @@ -1403,12 +1560,8 @@ process_ddl_command_start(Node *parsetree,
handled = process_reindex(parsetree);
break;
case T_ClusterStmt:

/*
* CLUSTER command on hypertables do not yet recurse to chunks.
* This requires mapping a hypertable index to corresponding
* indexes on each chunk.
*/
handled = process_cluster_start(parsetree, context);
break;
default:
break;
}
Expand Down
Loading

0 comments on commit 5d0cbc1

Please sign in to comment.