Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,12 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [1.2.1] - 2021-07-17
### Changed
- Modified Confluent archive to follow new standards
- Stopped using reactive Lettuce
- Upgraded various dependencies

## [1.2.0] - 2021-02-13
### Added
- Handle Redis cluster topology changes on the fly
Expand Down
2 changes: 2 additions & 0 deletions lombok.config
Original file line number Diff line number Diff line change
@@ -1,2 +1,4 @@
config.stopBubbling = true
lombok.addLombokGeneratedAnnotation = true
lombok.builder.className = Builder
lombok.log.fieldname = LOG
54 changes: 27 additions & 27 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,11 @@

<groupId>io.github.jaredpetersen</groupId>
<artifactId>kafka-connect-redis</artifactId>
<version>1.2.0</version>
<version>1.2.1</version>
<packaging>jar</packaging>

<name>kafka-connect-redis</name>
<description>Kafka Connect source and sink connector for Redis</description>
<name>Kafka Redis Connector (Sink and Source)</name>
<description>Kafka sink and source connector for Redis</description>
<url>https://github.com/jaredpetersen/kafka-connect-redis</url>

<licenses>
Expand Down Expand Up @@ -62,69 +62,57 @@
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>connect-api</artifactId>
<version>2.7.0</version>
<version>2.8.0</version>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>io.lettuce</groupId>
<artifactId>lettuce-core</artifactId>
<version>6.0.2.RELEASE</version>
</dependency>

<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-core</artifactId>
<version>3.4.2</version>
<version>6.1.4.RELEASE</version>
</dependency>

<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.18</version>
<version>1.18.20</version>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-nop</artifactId>
<version>1.7.30</version>
<artifactId>slf4j-simple</artifactId>
<version>1.7.31</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter</artifactId>
<version>5.7.1</version>
<version>5.7.2</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<version>3.7.7</version>
<version>3.11.2</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>testcontainers</artifactId>
<version>1.15.2</version>
<version>1.15.3</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>junit-jupiter</artifactId>
<version>1.15.2</version>
<version>1.15.3</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-test</artifactId>
<version>3.4.2</version>
<scope>test</scope>
</dependency>
</dependencies>

<build>
Expand Down Expand Up @@ -218,8 +206,9 @@
<goal>kafka-connect</goal>
</goals>
<configuration>
<title>Kafka Connect Redis</title>
<documentationUrl>${project.url}/blob/main/README.md</documentationUrl>
<name>redis-connector</name>
<title>Redis Connector (Sink and Source)</title>
<documentationUrl>${project.url}</documentationUrl>
<ownerLogo>docs/logos/jaredpetersen-logo.png</ownerLogo>
<ownerUsername>jaredpetersen</ownerUsername>
<ownerType>user</ownerType>
Expand All @@ -228,6 +217,17 @@
<supportProviderName>Open Source Community</supportProviderName>
<supportSummary>Support provided through community involvement.</supportSummary>
<supportUrl>${project.issueManagement.url}</supportUrl>
<fileSets>
<fileSet>
<directory>${project.basedir}</directory>
<outputDirectory>doc</outputDirectory>
<includes>
<include>README.md</include>
<include>LICENSE.md</include>
<include>CHANGELOG.md</include>
</includes>
</fileSet>
</fileSets>
<confluentControlCenterIntegration>true</confluentControlCenterIntegration>
<componentTypes>
<componentType>source</componentType>
Expand All @@ -252,7 +252,7 @@
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
<finalName>${project.name}-${project.version}</finalName>
<finalName>${project.artifactId}-${project.version}</finalName>
<appendAssemblyId>false</appendAssemblyId>
</configuration>
<executions>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,29 +3,31 @@
import io.github.jaredpetersen.kafkaconnectredis.sink.config.RedisSinkConfig;
import io.github.jaredpetersen.kafkaconnectredis.sink.writer.RecordConverter;
import io.github.jaredpetersen.kafkaconnectredis.sink.writer.Writer;
import io.github.jaredpetersen.kafkaconnectredis.sink.writer.record.RedisCommand;
import io.github.jaredpetersen.kafkaconnectredis.util.VersionUtil;
import io.lettuce.core.RedisClient;
import io.lettuce.core.api.StatefulRedisConnection;
import io.lettuce.core.api.reactive.RedisReactiveCommands;
import io.lettuce.core.api.sync.RedisCommands;
import io.lettuce.core.cluster.ClusterClientOptions;
import io.lettuce.core.cluster.ClusterTopologyRefreshOptions;
import io.lettuce.core.cluster.RedisClusterClient;
import io.lettuce.core.cluster.api.StatefulRedisClusterConnection;
import io.lettuce.core.cluster.api.reactive.RedisClusterReactiveCommands;
import io.lettuce.core.cluster.api.sync.RedisClusterCommands;
import java.util.Collection;
import java.util.Map;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.sink.SinkRecord;
import org.apache.kafka.connect.sink.SinkTask;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;

/**
* Kafka Connect Task for Kafka Connect Redis Sink.
*/
@Slf4j
public class RedisSinkTask extends SinkTask {
private static final RecordConverter RECORD_CONVERTER = new RecordConverter();

private RedisClient redisStandaloneClient;
private StatefulRedisConnection<String, String> redisStandaloneConnection;

Expand All @@ -34,10 +36,6 @@ public class RedisSinkTask extends SinkTask {

private Writer writer;

private static final RecordConverter RECORD_CONVERTER = new RecordConverter();

private static final Logger LOG = LoggerFactory.getLogger(RedisSinkTask.class);

@Override
public String version() {
return VersionUtil.getVersion();
Expand Down Expand Up @@ -67,14 +65,14 @@ public void start(final Map<String, String> props) {

this.redisClusterConnection = this.redisClusterClient.connect();

final RedisClusterReactiveCommands<String, String> redisClusterCommands = this.redisClusterConnection.reactive();
final RedisClusterCommands<String, String> redisClusterCommands = this.redisClusterConnection.sync();
this.writer = new Writer(redisClusterCommands);
}
else {
this.redisStandaloneClient = RedisClient.create(config.getRedisUri());
this.redisStandaloneConnection = this.redisStandaloneClient.connect();

final RedisReactiveCommands<String, String> redisStandaloneCommands = this.redisStandaloneConnection.reactive();
final RedisCommands<String, String> redisStandaloneCommands = this.redisStandaloneConnection.sync();
this.writer = new Writer(redisStandaloneCommands);
}
}
Expand All @@ -88,14 +86,27 @@ public void put(final Collection<SinkRecord> records) {
LOG.info("writing {} record(s) to redis", records.size());
LOG.debug("records: {}", records);

Flux
.fromIterable(records)
.flatMapSequential(RECORD_CONVERTER::convert)
.onErrorMap(error -> new ConnectException("failed to convert record", error))
.flatMapSequential(redisCommand -> this.writer.write(redisCommand))
.onErrorMap(error -> new ConnectException("failed to write record", error))
.then()
.block();
for (SinkRecord record : records) {
put(record);
}
}

private void put(SinkRecord record) {
final RedisCommand redisCommand;

try {
redisCommand = RECORD_CONVERTER.convert(record);
}
catch (Exception exception) {
throw new ConnectException("failed to convert record", exception);
}

try {
writer.write(redisCommand);
}
catch (Exception exception) {
throw new ConnectException("failed to write record", exception);
}
}

@Override
Expand Down
Loading