Skip to content

Commit

Permalink
GH-2059: adminTimeout property on @embeddedkafka
Browse files Browse the repository at this point in the history
* GH-2059: expose adminTimeout property in EmbeddedKafka annotation

* GH-2059: remove unused import

* GH-2059: change since version param in added methods

Resolves #2059
  • Loading branch information
breader124 authored and garyrussell committed Mar 29, 2022
1 parent da0b820 commit 5650491
Show file tree
Hide file tree
Showing 3 changed files with 47 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@
* @author Elliot Kennedy
* @author Nakul Mishra
* @author Pawel Lozinski
* @author Adrian Chlebosz
*
* @since 2.2
*/
Expand All @@ -114,7 +115,7 @@ public class EmbeddedKafkaBroker implements InitializingBean, DisposableBean {
*/
public static final String BROKER_LIST_PROPERTY = "spring.embedded.kafka.brokers.property";

private static final Duration DEFAULT_ADMIN_TIMEOUT = Duration.ofSeconds(10);
public static final int DEFAULT_ADMIN_TIMEOUT = 10;

public static final int DEFAULT_ZK_SESSION_TIMEOUT = 18000;

Expand Down Expand Up @@ -176,7 +177,7 @@ public class EmbeddedKafkaBroker implements InitializingBean, DisposableBean {

private int[] kafkaPorts;

private Duration adminTimeout = DEFAULT_ADMIN_TIMEOUT;
private Duration adminTimeout = Duration.ofSeconds(DEFAULT_ADMIN_TIMEOUT);

private int zkConnectionTimeout = DEFAULT_ZK_CONNECTION_TIMEOUT;

Expand Down Expand Up @@ -257,33 +258,24 @@ public EmbeddedKafkaBroker kafkaPorts(int... ports) {
}

/**
* Set an explicit port for the embedded Zookeeper.
* @param port the port.
* @return the {@link EmbeddedKafkaBroker}.
* Set the system property with this name to the list of broker addresses.
* @param brokerListProperty the brokerListProperty to set
* @return this broker.
* @since 2.3
*/
public EmbeddedKafkaBroker zkPort(int port) {
this.zkPort = port;
public EmbeddedKafkaBroker brokerListProperty(String brokerListProperty) {
this.brokerListProperty = brokerListProperty;
return this;
}
/**
* Set the timeout in seconds for admin operations (e.g. topic creation, close).
* Default 30 seconds.
* @param adminTimeout the timeout.
* @since 2.2
*/
public void setAdminTimeout(int adminTimeout) {
this.adminTimeout = Duration.ofSeconds(adminTimeout);
}

/**
* Set the system property with this name to the list of broker addresses.
* @param brokerListProperty the brokerListProperty to set
* @return this broker.
* Set an explicit port for the embedded Zookeeper.
* @param port the port.
* @return the {@link EmbeddedKafkaBroker}.
* @since 2.3
*/
public EmbeddedKafkaBroker brokerListProperty(String brokerListProperty) {
this.brokerListProperty = brokerListProperty;
public EmbeddedKafkaBroker zkPort(int port) {
this.zkPort = port;
return this;
}

Expand All @@ -305,6 +297,27 @@ public void setZkPort(int zkPort) {
this.zkPort = zkPort;
}

/**
* Set the timeout in seconds for admin operations (e.g. topic creation, close).
* @param adminTimeout the timeout.
* @return the {@link EmbeddedKafkaBroker}
* @since 2.8.5
*/
public EmbeddedKafkaBroker adminTimeout(int adminTimeout) {
this.adminTimeout = Duration.ofSeconds(adminTimeout);
return this;
}

/**
* Set the timeout in seconds for admin operations (e.g. topic creation, close).
* Default 10 seconds.
* @param adminTimeout the timeout.
* @since 2.2
*/
public void setAdminTimeout(int adminTimeout) {
this.adminTimeout = Duration.ofSeconds(adminTimeout);
}

/**
* Set connection timeout for the client to the embedded Zookeeper.
* @param zkConnectionTimeout the connection timeout,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2019-2020 the original author or authors.
* Copyright 2019-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -51,6 +51,7 @@
* @author Gary Russell
* @author Artem Bilan
* @author Pawel Lozinski
* @author Adrian Chlebosz
*
* @since 2.3
*
Expand Down Expand Up @@ -127,7 +128,8 @@ private EmbeddedKafkaBroker createBroker(EmbeddedKafka embedded) {
.zkPort(embedded.zookeeperPort())
.kafkaPorts(ports)
.zkConnectionTimeout(embedded.zkConnectionTimeout())
.zkSessionTimeout(embedded.zkSessionTimeout());
.zkSessionTimeout(embedded.zkSessionTimeout())
.adminTimeout(embedded.adminTimeout());
Properties properties = new Properties();

for (String pair : embedded.brokerProperties()) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2017-2020 the original author or authors.
* Copyright 2017-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -60,6 +60,7 @@
* @author Gary Russell
* @author Sergio Lourenco
* @author Pawel Lozinski
* @author Adrian Chlebosz
*
* @since 1.3
*
Expand Down Expand Up @@ -172,5 +173,12 @@
*/
int zkSessionTimeout() default EmbeddedKafkaBroker.DEFAULT_ZK_SESSION_TIMEOUT;

/**
* Timeout in seconds for admin operations (e.g. topic creation, close).
* @return default {@link EmbeddedKafkaBroker#DEFAULT_ADMIN_TIMEOUT}
* @since 2.8.5
*/
int adminTimeout() default EmbeddedKafkaBroker.DEFAULT_ADMIN_TIMEOUT;

}

0 comments on commit 5650491

Please sign in to comment.