Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ReactorNettyTcpClient constructor with callback to initialize TcpClient [SPR-17523] #22055

Closed
spring-projects-issues opened this issue Nov 20, 2018 · 7 comments
Assignees
Labels
in: web type: enhancement type: regression
Milestone

Comments

@spring-projects-issues
Copy link
Collaborator

@spring-projects-issues spring-projects-issues commented Nov 20, 2018

Gabriel Dogaru opened SPR-17523 and commented

This is the same issue as #17057

The solution there does not apply anymore because that constructor was removed in this commit ffbc75a#diff-11144739384955df1f8f38cbcde8d95b

 


Affects: 5.1.2

Reference URL: #17057

Issue Links:

  • #20933 Upgrade to Reactor Netty 0.8
  • #17057 Spring Websockets Broker relay supporting a cluster of STOMP endpoint addresses

Referenced from: commits 24848ec

@spring-projects-issues
Copy link
Collaborator Author

@spring-projects-issues spring-projects-issues commented Nov 20, 2018

Rossen Stoyanchev commented

The Reactor Netty config API changed from 0.7 to 0.8. We lost the builder to an immutable pattern with config methods returning a new instance of TcpClient.

You still have the constructor taking TcpClient but that requires to take over resource management, which could be done in 5.1+ with the ReactorResourceFactor, but I get the point that an option to provide a supplier of addresses without taking over the management of resource is now missing.

I'll add a constructor with those semantics. Something like this:

public ReactorNettyTcpClient(Function<TcpClient, TcpClient> configurer, ReactorNettyCodec<P> codec) {
    // ...
}

So you could then:

new ReactorNettyTcpClient(client -> client.addressSupplier(...), codec);

I'll also update the sample in the docs.

@PaulGobin
Copy link

@PaulGobin PaulGobin commented Oct 29, 2020

I think I got it:
`private ReactorNettyTcpClient<byte[]> createTcpClient()
{

	final List<InetSocketAddress> addressList = new ArrayList<>();

	addressList.add(new InetSocketAddress("192.168.0.2", 61613));
	addressList.add(new InetSocketAddress("192.168.0.3", 61613));
	addressList.add(new InetSocketAddress("192.168.0.4", 61613));
	addressList.add(new InetSocketAddress(StompBrokerRelayHost, StompBrokerRelayPort));
	final RoundRobinList<InetSocketAddress> addresses = new RoundRobinList<>(addressList);

	return new ReactorNettyTcpClient<>(client -> client.remoteAddress(() -> addresses.get()), new StompReactorNettyCodec());

}`

@sharunthomas
Copy link

@sharunthomas sharunthomas commented Nov 5, 2020

Is the above solution works with spring boot 1.5.X ?

@PaulGobin
Copy link

@PaulGobin PaulGobin commented Nov 5, 2020

I don't believe so, but I would recommend to upgrade to 2.3.4 if you can.

@sharunthomas
Copy link

@sharunthomas sharunthomas commented Nov 5, 2020

But I cannot see a method called .setTcpClient() in my MessageBrokerRegistry class..

registry.enableStompBrokerRelay("/queue/","/topic/")
                    .setRelayHost(hostname)
                    .setRelayPort(port)
                    .setClientLogin(username)
                    .setClientPasscode(password)
                    .setAutoStartup(true);

so I cannot add all hosts in the rabbitMQ cluster, It would be a big help if you can comment on my situation.

Dependencies(build.gradle):

dependencies {
//    developmentOnly("org.springframework.boot:spring-boot-devtools")
    compile('org.springframework.boot:spring-boot-starter-data-jpa')
    compile('org.springframework.boot:spring-boot-starter-web')
    //compile('org.springframework.boot:spring-boot-starter-security')
    //compile('org.springframework.boot:spring-boot-devtools')

    compile 'mysql:mysql-connector-java:5.1.44'
    jooqRuntime 'mysql:mysql-connector-java:5.1.44'
    //Apache commons libraries
    compile group: 'org.apache.commons', name: 'commons-collections4', version: '4.1'
    compile group: 'org.apache.commons', name: 'commons-lang3', version: '3.5'
    compile group: 'org.apache.commons', name: 'commons-email', version: '1.4'
    compile group: 'commons-beanutils', name: 'commons-beanutils', version: '1.9.1'
    compile group: 'commons-io', name: 'commons-io', version: '2.5'
    compile group: 'commons-lang', name: 'commons-lang', version: '2.6'
    compile group: 'org.apache.httpcomponents', name: 'httpclient', version: '4.5.2'
    compile group: 'org.apache.httpcomponents', name: 'httpcore', version: '4.4.4'
    compile group: 'com.needstreet', name: 'v4.commons', version: '1.7'
    compile group: 'io.springfox', name: 'springfox-swagger2', version: '2.7.0'
    compile group: 'io.springfox', name: 'springfox-swagger-ui', version: '2.7.0'
    //compile group: 'io.springfox', name: 'springfox-bean-validators', version: '2.7.0'
    compile 'org.jsoup:jsoup:1.12.1'

    compile group: 'org.springframework.data', name: 'spring-data-envers', version: '1.0.6.RELEASE'
    compile("org.springframework.boot:spring-boot-starter-amqp")
// https://mvnrepository.com/artifact/org.thymeleaf/thymeleaf-spring4
//    compile group: 'org.thymeleaf', name: 'thymeleaf-spring4', version: '2.1.4.RELEASE'

    compile group: 'org.springframework.boot', name: 'spring-boot-starter-thymeleaf', version: '1.5.9.RELEASE'


    testCompile('org.springframework.boot:spring-boot-starter-test')
    testCompile group: 'junit', name: 'junit', version: '4.12'
    testCompile group: 'org.assertj', name: 'assertj-core', version: '3.5.2'
    compile 'com.razorpay:razorpay-java:1.3.4'
    compile 'com.google.code.gson:gson:2.6.2'

//    compile group: 'org.apache.commons', name: 'commons-lang3', version: '3.6'
    compile group: 'org.atmosphere', name: 'atmosphere-runtime', version: '2.4.14'
    compile group: 'javax.inject', name: 'javax.inject', version: '1'
    compile group: 'com.auth0', name: 'java-jwt', version: '3.2.0'
//    compile group: 'org.apache.ignite', name: 'ignite-core', version: '2.1.4'
//    compile group: 'org.apache.ignite', name: 'ignite-core', version: '2.4.0'
//    compile group: 'com.tokbox', name: 'opentok-server-sdk', version: '4.2.0'
    compile group: 'com.tokbox', name: 'opentok-server-sdk', version: '4.3.0'
    compile group: 'org.springframework.boot', name: 'spring-boot-starter-security'
    compile group: 'org.springframework.security', name: 'spring-security-jwt'
    compile group: 'org.springframework.security.oauth', name: 'spring-security-oauth2'

//    compile("org.springframework.boot:spring-boot-devtools")
//    testCompile('com.h2database:h2')
    compile 'com.amazonaws:aws-java-sdk-s3'
//    compile "org.flywaydb:flyway-core:5.0.7"
    compile('org.springframework.boot:spring-boot-starter-jooq')
    compile('org.springframework.boot:spring-boot-starter-groovy-templates')
    compile('org.springframework.cloud:spring-cloud-starter-aws-messaging')

    //netflix zuul sdk
   //compile('org.springframework.cloud:spring-cloud-starter-netflix-zuul')
    //compile group: 'com.marcosbarbero.cloud', name: 'spring-cloud-zuul-ratelimit', version: '1.7.5.RELEASE'

    //aws ses sdk
    compile('com.amazonaws:aws-java-sdk-ses')
    compile('com.amazonaws:aws-java-sdk-sns:1.11.267')
    compile group: 'com.github.fge', name: 'jackson-coreutils', version: '1.0'
    compile("org.springframework.boot:spring-boot-starter-cache")
    compile group: 'com.googlecode.libphonenumber', name: 'libphonenumber', version: '8.10.1'
    compile group: 'javax.mail', name: 'mail', version: '1.4.1'
    compile('javax.servlet:jstl')
    compile('org.apache.tomcat.embed:tomcat-embed-jasper')
    compile("com.google.apis:google-api-services-calendar:v3-rev224-1.22.0")
    compile group: "com.twilio.sdk", name: "twilio", version: "7.17.+"
    compile group: 'commons-io', name: 'commons-io', version: '2.6'
    // https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-mail
    compile group: 'org.springframework.boot', name: 'spring-boot-starter-mail'
//    compile 'com.mixpanel:mixpanel-java:1.4.3'
//    compile 'org.json:json:20151123'
    compile group: 'com.maxmind.geoip2', name: 'geoip2', version: '2.13.0'
    // https://mvnrepository.com/artifact/com.fasterxml.jackson.dataformat/jackson-dataformat-csv
    compile group: 'com.fasterxml.jackson.dataformat', name: 'jackson-dataformat-csv', version: '2.9.6'
    compile "com.stripe:stripe-java:18.7.0"
    compile "org.springframework.boot:spring-boot-starter-websocket"
    implementation "com.googlecode.owasp-java-html-sanitizer:owasp-java-html-sanitizer:20191001.1"
    compile('org.springframework.session:spring-session-jdbc')


    compile group: 'com.google.firebase', name: 'firebase-admin', version: '6.12.2'

    //mailchimp client sdk dependencies.
    compile group: 'com.github.banana-j', name: 'bananaj', version: '0.6.2'

    // https://mvnrepository.com/artifact/org.apache.commons/commons-compress
    compile group: 'org.apache.commons', name: 'commons-compress', version: '1.20'



    compile group: 'io.projectreactor', name: 'reactor-core', version: '2.0.6.RELEASE'
    compile group: 'io.projectreactor', name: 'reactor-net', version: '2.0.6.RELEASE'
    compile group: 'io.netty', name: 'netty-all', version: '4.1.22.Final'
//    compile group: 'io.projectreactor.spring', name: 'reactor-spring-context', version: '2.0.5.RELEASE'
}



@snicoll
Copy link
Member

@snicoll snicoll commented Nov 5, 2020

@sharunthomas as mentioned in the guidelines for contributing, we prefer to use GitHub issues only for bugs and enhancements. For questions, please follow-up on StackOverflow. Note also that Spring Boot 2.0.x is EOL.

@PaulGobin
Copy link

@PaulGobin PaulGobin commented Nov 6, 2020

@sharunthomas
This is how I do it:

@Override
	public void configureMessageBroker(MessageBrokerRegistry messageBrokerRegistry)
	{
StompBrokerRelayRegistration broker = messageBrokerRegistry.enableStompBrokerRelay(ChatConstants.__BROKER_PREDICATE_TOPIC, ChatConstants.__BROKER_PREDICATE_QUEUE);
			broker.setTcpClient(createTcpClient());
			// broker.setRelayHost(StompBrokerRelayHost);
			// broker.setRelayPort(StompBrokerRelayPort);
			broker.setSystemLogin(brokerUser);
			broker.setSystemPasscode(brokerPassword);
			broker.setClientLogin(stompClientUser);
			broker.setClientPasscode(stompClientPassword);
}
/************************************************************************************************
	 * 
	 * @return
	 */
	private ReactorNettyTcpClient<byte[]> createTcpClient()
	{
		final List<InetSocketAddress> addressList = new ArrayList<>();
		String[] StompBrokerRelayHosts = StringUtils.split(StompBrokerRelayHostCluster, ",");
		for (String host : StompBrokerRelayHosts)
		{
			log.info("**** ADDING STOMP BROKER [" + host + "] TO THE CLUSTER LIST. ****");
			addressList.add(new InetSocketAddress(StringUtils.trim(host), StompBrokerRelayPort));
		}
		final RoundRobinList<InetSocketAddress> addresses = new RoundRobinList<>(addressList);
		return new ReactorNettyTcpClient<>(client -> client.remoteAddress(() -> addresses.get()), new StompReactorNettyCodec());
	}

/********************************************/
import java.util.Collection;
import java.util.Iterator;

public class RoundRobinList<T> {

	private Iterator<T> iterator;
	private final Collection<T> elements;

	public RoundRobinList(Collection<T> elements)
	{
		this.elements = elements;
		iterator = this.elements.iterator();
	}

	synchronized public T get()
	{
		if (iterator.hasNext())
		{
			return iterator.next();
		} else
		{
			iterator = elements.iterator();
			return iterator.next();
		}
	}

	public int size()
	{
		return elements.size();
	}
}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
in: web type: enhancement type: regression
Projects
None yet
Development

No branches or pull requests

5 participants