-
Notifications
You must be signed in to change notification settings - Fork 848
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix leak during concurrent UPDATE/DELETE #5675
Conversation
@lkshminarayanan, @shhnwz: please review this pull request.
|
Codecov Report
@@ Coverage Diff @@
## main #5675 +/- ##
==========================================
- Coverage 90.99% 90.97% -0.03%
==========================================
Files 230 230
Lines 54573 54579 +6
==========================================
- Hits 49659 49653 -6
- Misses 4914 4926 +12
... and 7 files with indirect coverage changes 📣 We’re building smart automated test selection to slash your CI/CD build times. Learn more |
I've already noticed this issue without even a test in this comment however ; the PR was merged regardless of my comments or followup being created ?!@! I think we have serious flaws in answering comments/acting on them... |
tsl/src/compression/compression.c
Outdated
@@ -2322,16 +2321,22 @@ decompress_batches(RowDecompressor *decompressor, ScanKeyData *scankeys, int num | |||
if (result == TM_Updated || result == TM_Deleted) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we have some documentation about the return value contract
of this method?
That might be important; as there are callsites relying on it - see my previous comment in the other PR for an example usage
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have documented it like this:
/*
* This method will:
* 1.scan compressed chunk
* 2.decompress the row
* 3.delete this row from compressed chunk
* 4.insert decompressed rows to uncompressed chunk
*
* Return value:
* if all 4 steps defined above pass set chunk_status_changed to true and return true
* if step 3 fails return false. Step 3 will fail if there are conflicting concurrent operations on same chunk.
*/
static bool
decompress_batches(RowDecompressor *decompressor, ScanKeyData *scankeys, int num_scankeys,
Bitmapset *null_columns, List *is_nulls, bool *chunk_status_changed)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that's just a comment; about the operation and nothing to do with documentation
I don't think that could be accepted as a contract - it just exposes the complexity of the method...(leaving most likely the current implementation the only one which could satisfy that)
I think it should be some clear-as-day statement like:
Decompresses all rows satisfying the provided scankeys
It will return true if it have decompressed matching rows
tsl/src/compression/compression.c
Outdated
table_endscan(heapScan); | ||
return false; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this seems like unreachable code to me - isn't it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For Repeatable Read Isolation level this code gets executed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Lets put the table_endscan before the error and I suppose we don't need the return statements.
My bad, will take care of it from now. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Lets add a bit documentation on the cases we are handling here.
tsl/src/compression/compression.c
Outdated
continue; | ||
} | ||
break; | ||
case TM_BeingModified: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
that's nonsense....if it can't happen - why handle it at all?
tsl/src/compression/compression.c
Outdated
continue; | ||
} | ||
break; | ||
case TM_BeingModified: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
case TM_BeingModified: | |
/* TM_BeingModified cannot happen since we always wait for locks here | |
* TM_SelfModified should never happen | |
*/ | |
case TM_BeingModified: |
tsl/src/compression/compression.c
Outdated
continue; | ||
} | ||
break; | ||
case TM_BeingModified: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
that's nonsense....if it can't happen - why handle it at all?
When updating and deleting the same tuple while both transactions are running at the same time, we end up with reference leak. This is because one of the query in a transaction fails and we take error path. However we fail to close the table. This patch fixes the above mentioned problem by closing the required tables. Fixes timescale#5674
* 4.insert decompressed rows to uncompressed chunk | ||
* | ||
* Return value: | ||
* if all 4 steps defined above pass set chunk_status_changed to true and return true |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the following is true:
- the method will return true iff it processed all candidate rows
chunk_status_changed
will be true iff there are rows decompressed
note:
because it only return true or an error right now - I think chunk_status_changed
is not necessary anymore...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM!
It's up to you but I'm in favor of some separation of concerns in the code to make it easier to understand.
My main concern where is that we have a while
loop and inside a for
with some specific logic to skip tuples and then we have a switch
with another specific logic and both logic decides if we continue
the while
and all this mix of continue
and break
in different contexts make the code a bit confusing so my suggestion below is to extract both logics into small check functions, something like:
diff --git a/tsl/src/compression/compression.c b/tsl/src/compression/compression.c
index f11359898..512fdde10 100644
--- a/tsl/src/compression/compression.c
+++ b/tsl/src/compression/compression.c
@@ -2261,6 +2261,78 @@ build_update_delete_scankeys(RowDecompressor *decompressor, List *filters, int *
return scankeys;
}
+static bool
+decompress_batches_skip_tuple(HeapTuple compressed_tuple, TupleDesc in_desc,
+ Bitmapset *null_columns, List *is_nulls)
+{
+ bool skip_tuple = false;
+ int attrno = bms_next_member(null_columns, -1);
+ int pos = 0;
+ bool is_null_condition = 0;
+ bool seg_col_is_null = false;
+ for (; attrno >= 0; attrno = bms_next_member(null_columns, attrno))
+ {
+ is_null_condition = list_nth_int(is_nulls, pos);
+ seg_col_is_null = heap_attisnull(compressed_tuple, attrno, in_desc);
+ if ((seg_col_is_null && !is_null_condition) || (!seg_col_is_null && is_null_condition))
+ {
+ /*
+ * if segment by column in the scanned tuple has non null value
+ * and IS NULL is specified, OR segment by column has null value
+ * and IS NOT NULL is specified then skip this tuple
+ */
+ skip_tuple = true;
+ break;
+ }
+ pos++;
+ }
+
+ return skip_tuple;
+}
+
+static bool
+decompress_batches_tuple_deleted(TM_Result result, TableScanDesc heapScan)
+{
+ bool tuple_deleted = false;
+
+ switch (result)
+ {
+ /* If the tuple has been already deleted, most likely somebody
+ * decompressed the tuple already */
+ case TM_Deleted:
+ if (IsolationUsesXactSnapshot())
+ {
+ /* For Repeatable Read isolation level report error */
+ table_endscan(heapScan);
+ ereport(ERROR,
+ (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
+ errmsg("could not serialize access due to concurrent update")));
+ }
+ break;
+ /*
+ * If another transaction is updating the compressed data,
+ * we have to abort the transaction to keep consistency.
+ */
+ case TM_Updated:
+ table_endscan(heapScan);
+ elog(ERROR, "tuple concurrently updated");
+ break;
+ case TM_Invisible:
+ table_endscan(heapScan);
+ elog(ERROR, "attempted to lock invisible tuple");
+ break;
+ case TM_Ok:
+ tuple_deleted = true;
+ break;
+ default:
+ table_endscan(heapScan);
+ elog(ERROR, "unexpected tuple operation result: %d", result);
+ break;
+ }
+
+ return tuple_deleted;
+}
+
/*
* This method will:
* 1.scan compressed chunk
@@ -2282,38 +2354,23 @@ decompress_batches(RowDecompressor *decompressor, ScanKeyData *scankeys, int num
TableScanDesc heapScan =
table_beginscan(decompressor->in_rel, snapshot, num_scankeys, scankeys);
+
while ((compressed_tuple = heap_getnext(heapScan, ForwardScanDirection)) != NULL)
{
- bool skip_tuple = false;
- int attrno = bms_next_member(null_columns, -1);
- int pos = 0;
- bool is_null_condition = 0;
- bool seg_col_is_null = false;
- for (; attrno >= 0; attrno = bms_next_member(null_columns, attrno))
- {
- is_null_condition = list_nth_int(is_nulls, pos);
- seg_col_is_null = heap_attisnull(compressed_tuple, attrno, decompressor->in_desc);
- if ((seg_col_is_null && !is_null_condition) || (!seg_col_is_null && is_null_condition))
- {
- /*
- * if segment by column in the scanned tuple has non null value
- * and IS NULL is specified, OR segment by column has null value
- * and IS NOT NULL is specified then skip this tuple
- */
- skip_tuple = true;
- break;
- }
- pos++;
- }
- if (skip_tuple)
+ TM_FailureData tmfd;
+ TM_Result result;
+
+ if (decompress_batches_skip_tuple(compressed_tuple,
+ decompressor->in_desc,
+ null_columns,
+ is_nulls))
continue;
+
heap_deform_tuple(compressed_tuple,
decompressor->in_desc,
decompressor->compressed_datums,
decompressor->compressed_is_nulls);
- TM_FailureData tmfd;
- TM_Result result;
result = table_tuple_delete(decompressor->in_rel,
&compressed_tuple->t_self,
decompressor->mycid,
@@ -2323,54 +2380,17 @@ decompress_batches(RowDecompressor *decompressor, ScanKeyData *scankeys, int num
&tmfd,
false);
- switch (result)
- {
- /* If the tuple has been already deleted, most likely somebody
- * decompressed the tuple already */
- case TM_Deleted:
- {
- if (IsolationUsesXactSnapshot())
- {
- /* For Repeatable Read isolation level report error */
- table_endscan(heapScan);
- ereport(ERROR,
- (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
- errmsg("could not serialize access due to concurrent update")));
- }
- continue;
- }
- break;
- /*
- * If another transaction is updating the compressed data,
- * we have to abort the transaction to keep consistency.
- */
- case TM_Updated:
- {
- table_endscan(heapScan);
- elog(ERROR, "tuple concurrently updated");
- }
- break;
- case TM_Invisible:
- {
- table_endscan(heapScan);
- elog(ERROR, "attempted to lock invisible tuple");
- }
- break;
- case TM_Ok:
- break;
- default:
- {
- table_endscan(heapScan);
- elog(ERROR, "unexpected tuple operation result: %d", result);
- }
- break;
- }
+ if (!decompress_batches_tuple_deleted(result, heapScan))
+ continue;
+
row_decompressor_decompress_row(decompressor, NULL);
*chunk_status_changed = true;
}
+
if (scankeys)
pfree(scankeys);
table_endscan(heapScan);
+
return true;
}
Thanks Fabrizio for this comment. As you mentioned, this function seem more confusing and likely error prone in future. Its the same concern even Zoltan has and we plan to open a new issue to refactor this code and do some cleanup as well as part of the new PR. |
Automated backport to 2.10.x not done: cherry-pick failed. Git status
|
@sb230132 can you create the refactor issue and link to this PR for posterity? Thanks. |
When updating and deleting the same tuple while both transactions are running at the same time, we end up with reference leak. This is because one of the query in a transaction fails and we take error path. However we fail to close the table.
This patch fixes the above mentioned problem by closing the required tables.
Fixes #5674
Disable-check: force-changelog-changed