Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix leak during concurrent UPDATE/DELETE #5675

Merged
merged 1 commit into from
May 12, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
62 changes: 48 additions & 14 deletions tsl/src/compression/compression.c
Original file line number Diff line number Diff line change
Expand Up @@ -2265,8 +2265,13 @@
* This method will:
* 1.scan compressed chunk
* 2.decompress the row
* 3.insert decompressed rows to uncompressed chunk
* 4.delete this row from compressed chunk
* 3.delete this row from compressed chunk
* 4.insert decompressed rows to uncompressed chunk
*
* Return value:
* if all 4 steps defined above pass set chunk_status_changed to true and return true
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the following is true:

  • the method will return true iff it processed all candidate rows
  • chunk_status_changed will be true iff there are rows decompressed

note:
because it only return true or an error right now - I think chunk_status_changed is not necessary anymore...

* if step 4 fails return false. Step 3 will fail if there are conflicting concurrent operations on
* same chunk.
*/
static bool
decompress_batches(RowDecompressor *decompressor, ScanKeyData *scankeys, int num_scankeys,
Expand Down Expand Up @@ -2307,7 +2312,6 @@
decompressor->compressed_datums,
decompressor->compressed_is_nulls);

row_decompressor_decompress_row(decompressor, NULL);
TM_FailureData tmfd;
TM_Result result;
result = table_tuple_delete(decompressor->in_rel,
Expand All @@ -2319,19 +2323,49 @@
&tmfd,
false);

if (result == TM_Updated || result == TM_Deleted)
{
if (IsolationUsesXactSnapshot())
ereport(ERROR,
(errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
errmsg("could not serialize access due to concurrent update")));
return false;
}
if (result == TM_Invisible)
switch (result)
{
elog(ERROR, "attempted to lock invisible tuple");
return false;
/* If the tuple has been already deleted, most likely somebody
* decompressed the tuple already */
case TM_Deleted:
kgyrtkirk marked this conversation as resolved.
Show resolved Hide resolved
{
if (IsolationUsesXactSnapshot())
{
/* For Repeatable Read isolation level report error */
table_endscan(heapScan);
ereport(ERROR,
(errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
errmsg("could not serialize access due to concurrent update")));
}
continue;
}
break;
/*
* If another transaction is updating the compressed data,
* we have to abort the transaction to keep consistency.
*/
case TM_Updated:

Check warning on line 2347 in tsl/src/compression/compression.c

View check run for this annotation

Codecov / codecov/patch

tsl/src/compression/compression.c#L2347

Added line #L2347 was not covered by tests
{
table_endscan(heapScan);
elog(ERROR, "tuple concurrently updated");

Check warning on line 2350 in tsl/src/compression/compression.c

View check run for this annotation

Codecov / codecov/patch

tsl/src/compression/compression.c#L2349-L2350

Added lines #L2349 - L2350 were not covered by tests
}
break;
case TM_Invisible:

Check warning on line 2353 in tsl/src/compression/compression.c

View check run for this annotation

Codecov / codecov/patch

tsl/src/compression/compression.c#L2353

Added line #L2353 was not covered by tests
{
table_endscan(heapScan);
elog(ERROR, "attempted to lock invisible tuple");

Check warning on line 2356 in tsl/src/compression/compression.c

View check run for this annotation

Codecov / codecov/patch

tsl/src/compression/compression.c#L2355-L2356

Added lines #L2355 - L2356 were not covered by tests
}
break;
case TM_Ok:
break;
default:

Check warning on line 2361 in tsl/src/compression/compression.c

View check run for this annotation

Codecov / codecov/patch

tsl/src/compression/compression.c#L2361

Added line #L2361 was not covered by tests
{
table_endscan(heapScan);
elog(ERROR, "unexpected tuple operation result: %d", result);

Check warning on line 2364 in tsl/src/compression/compression.c

View check run for this annotation

Codecov / codecov/patch

tsl/src/compression/compression.c#L2363-L2364

Added lines #L2363 - L2364 were not covered by tests
}
break;
}
row_decompressor_decompress_row(decompressor, NULL);
*chunk_status_changed = true;
}
if (scankeys)
Expand Down