Skip to content

Commit

Permalink
DBZ-4276 Unify unavailable value placeholder handling
Browse files Browse the repository at this point in the history
Deprecate the `toasted.value.placeholder` option in PostgreSQL in favor
of the new `unavailable.value.placeholder` option that is shared by all
relational connectors.
  • Loading branch information
Naros authored and gunnarmorling committed Nov 16, 2021
1 parent 0fbfd88 commit ca17352
Show file tree
Hide file tree
Showing 4 changed files with 49 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,6 @@ public class OracleConnectorConfig extends HistorizedRelationalDatabaseConnector

protected final static Duration ARCHIVE_LOG_ONLY_POLL_TIME = Duration.ofMillis(10_000);

protected final static String DEFAULT_UNAVAILABLE_VALUE_PLACEHOLDER = "__debezium_unavailable_value";

public static final Field PORT = RelationalDatabaseConnectorConfig.PORT
.withDefault(DEFAULT_PORT);

Expand Down Expand Up @@ -345,15 +343,6 @@ public class OracleConnectorConfig extends HistorizedRelationalDatabaseConnector
"bigger than log.mining.scn.gap.detection.gap.size.min, and the time difference of current SCN and previous end SCN is smaller than " +
" this value, consider it a SCN gap.");

public static final Field UNAVAILABLE_VALUE_PLACEHOLDER = Field.create("unavailable.value.placeholder")
.withDisplayName("Unavailable value placeholder")
.withType(Type.STRING)
.withWidth(Width.MEDIUM)
.withDefault(DEFAULT_UNAVAILABLE_VALUE_PLACEHOLDER)
.withImportance(Importance.MEDIUM)
.withDescription("Specify the constant that will be provided by Debezium to indicate that " +
"the original value is unavailable and not provided by the database.");

private static final ConfigDefinition CONFIG_DEFINITION = HistorizedRelationalDatabaseConnectorConfig.CONFIG_DEFINITION.edit()
.name("Oracle")
.excluding(
Expand Down Expand Up @@ -381,7 +370,6 @@ public class OracleConnectorConfig extends HistorizedRelationalDatabaseConnector
SNAPSHOT_ENHANCEMENT_TOKEN,
SNAPSHOT_LOCKING_MODE,
RAC_NODES,
UNAVAILABLE_VALUE_PLACEHOLDER,
LOG_MINING_ARCHIVE_LOG_HOURS,
LOG_MINING_BATCH_SIZE_DEFAULT,
LOG_MINING_BATCH_SIZE_MIN,
Expand Down Expand Up @@ -426,7 +414,6 @@ public static ConfigDef configDef() {
private final StreamingAdapter streamingAdapter;
private final String snapshotEnhancementToken;
private final SnapshotLockingMode snapshotLockingMode;
private final String unavailableValuePlaceholder;

// LogMiner options
private final LogMiningStrategy logMiningStrategy;
Expand Down Expand Up @@ -465,7 +452,6 @@ public OracleConnectorConfig(Configuration config) {
this.snapshotEnhancementToken = config.getString(SNAPSHOT_ENHANCEMENT_TOKEN);
this.connectorAdapter = ConnectorAdapter.parse(config.getString(CONNECTOR_ADAPTER));
this.snapshotLockingMode = SnapshotLockingMode.parse(config.getString(SNAPSHOT_LOCKING_MODE), SNAPSHOT_LOCKING_MODE.defaultValueAsString());
this.unavailableValuePlaceholder = config.getString(UNAVAILABLE_VALUE_PLACEHOLDER);
this.lobEnabled = config.getBoolean(LOB_ENABLED);

this.streamingAdapter = this.connectorAdapter.getInstance(this);
Expand Down Expand Up @@ -934,13 +920,6 @@ public Set<String> getRacNodes() {
return racNodes;
}

/**
* @return the unavailable value place holder
*/
public String getUnavailableValuePlaceholder() {
return unavailableValuePlaceholder;
}

/**
* @return String token to replace
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -984,16 +984,24 @@ public static AutoCreateMode parse(String value, String defaultValue) {
"The default is set to 0 ms, which disables tracking xmin.")
.withValidation(Field::isNonNegativeLong);

@Deprecated
public static final Field TOASTED_VALUE_PLACEHOLDER = Field.create("toasted.value.placeholder")
.withDisplayName("Toasted value placeholder")
.withType(Type.STRING)
.withGroup(Field.createGroupEntry(Field.Group.CONNECTOR_ADVANCED, 2))
.withWidth(Width.MEDIUM)
.withDefault("__debezium_unavailable_value")
.withDefault(DEFAULT_UNAVAILABLE_VALUE_PLACEHOLDER)
.withImportance(Importance.MEDIUM)
.withValidation(PostgresConnectorConfig::validateToastedValuePlaceholder)
.withDescription("Specify the constant that will be provided by Debezium to indicate that " +
"the original value is a toasted value not provided by the database. " +
"If starts with 'hex:' prefix it is expected that the rest of the string repesents hexadecimally encoded octets.");
"If starts with 'hex:' prefix it is expected that the rest of the string represents hexadecimal encoded octets." +
"Deprecated, use 'unavailable.value.placeholder' instead.");

public static final Field UNAVAILABLE_VALUE_PLACEHOLDER = RelationalDatabaseConnectorConfig.UNAVAILABLE_VALUE_PLACEHOLDER
.withDescription("Specify the constant that will be provided by Debezium to indicate that " +
"the original value is a toasted value not provided by the database. " +
"If starts with 'hex:' prefix it is expected that the rest of the string represents hexadecimal encoded octets.");

private final TruncateHandlingMode truncateHandlingMode;
private final HStoreHandlingMode hStoreHandlingMode;
Expand Down Expand Up @@ -1103,7 +1111,10 @@ protected Duration xminFetchInterval() {
}

protected byte[] toastedValuePlaceholder() {
final String placeholder = getConfig().getString(TOASTED_VALUE_PLACEHOLDER);
String placeholder = getConfig().getString(TOASTED_VALUE_PLACEHOLDER);
if (Strings.isNullOrEmpty(placeholder)) {
placeholder = getConfig().getString(UNAVAILABLE_VALUE_PLACEHOLDER);
}
if (placeholder.startsWith("hex:")) {
return Strings.hexStringToByteArray(placeholder.substring(4));
}
Expand Down Expand Up @@ -1209,6 +1220,16 @@ private static int validateTruncateHandlingMode(Configuration config, Field fiel
return errors;
}

private static int validateToastedValuePlaceholder(Configuration config, Field field, Field.ValidationOutput problems) {
final String placeholder = config.getString(TOASTED_VALUE_PLACEHOLDER);
if (!Strings.isNullOrEmpty(placeholder)) {
LOGGER.warn("Configuration property '{}' is deprecated and will be removed in future versions. Please use '{}' instead.",
TOASTED_VALUE_PLACEHOLDER.name(),
UNAVAILABLE_VALUE_PLACEHOLDER.name());
}
return 0;
}

@Override
public String getContextName() {
return Module.contextName();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ public abstract class RelationalDatabaseConnectorConfig extends CommonConnectorC
public static final String DATABASE_WHITELIST_ALREADY_SPECIFIED_ERROR_MSG = "\"database.whitelist\" is already specified";

public static final long DEFAULT_SNAPSHOT_LOCK_TIMEOUT_MILLIS = TimeUnit.SECONDS.toMillis(10);
public static final String DEFAULT_UNAVAILABLE_VALUE_PLACEHOLDER = "__debezium_unavailable_value";

/**
* The set of predefined DecimalHandlingMode options or aliases.
Expand Down Expand Up @@ -569,6 +570,15 @@ public static DecimalHandlingMode parse(String value, String defaultValue) {
+ " If you are excluding a lot of tables the default behavour should work well.")
.withDefault(false);

public static final Field UNAVAILABLE_VALUE_PLACEHOLDER = Field.create("unavailable.value.placeholder")
.withDisplayName("Unavailable value placeholder")
.withType(Type.STRING)
.withWidth(Width.MEDIUM)
.withDefault(DEFAULT_UNAVAILABLE_VALUE_PLACEHOLDER)
.withImportance(Importance.MEDIUM)
.withDescription("Specify the constant that will be provided by Debezium to indicate that " +
"the original value is unavailable and not provided by the database.");

protected static final ConfigDefinition CONFIG_DEFINITION = CommonConnectorConfig.CONFIG_DEFINITION.edit()
.type(
SERVER_NAME)
Expand Down Expand Up @@ -600,7 +610,8 @@ public static DecimalHandlingMode parse(String value, String defaultValue) {
PROPAGATE_COLUMN_SOURCE_TYPE,
PROPAGATE_DATATYPE_SOURCE_TYPE,
SNAPSHOT_FULL_COLUMN_SCAN_FORCE,
DatabaseHeartbeatImpl.HEARTBEAT_ACTION_QUERY)
DatabaseHeartbeatImpl.HEARTBEAT_ACTION_QUERY,
UNAVAILABLE_VALUE_PLACEHOLDER)
.create();

private final RelationalTableFilters tableFilters;
Expand Down Expand Up @@ -681,6 +692,10 @@ public String getHeartbeatActionQuery() {
return heartbeatActionQuery;
}

public String getUnavailableValuePlaceholder() {
return getConfig().getString(UNAVAILABLE_VALUE_PLACEHOLDER);
}

public Duration snapshotLockTimeout() {
return Duration.ofMillis(getConfig().getLong(SNAPSHOT_LOCK_TIMEOUT_MS));
}
Expand Down
11 changes: 9 additions & 2 deletions documentation/modules/ROOT/pages/connectors/postgresql.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -1733,7 +1733,7 @@ There is no safe way for {prodname} to read the missing value out-of-bands direc
* Tables with `REPLICA IDENTITY FULL` - TOAST column values are part of the `before` and `after` fields in change events just like any other column.
* Tables with `REPLICA IDENTITY DEFAULT` - When receiving an `UPDATE` event from the database, any unchanged TOAST column value that is not part of the replica identity is not contained in the event.
Similarly, when receiving a `DELETE` event, no TOAST columns, if any, are in the `before` field.
As {prodname} cannot safely provide the column value in this case, the connector returns a placeholder value as defined by the connector configuration property, `toasted.value.placeholder`.
As {prodname} cannot safely provide the column value in this case, the connector returns a placeholder value as defined by the connector configuration property, `unavailable.value.placeholder`.

ifdef::community[]
[IMPORTANT]
Expand Down Expand Up @@ -2963,7 +2963,14 @@ endif::community[]
|[[postgresql-property-toasted-value-placeholder]]<<postgresql-property-toasted-value-placeholder, `+toasted.value.placeholder+`>>
|`__debezium_unavailable_value`
|Specifies the constant that the connector provides to indicate that the original value is a toasted value that is not provided by the database.
If the setting of `toasted.value.placeholder` starts with the `hex:` prefix it is expected that the rest of the string represents hexadecimally encoded octets. See {link-prefix}:{link-postgresql-connector}#postgresql-toasted-values[toasted values] for additional details.
If the setting of `toasted.value.placeholder` starts with the `hex:` prefix it is expected that the rest of the string represents hexadecimally encoded octets. See {link-prefix}:{link-postgresql-connector}#postgresql-toasted-values[toasted values] for additional details. +
+
_This option is deprecated, please use `unavailable.value.placeholder` instead._

|[[postgresql-property-unavailable-value-placeholder]]<<postgresql-property-unavailable-value-placeholder, `+unavailable.value.placeholder+`>>
|`__debezium_unavailable_value`
|Specifies the constant that the connector provides to indicate that the original value is a toasted value that is not provided by the database.
If the setting of `unavailable.value.placeholder` starts with the `hex:` prefix it is expected that the rest of the string represents hexadecimally encoded octets. See {link-prefix}:{link-postgresql-connector}#postgresql-toasted-values[toasted values] for additional details.

|[[postgresql-property-provide-transaction-metadata]]<<postgresql-property-provide-transaction-metadata, `+provide.transaction.metadata+`>>
|`false`
Expand Down

0 comments on commit ca17352

Please sign in to comment.