Skip to content

Commit

Permalink
[Hotfix][S3-Redshift/CDC] Fix cdc write redshift error (apache#382)
Browse files Browse the repository at this point in the history
  • Loading branch information
hailin0 committed Sep 22, 2023
1 parent f6b43a9 commit 5cb9a6a
Show file tree
Hide file tree
Showing 3 changed files with 44 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,17 @@

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Deque;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.ConcurrentLinkedQueue;

/** Assigner for snapshot split. */
public class SnapshotSplitAssigner<C extends SourceConfig> implements SplitAssigner {
Expand All @@ -47,12 +52,12 @@ public class SnapshotSplitAssigner<C extends SourceConfig> implements SplitAssig

private final C sourceConfig;
private final List<TableId> alreadyProcessedTables;
private final List<SnapshotSplit> remainingSplits;
private final Queue<SnapshotSplit> remainingSplits;
private final Map<String, SnapshotSplit> assignedSplits;
private final Map<String, SnapshotSplitWatermark> splitCompletedOffsets;
private boolean assignerCompleted;
private final int currentParallelism;
private final LinkedList<TableId> remainingTables;
private final Deque<TableId> remainingTables;
private final boolean isRemainingTablesCheckpointed;

private ChunkSplitter chunkSplitter;
Expand Down Expand Up @@ -115,12 +120,12 @@ private SnapshotSplitAssigner(
this.context = context;
this.sourceConfig = context.getSourceConfig();
this.currentParallelism = currentParallelism;
this.alreadyProcessedTables = alreadyProcessedTables;
this.remainingSplits = remainingSplits;
this.assignedSplits = assignedSplits;
this.splitCompletedOffsets = splitCompletedOffsets;
this.alreadyProcessedTables = Collections.synchronizedList(alreadyProcessedTables);
this.remainingSplits = new ConcurrentLinkedQueue(remainingSplits);
this.assignedSplits = new ConcurrentHashMap<>(assignedSplits);
this.splitCompletedOffsets = new ConcurrentHashMap<>(splitCompletedOffsets);
this.assignerCompleted = assignerCompleted;
this.remainingTables = new LinkedList<>(remainingTables);
this.remainingTables = new ConcurrentLinkedDeque<>(remainingTables);
this.isRemainingTablesCheckpointed = isRemainingTablesCheckpointed;
this.isTableIdCaseSensitive = isTableIdCaseSensitive;
this.dialect = dialect;
Expand Down Expand Up @@ -211,11 +216,15 @@ public SnapshotPhaseState snapshotState(long checkpointId) {
SnapshotPhaseState state =
new SnapshotPhaseState(
alreadyProcessedTables,
remainingSplits,
remainingSplits.isEmpty()
? Collections.emptyList()
: new ArrayList<>(remainingSplits),
assignedSplits,
splitCompletedOffsets,
assignerCompleted,
remainingTables,
remainingTables.isEmpty()
? Collections.emptyList()
: new ArrayList<>(remainingTables),
isTableIdCaseSensitive,
true);
// we need a complete checkpoint before mark this assigner to be completed, to wait for all
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

Expand Down Expand Up @@ -58,12 +58,17 @@ public static CommitterResource createResource(S3RedshiftConf conf) {
}

private static ExecutorService createCommitWorker(int workerSize) {
return new ThreadPoolExecutor(
0,
workerSize,
30L,
TimeUnit.MINUTES,
new SynchronousQueue<Runnable>(),
new ThreadFactoryBuilder().setNameFormat("s3-redshift-commit-worker-%d").build());
ThreadPoolExecutor executor =
new ThreadPoolExecutor(
workerSize,
workerSize,
60L,
TimeUnit.SECONDS,
new LinkedBlockingQueue<>(),
new ThreadFactoryBuilder()
.setNameFormat("s3-redshift-commit-worker-%d")
.build());
executor.allowCoreThreadTimeOut(true);
return executor;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@
import org.apache.seatunnel.connectors.seatunnel.file.sink.commit.FileAggregatedCommitInfo;
import org.apache.seatunnel.connectors.seatunnel.file.sink.commit.FileCommitInfo;
import org.apache.seatunnel.connectors.seatunnel.file.sink.state.FileSinkState;
import org.apache.seatunnel.connectors.seatunnel.file.sink.writer.WriteStrategy;
import org.apache.seatunnel.connectors.seatunnel.file.sink.writer.WriteStrategyFactory;
import org.apache.seatunnel.connectors.seatunnel.redshift.commit.S3RedshiftSinkAggregatedCommitter;
import org.apache.seatunnel.connectors.seatunnel.redshift.config.S3RedshiftConf;
import org.apache.seatunnel.connectors.seatunnel.redshift.config.S3RedshiftConfig;
Expand Down Expand Up @@ -115,7 +117,7 @@ public void prepare(Config pluginConfig) throws PrepareFailException {
public SinkWriter<SeaTunnelRow, FileCommitInfo, FileSinkState> createWriter(
SinkWriter.Context context) throws IOException {
return new S3RedshiftChangelogWriter(
writeStrategy,
newWriteStrategy(),
hadoopConf,
context,
jobId,
Expand All @@ -128,7 +130,7 @@ public SinkWriter<SeaTunnelRow, FileCommitInfo, FileSinkState> createWriter(
public SinkWriter<SeaTunnelRow, FileCommitInfo, FileSinkState> restoreWriter(
SinkWriter.Context context, List<FileSinkState> states) throws IOException {
return new S3RedshiftChangelogWriter(
writeStrategy,
newWriteStrategy(),
hadoopConf,
context,
jobId,
Expand Down Expand Up @@ -161,4 +163,12 @@ public void handleSaveMode(DataSaveMode saveMode) {
saveModeHandler.handle(saveMode);
}
}

private WriteStrategy newWriteStrategy() {
WriteStrategy writeStrategy =
WriteStrategyFactory.of(fileSinkConfig.getFileFormat(), fileSinkConfig);
writeStrategy.setSeaTunnelRowTypeInfo(seaTunnelRowType);
writeStrategy.setFileSystemUtils(fileSystemUtils);
return writeStrategy;
}
}

0 comments on commit 5cb9a6a

Please sign in to comment.