Skip to content

rubyzhang/pulsar-java-spring-boot-starter

 
 

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

Spring boot starter for Apache Pulsar

Maven Central Release Build Status Test Coverage License: MIT Join the chat at https://gitter.im/pulsar-java-spring-boot-starter/community

Quick Start

Simple start consist only from 3 simple steps.

1. Add Maven dependency

<dependency>
  <groupId>io.github.majusko</groupId>
  <artifactId>pulsar-java-spring-boot-starter</artifactId>
  <version>${version}</version>
</dependency>

2. Configure Producer

Create your configuration class with all producers you would like to register.

@Configuration
public class TestProducerConfiguration {

    @Bean
    public ProducerFactory producerFactory() {
        return new ProducerFactory()
            .addProducer("my-topic", MyMsg.class)
            .addProducer("other-topic", String.class);
    }
}

Use registered producers by simply injecting the PulsarTemplate into your service.

@Service
class MyProducer {

	@Autowired
	private PulsarTemplate<MyMsg> producer;

	void send(MyMsg msg) {
		producer.send("my-topic", msg);
	}
}

3. Configure Consumer

Annotate your service method with @PulsarConsumer annotation.

@Service
class MyConsumer {
    
    @PulsarConsumer(topic="my-topic", clazz=MyMsg.class)
    void consume(MyMsg msg) { 
        producer.send(TOPIC, msg); 
    }
}

4. Minimal Configuration

pulsar.service-url=pulsar://localhost:6650

Example project

Documentation

Configuration

Default configuration:

#PulsarClient
pulsar.service-url=pulsar://localhost:6650
pulsar.io-threads=10
pulsar.listener-threads=10
pulsar.enable-tcp-no-delay=false
pulsar.keep-alive-interval-sec=20
pulsar.connection-timeout-sec=10
pulsar.operation-timeout-sec=15
pulsar.starting-backoff-interval-ms=100
pulsar.max-backoff-interval-sec=10
pulsar.consumer-name-delimiter=
pulsar.namespace=default
pulsar.tenant=public

#Consumer
pulsar.consumer.default.dead-letter-policy-max-redeliver-count=-1
pulsar.consumer.default.ack-timeout-ms=3000

Properties explained:

PulsarClient

  • pulsar.service-url - URL used to connect to pulsar cluster.
  • pulsar.io-threads - Number of threads to be used for handling connections to brokers.
  • pulsar.listener-threads - Set the number of threads to be used for message listeners/subscribers.
  • pulsar.enable-tcp-no-delay - Whether to use TCP no-delay flag on the connection, to disable Nagle algorithm.
  • pulsar.keep-alive-interval-sec - Keep alive interval for each client-broker-connection.
  • pulsar.connection-timeout-sec - duration of time to wait for a connection to a broker to be established. If the duration passes without a response from the broker, the connection attempt is dropped.
  • pulsar.operation-timeout-sec - Operation timeout.
  • pulsar.starting-backoff-interval-ms - Duration of time for a backoff interval (Retry algorithm).
  • pulsar.max-backoff-interval-sec - The maximum duration of time for a backoff interval (Retry algorithm).
  • pulsar.consumer-name-delimiter - Consumer names are connection of bean name and method with a delimiter. By default, there is no delimiter and words are connected together.
  • pulsar.namespace - Namespace separation. For example: app1/app2 OR dev/staging/prod. More in Namespaces docs.
  • pulsar.tenant - Pulsar multi-tenancy support. More in Multi Tenancy docs.

Consumer

  • pulsar.consumer.default.dead-letter-policy-max-redeliver-count - How many times should pulsar try to retry sending the message to consumer.
  • pulsar.consumer.default.ack-timeout-ms - How soon should be the message acked and how soon will dead letter mechanism try to retry to send the message.

Additional usages

1. PulsarMessage Wrapper

In case you need to access pulsar metadata you simply use PulsarMessage as a wrapper and data will be injected for you.

@Service
class MyConsumer {
    
    @PulsarConsumer(topic="my-topic", clazz=MyMsg.class)
    void consume(PulsarMessage<MyMsg> myMsg) { 
        producer.send(TOPIC, msg.getValue()); 
    }
}

1. SpeL support

You can configure topic names in application.properties

my.custom.topic.name=foo
@PulsarConsumer(topic = "${my.custom.topic.name}", clazz = MyMsg.class)
public void consume(MyMsg myMsg) {
}

Contributing

All contributors are welcome. If you never contributed to the open-source, start with reading the Github Flow.

Roadmap task

  1. Pick a task from simple roadmap in Projects section.
  2. Create a pull request with reference (url) to the task inside the Projects section.
  3. Rest and enjoy the great feeling of being a contributor.

Hotfix

  1. Create an issue
  2. Create a pull request with reference to the issue
  3. Rest and enjoy the great feeling of being a contributor.

About

Simple pulsar spring boot starter with annotation based consumer/producer registration.

Resources

License

Stars

Watchers

Forks

Packages

No packages published

Languages

  • Java 100.0%