Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix crash for concurrent drop and compress chunk #2688

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
37 changes: 24 additions & 13 deletions src/dimension_slice.c
Expand Up @@ -99,8 +99,27 @@ ts_dimension_slice_cmp_coordinate(const DimensionSlice *slice, int64 coord)
return 0;
}

static bool
tuple_is_deleted(TupleInfo *ti)
{
#if PG12_GE
#ifdef USE_ASSERT_CHECKING
if (ti->lockresult == TM_Deleted)
Assert(ItemPointerEquals(ts_scanner_get_tuple_tid(ti), &ti->lockfd.ctid));
#endif
return ti->lockresult == TM_Deleted;
#else
/* If the tid and ctid in the lock failure data is the same, then this is
* a delete. Otherwise it is an update and ctid is the new tuple ID. This
* applies mostly to PG11, since PG12 has an explicit lockresult for
* deleted tuples. */
return ti->lockresult == TM_Updated &&
ItemPointerEquals(ts_scanner_get_tuple_tid(ti), &ti->lockfd.ctid);
#endif
}

static void
lock_result_ok_or_abort(TupleInfo *ti, DimensionSlice *slice)
lock_result_ok_or_abort(TupleInfo *ti)
{
switch (ti->lockresult)
{
Expand All @@ -109,37 +128,29 @@ lock_result_ok_or_abort(TupleInfo *ti, DimensionSlice *slice)
case TM_SelfModified:
case TM_Ok:
break;

#if PG12_GE
case TM_Deleted:
ereport(ERROR,
(errcode(ERRCODE_LOCK_NOT_AVAILABLE),
errmsg("dimension slice %d deleted by other transaction", slice->fd.id),
errhint("Retry the operation again.")));
pg_unreachable();
break;
#endif
case TM_Updated:
ereport(ERROR,
(errcode(ERRCODE_LOCK_NOT_AVAILABLE),
errmsg("dimension slice %d locked by other transaction", slice->fd.id),
errmsg("chunk %s by other transaction",
tuple_is_deleted(ti) ? "deleted" : "updated"),
errhint("Retry the operation again.")));
pg_unreachable();
break;

case TM_BeingModified:
ereport(ERROR,
(errcode(ERRCODE_LOCK_NOT_AVAILABLE),
errmsg("dimension slice %d updated by other transaction", slice->fd.id),
errmsg("chunk updated by other transaction"),
errhint("Retry the operation again.")));
pg_unreachable();
break;

case TM_Invisible:
elog(ERROR, "attempt to lock invisible tuple");
pg_unreachable();
break;

case TM_WouldBlock:
default:
elog(ERROR, "unexpected tuple lock status: %d", ti->lockresult);
Expand Down Expand Up @@ -624,7 +635,7 @@ dimension_slice_tuple_found(TupleInfo *ti, void *data)
DimensionSlice **slice = data;
MemoryContext old;

lock_result_ok_or_abort(ti, *slice);
lock_result_ok_or_abort(ti);

old = MemoryContextSwitchTo(ti->mctx);
*slice = dimension_slice_from_slot(ti->slot);
Expand Down
6 changes: 2 additions & 4 deletions src/scanner.c
Expand Up @@ -290,8 +290,6 @@ ts_scanner_next(ScannerCtx *ctx, InternalScannerCtx *ictx)

if (ctx->tuplock)
{
TM_FailureData tmfd;

#if PG12_GE
TupleTableSlot *slot = ictx->tinfo.slot;

Expand All @@ -304,7 +302,7 @@ ts_scanner_next(ScannerCtx *ctx, InternalScannerCtx *ictx)
ctx->tuplock->lockmode,
ctx->tuplock->waitpolicy,
ctx->tuplock->lockflags,
&tmfd);
&ictx->tinfo.lockfd);

#else
HeapTuple tuple = ExecFetchSlotTuple(ictx->tinfo.slot);
Expand All @@ -317,7 +315,7 @@ ts_scanner_next(ScannerCtx *ctx, InternalScannerCtx *ictx)
ctx->tuplock->waitpolicy,
false,
&buffer,
&tmfd);
&ictx->tinfo.lockfd);
/*
* A tuple lock pins the underlying buffer, so we need to
* unpin it.
Expand Down
2 changes: 2 additions & 0 deletions src/scanner.h
Expand Up @@ -42,6 +42,8 @@ typedef struct TupleInfo
* in lockresult.
*/
TM_Result lockresult;
/* Failure data in case of failed tuple lock */
TM_FailureData lockfd;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to initialize this field somewhere as well before table_tuple_lock or heap_lock_tuple, since it was used only on stack before?

Copy link
Contributor Author

@erimatnor erimatnor Nov 30, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is usually embedded in a larger struct, which, AFAIK, is initialized properly in the places where used. Still, even if this was garbage, it is not something that one should look at unless lockresult tells you to, in which case lockfd should also be set accordingly.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, I was mostly thinking about tools like valgrind which might bring additional warnings, etc. Anyway, I don't think that it is necessary to change anything

int count;

/*
Expand Down
237 changes: 237 additions & 0 deletions tsl/test/isolation/expected/deadlock_drop_chunks_compress.out
@@ -0,0 +1,237 @@
Parsed test spec with 2 sessions

starting permutation: s1_drop s1_commit s2_compress_chunk_1 s2_compress_chunk_2 s2_commit
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This iterates over all possible step order combinations, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unless a test specifies explicit permutations, it will run all permutations.

step s1_drop:
SELECT count (*)
FROM drop_chunks('conditions', older_than => '2018-12-03 00:00'::timestamptz);

count

2
step s1_commit: COMMIT;
step s2_compress_chunk_1:
SELECT count(compress_chunk(chunk))
FROM (SELECT chunk FROM chunks_to_compress ORDER BY 1 ASC LIMIT 1) AS chunk;

ERROR: chunk not found
step s2_compress_chunk_2:
SELECT count(compress_chunk(chunk))
FROM (SELECT chunk FROM chunks_to_compress ORDER BY 1 DESC LIMIT 1) AS chunk;

ERROR: current transaction is aborted, commands ignored until end of transaction block
step s2_commit: COMMIT;

starting permutation: s1_drop s2_compress_chunk_1 s1_commit s2_compress_chunk_2 s2_commit
step s1_drop:
SELECT count (*)
FROM drop_chunks('conditions', older_than => '2018-12-03 00:00'::timestamptz);

count

2
step s2_compress_chunk_1:
SELECT count(compress_chunk(chunk))
FROM (SELECT chunk FROM chunks_to_compress ORDER BY 1 ASC LIMIT 1) AS chunk;
<waiting ...>
step s1_commit: COMMIT;
step s2_compress_chunk_1: <... completed>
error in steps s1_commit s2_compress_chunk_1: ERROR: chunk deleted by other transaction
step s2_compress_chunk_2:
SELECT count(compress_chunk(chunk))
FROM (SELECT chunk FROM chunks_to_compress ORDER BY 1 DESC LIMIT 1) AS chunk;

ERROR: current transaction is aborted, commands ignored until end of transaction block
step s2_commit: COMMIT;

starting permutation: s1_drop s2_compress_chunk_1 s2_compress_chunk_2 s1_commit s2_commit
step s1_drop:
SELECT count (*)
FROM drop_chunks('conditions', older_than => '2018-12-03 00:00'::timestamptz);

count

2
step s2_compress_chunk_1:
SELECT count(compress_chunk(chunk))
FROM (SELECT chunk FROM chunks_to_compress ORDER BY 1 ASC LIMIT 1) AS chunk;
<waiting ...>
step s2_compress_chunk_1: <... completed>
ERROR: canceling statement due to lock timeout
step s2_compress_chunk_2:
SELECT count(compress_chunk(chunk))
FROM (SELECT chunk FROM chunks_to_compress ORDER BY 1 DESC LIMIT 1) AS chunk;

ERROR: current transaction is aborted, commands ignored until end of transaction block
step s1_commit: COMMIT;
step s2_commit: COMMIT;

starting permutation: s1_drop s2_compress_chunk_1 s2_compress_chunk_2 s2_commit s1_commit
step s1_drop:
SELECT count (*)
FROM drop_chunks('conditions', older_than => '2018-12-03 00:00'::timestamptz);

count

2
step s2_compress_chunk_1:
SELECT count(compress_chunk(chunk))
FROM (SELECT chunk FROM chunks_to_compress ORDER BY 1 ASC LIMIT 1) AS chunk;
<waiting ...>
step s2_compress_chunk_1: <... completed>
ERROR: canceling statement due to lock timeout
step s2_compress_chunk_2:
SELECT count(compress_chunk(chunk))
FROM (SELECT chunk FROM chunks_to_compress ORDER BY 1 DESC LIMIT 1) AS chunk;

ERROR: current transaction is aborted, commands ignored until end of transaction block
step s2_commit: COMMIT;
step s1_commit: COMMIT;

starting permutation: s2_compress_chunk_1 s1_drop s1_commit s2_compress_chunk_2 s2_commit
step s2_compress_chunk_1:
SELECT count(compress_chunk(chunk))
FROM (SELECT chunk FROM chunks_to_compress ORDER BY 1 ASC LIMIT 1) AS chunk;

count

1
step s1_drop:
SELECT count (*)
FROM drop_chunks('conditions', older_than => '2018-12-03 00:00'::timestamptz);
<waiting ...>
step s1_drop: <... completed>
ERROR: some chunks could not be read since they are being concurrently updated
step s1_commit: COMMIT;
step s2_compress_chunk_2:
SELECT count(compress_chunk(chunk))
FROM (SELECT chunk FROM chunks_to_compress ORDER BY 1 DESC LIMIT 1) AS chunk;

count

1
step s2_commit: COMMIT;

starting permutation: s2_compress_chunk_1 s1_drop s2_compress_chunk_2 s1_commit s2_commit
step s2_compress_chunk_1:
SELECT count(compress_chunk(chunk))
FROM (SELECT chunk FROM chunks_to_compress ORDER BY 1 ASC LIMIT 1) AS chunk;

count

1
step s1_drop:
SELECT count (*)
FROM drop_chunks('conditions', older_than => '2018-12-03 00:00'::timestamptz);
<waiting ...>
step s2_compress_chunk_2:
SELECT count(compress_chunk(chunk))
FROM (SELECT chunk FROM chunks_to_compress ORDER BY 1 DESC LIMIT 1) AS chunk;

count

1
step s1_drop: <... completed>
ERROR: some chunks could not be read since they are being concurrently updated
step s1_commit: COMMIT;
step s2_commit: COMMIT;

starting permutation: s2_compress_chunk_1 s1_drop s2_compress_chunk_2 s2_commit s1_commit
step s2_compress_chunk_1:
SELECT count(compress_chunk(chunk))
FROM (SELECT chunk FROM chunks_to_compress ORDER BY 1 ASC LIMIT 1) AS chunk;

count

1
step s1_drop:
SELECT count (*)
FROM drop_chunks('conditions', older_than => '2018-12-03 00:00'::timestamptz);
<waiting ...>
step s2_compress_chunk_2:
SELECT count(compress_chunk(chunk))
FROM (SELECT chunk FROM chunks_to_compress ORDER BY 1 DESC LIMIT 1) AS chunk;

count

1
step s2_commit: COMMIT;
step s1_drop: <... completed>
count

2
step s1_commit: COMMIT;

starting permutation: s2_compress_chunk_1 s2_compress_chunk_2 s1_drop s1_commit s2_commit
step s2_compress_chunk_1:
SELECT count(compress_chunk(chunk))
FROM (SELECT chunk FROM chunks_to_compress ORDER BY 1 ASC LIMIT 1) AS chunk;

count

1
step s2_compress_chunk_2:
SELECT count(compress_chunk(chunk))
FROM (SELECT chunk FROM chunks_to_compress ORDER BY 1 DESC LIMIT 1) AS chunk;

count

1
step s1_drop:
SELECT count (*)
FROM drop_chunks('conditions', older_than => '2018-12-03 00:00'::timestamptz);
<waiting ...>
step s1_drop: <... completed>
ERROR: some chunks could not be read since they are being concurrently updated
step s1_commit: COMMIT;
step s2_commit: COMMIT;

starting permutation: s2_compress_chunk_1 s2_compress_chunk_2 s1_drop s2_commit s1_commit
step s2_compress_chunk_1:
SELECT count(compress_chunk(chunk))
FROM (SELECT chunk FROM chunks_to_compress ORDER BY 1 ASC LIMIT 1) AS chunk;

count

1
step s2_compress_chunk_2:
SELECT count(compress_chunk(chunk))
FROM (SELECT chunk FROM chunks_to_compress ORDER BY 1 DESC LIMIT 1) AS chunk;

count

1
step s1_drop:
SELECT count (*)
FROM drop_chunks('conditions', older_than => '2018-12-03 00:00'::timestamptz);
<waiting ...>
step s2_commit: COMMIT;
step s1_drop: <... completed>
count

2
step s1_commit: COMMIT;

starting permutation: s2_compress_chunk_1 s2_compress_chunk_2 s2_commit s1_drop s1_commit
step s2_compress_chunk_1:
SELECT count(compress_chunk(chunk))
FROM (SELECT chunk FROM chunks_to_compress ORDER BY 1 ASC LIMIT 1) AS chunk;

count

1
step s2_compress_chunk_2:
SELECT count(compress_chunk(chunk))
FROM (SELECT chunk FROM chunks_to_compress ORDER BY 1 DESC LIMIT 1) AS chunk;

count

1
step s2_commit: COMMIT;
step s1_drop:
SELECT count (*)
FROM drop_chunks('conditions', older_than => '2018-12-03 00:00'::timestamptz);

count

2
step s1_commit: COMMIT;
1 change: 1 addition & 0 deletions tsl/test/isolation/specs/CMakeLists.txt
Expand Up @@ -14,6 +14,7 @@ list(APPEND TEST_FILES
continuous_aggs_insert.spec
continuous_aggs_multi.spec
continuous_aggs_concurrent_refresh.spec
deadlock_drop_chunks_compress.spec
)

if (CMAKE_BUILD_TYPE MATCHES Debug)
Expand Down