Skip to content

Commit

Permalink
HermesGermany#292 Reverted changes to KafkaSenderImpl and added overl…
Browse files Browse the repository at this point in the history
…oaded method to KafkaFutureDecoupler to accept new return type of kafkaTemplate.send()
  • Loading branch information
jimbethancourt committed Jul 13, 2023
1 parent 22fa131 commit 19f702c
Show file tree
Hide file tree
Showing 4 changed files with 44 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ private KafkaSenderImpl buildKafkaSender(KafkaEnvironmentConfig environment,
props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "1");

ProducerFactory<String, String> factory = new DefaultKafkaProducerFactory<>(toMap(props));
return new KafkaSenderImpl(new KafkaTemplate<>(factory));
return new KafkaSenderImpl(new KafkaTemplate<>(factory), futureDecoupler);
}

private Properties buildKafkaProperties(KafkaEnvironmentConfig environment,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,30 @@ public <T> CompletableFuture<T> toCompletableFuture(KafkaFuture<T> future) {
}));
}

/**
* Returns a {@link CompletableFuture} which completes when the given {@link CompletableFuture} completes. If the
* <code>ListenableFuture</code> is already complete, a completed Future is returned. Otherwise, the returned Future
* completes on a Thread provided by a fresh <code>ExecutorService</code> of the <code>KafkaExecutorFactory</code>
* provided for this helper class.
*
* @param <T> Type of the value provided by the Future.
* @param completableFuture Future which may be complete, or which may complete on the Kafka Thread.
*
* @return A completable Future which may be already complete if the original Future already was complete, or which
* completes on a Thread decoupled from the Kafka Thread.
*/
public <T> CompletableFuture<T> toCompletableFuture(CompletableFuture<T> completableFuture) {
return toCompletableFuture(completableFuture, cb -> completableFuture.whenComplete((t, ex) -> {
if (ex != null) {
cb.onFailure(ex);
}
else {
cb.onSuccess(t);
}
}));
}


/**
* Returns a {@link CompletableFuture} which completes when the given {@link ListenableFuture} completes. If the
* <code>ListenableFuture</code> is already complete, a completed Future is returned. Otherwise, the returned Future
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,17 @@ public class KafkaSenderImpl implements KafkaSender {

private KafkaTemplate<String, String> kafkaTemplate;

public KafkaSenderImpl(KafkaTemplate<String, String> template) {
private KafkaFutureDecoupler futureDecoupler;

public KafkaSenderImpl(KafkaTemplate<String, String> template, KafkaFutureDecoupler futureDecoupler) {
this.kafkaTemplate = template;
this.futureDecoupler = futureDecoupler;
}

@Override
public CompletableFuture<Void> send(String topic, String key, String message) {
return kafkaTemplate.send(new ProducerRecord<>(topic, key, message)).thenApply(o -> null);
return futureDecoupler.toCompletableFuture(kafkaTemplate.send(new ProducerRecord<>(topic, key, message)))
.thenApply(o -> null);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,19 @@

class KafkaSenderImplTest {

private static ThreadFactory tfDecoupled = new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "decoupled-" + System.currentTimeMillis());
}
};

private static KafkaExecutorFactory executorFactory = () -> {
return Executors.newSingleThreadExecutor(tfDecoupled);
};
@Test
void testSendDecoupling() throws Exception {
KafkaFutureDecoupler decoupler = new KafkaFutureDecoupler(executorFactory);

@SuppressWarnings("unchecked")
Producer<String, String> producer = mock(Producer.class);
Expand All @@ -45,12 +56,12 @@ void testSendDecoupling() throws Exception {

KafkaTemplate<String, String> template = new KafkaTemplate<String, String>(factory);

KafkaSenderImpl sender = new KafkaSenderImpl(template);
KafkaSenderImpl sender = new KafkaSenderImpl(template, decoupler);

StringBuilder threadName = new StringBuilder();

sender.send("a", "b", "c").thenApply(o -> threadName.append(Thread.currentThread().getName())).get();
assertFalse(threadName.toString().startsWith("decoupled-"));
assertTrue(threadName.toString().startsWith("decoupled-"));
}

}

0 comments on commit 19f702c

Please sign in to comment.