Skip to content
This repository was archived by the owner on Mar 30, 2023. It is now read-only.
This repository was archived by the owner on Mar 30, 2023. It is now read-only.

Spring Kafka Integration: Consumer code is not getting invoked. #57

@ghost

Description

Hi,

I am building up a code using Spring MVC and Spring Kafka Integration. The code on a URL hit produces the messages and once a message is send I want to invoke my consumer code so that It can absorb the message. While first part that is sending the message to broker is working fine but the remaining part that is to consume the message automatically is not working.

  1. Spring MVC / Spring Integration version is 4.1.5.Release.
  2. spring-integration-kafka is 1.2.0.Release.

Pom.xml:

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <groupId>com</groupId>
    <artifactId>KafKaWithSpringMVC</artifactId>
    <packaging>war</packaging>
    <version>0.0.1-SNAPSHOT</version>
    <name>Spring MVC Tutorial</name>
    <url>http://maven.apache.org</url>
    <properties>
        <spring.version>4.1.5.RELEASE</spring.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>javax.servlet</groupId>
            <artifactId>javax.servlet-api</artifactId>
            <version>3.1.0</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-core</artifactId>
            <version>${spring.version}</version>
        </dependency>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-web</artifactId>
            <version>${spring.version}</version>
        </dependency>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-webmvc</artifactId>
            <version>${spring.version}</version>
        </dependency>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.11</version>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-kafka</artifactId>
            <version>1.2.0.RELEASE</version>
            <exclusions>
                <exclusion>
                    <artifactId>jms</artifactId>
                    <groupId>javax.jms</groupId>
                </exclusion>
                <exclusion>
                    <artifactId>jmxri</artifactId>
                    <groupId>com.sun.jmx</groupId>
                </exclusion>
                <exclusion>
                    <artifactId>jmxtools</artifactId>
                    <groupId>com.sun.jdmk</groupId>
                </exclusion>
                <exclusion>
                    <artifactId>log4j</artifactId>
                    <groupId>log4j</groupId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-stream</artifactId>
            <version>${spring.version}</version>
            <scope>compile</scope>
        </dependency>
        <dependency>
            <groupId>commons-logging</groupId>
            <artifactId>commons-logging</artifactId>
            <version>1.1.1</version>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
            <version>1.7.5</version>
        </dependency>
        <!-- <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> 
            <version>0.8.2.1</version> </dependency> -->
    </dependencies>
    <build>
        <finalName>
        KafKaWithSpringMVC
    </finalName>
    </build>
</project>

Producer Code:

package com;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.integration.kafka.support.KafkaHeaders;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.messaging.MessageChannel;
import org.springframework.stereotype.Controller;
import org.springframework.ui.Model;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.servlet.config.annotation.EnableWebMvc;
@Controller
@EnableWebMvc
public class ProducerController {
    private static final Log LOG = LogFactory.getLog(ProducerController.class);
    @Autowired
    private MessageChannel inputToKafka;
    @RequestMapping("/producer")
    public String sendMessage(@RequestParam(value = "name", required = false, defaultValue = "World") String name,
            Model model) {
        model.addAttribute("name", name);
        for (int i = 30; i < 53; i++) {
            boolean flag = inputToKafka.send(
                    MessageBuilder.withPayload("Message: " + i).setHeader(KafkaHeaders.TOPIC, "testtopic").setHeader(KafkaHeaders.PARTITION_ID, 3).build());
            System.out.println("Message send: " + flag);
            LOG.info("message sent " + i + "--");
        }
        return "producer";
    }
}

Consumer Code:

package com;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.MessageChannel;
import org.springframework.stereotype.Component;
@Component
public class ConsumerController {
    private static final Log LOG = LogFactory.getLog(ConsumerController.class); 
    public void processMessage(Map<String, Map<Integer, List<byte[]>>> msgs) {
        for (Map.Entry<String, Map<Integer, List<byte[]>>> entry : msgs.entrySet()) {
            LOG.debug("Topic:" + entry.getKey());
            ConcurrentHashMap<Integer, List<byte[]>> messages = (ConcurrentHashMap<Integer, List<byte[]>>) entry.getValue();
            LOG.debug("\n**** Partition: \n");
            Set<Integer> keys = messages.keySet();
            for (Integer i : keys)
                LOG.debug("p:" + i);
            LOG.debug("\n**************\n");
            Collection<List<byte[]>> values = messages.values();
            for (Iterator<List<byte[]>> iterator = values.iterator(); iterator.hasNext();) {
                List<byte[]> list = iterator.next();
                for (byte[] object : list) {
                    String message = new String(object);
                    LOG.debug("Message: " + message);
                    try {
                        System.out.println("Message received: " + message);
                    } catch (Exception e) {
                        LOG.error(String.format("Failed to process message %s", message));
                    }
                }
            }
        }
    }
}

Kafka Producer configuration:

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:int="http://www.springframework.org/schema/integration"
       xmlns:int-kafka="http://www.springframework.org/schema/integration/kafka"
       xmlns:task="http://www.springframework.org/schema/task"
       xsi:schemaLocation="http://www.springframework.org/schema/integration/kafka http://www.springframework.org/schema/integration/kafka/spring-integration-kafka.xsd
        http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd
        http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
        http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task.xsd">
    <int:channel id="inputToKafka">
        <int:queue />
    </int:channel>
    <int-kafka:outbound-channel-adapter
        id="kafkaOutboundChannelAdapter" kafka-producer-context-ref="kafkaProducerContext"
        channel="inputToKafka" >
        <int:poller fixed-delay="1000" time-unit="MILLISECONDS"
            receive-timeout="0" task-executor="taskExecutor" />
    </int-kafka:outbound-channel-adapter>
    <task:executor id="taskExecutor" pool-size="5"
        keep-alive="120" queue-capacity="10000" />
    <int-kafka:producer-context id="kafkaProducerContext">
        <int-kafka:producer-configurations>
            <int-kafka:producer-configuration
                broker-list="localhost:9092" key-class-type="java.lang.String"
                key-encoder="encoder" value-class-type="java.lang.String" 
                value-encoder="encoder" partitioner="partitioner" topic="testtopic"/>
        </int-kafka:producer-configurations>
    </int-kafka:producer-context>
    <bean id="encoder"
        class="org.springframework.integration.kafka.serializer.common.StringEncoder" />
    <bean id="partitioner"
        class="org.springframework.integration.kafka.support.DefaultPartitioner" />
</beans>

Kafka Consumer configuration:

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:int="http://www.springframework.org/schema/integration"
    xmlns:stream="http://www.springframework.org/schema/integration/stream"
    xmlns:int-kafka="http://www.springframework.org/schema/integration/kafka"
    xmlns:task="http://www.springframework.org/schema/task"
    xsi:schemaLocation="http://www.springframework.org/schema/integration/stream http://www.springframework.org/schema/integration/stream/spring-integration-stream.xsd
        http://www.springframework.org/schema/integration/kafka http://www.springframework.org/schema/integration/kafka/spring-integration-kafka.xsd
        http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd
        http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
        http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task.xsd">
    <int:channel id="inputFromKafka">
        <int:dispatcher task-executor="kafkaMessageExecutor" />
    </int:channel>
    <task:executor id="kafkaMessageExecutor" pool-size="0-10"
        keep-alive="120" queue-capacity="500" />
    <int-kafka:zookeeper-connect id="zookeeperConnect"
        zk-connect="localhost:2181" zk-connection-timeout="6000"
        zk-session-timeout="80" zk-sync-time="2000" />
    <int-kafka:inbound-channel-adapter
        id="kafkaInboundChannelAdapter" kafka-consumer-context-ref="consumerContext"
        auto-startup="true" channel="inputFromKafka">
        <int:poller fixed-delay="2000" time-unit="MILLISECONDS" />
    </int-kafka:inbound-channel-adapter>
    <bean id="consumerProperties"
        class="org.springframework.beans.factory.config.PropertiesFactoryBean">
        <property name="properties">
            <props>
                <prop key="auto.offset.reset">smallest</prop>
                <prop key="socket.receive.buffer.bytes">10485760</prop> <!-- 10M -->
                <prop key="fetch.message.max.bytes">5242880</prop>
                <prop key="auto.commit.interval.ms">1000</prop>
            </props>
        </property>
    </bean>
    <int:outbound-channel-adapter channel="inputFromKafka"
        ref="consumerController" method="processMessage" />
    <int-kafka:consumer-context id="consumerContext"
        consumer-timeout="1000" zookeeper-connect="zookeeperConnect"
        consumer-properties="consumerProperties">
        <int-kafka:consumer-configurations>
            <int-kafka:consumer-configuration
                group-id="default1" max-messages="5000" key-decoder="deccoder"
                value-decoder="deccoder">
                <int-kafka:topic id="testtopic" streams="3" />
            </int-kafka:consumer-configuration>
            <!-- <int-kafka:consumer-configuration group-id="default2" max-messages="50"> 
                <int-kafka:topic id="test2" streams="4"/> </int-kafka:consumer-configuration> 
                <int-kafka:consumer-configuration group-id="default3" max-messages="10"> 
                <int-kafka:topic-filter pattern="regextopic.*" streams="4" exclude="false"/> 
                </int-kafka:consumer-configuration> -->
        </int-kafka:consumer-configurations>
    </int-kafka:consumer-context>
    <bean id="deccoder" class="org.springframework.integration.kafka.serializer.common.StringDecoder" />
</beans>

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions