Skip to content

Commit

Permalink
Support unique constraints on compressed chunks
Browse files Browse the repository at this point in the history
This patch allows unique constraints on compressed chunks. When
trying to INSERT into compressed chunks with unique constraints
any potentially conflicting compressed batches will be decompressed
to let postgres do constraint checking on the INSERT.
With this patch only INSERT ON CONFLICT DO NOTHING will be supported.
For decompression only segment by information is considered to
determine conflicting batches. This will be enhanced in a follow-up
patch to also include orderby metadata to require decompressing
less batches.
  • Loading branch information
svenklemm committed Mar 13, 2023
1 parent c02cb76 commit db626a9
Show file tree
Hide file tree
Showing 22 changed files with 772 additions and 75 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ accidentally triggering the load of a previous DB version.**
* #5212 Allow pushdown of reference table joins
* #5312 Add timeout support to the ping_data_node()
* #5361 Add parallel support for partialize_agg()
* #5252 Improve unique constraint support on compressed hypertables
* #5312 Add timeout support to ping_data_node()

**Bugfixes**
* #5396 Fix SEGMENTBY columns predicates to be pushed down
Expand Down
6 changes: 6 additions & 0 deletions src/chunk.c
Original file line number Diff line number Diff line change
Expand Up @@ -4378,6 +4378,12 @@ ts_chunk_is_compressed(const Chunk *chunk)
return ts_flags_are_set_32(chunk->fd.status, CHUNK_STATUS_COMPRESSED);
}

bool
ts_chunk_is_distributed(const Chunk *chunk)
{
return chunk->data_nodes != NIL;
}

/* Note that only a compressed chunk can have partial flag set */
bool
ts_chunk_is_partial(const Chunk *chunk)
Expand Down
1 change: 1 addition & 0 deletions src/chunk.h
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,7 @@ extern TSDLLEXPORT Chunk *ts_chunk_get_compressed_chunk_parent(const Chunk *chun
extern TSDLLEXPORT bool ts_chunk_is_unordered(const Chunk *chunk);
extern TSDLLEXPORT bool ts_chunk_is_partial(const Chunk *chunk);
extern TSDLLEXPORT bool ts_chunk_is_compressed(const Chunk *chunk);
extern TSDLLEXPORT bool ts_chunk_is_distributed(const Chunk *chunk);
extern TSDLLEXPORT bool ts_chunk_validate_chunk_status_for_operation(Oid chunk_relid,
int32 chunk_status,
ChunkOperation cmd,
Expand Down
2 changes: 2 additions & 0 deletions src/copy.c
Original file line number Diff line number Diff line change
Expand Up @@ -324,6 +324,7 @@ TSCopyMultiInsertBufferFlush(TSCopyMultiInsertInfo *miinfo, TSCopyMultiInsertBuf
ChunkInsertState *cis =
ts_chunk_dispatch_get_chunk_insert_state(miinfo->ccstate->dispatch,
buffer->point,
buffer->slots[0],
NULL /* on chunk changed function */,
NULL /* payload for on chunk changed function */);

Expand Down Expand Up @@ -948,6 +949,7 @@ copyfrom(CopyChunkState *ccstate, List *range_table, Hypertable *ht, MemoryConte
/* Find or create the insert state matching the point */
cis = ts_chunk_dispatch_get_chunk_insert_state(dispatch,
point,
myslot,
on_chunk_insert_state_changed,
bistate);

Expand Down
3 changes: 3 additions & 0 deletions src/cross_module_fn.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
typedef struct JsonbParseState JsonbParseState;
typedef struct Hypertable Hypertable;
typedef struct Chunk Chunk;
typedef struct ChunkInsertState ChunkInsertState;
typedef struct CopyChunkState CopyChunkState;

typedef struct CrossModuleFunctions
Expand Down Expand Up @@ -136,6 +137,8 @@ typedef struct CrossModuleFunctions
PGFunction create_compressed_chunk;
PGFunction compress_chunk;
PGFunction decompress_chunk;
void (*decompress_batches_for_insert)(ChunkInsertState *state, Chunk *chunk,
TupleTableSlot *slot);
/* The compression functions below are not installed in SQL as part of create extension;
* They are installed and tested during testing scripts. They are exposed in cross-module
* functions because they may be very useful for debugging customer problems if the sql
Expand Down
2 changes: 1 addition & 1 deletion src/indexing.c
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,7 @@ indexing_create_and_verify_hypertable_indexes(const Hypertable *ht, bool create_
table_close(tblrel, AccessShareLock);
}

bool
bool TSDLLEXPORT
ts_indexing_relation_has_primary_or_unique_index(Relation htrel)
{
Bitmapset *key_attrs = RelationGetIndexAttrBitmap(htrel, INDEX_ATTR_BITMAP_KEY);
Expand Down
2 changes: 1 addition & 1 deletion src/indexing.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,6 @@ extern TSDLLEXPORT Oid ts_indexing_find_clustered_index(Oid table_relid);

extern void ts_indexing_mark_as_valid(Oid index_id);
extern bool ts_indexing_mark_as_invalid(Oid index_id);
extern bool ts_indexing_relation_has_primary_or_unique_index(Relation htrel);
extern bool TSDLLEXPORT ts_indexing_relation_has_primary_or_unique_index(Relation htrel);

#endif /* TIMESCALEDB_INDEXING_H */
28 changes: 25 additions & 3 deletions src/nodes/chunk_dispatch/chunk_dispatch.c
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ destroy_chunk_insert_state(void *cis)
*/
extern ChunkInsertState *
ts_chunk_dispatch_get_chunk_insert_state(ChunkDispatch *dispatch, Point *point,
TupleTableSlot *slot,
const on_chunk_changed_func on_chunk_changed, void *data)
{
ChunkInsertState *cis;
Expand Down Expand Up @@ -91,6 +92,7 @@ ts_chunk_dispatch_get_chunk_insert_state(ChunkDispatch *dispatch, Point *point,
* where the chunk already exists.
*/
bool found;
Assert(slot);
Chunk *chunk = ts_hypertable_find_chunk_for_point(dispatch->hypertable, point);

#if PG14_GE
Expand All @@ -106,7 +108,12 @@ ts_chunk_dispatch_get_chunk_insert_state(ChunkDispatch *dispatch, Point *point,
chunk = ts_hypertable_create_chunk_for_point(dispatch->hypertable, point, &found);
}
else
{
found = true;
}

if (!chunk)
elog(ERROR, "no chunk found or created");

/* get the filtered list of "available" DNs for this chunk but only if it's replicated */
if (found && dispatch->hypertable->fd.replication_factor > 1)
Expand All @@ -126,12 +133,26 @@ ts_chunk_dispatch_get_chunk_insert_state(ChunkDispatch *dispatch, Point *point,
list_free(chunk_data_nodes);
}

if (!chunk)
elog(ERROR, "no chunk found or created");

cis = ts_chunk_insert_state_create(chunk, dispatch);
ts_subspace_store_add(dispatch->cache, chunk->cube, cis, destroy_chunk_insert_state);

if (found && ts_chunk_is_compressed(chunk) && !ts_chunk_is_distributed(chunk))
{
/*
* If this is an INSERT into a compressed chunk with UNIQUE or
* PRIMARY KEY constraints we need to make sure any batches that could
* potentially lead to a conflict are in the decompressed chunk so
* postgres can do proper constraint checking.
*/
if (ts_cm_functions->decompress_batches_for_insert)
ts_cm_functions->decompress_batches_for_insert(cis, chunk, slot);
else
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("functionality not supported under the current \"%s\" license",
ts_guc_license)));
}

MemoryContextSwitchTo(old_context);
}
else if (cis->rel->rd_id == dispatch->prev_cis_oid && cis == dispatch->prev_cis)
Expand Down Expand Up @@ -308,6 +329,7 @@ chunk_dispatch_exec(CustomScanState *node)
/* Find or create the insert state matching the point */
cis = ts_chunk_dispatch_get_chunk_insert_state(dispatch,
point,
slot,
on_chunk_insert_state_changed,
state);

Expand Down
2 changes: 1 addition & 1 deletion src/nodes/chunk_dispatch/chunk_dispatch.h
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ typedef void (*on_chunk_changed_func)(ChunkInsertState *state, void *data);
extern ChunkDispatch *ts_chunk_dispatch_create(Hypertable *ht, EState *estate, int eflags);
extern void ts_chunk_dispatch_destroy(ChunkDispatch *chunk_dispatch);
extern ChunkInsertState *
ts_chunk_dispatch_get_chunk_insert_state(ChunkDispatch *dispatch, Point *p,
ts_chunk_dispatch_get_chunk_insert_state(ChunkDispatch *dispatch, Point *p, TupleTableSlot *slot,
const on_chunk_changed_func on_chunk_changed, void *data);

extern TSDLLEXPORT Path *ts_chunk_dispatch_path_create(PlannerInfo *root, ModifyTablePath *mtpath,
Expand Down
14 changes: 3 additions & 11 deletions src/nodes/chunk_dispatch/chunk_insert_state.c
Original file line number Diff line number Diff line change
Expand Up @@ -597,21 +597,13 @@ ts_chunk_insert_state_create(const Chunk *chunk, ChunkDispatch *dispatch)
CHUNK_INSERT,
true);

if (has_compressed_chunk && onconflict_action != ONCONFLICT_NONE)
if (has_compressed_chunk && onconflict_action == ONCONFLICT_UPDATE)
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("insert with ON CONFLICT clause is not supported on "
"compressed chunks")));
errmsg(
"INSERT with ON CONFLICT DO UPDATE is not supported on compressed chunks")));

rel = table_open(chunk->table_id, RowExclusiveLock);
if (has_compressed_chunk && ts_indexing_relation_has_primary_or_unique_index(rel))
{
table_close(rel, RowExclusiveLock);
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("insert into a compressed chunk that has primary or unique constraint is "
"not supported")));
}

MemoryContext old_mcxt = MemoryContextSwitchTo(cis_context);
relinfo = create_chunk_result_relation_info(dispatch, rel);
Expand Down
16 changes: 2 additions & 14 deletions src/process_utility.c
Original file line number Diff line number Diff line change
Expand Up @@ -2679,7 +2679,7 @@ process_index_start(ProcessUtilityArgs *args)
hcache = ts_hypertable_cache_pin();
ht = ts_hypertable_cache_get_entry_rv(hcache, stmt->relation);

if (NULL == ht)
if (!ht)
{
/* Check if the relation is a Continuous Aggregate */
cagg = ts_continuous_agg_find_by_rv(stmt->relation);
Expand All @@ -2702,7 +2702,7 @@ process_index_start(ProcessUtilityArgs *args)
}
}

if (NULL == ht)
if (!ht)
{
ts_cache_release(hcache);
return DDL_CONTINUE;
Expand All @@ -2716,18 +2716,6 @@ process_index_start(ProcessUtilityArgs *args)
/* Make the RangeVar for the underlying materialization hypertable */
stmt->relation = makeRangeVar(NameStr(ht->fd.schema_name), NameStr(ht->fd.table_name), -1);
}
else if (TS_HYPERTABLE_HAS_COMPRESSION_ENABLED(ht))
{
/* unique indexes are not allowed on compressed hypertables*/
if (stmt->unique || stmt->primary || stmt->isconstraint)
{
ts_cache_release(hcache);
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("operation not supported on hypertables that have compression "
"enabled")));
}
}

ts_hypertable_permissions_check_by_id(ht->fd.id);
add_hypertable_to_process_args(args, ht);
Expand Down
Loading

0 comments on commit db626a9

Please sign in to comment.