Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix for connector losing tablet list upon restart and RPC failures. #270

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@
<version.kafka>3.3.1</version.kafka>
<version.org.slf4j>1.7.36</version.org.slf4j>
<version.logback>1.4.0</version.logback>
<version.ybclient>0.8.64-20230929.065833-2</version.ybclient>
<version.ybclient>0.8.65-SNAPSHOT</version.ybclient>
<version.gson>2.8.9</version.gson>

<!--
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ public class YugabyteDBSnapshotChangeEventSource extends AbstractSnapshotChangeE
private final YugabyteDBConnection connection;

private final AsyncYBClient asyncClient;
private final YBClient syncClient;
private YBClient syncClient;

private OpId lastCompletelyProcessedLsn;

Expand Down Expand Up @@ -262,16 +262,28 @@ protected void setCheckpointWithRetryBeforeSnapshot(
+ "retry {} of {} after {} milli-seconds. Exception message: {}", retryCount,
this.connectorConfig.maxConnectorRetries(),
this.connectorConfig.connectorRetryDelayMs(), e.getMessage());
LOGGER.debug("Stacktrace: ", e);
LOGGER.warn("Stacktrace: ", e);

try {
if (e instanceof RecoverableException) {
LOGGER.warn("Retrying with a new YBClient");
this.syncClient = YBClientUtils.getYbClient(connectorConfig);
}

pauseBeforeRetryingError();
}
}

/**
* Pause the flow before moving further to retry.
*/
private void pauseBeforeRetryingError() {
try {
final Metronome retryMetronome = Metronome.parker(Duration.ofMillis(connectorConfig.connectorRetryDelayMs()), Clock.SYSTEM);
retryMetronome.pause();
} catch (InterruptedException ie) {
LOGGER.warn("Connector retry sleep interrupted by exception: {}", ie);
Thread.currentThread().interrupt();
}
}
}

/**
Expand Down Expand Up @@ -304,16 +316,14 @@ protected GetCheckpointResponse getCheckpointWithRetry(YBTable ybTable, String t
tabletId, retryCount,
connectorConfig.maxConnectorRetries(),
connectorConfig.connectorRetryDelayMs(), e.getMessage());
LOGGER.debug("Stacktrace: ", e);

try {
final Metronome retryMetronome =
Metronome.parker(Duration.ofMillis(connectorConfig.connectorRetryDelayMs()), Clock.SYSTEM);
retryMetronome.pause();
} catch (InterruptedException ie) {
LOGGER.warn("Connector retry sleep interrupted by exception: {}", ie);
Thread.currentThread().interrupt();
LOGGER.warn("Stacktrace: ", e);

if (e instanceof RecoverableException) {
LOGGER.warn("Retrying with a new YBClient");
this.syncClient = YBClientUtils.getYbClient(connectorConfig);
}

pauseBeforeRetryingError();
}
}

Expand Down Expand Up @@ -665,13 +675,12 @@ protected SnapshotResult<YugabyteDBOffsetContext> doExecute(ChangeEventSourceCon
this.connectorConfig.maxConnectorRetries(),
this.connectorConfig.connectorRetryDelayMs(), e);

try {
final Metronome retryMetronome = Metronome.parker(Duration.ofMillis(connectorConfig.connectorRetryDelayMs()), Clock.SYSTEM);
retryMetronome.pause();
} catch (InterruptedException ie) {
LOGGER.warn("Connector retry sleep interrupted by exception: {}", ie);
Thread.currentThread().interrupt();
if (e instanceof RecoverableException) {
LOGGER.warn("Retrying with a new YBClient");
this.syncClient = YBClientUtils.getYbClient(connectorConfig);
}

pauseBeforeRetryingError();
}
}

Expand Down Expand Up @@ -740,6 +749,13 @@ public void markSnapshotDoneOnServer(YBPartition partition, YugabyteDBOffsetCont

LOGGER.warn("Error while marking snapshot completed on service for table {} tablet {}, will attempt retry {} of {} for error {}",
partition.getTableId(), partition.getTabletId(), retryCount, connectorConfig.maxConnectorRetries(), e);

if (e instanceof RecoverableException) {
LOGGER.warn("Retrying with a new YBClient");
this.syncClient = YBClientUtils.getYbClient(connectorConfig);
}

pauseBeforeRetryingError();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ public class YugabyteDBStreamingChangeEventSource implements
protected OpId lastCompletelyProcessedLsn;

protected final AsyncYBClient asyncYBClient;
protected final YBClient syncClient;
protected YBClient syncClient;
protected YugabyteDBTypeRegistry yugabyteDBTypeRegistry;
protected final Map<String, OpId> checkPointMap;
protected final ChangeEventQueue<DataChangeEvent> queue;
Expand Down Expand Up @@ -211,13 +211,12 @@ protected void bootstrapTabletWithRetry(List<Pair<String,String>> tabletPairList
LOGGER.warn("Error while trying to get the checkpoint for tablet {}; will attempt retry {} of {} after {} milli-seconds. Exception message: {}",
entry.getValue(), retryCountForGetCheckpoint, connectorConfig.maxConnectorRetries(), connectorConfig.connectorRetryDelayMs(), e.getMessage());

try {
final Metronome retryMetronome = Metronome.parker(Duration.ofMillis(connectorConfig.connectorRetryDelayMs()), Clock.SYSTEM);
retryMetronome.pause();
} catch (InterruptedException ie) {
LOGGER.warn("Connector retry sleep interrupted by exception: {}", ie);
Thread.currentThread().interrupt();
if (e instanceof RecoverableException) {
LOGGER.warn("Retrying with a new YBClient");
this.syncClient = YBClientUtils.getYbClient(connectorConfig);
}

pauseBeforeRetryingError();
}
}
}
Expand Down Expand Up @@ -253,17 +252,16 @@ protected void bootstrapTabletWithRetry(List<Pair<String,String>> tabletPairList
throw e;
}

if (e instanceof RecoverableException) {
LOGGER.warn("Retrying with a new YBClient");
this.syncClient = YBClientUtils.getYbClient(connectorConfig);
}

// If there are retries left, perform them after the specified delay.
LOGGER.warn("Error while trying to bootstrap tablet {}; will attempt retry {} of {} after {} milli-seconds. Exception message: {}",
entry.getValue(), retryCountForBootstrapping, maxBootstrapRetries, connectorConfig.connectorRetryDelayMs(), e.getMessage());

try {
final Metronome retryMetronome = Metronome.parker(Duration.ofMillis(connectorConfig.connectorRetryDelayMs()), Clock.SYSTEM);
retryMetronome.pause();
} catch (InterruptedException ie) {
LOGGER.warn("Connector retry sleep interrupted by exception: {}", ie);
Thread.currentThread().interrupt();
}
pauseBeforeRetryingError();
}
}
}
Expand Down Expand Up @@ -293,13 +291,12 @@ protected void markNoSnapshotNeeded(YBTable ybTable, String tabletId) throws Exc
LOGGER.warn("Error while marking no snapshot on service for table {} tablet {}, will attempt retry {} of {} for error {}",
ybTable.getTableId(), tabletId, retryCount, connectorConfig.maxConnectorRetries(), e);

try {
final Metronome retryMetronome = Metronome.parker(Duration.ofMillis(connectorConfig.connectorRetryDelayMs()), Clock.SYSTEM);
retryMetronome.pause();
} catch (InterruptedException ie) {
LOGGER.warn("Connector retry sleep interrupted by exception: {}", ie);
Thread.currentThread().interrupt();
if (e instanceof RecoverableException) {
LOGGER.warn("Retrying with a new YBClient");
this.syncClient = YBClientUtils.getYbClient(connectorConfig);
}

pauseBeforeRetryingError();
}
}
}
Expand Down Expand Up @@ -348,14 +345,32 @@ protected void getChanges2(ChangeEventSourceContext context,
// This schemaNeeded map here would have the elements as <tableId.tabletId>:<boolean-value>
Map<String, Boolean> schemaNeeded = new HashMap<>();
Map<String, Long> tabletSafeTime = new HashMap<>();
for (Pair<String, String> entry : tabletPairList) {
List<Pair<String, String>> elementsToBeRemoved = new ArrayList<>();
for (int i = 0; i < tabletPairList.size(); ++i) {
Pair<String, String> entry = tabletPairList.get(i);
// entry.getValue() will give the tabletId
OpId opId = YBClientUtils.getOpIdFromGetTabletListResponse(
tabletListResponse.get(entry.getKey()), entry.getValue());

if (opId == null) {
throw new RuntimeException(String.format("OpId for the given tablet {} was not found in the response,"
+ " restart the connector to try again", entry.getValue()));
// At this stage, we know we do not have the tablet-OpId, most probably because we lost the
// correct tablet list containing children from the list. We should remove the parent from the list
// and add its children.
GetTabletListToPollForCDCResponse getResp = syncClient.getTabletListToPollForCdc(
tableIdToTable.get(entry.getKey()), streamId, entry.getKey(), entry.getValue());

LOGGER.info("Response size while getting chilren for tablet {}: {}", entry.getValue(),
getResp.getTabletCheckpointPairListSize());

for (TabletCheckpointPair pair : getResp.getTabletCheckpointPairList()) {
addTabletIfNotPresent(tabletPairList, pair, entry.getKey(), offsetContext, schemaNeeded);
}

// Pair<String, String> entryToBeDeleted = getEntryToDelete(tabletPairList, entry.getValue());
elementsToBeRemoved.add(entry);
LOGGER.info("Tablet {} will be removed later from the list before polling", entry.getValue());

continue;
}

// If we are getting a term and index as -1 and -1 from the server side it means
Expand All @@ -366,14 +381,13 @@ protected void getChanges2(ChangeEventSourceContext context,
opId = YugabyteDBOffsetContext.streamingStartLsn();
}

// For streaming, we do not want any colocated information and want to process the tables
// based on just their tablet IDs - pass false as the 'colocated' flag to enforce the same.
YBPartition p = new YBPartition(entry.getKey(), entry.getValue(), false /* colocated */);
offsetContext.initSourceInfo(p, this.connectorConfig, opId);
// We can initialise the explicit checkpoint for this tablet to the value returned by
// the cdc_service through the 'GetTabletListToPollForCDC' API
tabletToExplicitCheckpoint.put(p.getId(), opId.toCdcSdkCheckpoint());
schemaNeeded.put(p.getId(), Boolean.TRUE);
initializePartitionsAndOffsets(entry.getKey() /* tableId */, entry.getValue() /* tablet ID */,
offsetContext, tabletToExplicitCheckpoint, schemaNeeded, opId);
}

for (Pair<String, String> entry : elementsToBeRemoved) {
tabletPairList.remove(entry);
LOGGER.info("Removed original entry for the tablet {} from list as it has been split", entry.getValue());
}

// This will contain the tablet ID mapped to the number of records it has seen
Expand Down Expand Up @@ -755,19 +769,41 @@ else if (message.isDDLMessage()) {
// If there are retries left, perform them after the specified delay.
LOGGER.warn("Error while trying to get the changes from the server; will attempt retry {} of {} after {} milli-seconds. Exception: {}",
retryCount, connectorConfig.maxConnectorRetries(), connectorConfig.connectorRetryDelayMs(), e);

try {
final Metronome retryMetronome = Metronome.parker(Duration.ofMillis(connectorConfig.connectorRetryDelayMs()), Clock.SYSTEM);
retryMetronome.pause();
}
catch (InterruptedException ie) {
LOGGER.warn("Connector retry sleep interrupted by exception: {}", ie);
Thread.currentThread().interrupt();

if (e instanceof RecoverableException) {
LOGGER.warn("Retrying with a new YBClient");
this.syncClient = YBClientUtils.getYbClient(connectorConfig);
}

pauseBeforeRetryingError();
}
}
}

/**
* Initialize the partition and offsets for the given tablet to get them ready for streaming.
* @param tableId table UUID
* @param tabletId tablet UUID
* @param offsetContext the {@link YugabyteDBOffsetContext}
* @param tabletToExplicitCheckpoint map containing tabletId-explicitCheckpoint mapping
* @param schemaNeeded map with information whether to get the schema for a tablet
* @param opId the OpId to initialize the partitions with
*/
private void initializePartitionsAndOffsets(String tableId, String tabletId, YugabyteDBOffsetContext offsetContext,
Map<String, CdcSdkCheckpoint> tabletToExplicitCheckpoint,
Map<String, Boolean> schemaNeeded, OpId opId) {
// For streaming, we do not want any colocated information and want to process the tables
// based on just their tablet IDs - pass false as the 'colocated' flag to enforce the same.
YBPartition p = new YBPartition(tableId, tabletId, false /* colocated */);

offsetContext.initSourceInfo(p, this.connectorConfig, opId);

// We can initialise the explicit checkpoint for this tablet to the value returned by
// the cdc_service through the 'GetTabletListToPollForCDC' API
tabletToExplicitCheckpoint.put(p.getId(), opId.toCdcSdkCheckpoint());
schemaNeeded.put(p.getId(), Boolean.TRUE);
}

private void probeConnectionIfNeeded() throws SQLException {
// CDCSDK Find out why it fails.
// if (connectionProbeTimer.hasElapsed()) {
Expand Down Expand Up @@ -1101,19 +1137,29 @@ private GetTabletListToPollForCDCResponse getTabletListResponseWithRetry(
splitTabletId, retryCount, connectorConfig.maxConnectorRetries(), connectorConfig.connectorRetryDelayMs());
LOGGER.warn("Stacktrace", e);

try {
final Metronome retryMetronome = Metronome.parker(Duration.ofMillis(connectorConfig.connectorRetryDelayMs()), Clock.SYSTEM);
retryMetronome.pause();
}
catch (InterruptedException ie) {
LOGGER.warn("Connector retry sleep while pausing to get the children tablets for parent {} interrupted", splitTabletId);
LOGGER.warn("Exception for interruption", ie);
Thread.currentThread().interrupt();
if (e instanceof RecoverableException) {
LOGGER.warn("Retrying with a new YBClient");
this.syncClient = YBClientUtils.getYbClient(connectorConfig);
}

pauseBeforeRetryingError();
}
}

// In ideal scenarios, this should NEVER be returned from this function.
return null;
}

/**
* Pause the flow before moving further to retry.
*/
private void pauseBeforeRetryingError() {
try {
final Metronome retryMetronome = Metronome.parker(Duration.ofMillis(connectorConfig.connectorRetryDelayMs()), Clock.SYSTEM);
retryMetronome.pause();
} catch (InterruptedException ie) {
LOGGER.warn("Connector retry sleep interrupted by exception: {}", ie);
Thread.currentThread().interrupt();
}
}
}
Loading