Skip to content

Commit

Permalink
[ISSUE apache#4379] Enable manually commit offset in rocketmq source …
Browse files Browse the repository at this point in the history
…connector
  • Loading branch information
xwm1992 committed Aug 17, 2023
1 parent e03d291 commit 08b1915
Show file tree
Hide file tree
Showing 18 changed files with 413 additions and 32 deletions.
Expand Up @@ -19,6 +19,8 @@

import org.apache.eventmesh.connector.kafka.sink.config.KafkaSinkConfig;
import org.apache.eventmesh.openconnect.api.config.Config;
import org.apache.eventmesh.openconnect.api.connector.ConnectorContext;
import org.apache.eventmesh.openconnect.api.connector.SinkConnectorContext;
import org.apache.eventmesh.openconnect.api.sink.Sink;
import org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord;

Expand All @@ -41,7 +43,7 @@ public class KafkaSinkConnector implements Sink {

private KafkaSinkConfig sinkConfig;

private Properties props = new Properties();
private final Properties props = new Properties();
Producer<String, String> producer;

@Override
Expand All @@ -67,6 +69,25 @@ public void init(Config config) {
producer = new KafkaProducer<>(props);
}

@Override
public void init(ConnectorContext connectorContext) throws Exception {
SinkConnectorContext sinkConnectorContext = (SinkConnectorContext)connectorContext;
this.sinkConfig = (KafkaSinkConfig) sinkConnectorContext.getSinkConfig();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, sinkConfig.getConnectorConfig().getBootstrapServers());
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, sinkConfig.getConnectorConfig().getKeyConverter());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, sinkConfig.getConnectorConfig().getValueConverter());
props.put(ProducerConfig.ACKS_CONFIG, sinkConfig.getConnectorConfig().getAck());
props.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, sinkConfig.getConnectorConfig().getMaxRequestSize());
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, sinkConfig.getConnectorConfig().getBufferMemory());
props.put(ProducerConfig.BATCH_SIZE_CONFIG, sinkConfig.getConnectorConfig().getBatchSize());
props.put(ProducerConfig.LINGER_MS_CONFIG, sinkConfig.getConnectorConfig().getLingerMs());
props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, sinkConfig.getConnectorConfig().getRequestTimeoutMs());
props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, sinkConfig.getConnectorConfig().getMaxInFightRequestsPerConnection());
props.put(ProducerConfig.RETRIES_CONFIG, sinkConfig.getConnectorConfig().getRetries());
props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, sinkConfig.getConnectorConfig().getCompressionType());
producer = new KafkaProducer<>(props);
}

@Override
public void start() throws Exception {
}
Expand Down
Expand Up @@ -19,6 +19,8 @@

import org.apache.eventmesh.connector.kafka.source.config.KafkaSourceConfig;
import org.apache.eventmesh.openconnect.api.config.Config;
import org.apache.eventmesh.openconnect.api.connector.ConnectorContext;
import org.apache.eventmesh.openconnect.api.connector.SourceConnectorContext;
import org.apache.eventmesh.openconnect.api.source.Source;
import org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord;
import org.apache.eventmesh.openconnect.offsetmgmt.api.data.RecordOffset;
Expand Down Expand Up @@ -66,6 +68,23 @@ public void init(Config config) throws Exception {
this.kafkaConsumer = new KafkaConsumer<String, String>(props);
}

@Override
public void init(ConnectorContext connectorContext) throws Exception {
SourceConnectorContext sourceConnectorContext = (SourceConnectorContext)connectorContext;
this.sourceConfig = (KafkaSourceConfig) sourceConnectorContext.getSourceConfig();
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, sourceConfig.getConnectorConfig().getBootstrapServers());
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, sourceConfig.getConnectorConfig().getKeyConverter());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, sourceConfig.getConnectorConfig().getValueConverter());
props.put(ConsumerConfig.GROUP_ID_CONFIG, sourceConfig.getConnectorConfig().getGroupID());
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, sourceConfig.getConnectorConfig().getEnableAutoCommit());
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, sourceConfig.getConnectorConfig().getMaxPollRecords());
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, sourceConfig.getConnectorConfig().getAutoCommitIntervalMS());
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sourceConfig.getConnectorConfig().getSessionTimeoutMS());
this.pollTimeOut = sourceConfig.getConnectorConfig().getPollTimeOut();
this.kafkaConsumer = new KafkaConsumer<String, String>(props);
}

@Override
public void start() throws Exception {
kafkaConsumer.subscribe(Collections.singleton(sourceConfig.getConnectorConfig().getTopic()));
Expand Down
Expand Up @@ -19,6 +19,8 @@

import org.apache.eventmesh.connector.openfunction.sink.config.OpenFunctionSinkConfig;
import org.apache.eventmesh.openconnect.api.config.Config;
import org.apache.eventmesh.openconnect.api.connector.ConnectorContext;
import org.apache.eventmesh.openconnect.api.connector.SinkConnectorContext;
import org.apache.eventmesh.openconnect.api.sink.Sink;
import org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord;

Expand Down Expand Up @@ -49,6 +51,14 @@ public void init(Config config) throws Exception {
this.queue = new LinkedBlockingQueue<>(1000);
}

@Override
public void init(ConnectorContext connectorContext) throws Exception {
// init config for openfunction source connector
SinkConnectorContext sinkConnectorContext = (SinkConnectorContext)connectorContext;
this.sinkConfig = (OpenFunctionSinkConfig) sinkConnectorContext.getSinkConfig();
this.queue = new LinkedBlockingQueue<>(1000);
}

@Override
public void start() throws Exception {
isRunning = true;
Expand Down
Expand Up @@ -19,6 +19,8 @@

import org.apache.eventmesh.connector.openfunction.source.config.OpenFunctionSourceConfig;
import org.apache.eventmesh.openconnect.api.config.Config;
import org.apache.eventmesh.openconnect.api.connector.ConnectorContext;
import org.apache.eventmesh.openconnect.api.connector.SourceConnectorContext;
import org.apache.eventmesh.openconnect.api.source.Source;
import org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord;

Expand Down Expand Up @@ -51,6 +53,14 @@ public void init(Config config) throws Exception {
this.queue = new LinkedBlockingQueue<>(1000);
}

@Override
public void init(ConnectorContext connectorContext) throws Exception {
SourceConnectorContext sourceConnectorContext = (SourceConnectorContext)connectorContext;
// init config for openfunction source connector
this.sourceConfig = (OpenFunctionSourceConfig) sourceConnectorContext.getSourceConfig();
this.queue = new LinkedBlockingQueue<>(1000);
}

@Override
public void start() throws Exception {

Expand Down
Expand Up @@ -21,6 +21,8 @@

import org.apache.eventmesh.connector.pulsar.sink.config.PulsarSinkConfig;
import org.apache.eventmesh.openconnect.api.config.Config;
import org.apache.eventmesh.openconnect.api.connector.ConnectorContext;
import org.apache.eventmesh.openconnect.api.connector.SinkConnectorContext;
import org.apache.eventmesh.openconnect.api.sink.Sink;
import org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord;

Expand Down Expand Up @@ -59,6 +61,19 @@ public void init(Config config) throws Exception {
.create();
}

@Override
public void init(ConnectorContext connectorContext) throws Exception {
// init config for pulsar source connector
SinkConnectorContext sinkConnectorContext = (SinkConnectorContext)connectorContext;
this.sinkConfig = (PulsarSinkConfig) sinkConnectorContext.getSinkConfig();
PulsarClient client = PulsarClient.builder()
.serviceUrl(sinkConfig.getConnectorConfig().getServiceUrl())
.build();
producer = client.newProducer()
.topic(sinkConfig.getConnectorConfig().getTopic())
.create();
}

@Override
public void start() throws Exception {
}
Expand Down
Expand Up @@ -20,6 +20,8 @@

import org.apache.eventmesh.connector.pulsar.source.config.PulsarSourceConfig;
import org.apache.eventmesh.openconnect.api.config.Config;
import org.apache.eventmesh.openconnect.api.connector.ConnectorContext;
import org.apache.eventmesh.openconnect.api.connector.SourceConnectorContext;
import org.apache.eventmesh.openconnect.api.source.Source;
import org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord;
import org.apache.eventmesh.openconnect.offsetmgmt.api.data.RecordPartition;
Expand Down Expand Up @@ -66,6 +68,19 @@ public void init(Config config) throws Exception {
.subscribe();
}

@Override
public void init(ConnectorContext connectorContext) throws Exception {
SourceConnectorContext sourceConnectorContext = (SourceConnectorContext)connectorContext;
this.sourceConfig = (PulsarSourceConfig) sourceConnectorContext.getSourceConfig();
PulsarClient client = PulsarClient.builder()
.serviceUrl(sourceConfig.getConnectorConfig().getServiceUrl())
.build();
consumer = client.newConsumer()
.topic(sourceConfig.connectorConfig.getTopic())
.subscriptionName(sourceConfig.getPubSubConfig().getGroup())
.subscribe();
}

@Override
public void start() throws Exception {
}
Expand Down
Expand Up @@ -20,6 +20,8 @@
import org.apache.eventmesh.connector.redis.cloudevent.CloudEventCodec;
import org.apache.eventmesh.connector.redis.sink.config.RedisSinkConfig;
import org.apache.eventmesh.openconnect.api.config.Config;
import org.apache.eventmesh.openconnect.api.connector.ConnectorContext;
import org.apache.eventmesh.openconnect.api.connector.SinkConnectorContext;
import org.apache.eventmesh.openconnect.api.sink.Sink;
import org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord;

Expand Down Expand Up @@ -59,6 +61,16 @@ public void init(Config config) throws Exception {
this.redissonClient = Redisson.create(redisConfig);
}

@Override
public void init(ConnectorContext connectorContext) throws Exception {
SinkConnectorContext sinkConnectorContext = (SinkConnectorContext)connectorContext;
this.sinkConfig = (RedisSinkConfig) sinkConnectorContext.getSinkConfig();
org.redisson.config.Config redisConfig = new org.redisson.config.Config();
redisConfig.useSingleServer().setAddress(sinkConfig.connectorConfig.getServer());
redisConfig.setCodec(CloudEventCodec.getInstance());
this.redissonClient = Redisson.create(redisConfig);
}

@Override
public void start() throws Exception {
this.topic = redissonClient.getTopic(sinkConfig.connectorConfig.getTopic());
Expand Down
Expand Up @@ -20,6 +20,8 @@
import org.apache.eventmesh.connector.redis.cloudevent.CloudEventCodec;
import org.apache.eventmesh.connector.redis.source.config.RedisSourceConfig;
import org.apache.eventmesh.openconnect.api.config.Config;
import org.apache.eventmesh.openconnect.api.connector.ConnectorContext;
import org.apache.eventmesh.openconnect.api.connector.SourceConnectorContext;
import org.apache.eventmesh.openconnect.api.source.Source;
import org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord;

Expand Down Expand Up @@ -63,6 +65,17 @@ public void init(Config config) throws Exception {
this.queue = new LinkedBlockingQueue<>(1000);
}

@Override
public void init(ConnectorContext connectorContext) throws Exception {
SourceConnectorContext sourceConnectorContext = (SourceConnectorContext)connectorContext;
this.sourceConfig = (RedisSourceConfig) sourceConnectorContext.getSourceConfig();
org.redisson.config.Config redisConfig = new org.redisson.config.Config();
redisConfig.useSingleServer().setAddress(sourceConfig.connectorConfig.getServer());
redisConfig.setCodec(CloudEventCodec.getInstance());
this.redissonClient = Redisson.create(redisConfig);
this.queue = new LinkedBlockingQueue<>(1000);
}

@Override
public void start() throws Exception {
this.topic = redissonClient.getTopic(sourceConfig.connectorConfig.getTopic());
Expand Down
Expand Up @@ -19,6 +19,8 @@

import org.apache.eventmesh.connector.rocketmq.sink.config.RocketMQSinkConfig;
import org.apache.eventmesh.openconnect.api.config.Config;
import org.apache.eventmesh.openconnect.api.connector.ConnectorContext;
import org.apache.eventmesh.openconnect.api.connector.SinkConnectorContext;
import org.apache.eventmesh.openconnect.api.sink.Sink;
import org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord;

Expand Down Expand Up @@ -51,6 +53,16 @@ public void init(Config config) throws Exception {
producer.setNamesrvAddr(sinkConfig.getConnectorConfig().getNameServer());
}

@Override
public void init(ConnectorContext connectorContext) throws Exception {
// init config for rocketmq source connector
SinkConnectorContext sinkConnectorContext = (SinkConnectorContext) connectorContext;
this.sinkConfig = (RocketMQSinkConfig) sinkConnectorContext.getSinkConfig();
producer.setProducerGroup(sinkConfig.getPubSubConfig().getGroup());
producer.setNamesrvAddr(sinkConfig.getConnectorConfig().getNameServer());
}


@Override
public void start() throws Exception {
producer.start();
Expand Down

0 comments on commit 08b1915

Please sign in to comment.