Skip to content

Commit

Permalink
[BACKPORT 2.16][#15064/#15033] CDCSDK: Restrict compaction only for C…
Browse files Browse the repository at this point in the history
…DC snapshot mode and for before image type

Summary:
"Original commit:
3c7146e/D21213
876753e/D21246"
During code analysis, we observed that compaction is by default restricted for the active CDC stream using the //cdc_sdk_safe_time//, which really does not require all the features of the CDC and It will unnecessarily consume disk space as well as impact the performance.
 So in this diff, we will only restrict the compaction:-
1. During snapshot operation.
2. when Before image is enabled for the stream.

We have added a few unit test cases around it.

[#15063]CDCSDK: Alter table support is broken with packed row

During the unit test case failure analysis we observed that for packed row decoding CDC need the corresponding schema and schema version in the //SchemaPackingStorage// object so that in the later stage it can decode the packed row in //PopulatePackedRows// method. but in the method //PopulateCDCSDKWriteRecord//, it is always taking the latest schema version, causing this issue. to handle this we will then pass the corresponding schema version, that CDC is collected from the system catalog table to //PopulateCDCSDKWriteRecord// method

Test Plan:
./yb_build.sh --cxx-test integration-tests_cdcsdk_ysql-test --gtest_filter CDCSDKYsqlTest.TestCompactionWithSnapshotAndNoBeforeImage
./yb_build.sh --cxx-test integration-tests_cdcsdk_ysql-test --gtest_filter CDCSDKYsqlTest.TestExpiredStreamWithCompaction
./yb_build.sh --cxx-test integration-tests_cdcsdk_ysql-test --gtest_filter CDCSDKYsqlTest.TestCompactionWithoutBeforeImage

Running all the alter table unit testcases with FLAGS_ysql_enable_packed_row as true

Reviewers: srangavajjula, skumar, abharadwaj

Reviewed By: abharadwaj

Subscribers: ycdcxcluster

Differential Revision: https://phabricator.dev.yugabyte.com/D21275
  • Loading branch information
sureshdash2022-yb committed Nov 22, 2022
1 parent 72f9087 commit f9a0dff
Show file tree
Hide file tree
Showing 3 changed files with 2,718 additions and 2,384 deletions.
4 changes: 3 additions & 1 deletion ent/src/yb/cdc/cdc_service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1722,7 +1722,9 @@ void CDCServiceImpl::GetChanges(
UpdateCheckpointAndActiveTime(
producer_tablet, OpId::FromPB(resp->checkpoint().op_id()), op_id, session,
last_record_hybrid_time, record.source_type, false,
HybridTime::FromPB(resp->safe_hybrid_time())),
record.record_type == CDCRecordType::ALL
? HybridTime::FromPB(resp->safe_hybrid_time())
: HybridTime::kInvalid),
resp->mutable_error(), CDCErrorPB::INTERNAL_ERROR, context);
}

Expand Down
7 changes: 4 additions & 3 deletions ent/src/yb/cdc/cdcsdk_producer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -582,7 +582,8 @@ Status PopulateCDCSDKWriteRecord(
const EnumOidLabelMap& enum_oid_label_map,
const CompositeAttsMap& composite_atts_map,
GetChangesResponsePB* resp,
const Schema& current_schema) {
const Schema& current_schema,
const SchemaVersion current_schema_version) {
auto tablet_ptr = VERIFY_RESULT(tablet_peer->shared_tablet_safe());
const auto& batch = msg->write().write_batch();
CDCSDKProtoRecordPB* proto_record = nullptr;
Expand All @@ -593,7 +594,7 @@ Status PopulateCDCSDKWriteRecord(
// We'll use DocDB key hash to identify the records that belong to the same row.
Slice prev_key;
Schema schema = current_schema;
SchemaVersion schema_version = tablet_ptr->metadata()->schema_version();
SchemaVersion schema_version = current_schema_version;
std::string table_name = tablet_ptr->metadata()->table_name();
SchemaPackingStorage schema_packing_storage;
schema_packing_storage.AddSchema(schema_version, schema);
Expand Down Expand Up @@ -1358,7 +1359,7 @@ Status GetChangesForCDCSDK(
if (!batch.has_transaction()) {
RETURN_NOT_OK(PopulateCDCSDKWriteRecord(
msg, stream_metadata, tablet_peer, enum_oid_label_map, composite_atts_map, resp,
current_schema));
current_schema, *cached_schema_version));

SetCheckpoint(
msg->id().term(), msg->id().index(), 0, "", 0, &checkpoint, last_streamed_op_id);
Expand Down

0 comments on commit f9a0dff

Please sign in to comment.