Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix chunk status when inserting into chunks #5533

Merged
merged 1 commit into from Apr 12, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
12 changes: 12 additions & 0 deletions src/nodes/chunk_dispatch/chunk_dispatch.c
Expand Up @@ -135,6 +135,18 @@ ts_chunk_dispatch_get_chunk_insert_state(ChunkDispatch *dispatch, Point *point,
}

cis = ts_chunk_insert_state_create(chunk, dispatch);

/*
* We might have been blocked by a compression operation
* while trying to fetch the above lock so lets update the
* chunk catalog data because the status might have changed.
*
* This works even in higher levels of isolation since
* catalog data is always read from latest snapshot.
*/
chunk = ts_chunk_get_by_relid(chunk->table_id, true);
ts_set_compression_status(cis, chunk);

ts_subspace_store_add(dispatch->cache, chunk->cube, cis, destroy_chunk_insert_state);

if (found && ts_chunk_is_compressed(chunk) && !ts_chunk_is_distributed(chunk))
Expand Down
13 changes: 9 additions & 4 deletions src/nodes/chunk_dispatch/chunk_insert_state.c
Expand Up @@ -608,10 +608,7 @@ ts_chunk_insert_state_create(const Chunk *chunk, ChunkDispatch *dispatch)
state->rel = rel;
state->result_relation_info = relinfo;
state->estate = dispatch->estate;

state->chunk_compressed = ts_chunk_is_compressed(chunk);
if (state->chunk_compressed)
state->chunk_partial = ts_chunk_is_partial(chunk);
ts_set_compression_status(state, chunk);

if (relinfo->ri_RelationDesc->rd_rel->relhasindex && relinfo->ri_IndexRelationDescs == NULL)
ExecOpenIndices(relinfo, onconflict_action != ONCONFLICT_NONE);
Expand Down Expand Up @@ -718,6 +715,14 @@ ts_chunk_insert_state_create(const Chunk *chunk, ChunkDispatch *dispatch)
return state;
}

void
ts_set_compression_status(ChunkInsertState *state, const Chunk *chunk)
{
state->chunk_compressed = ts_chunk_is_compressed(chunk);
if (state->chunk_compressed)
state->chunk_partial = ts_chunk_is_partial(chunk);
}

extern void
ts_chunk_insert_state_destroy(ChunkInsertState *state)
{
Expand Down
1 change: 1 addition & 0 deletions src/nodes/chunk_dispatch/chunk_insert_state.h
Expand Up @@ -68,4 +68,5 @@ extern ChunkInsertState *ts_chunk_insert_state_create(const Chunk *chunk, ChunkD
extern void ts_chunk_insert_state_destroy(ChunkInsertState *state);

OnConflictAction chunk_dispatch_get_on_conflict_action(const ChunkDispatch *dispatch);
void ts_set_compression_status(ChunkInsertState *state, const Chunk *chunk);
#endif /* TIMESCALEDB_CHUNK_INSERT_STATE_H */
8 changes: 6 additions & 2 deletions tsl/src/compression/compression.c
Expand Up @@ -1935,8 +1935,12 @@ decompress_batches_for_insert(ChunkInsertState *cis, Chunk *chunk, TupleTableSlo

bms_free(key_columns);

TableScanDesc heapScan =
table_beginscan(in_rel, GetTransactionSnapshot(), num_scankeys, scankeys);
/*
* Using latest snapshot to scan the heap since we are doing this to build
* the index on the uncompressed chunks in order to do speculative insertion
* which is always built from all tuples (even in higher levels of isolation).
*/
TableScanDesc heapScan = table_beginscan(in_rel, GetLatestSnapshot(), num_scankeys, scankeys);

for (HeapTuple compressed_tuple = heap_getnext(heapScan, ForwardScanDirection);
compressed_tuple != NULL;
Expand Down