There are multiple ways to transfer data between Java applications, this library offers an easy way to connect with them using common methods.
Currently supporting the brokers:
- ActiveMQ using topic producers and consumers.
- Kafka using empty-key records to producers.
- NATS using subject subscription.
- PostgreSQL using
LISTEN
andNOTIFY
statement. - RabbitMQ using queue and consumer via exchange.
- Redis using publish and subscribe (also compatible with KeyDB).
- SQL polling (not a real broker, but can be used as one).
PostgreSQL and SQL are also compatible with Hikari.
Delivery4j contains the following artifacts:
delivery4j
- The main project.broker-activemq
- ActiveMQ broker.broker-kafka
- Kafka broker.broker-nats
- NATS broker.broker-postgresql
- PostgreSQL broker using plain Java connections.broker-postgresql-hikari
- PostgreSQL broker using Hikari library.broker-rabbitmq
- RabbitMQ broker.broker-redis
- Redis broker.broker-sql
- SQL broker using plain Java connections.broker-sql-hikari
- SQL broker using Hikari library.extension-caffeine
- Extension to detect and use Caffeine cache on MessageChannel.extension-guava
- Extension to detect and use Guava cache on MessageChannel.extension-log4j
- Extension to detect and use log4j logger on Broker instance.extension-slf4j
- Extension to detect and use slf4j logger on Broker instance.
build.gradle
repositories {
maven { url 'https://jitpack.io' }
}
dependencies {
implementation 'com.saicone.delivery4j:delivery4j:VERSION'
}
build.gradle.kts
repositories {
maven("https://jitpack.io")
}
dependencies {
implementation("com.saicone.delivery4j:delivery4j:VERSION")
}
pom.xml
<repositories>
<repository>
<id>Jitpack</id>
<url>https://jitpack.io</url>
</repository>
</repositories>
<dependencies>
<dependency>
<groupId>com.saicone.delivery4j</groupId>
<artifactId>delivery4j</artifactId>
<version>VERSION</version>
<scope>compile</scope>
</dependency>
</dependencies>
How to use Delivery4j library.
Using brokers is pretty simple, you just need to create a broker instance (depending on the implementation) and set a consumer.
Broker broker = // Create instance from any implementation
// Subscribe to channels
broker.subscribe("hello:world", "myChannel1");
broker.setConsumer((channel, data) -> {
// do something
});
// Start connection
broker.start();
// Send data
byte[] data = ...;
broker.send("myChannel1", data);
Some brokers require to convert bytes to String and viceversa, Base64 is used by default.
Broker broker = // Create instance from any implementation
broker.setCodec(new ByteCodec<>() {
@Override
public @NotNull String encode(byte[] src) {
// convert bytes to String
}
@Override
public byte[] decode(@NotNull String src) {
// convert String to bytes
}
});
Some brokers have blocking operations or repetitive tasks, it's suggested to implement your own executor.
Broker broker = // Create instance from any implementation
broker.setExecutor(new DelayedExecutor<MyTaskObject>() {
@Override
public @NotNull MyTaskObject execute(@NotNull Runnable command) {
// run task and return itself
}
@Override
public @NotNull MyTaskObject execute(@NotNull Runnable command, long delay, @NotNull TimeUnit unit) {
// run delayed task and return itself
}
@Override
public @NotNull MyTaskObject execute(@NotNull Runnable command, long delay, long period, @NotNull TimeUnit unit) {
// run repetitive task and return itself
}
@Override
public void cancel(@NotNull MyTaskObject unused) {
// cancel task
}
});
And also a logging instance to log information about connection and exceptions, by default it use the best available implementation.
It uses a number terminology for logging levels:
- Error
- Warning
- Information
- Debug
Broker broker = // Create instance from any implementation
broker.setLogger(new Broker.Logger() {
@Override
public void log(int level, @NotNull String msg) {
// log raw message
}
@Override
public void log(int level, @NotNull String msg, @NotNull Throwable throwable) {
// log raw message with throwable
}
});
Probably the reason why you are here, it's a simple usage of brokers to send and receive multi-line String messages.
First you need to extend AbstractMessenger and provide a broker.
public class Messenger extends AbstractMessenger {
@Override
protected Broker loadBroker() {
// Create instance from any implementation
}
}
And then use the Messenger.
Messenger messenger = new Messenger();
// Start connection
messenger.start();
// Send multi-line message to channel
messeger.send("myChannel1", "Hello", "World");
// Subscribe to channel
messenger.subscribe("myChannel1").consume((channel, lines) -> {
// do something
});
The subscribed message channels can have a cache instance to avoid receive outbound messages, by default it use the best available implementation.
Messenger messenger = new Messenger();
// Subscribe to channel
MessageChannel channel = messenger.subscribe("myChannel1").consume((channel, lines) -> {
// do something
});
// Cache message IDs
channel.cache(true);
// Cache with provided expiration
channel.cache(20, TimeUnit.SECONDS);
And also can have an end-to-end encryption.
Messenger messenger = new Messenger();
// Subscribe to channel
MessageChannel channel = messenger.subscribe("myChannel1").consume((channel, lines) -> {
// do something
});
// Your key
SecretKey key = ...;
channel.encryptor(Encryptor.of(key));