Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Lock dimension slices when creating new chunk #2800

Merged
merged 1 commit into from Jan 13, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
27 changes: 18 additions & 9 deletions src/chunk.c
Expand Up @@ -113,7 +113,7 @@ static int chunk_scan_ctx_foreach_chunk_stub(ChunkScanCtx *ctx, on_chunk_stub_fu
uint16 limit);
static Datum chunks_return_srf(FunctionCallInfo fcinfo);
static int chunk_cmp(const void *ch1, const void *ch2);
static Chunk *chunk_find(Hypertable *ht, Point *p, bool resurrect);
static Chunk *chunk_find(Hypertable *ht, Point *p, bool resurrect, bool lock_slices);
static void init_scan_by_qualified_table_name(ScanIterator *iterator, const char *schema_name,
const char *table_name);

Expand Down Expand Up @@ -868,8 +868,10 @@ ts_chunk_create(Hypertable *ht, Point *p, const char *schema, const char *prefix
*/
LockRelationOid(ht->main_table_relid, ShareUpdateExclusiveLock);

/* Recheck if someone else created the chunk before we got the table lock */
chunk = chunk_find(ht, p, true);
/* Recheck if someone else created the chunk before we got the table
* lock. The returned chunk will have all slices locked so that they
* aren't removed. */
chunk = chunk_find(ht, p, true, true);

if (NULL == chunk)
{
Expand Down Expand Up @@ -1106,19 +1108,23 @@ dimension_slice_and_chunk_constraint_join(ChunkScanCtx *scanctx, DimensionVec *v
* to two chunks.
*/
static void
chunk_point_scan(ChunkScanCtx *scanctx, Point *p)
chunk_point_scan(ChunkScanCtx *scanctx, Point *p, bool lock_slices)
{
int i;

/* Scan all dimensions for slices enclosing the point */
for (i = 0; i < scanctx->space->num_dimensions; i++)
{
DimensionVec *vec;
ScanTupLock tuplock = {
.lockmode = LockTupleKeyShare,
.waitpolicy = LockWaitBlock,
};

vec = ts_dimension_slice_scan_limit(scanctx->space->dimensions[i].fd.id,
p->coordinates[i],
0,
NULL);
lock_slices ? &tuplock : NULL);

dimension_slice_and_chunk_constraint_join(scanctx, vec);
}
Expand Down Expand Up @@ -1346,7 +1352,7 @@ chunk_resurrect(Hypertable *ht, ChunkStub *stub)
* case it needs to live beyond the lifetime of the other data.
*/
static Chunk *
chunk_find(Hypertable *ht, Point *p, bool resurrect)
chunk_find(Hypertable *ht, Point *p, bool resurrect, bool lock_slices)
{
ChunkStub *stub;
Chunk *chunk = NULL;
Expand All @@ -1359,7 +1365,7 @@ chunk_find(Hypertable *ht, Point *p, bool resurrect)
ctx.early_abort = true;

/* Scan for the chunk matching the point */
chunk_point_scan(&ctx, p);
chunk_point_scan(&ctx, p, lock_slices);

/* Find the stub that has N matching dimension constraints */
stub = chunk_scan_ctx_get_chunk_stub(&ctx);
Expand Down Expand Up @@ -1393,9 +1399,9 @@ chunk_find(Hypertable *ht, Point *p, bool resurrect)
}

Chunk *
ts_chunk_find(Hypertable *ht, Point *p)
ts_chunk_find(Hypertable *ht, Point *p, bool lock_slices)
{
return chunk_find(ht, p, false);
return chunk_find(ht, p, false, lock_slices);
}

/*
Expand Down Expand Up @@ -3134,6 +3140,9 @@ ts_chunk_do_drop_chunks(Oid table_relid, Datum older_than_datum, Datum newer_tha
log_level,
user_supplied_table_name);
}

DEBUG_WAITPOINT("drop_chunks_end");

return dropped_chunk_names;
}

Expand Down
2 changes: 1 addition & 1 deletion src/chunk.h
Expand Up @@ -101,7 +101,7 @@ typedef enum CascadeToMaterializationOption
extern Chunk *ts_chunk_create(Hypertable *ht, Point *p, const char *schema, const char *prefix);
extern TSDLLEXPORT Chunk *ts_chunk_create_base(int32 id, int16 num_constraints);
extern TSDLLEXPORT ChunkStub *ts_chunk_stub_create(int32 id, int16 num_constraints);
extern Chunk *ts_chunk_find(Hypertable *ht, Point *p);
extern Chunk *ts_chunk_find(Hypertable *ht, Point *p, bool lock_slices);
extern Chunk **ts_chunk_find_all(Hyperspace *hs, List *dimension_vecs, LOCKMODE lockmode,
unsigned int *num_chunks);
extern List *ts_chunk_find_all_oids(Hyperspace *hs, List *dimension_vecs, LOCKMODE lockmode);
Expand Down
79 changes: 69 additions & 10 deletions src/dimension_slice.c
Expand Up @@ -110,6 +110,12 @@ lock_result_ok_or_abort(TupleInfo *ti, DimensionSlice *slice)

#if PG12_GE
case TM_Deleted:
ereport(ERROR,
(errcode(ERRCODE_LOCK_NOT_AVAILABLE),
errmsg("dimension slice %d deleted by other transaction", slice->fd.id),
errhint("Retry the operation again.")));
pg_unreachable();
break;
#endif
case TM_Updated:
ereport(ERROR,
Expand Down Expand Up @@ -144,9 +150,26 @@ static ScanTupleResult
dimension_vec_tuple_found(TupleInfo *ti, void *data)
{
DimensionVec **slices = data;
DimensionSlice *slice = dimension_slice_from_tuple(ti->tuple);
DimensionSlice *slice;

lock_result_ok_or_abort(ti, slice);
switch (ti->lockresult)
{
case TM_SelfModified:
case TM_Ok:
break;
#if PG12_GE
case TM_Deleted:
#endif
case TM_Updated:
/* Treat as not found */
return SCAN_CONTINUE;
default:
elog(ERROR, "unexpected tuple lock status: %d", ti->lockresult);
pg_unreachable();
break;
}

slice = dimension_slice_from_tuple(ti->tuple);
*slices = ts_dimension_vec_add_slice(slices, slice);

return SCAN_CONTINUE;
Expand Down Expand Up @@ -522,9 +545,29 @@ ts_dimension_slice_delete_by_id(int32 dimension_slice_id, bool delete_constraint
static ScanTupleResult
dimension_slice_fill(TupleInfo *ti, void *data)
{
DimensionSlice **slice = data;
switch (ti->lockresult)
{
case TM_SelfModified:
case TM_Ok:
{
DimensionSlice **slice = data;
HeapTuple tuple = ti->tuple;

memcpy(&(*slice)->fd, GETSTRUCT(tuple), sizeof(FormData_dimension_slice));
break;
}
#if PG12_GE
case TM_Deleted:
#endif
case TM_Updated:
/* Same as not found */
break;
default:
elog(ERROR, "unexpected tuple lock status: %d", ti->lockresult);
pg_unreachable();
break;
}

memcpy(&(*slice)->fd, GETSTRUCT(ti->tuple), sizeof(FormData_dimension_slice));
return SCAN_DONE;
}

Expand All @@ -537,7 +580,7 @@ dimension_slice_fill(TupleInfo *ti, void *data)
* otherwise.
*/
bool
ts_dimension_slice_scan_for_existing(DimensionSlice *slice)
ts_dimension_slice_scan_for_existing(DimensionSlice *slice, ScanTupLock *tuplock)
{
ScanKeyData scankey[3];

Expand Down Expand Up @@ -565,7 +608,7 @@ ts_dimension_slice_scan_for_existing(DimensionSlice *slice)
&slice,
1,
AccessShareLock,
NULL,
tuplock,
CurrentMemoryContext);
}

Expand Down Expand Up @@ -738,20 +781,36 @@ dimension_slice_insert_relation(Relation rel, DimensionSlice *slice)

/*
* Insert slices into the catalog.
*
* Only slices that don't already exist in the catalog will be inserted. Note
* that all slices that already exist (i.e., have a valid ID) MUST be locked
* with a tuple lock (e.g., FOR KEY SHARE) prior to calling this function
* since they won't be created. Otherwise it is not possible to guarantee that
* all slices still exist once the transaction commits.
*
* Returns the number of slices inserted.
*/
void
ts_dimension_slice_insert_multi(DimensionSlice **slices, Size num_slices)
int
ts_dimension_slice_insert_multi(DimensionSlice **slices, int num_slices)
{
Catalog *catalog = ts_catalog_get();
Relation rel;
Size i;
int i, n = 0;

rel = table_open(catalog_get_table_id(catalog, DIMENSION_SLICE), RowExclusiveLock);

for (i = 0; i < num_slices; i++)
dimension_slice_insert_relation(rel, slices[i]);
{
if (slices[i]->fd.id == 0)
{
dimension_slice_insert_relation(rel, slices[i]);
n++;
}
}

table_close(rel, RowExclusiveLock);

return n;
}

static ScanTupleResult
Expand Down
4 changes: 2 additions & 2 deletions src/dimension_slice.h
Expand Up @@ -50,7 +50,7 @@ ts_dimension_slice_scan_range_limit(int32 dimension_id, StrategyNumber start_str
int limit, ScanTupLock *tuplock);
extern DimensionVec *ts_dimension_slice_collision_scan_limit(int32 dimension_id, int64 range_start,
int64 range_end, int limit);
extern bool ts_dimension_slice_scan_for_existing(DimensionSlice *slice);
extern bool ts_dimension_slice_scan_for_existing(DimensionSlice *slice, ScanTupLock *tuplock);
extern DimensionSlice *ts_dimension_slice_scan_by_id_and_lock(int32 dimension_slice_id,
ScanTupLock *tuplock,
MemoryContext mctx);
Expand All @@ -68,7 +68,7 @@ extern bool ts_dimension_slices_collide(DimensionSlice *slice1, DimensionSlice *
extern bool ts_dimension_slices_equal(DimensionSlice *slice1, DimensionSlice *slice2);
extern bool ts_dimension_slice_cut(DimensionSlice *to_cut, DimensionSlice *other, int64 coord);
extern void ts_dimension_slice_free(DimensionSlice *slice);
extern void ts_dimension_slice_insert_multi(DimensionSlice **slice, Size num_slices);
extern int ts_dimension_slice_insert_multi(DimensionSlice **slice, int num_slices);
extern int ts_dimension_slice_cmp(const DimensionSlice *left, const DimensionSlice *right);
extern int ts_dimension_slice_cmp_coordinate(const DimensionSlice *slice, int64 coord);

Expand Down
5 changes: 1 addition & 4 deletions src/hypercube.c
Expand Up @@ -256,11 +256,8 @@ ts_hypercube_calculate_from_point(Hyperspace *hs, Point *p, ScanTupLock *tuplock
* Check if there's already an existing slice with the calculated
* range. If a slice already exists, use that slice's ID instead
* of a new one.
*
* The tuples are already locked in
* `chunk_create_from_point_after_lock`, so nothing to do here.
*/
ts_dimension_slice_scan_for_existing(cube->slices[i]);
ts_dimension_slice_scan_for_existing(cube->slices[i], tuplock);
}
}

Expand Down
12 changes: 7 additions & 5 deletions src/hypertable.c
Expand Up @@ -1009,7 +1009,7 @@ hypertable_chunk_store_add(Hypertable *h, Chunk *chunk)
}

static inline Chunk *
hypertable_get_chunk(Hypertable *h, Point *point, bool create_if_not_exists)
hypertable_get_chunk(Hypertable *h, Point *point, bool create_if_not_exists, bool lock_chunk_slices)
{
Chunk *chunk;
ChunkStoreEntry *cse = ts_subspace_store_get(h->chunk_cache, point);
Expand All @@ -1025,7 +1025,7 @@ hypertable_get_chunk(Hypertable *h, Point *point, bool create_if_not_exists)
* allocates a lot of transient data. We don't want this allocated on
* the cache's memory context.
*/
chunk = ts_chunk_find(h, point);
chunk = ts_chunk_find(h, point, lock_chunk_slices);

if (NULL == chunk)
{
Expand All @@ -1049,14 +1049,16 @@ hypertable_get_chunk(Hypertable *h, Point *point, bool create_if_not_exists)
Chunk *
ts_hypertable_find_chunk_if_exists(Hypertable *h, Point *point)
{
return hypertable_get_chunk(h, point, false);
return hypertable_get_chunk(h, point, false, false);
}

/* gets the chunk for a given point, creating it if it does not exist */
/* gets the chunk for a given point, creating it if it does not exist. If an
* existing chunk exists, all its dimension slices will be locked in FOR KEY
* SHARE mode. */
Chunk *
ts_hypertable_get_or_create_chunk(Hypertable *h, Point *point)
{
return hypertable_get_chunk(h, point, true);
return hypertable_get_chunk(h, point, true, true);
}

bool
Expand Down
86 changes: 77 additions & 9 deletions test/isolation/expected/dropchunks_race.out
@@ -1,22 +1,90 @@
Parsed test spec with 3 sessions
Parsed test spec with 5 sessions

starting permutation: s1a s2a s3a s3b
starting permutation: s3_chunks_found_wait s1_drop_chunks s2_drop_chunks s3_chunks_found_release s3_show_missing_slices s3_show_num_chunks s3_show_data
step s3_chunks_found_wait: SELECT debug_waitpoint_enable('drop_chunks_chunks_found');
debug_waitpoint_enable


step s1a: SELECT COUNT(*) FROM drop_chunks(TIMESTAMPTZ '2020-03-01', 'dropchunks_race_t1'); <waiting ...>
step s2a: SELECT COUNT(*) FROM drop_chunks(TIMESTAMPTZ '2020-03-01', 'dropchunks_race_t1'); <waiting ...>
step s3a: SELECT debug_waitpoint_release('drop_chunks_chunks_found');
step s1_drop_chunks: SELECT count(*) FROM drop_chunks(TIMESTAMPTZ '2020-03-01', 'dropchunks_race_t1'); <waiting ...>
step s2_drop_chunks: SELECT count(*) FROM drop_chunks(TIMESTAMPTZ '2020-03-01', 'dropchunks_race_t1'); <waiting ...>
step s3_chunks_found_release: SELECT debug_waitpoint_release('drop_chunks_chunks_found');
debug_waitpoint_release


step s1a: <... completed>
step s1_drop_chunks: <... completed>
count

1
step s2a: <... completed>
error in steps s3a s1a s2a: ERROR: some chunks could not be read since they are being concurrently updated
step s3b: SELECT COUNT(*) FROM _timescaledb_catalog.chunk_constraint WHERE dimension_slice_id NOT IN (SELECT id FROM _timescaledb_catalog.dimension_slice);
step s2_drop_chunks: <... completed>
count

0
step s3_show_missing_slices: SELECT count(*) FROM _timescaledb_catalog.chunk_constraint WHERE dimension_slice_id NOT IN (SELECT id FROM _timescaledb_catalog.dimension_slice);
count

0
step s3_show_num_chunks: SELECT count(*) FROM show_chunks('dropchunks_race_t1') ORDER BY 1;
count

0
step s3_show_data: SELECT * FROM dropchunks_race_t1 ORDER BY 1;
time device temp


starting permutation: s4_chunks_dropped_wait s1_drop_chunks s5_insert_new_chunk s4_chunks_dropped_release s3_show_missing_slices s3_show_num_chunks s3_show_data
step s4_chunks_dropped_wait: SELECT debug_waitpoint_enable('drop_chunks_end');
debug_waitpoint_enable


step s1_drop_chunks: SELECT count(*) FROM drop_chunks(TIMESTAMPTZ '2020-03-01', 'dropchunks_race_t1'); <waiting ...>
step s5_insert_new_chunk: INSERT INTO dropchunks_race_t1 VALUES ('2020-03-01 10:30', 1, 2.2); <waiting ...>
step s4_chunks_dropped_release: SELECT debug_waitpoint_release('drop_chunks_end');
debug_waitpoint_release


step s1_drop_chunks: <... completed>
count

1
step s5_insert_new_chunk: <... completed>
step s3_show_missing_slices: SELECT count(*) FROM _timescaledb_catalog.chunk_constraint WHERE dimension_slice_id NOT IN (SELECT id FROM _timescaledb_catalog.dimension_slice);
count

0
step s3_show_num_chunks: SELECT count(*) FROM show_chunks('dropchunks_race_t1') ORDER BY 1;
count

1
step s3_show_data: SELECT * FROM dropchunks_race_t1 ORDER BY 1;
time device temp

Sun Mar 01 10:30:00 2020 PST1 2.2

starting permutation: s4_chunks_dropped_wait s1_drop_chunks s5_insert_old_chunk s4_chunks_dropped_release s3_show_missing_slices s3_show_num_chunks s3_show_data
step s4_chunks_dropped_wait: SELECT debug_waitpoint_enable('drop_chunks_end');
debug_waitpoint_enable


step s1_drop_chunks: SELECT count(*) FROM drop_chunks(TIMESTAMPTZ '2020-03-01', 'dropchunks_race_t1'); <waiting ...>
step s5_insert_old_chunk: INSERT INTO dropchunks_race_t1 VALUES ('2020-01-02 10:31', 1, 1.1); <waiting ...>
step s4_chunks_dropped_release: SELECT debug_waitpoint_release('drop_chunks_end');
debug_waitpoint_release


step s1_drop_chunks: <... completed>
count

1
step s5_insert_old_chunk: <... completed>
step s3_show_missing_slices: SELECT count(*) FROM _timescaledb_catalog.chunk_constraint WHERE dimension_slice_id NOT IN (SELECT id FROM _timescaledb_catalog.dimension_slice);
count

0
step s3_show_num_chunks: SELECT count(*) FROM show_chunks('dropchunks_race_t1') ORDER BY 1;
count

1
step s3_show_data: SELECT * FROM dropchunks_race_t1 ORDER BY 1;
time device temp

Thu Jan 02 10:31:00 2020 PST1 1.1