Skip to content

Commit

Permalink
Ensure the DLQ configuration from Kafka Connect is correctly handled …
Browse files Browse the repository at this point in the history
…(issue apache#835)
  • Loading branch information
orpiske committed Jan 8, 2021
1 parent 072b2e2 commit 0f708b7
Show file tree
Hide file tree
Showing 3 changed files with 155 additions and 1 deletion.
Expand Up @@ -35,6 +35,7 @@
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.header.Header;
import org.apache.kafka.connect.sink.ErrantRecordReporter;
import org.apache.kafka.connect.sink.SinkRecord;
import org.apache.kafka.connect.sink.SinkTask;
import org.slf4j.Logger;
Expand All @@ -51,6 +52,7 @@ public class CamelSinkTask extends SinkTask {
private static final Logger LOG = LoggerFactory.getLogger(CamelSinkTask.class);

private static final String LOCAL_URL = "direct:start";
private ErrantRecordReporter reporter;


private CamelKafkaConnectMain cms;
Expand All @@ -70,6 +72,10 @@ public void start(Map<String, String> props) {
Map<String, String> actualProps = TaskHelper.combineDefaultAndLoadedProperties(getDefaultConfig(), props);
CamelSinkConnectorConfig config = getCamelSinkConnectorConfig(actualProps);

if (context != null) {
reporter = context.errantRecordReporter();
}

try {
String levelStr = config.getString(CamelSinkConnectorConfig.CAMEL_SINK_CONTENT_LOG_LEVEL_CONF);
loggingLevel = LoggingLevel.valueOf(levelStr.toUpperCase());
Expand Down Expand Up @@ -175,7 +181,13 @@ public void put(Collection<SinkRecord> sinkRecords) {
producer.send(localEndpoint, exchange);

if (exchange.isFailed()) {
throw new ConnectException("Exchange delivery has failed!", exchange.getException());
if (reporter == null) {
LOG.warn("A delivery has failed and the error reporting is NOT enabled. Records may be lost or ignored");
throw new ConnectException("Exchange delivery has failed!", exchange.getException());
}

LOG.warn("A delivery has failed and the error reporting is enabled. Sending record to the DLQ");
reporter.report(record, exchange.getException());
}
}
}
Expand Down
Expand Up @@ -63,6 +63,21 @@ public T withValueConverterClass(String converterClass) {
return (T) this;
}

/**
* This enables sending failed records to the DLQ. Note: it automatically configure other required/recommended
* options!
* @param topicName the DLQ topic name
* @return this object instance
*/
public T withDeadLetterQueueTopicName(String topicName) {
// There's no constant for the DLQ settings
connectorProps.put("errors.deadletterqueue.topic.name", topicName);
connectorProps.put("errors.deadletterqueue.topic.replication.factor", 1);
connectorProps.put(ConnectorConfig.ERRORS_LOG_ENABLE_CONFIG, true);

return (T) this;
}

public TransformsConfigBuilder<T> withTransformsConfig(String name) {
return new TransformsConfigBuilder<>((T) this, getProperties(), name);
}
Expand Down
@@ -0,0 +1,127 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.camel.kafkaconnector.sjms2.sink;

import java.util.Properties;
import java.util.concurrent.ExecutionException;

import org.apache.camel.kafkaconnector.common.AbstractKafkaTest;
import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory;
import org.apache.camel.kafkaconnector.common.clients.kafka.KafkaClient;
import org.apache.camel.kafkaconnector.common.utils.TestUtils;
import org.apache.camel.kafkaconnector.sjms2.common.SJMS2Common;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.junit.jupiter.Testcontainers;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.fail;

/**
* Integration tests for the JMS sink with a DLQ configuration. This test forces a failure in the sink connector to
* ensure that the failed records are added to the DLQ configured in Kafka.
*/
@Testcontainers
public class CamelSinkWithDLQJMSITCase extends AbstractKafkaTest {
private static final Logger LOG = LoggerFactory.getLogger(CamelSinkWithDLQJMSITCase.class);

private int received;
private final int expect = 10;
private int errors;
private final int expectedErrors = 1;

private Properties connectionProperties() {
Properties properties = new Properties();

properties.put("camel.component.sjms2.connection-factory", "#class:org.apache.qpid.jms.JmsConnectionFactory");
properties.put("camel.component.sjms2.connection-factory.remoteURI", "invalid");

return properties;
}

@Override
protected String[] getConnectorsInTest() {
return new String[] {"camel-sjms2-kafka-connector"};
}

@BeforeEach
public void setUp() {
received = 0;
errors = 0;
}

private <T> boolean checkDqlRecord(ConsumerRecord<String, T> record) {
LOG.debug("Received: {}", record.value());
errors++;

if (errors >= expectedErrors) {
return false;
}

return true;
}

private void runTest(ConnectorPropertyFactory connectorPropertyFactory) throws ExecutionException, InterruptedException {
connectorPropertyFactory.log();
getKafkaConnectService().initializeConnector(connectorPropertyFactory);

LOG.debug("Creating the consumer ...");


KafkaClient<String, String> kafkaClient = new KafkaClient<>(getKafkaService().getBootstrapServers());

for (int i = 0; i < expect; i++) {
kafkaClient.produce(TestUtils.getDefaultTestTopic(this.getClass()), "Sink test message " + i);
}

LOG.debug("Created the consumer ... About to receive messages");
}


@Test
@Timeout(10)
public void testSendReceiveWithError() {
try {
Properties brokenProp = connectionProperties();

brokenProp.put("camel.component.sjms2.connection-factory.remoteURI", "invalid");

ConnectorPropertyFactory connectorPropertyFactory = CamelJMSPropertyFactory
.basic()
.withTopics(TestUtils.getDefaultTestTopic(this.getClass()))
.withConnectionProperties(brokenProp)
.withDestinationName(SJMS2Common.DEFAULT_JMS_QUEUE)
.withDeadLetterQueueTopicName("dlq-sink-topic");

runTest(connectorPropertyFactory);

KafkaClient<String, Integer> kafkaClient = new KafkaClient<>(getKafkaService().getBootstrapServers());
kafkaClient.consume("dlq-sink-topic", this::checkDqlRecord);

assertEquals(expectedErrors, errors, "Didn't process the expected amount of messages");

} catch (Exception e) {
LOG.error("JMS test failed: {}", e.getMessage(), e);
fail(e.getMessage());
}
}
}

0 comments on commit 0f708b7

Please sign in to comment.