Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[DBZ-PGYB][yugabyte/yugabyte-db#21591] Support to process update records with CHANGE #106

Merged
merged 27 commits into from
Jun 25, 2024
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
7ca385f
initial commit to for PG connector parity
vaibhav-yb Mar 11, 2024
9b0aab6
addressed review comments
vaibhav-yb Mar 18, 2024
9f9854e
added file
vaibhav-yb Mar 18, 2024
ed8c457
addressed review comments
vaibhav-yb Mar 18, 2024
c6cb244
Merge branch 'ybdb-debezium-2.5.2' into ybdb-debezium-252-vaibhav
vaibhav-yb Mar 19, 2024
b9c0adc
Merge branch 'ybdb-debezium-2.5.2' into ybdb-debezium-252-vaibhav
vaibhav-yb Mar 20, 2024
f825f15
initial commit to modify record for replica identity CHANGE
vaibhav-yb Mar 20, 2024
61aff29
commit with working set of changes for snapshot and streaming
vaibhav-yb Mar 26, 2024
221277e
custom replica identity storage class
vaibhav-yb Mar 26, 2024
2a6808c
added transformer for custom structure of messages when in CHANGE rep…
vaibhav-yb Mar 26, 2024
e5aec77
cleanup of the transformer
vaibhav-yb Mar 26, 2024
a6b6418
cleanup of the transformer
vaibhav-yb Mar 26, 2024
8c58146
fixed bug for pgschemabuilder
vaibhav-yb Mar 26, 2024
9e8b868
test changes
vaibhav-yb Mar 26, 2024
afa9972
addressed review comments
vaibhav-yb Mar 28, 2024
681b118
addressed review comments
vaibhav-yb Mar 28, 2024
245dd46
Merge branch 'ybdb-debezium-2.5.2' into ybdb-debezium-252-vaibhav
vaibhav-yb Mar 28, 2024
76b4796
do not fetch replica identity while processing NOOP
vaibhav-yb Apr 2, 2024
be24a73
record structure will be transformed only with yboutput plugin now
vaibhav-yb Apr 4, 2024
6034614
resolved merge conflicts
vaibhav-yb May 21, 2024
351839d
added code to store replica identity to help process before images
vaibhav-yb May 30, 2024
87def88
added missing import
vaibhav-yb May 30, 2024
2f8eb02
Merge branch 'ybdb-debezium-2.5.2' into ybdb-debezium-252-vaibhav
vaibhav-yb Jun 11, 2024
8ca79fb
addressed review comments
vaibhav-yb Jun 24, 2024
1cc72f4
fixed issue while using yboutput plugin where publication was not bei…
vaibhav-yb Jun 25, 2024
d222229
addressed review comments
vaibhav-yb Jun 25, 2024
90435ef
addressed review comments to add tests verifying skipping of messages…
vaibhav-yb Jun 25, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
import java.util.Set;
import java.util.stream.Collectors;

import io.debezium.connector.postgresql.connection.ReplicaIdentityInfo;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.errors.ConnectException;
import org.postgresql.core.BaseConnection;
Expand Down Expand Up @@ -57,10 +59,11 @@ public class PostgresChangeRecordEmitter extends RelationalChangeRecordEmitter<P
private final PostgresConnection connection;
private final TableId tableId;
private final Map<String, Object> cachedOldToastedValues = new HashMap<>();
private final ReplicaIdentityInfo.ReplicaIdentity replicaIdentity;

public PostgresChangeRecordEmitter(PostgresPartition partition, OffsetContext offset, Clock clock, PostgresConnectorConfig connectorConfig, PostgresSchema schema,
PostgresConnection connection, TableId tableId,
ReplicationMessage message) {
ReplicationMessage message, ReplicaIdentityInfo.ReplicaIdentity replicaIdentity) {
super(partition, offset, clock, connectorConfig);

this.schema = schema;
Expand All @@ -70,6 +73,14 @@ public PostgresChangeRecordEmitter(PostgresPartition partition, OffsetContext of

this.tableId = tableId;
Objects.requireNonNull(this.tableId);

this.replicaIdentity = replicaIdentity;
}

public PostgresChangeRecordEmitter(PostgresPartition partition, OffsetContext offset, Clock clock, PostgresConnectorConfig connectorConfig, PostgresSchema schema,
PostgresConnection connection, TableId tableId,
ReplicationMessage message) {
this(partition, offset, clock, connectorConfig, schema, connection, tableId, message, ReplicaIdentityInfo.ReplicaIdentity.CHANGE);
}

@Override
Expand Down Expand Up @@ -107,6 +118,11 @@ protected Object[] getOldColumnValues() {
case CREATE:
return null;
case UPDATE:
// YB Note: For replica identity CHANGE, there is no old column value available.
if (replicaIdentity == ReplicaIdentityInfo.ReplicaIdentity.CHANGE) {
return null;
}

vaibhav-yb marked this conversation as resolved.
Show resolved Hide resolved
return columnValues(message.getOldTupleList(), tableId, true, true, true);
default:
return columnValues(message.getOldTupleList(), tableId, true, false, true);
Expand Down Expand Up @@ -151,7 +167,7 @@ private DataCollectionSchema synchronizeTableSchema(DataCollectionSchema tableSc
return schema.schemaFor(tableId);
}

private Object[] columnValues(List<ReplicationMessage.Column> columns, TableId tableId, boolean refreshSchemaIfChanged,
protected Object[] columnValues(List<ReplicationMessage.Column> columns, TableId tableId, boolean refreshSchemaIfChanged,
boolean sourceOfToasted, boolean oldValues)
throws SQLException {
if (columns == null || columns.isEmpty()) {
Expand Down Expand Up @@ -188,12 +204,62 @@ private Object[] columnValues(List<ReplicationMessage.Column> columns, TableId t
}
}
}
values[position] = value;

if (replicaIdentity == ReplicaIdentityInfo.ReplicaIdentity.CHANGE) {
if (!UnchangedToastedReplicationMessageColumn.isUnchangedToastedValue(value)) {
values[position] = new Object[]{value, Boolean.TRUE};
}
vaibhav-yb marked this conversation as resolved.
Show resolved Hide resolved
} else {
LOGGER.info("Replica identity is NOT CHANGE");
values[position] = value;
}
}
}
return values;
}

@Override
protected void emitUpdateRecord(Receiver<PostgresPartition> receiver, TableSchema tableSchema) throws InterruptedException {
Object[] oldColumnValues = getOldColumnValues();
Object[] newColumnValues = getNewColumnValues();

Struct oldKey = tableSchema.keyFromColumnData(oldColumnValues);
Struct newKey = tableSchema.keyFromColumnData(newColumnValues);

Struct newValue = tableSchema.valueFromColumnData(newColumnValues);
Struct oldValue = tableSchema.valueFromColumnData(oldColumnValues);

if (skipEmptyMessages() && (newColumnValues == null || newColumnValues.length == 0)) {
LOGGER.debug("no new values found for table '{}' from update message at '{}'; skipping record", tableSchema, getOffset().getSourceInfo());
return;
}

/*
* If skip.messages.without.change is configured true,
* Skip Publishing the message in case there is no change in monitored columns
* (Postgres) Only works if REPLICA IDENTITY is set to FULL - as oldValues won't be available
*/
if (skipMessagesWithoutChange() && Objects.nonNull(newValue) && newValue.equals(oldValue)) {
vaibhav-yb marked this conversation as resolved.
Show resolved Hide resolved
LOGGER.debug("No new values found for table '{}' in included columns from update message at '{}'; skipping record", tableSchema,
getOffset().getSourceInfo());
return;
}
// some configurations does not provide old values in case of updates
// in this case we handle all updates as regular ones

// YB Note: If replica identity is change, one hack is that we always know there will be no
// oldKey present so we should simply go ahead with this block. Also, oldKey would be null
// at this stage if replica identity is CHANGE
if (oldKey == null || replicaIdentity == ReplicaIdentityInfo.ReplicaIdentity.CHANGE || Objects.equals(oldKey, newKey)) {
Struct envelope = tableSchema.getEnvelopeSchema().update(oldValue, newValue, getOffset().getSourceInfo(), getClock().currentTimeAsInstant());
receiver.changeRecord(getPartition(), tableSchema, Operation.UPDATE, newKey, envelope, getOffset(), null);
}
// PK update -> emit as delete and re-insert with new key
else {
emitUpdateAsPrimaryKeyChangeRecord(receiver, tableSchema, oldKey, newKey, oldValue, newValue);
}
}

private int getPosition(String columnName, Table table, Object[] values) {
final Column tableColumn = table.columnWithName(columnName);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -644,6 +644,11 @@ public static AutoCreateMode parse(String value, String defaultValue) {
"FULL - Records the old values of all columns in the row." +
"NOTHING - Records no information about the old row. This is the default for system tables.");

public static final Field REPLICA_IDENTITY = Field.create("replica.identity")
vaibhav-yb marked this conversation as resolved.
Show resolved Hide resolved
.withDisplayName("Internal: Replica Identity")
.withType(Type.STRING)
.withImportance(Importance.LOW);

public static final Field STREAM_PARAMS = Field.create("slot.stream.params")
.withDisplayName("Optional parameters to pass to the logical decoder when the stream is started.")
.withType(Type.STRING)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,9 +66,7 @@ protected PostgresSchema(PostgresConnectorConfig config, PostgresDefaultValueCon

private static TableSchemaBuilder getTableSchemaBuilder(PostgresConnectorConfig config, PostgresValueConverter valueConverter,
PostgresDefaultValueConverter defaultValueConverter) {
return new TableSchemaBuilder(valueConverter, defaultValueConverter, config.schemaNameAdjuster(),
config.customConverterRegistry(), config.getSourceInfoStructMaker().schema(),
config.getFieldNamer(), false);
return new PGTableSchemaBuilder(valueConverter, defaultValueConverter, config, false /* multiPartitionMode */);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,18 @@

vaibhav-yb marked this conversation as resolved.
Show resolved Hide resolved
import java.sql.SQLException;
import java.time.Duration;
import java.time.Instant;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;

import io.debezium.connector.postgresql.connection.ReplicaIdentityInfo;
import io.debezium.connector.postgresql.connection.YBReplicaIdentity;
import io.debezium.connector.postgresql.connection.pgoutput.PgOutputReplicationMessage;
import io.debezium.pipeline.spi.ChangeRecordEmitter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -46,6 +52,7 @@ public class PostgresSnapshotChangeEventSource extends RelationalSnapshotChangeE
private final Snapshotter blockingSnapshotter;
private final SlotCreationResult slotCreatedInfo;
private final SlotState startingSlotInfo;
private final Map<TableId, YBReplicaIdentity> replicaIdentityMap;

public PostgresSnapshotChangeEventSource(PostgresConnectorConfig connectorConfig, Snapshotter snapshotter,
MainConnectionProvidingConnectionFactory<PostgresConnection> connectionFactory, PostgresSchema schema,
Expand All @@ -60,6 +67,7 @@ public PostgresSnapshotChangeEventSource(PostgresConnectorConfig connectorConfig
this.slotCreatedInfo = slotCreatedInfo;
this.startingSlotInfo = startingSlotInfo;
this.blockingSnapshotter = new AlwaysSnapshotter();
this.replicaIdentityMap = new ConcurrentHashMap<>();
}

@Override
Expand Down Expand Up @@ -88,6 +96,22 @@ protected SnapshotContext<PostgresPartition, PostgresOffsetContext> prepare(Post
return new PostgresSnapshotContext(partition, connectorConfig.databaseName(), onDemand);
}

@Override
protected ChangeRecordEmitter<PostgresPartition> getChangeRecordEmitter(
PostgresPartition partition, PostgresOffsetContext offset, TableId tableId, Object[] row,
Instant timestamp) {
offset.event(tableId, timestamp);

YBReplicaIdentity ybReplicaIdentity = replicaIdentityMap.get(tableId);
if (ybReplicaIdentity == null) {
ybReplicaIdentity = new YBReplicaIdentity(connectorConfig, tableId);
replicaIdentityMap.put(tableId, ybReplicaIdentity);
}

return new YBSnapshotChangeRecordEmitter<>(partition, offset, row, getClock(),
vaibhav-yb marked this conversation as resolved.
Show resolved Hide resolved
connectorConfig, ybReplicaIdentity.getReplicaIdentity());
}

@Override
protected void connectionCreated(RelationalSnapshotContext<PostgresPartition, PostgresOffsetContext> snapshotContext)
throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,22 +9,17 @@
import java.util.Map;
import java.util.Objects;
import java.util.OptionalLong;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicReference;

import io.debezium.connector.postgresql.connection.*;
import org.apache.kafka.connect.errors.ConnectException;
import org.postgresql.core.BaseConnection;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import io.debezium.DebeziumException;
import io.debezium.connector.postgresql.connection.LogicalDecodingMessage;
import io.debezium.connector.postgresql.connection.Lsn;
import io.debezium.connector.postgresql.connection.PostgresConnection;
import io.debezium.connector.postgresql.connection.ReplicationConnection;
import io.debezium.connector.postgresql.connection.ReplicationMessage;
import io.debezium.connector.postgresql.connection.ReplicationMessage.Operation;
import io.debezium.connector.postgresql.connection.ReplicationStream;
vaibhav-yb marked this conversation as resolved.
Show resolved Hide resolved
import io.debezium.connector.postgresql.connection.WalPositionLocator;
import io.debezium.connector.postgresql.spi.Snapshotter;
import io.debezium.heartbeat.Heartbeat;
import io.debezium.pipeline.ErrorHandler;
Expand Down Expand Up @@ -81,6 +76,7 @@ public class PostgresStreamingChangeEventSource implements StreamingChangeEventS
private long numberOfEventsSinceLastEventSentOrWalGrowingWarning = 0;
private Lsn lastCompletelyProcessedLsn;
private PostgresOffsetContext effectiveOffset;
private final Map<TableId, YBReplicaIdentity> replicaIdentityMap;

public PostgresStreamingChangeEventSource(PostgresConnectorConfig connectorConfig, Snapshotter snapshotter,
PostgresConnection connection, PostgresEventDispatcher<TableId> dispatcher, ErrorHandler errorHandler, Clock clock,
Expand All @@ -96,7 +92,7 @@ public PostgresStreamingChangeEventSource(PostgresConnectorConfig connectorConfi
this.snapshotter = snapshotter;
this.replicationConnection = replicationConnection;
this.connectionProbeTimer = ElapsedTimeStrategy.constant(Clock.system(), connectorConfig.statusUpdateInterval());

this.replicaIdentityMap = new ConcurrentHashMap<>();
}

@Override
Expand Down Expand Up @@ -316,6 +312,15 @@ else if (message.getOperation() == Operation.MESSAGE) {
tableId,
message.getOperation());

LOGGER.info("Received record of type {}", message.getOperation());

// YB Note: Get the cached replica identity.
YBReplicaIdentity ybReplicaIdentity = replicaIdentityMap.get(tableId);
if (ybReplicaIdentity == null) {
ybReplicaIdentity = new YBReplicaIdentity(connectorConfig, tableId);
replicaIdentityMap.put(tableId, ybReplicaIdentity);
}

boolean dispatched = message.getOperation() != Operation.NOOP && dispatcher.dispatchDataChangeEvent(
partition,
tableId,
Expand All @@ -327,7 +332,8 @@ else if (message.getOperation() == Operation.MESSAGE) {
schema,
connection,
tableId,
message));
message,
ybReplicaIdentity.getReplicaIdentity()));

maybeWarnAboutGrowingWalBacklog(dispatched);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package io.debezium.connector.postgresql;

import io.debezium.connector.postgresql.connection.ReplicaIdentityInfo;
import io.debezium.data.Envelope;
import io.debezium.pipeline.spi.OffsetContext;
import io.debezium.relational.RelationalChangeRecordEmitter;
import io.debezium.relational.RelationalDatabaseConnectorConfig;
import io.debezium.util.Clock;

/**
* Custom snapshot change record emitter for YugabyteDB which forms column values object based on
vaibhav-yb marked this conversation as resolved.
Show resolved Hide resolved
* the replica identity type
* @param <P> instance of {@link io.debezium.pipeline.spi.Partition}
* @author Vaibhav Kushwaha (vkushwaha@yugabyte.com)
*/
public class YBSnapshotChangeRecordEmitter<P extends PostgresPartition> extends RelationalChangeRecordEmitter<P> {
private final Object[] row;
private final ReplicaIdentityInfo.ReplicaIdentity replicaIdentity;

public YBSnapshotChangeRecordEmitter(P partition, OffsetContext offset, Object[] row, Clock clock, RelationalDatabaseConnectorConfig connectorConfig,
ReplicaIdentityInfo.ReplicaIdentity replicaIdentity) {
super(partition, offset, clock, connectorConfig);

this.row = row;
this.replicaIdentity = replicaIdentity;
}

@Override
public Envelope.Operation getOperation() {
return Envelope.Operation.READ;
}

@Override
protected Object[] getOldColumnValues() {
throw new UnsupportedOperationException("Can't get old row values for READ record");
}

@Override
protected Object[] getNewColumnValues() {
Object[] values = new Object[row.length];

for (int position = 0; position < values.length; ++position) {
if (replicaIdentity == ReplicaIdentityInfo.ReplicaIdentity.CHANGE) {
values[position] = new Object[]{row[position], Boolean.TRUE};
} else {
values[position] = row[position];
}
}

return values;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ public class PostgresConnection extends JdbcConnection {
public static final String CONNECTION_VALIDATE_CONNECTION = "Debezium Validate Connection";
public static final String CONNECTION_HEARTBEAT = "Debezium Heartbeat";
public static final String CONNECTION_GENERAL = "Debezium General";
public static final String CONNECTION_FETCH_REPLICA_IDENTITY = "Debezium YB Fetch Replica Identity";

private static final Pattern FUNCTION_DEFAULT_PATTERN = Pattern.compile("^[(]?[A-Za-z0-9_.]+\\((?:.+(?:, ?.+)*)?\\)");
private static final Pattern EXPRESSION_DEFAULT_PATTERN = Pattern.compile("\\(+(?:.+(?:[+ - * / < > = ~ ! @ # % ^ & | ` ?] ?.+)+)+\\)");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,8 @@ public enum ReplicaIdentity {
FULL("UPDATE AND DELETE events will contain the previous values of all the columns"),
DEFAULT("UPDATE and DELETE events will contain previous values only for PK columns"),
INDEX("UPDATE and DELETE events will contain previous values only for columns present in the REPLICA IDENTITY index"),
UNKNOWN("Unknown REPLICA IDENTITY");
UNKNOWN("Unknown REPLICA IDENTITY"),
CHANGE("UPDATE events will contain values only for changed columns");
vaibhav-yb marked this conversation as resolved.
Show resolved Hide resolved

private final String description;

Expand All @@ -77,6 +78,8 @@ static ReplicaIdentityInfo.ReplicaIdentity parseFromDB(String s) {
return INDEX;
case "f":
return FULL;
case "c":
return CHANGE;
default:
return UNKNOWN;
}
Expand Down
Loading