Skip to content

Commit

Permalink
Replaces deprecations
Browse files Browse the repository at this point in the history
  • Loading branch information
spencergibb committed May 20, 2020
1 parent c128605 commit 1bf5765
Show file tree
Hide file tree
Showing 6 changed files with 42 additions and 54 deletions.
Expand Up @@ -57,8 +57,8 @@
import org.springframework.boot.rsocket.netty.NettyRSocketServer;
import org.springframework.boot.rsocket.netty.NettyRSocketServerFactory;
import org.springframework.boot.rsocket.server.RSocketServer;
import org.springframework.boot.rsocket.server.RSocketServerCustomizer;
import org.springframework.boot.rsocket.server.RSocketServerFactory;
import org.springframework.boot.rsocket.server.ServerRSocketFactoryProcessor;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
Expand Down Expand Up @@ -175,14 +175,14 @@ public MetadataExtractorBrokerSocketAcceptor metadataExtractorBrokerSocketAccept
}

private static RSocketServerFactory getRSocketServerFactory(ReactorResourceFactory resourceFactory,
ObjectProvider<ServerRSocketFactoryProcessor> processors, AbstractBrokerProperties properties) {
ObjectProvider<RSocketServerCustomizer> processors, AbstractBrokerProperties properties) {
NettyRSocketServerFactory factory = new NettyRSocketServerFactory();
factory.setResourceFactory(resourceFactory);
factory.setTransport(getTransport(properties));
PropertyMapper map = PropertyMapper.get().alwaysApplyingWhenNonNull();
map.from(properties.getAddress()).to(factory::setAddress);
map.from(properties.getPort()).to(factory::setPort);
factory.setSocketFactoryProcessors(processors.orderedStream().collect(Collectors
factory.setRSocketServerCustomizers(processors.orderedStream().collect(Collectors
.toList()));
return factory;
}
Expand Down Expand Up @@ -243,7 +243,7 @@ public ClusterBrokerProperties clusterBrokerProperties() {
public RSocketServerBootstrap clusterRSocketServerBootstrap(
ClusterBrokerProperties properties,
ReactorResourceFactory resourceFactory,
ObjectProvider<ServerRSocketFactoryProcessor> processors,
ObjectProvider<RSocketServerCustomizer> processors,
ClusterSocketAcceptor clusterSocketAcceptor) {
RSocketServerFactory serverFactory = getRSocketServerFactory(resourceFactory, processors, properties);
return new BrokerRSocketServerBootstrap(properties, serverFactory, clusterSocketAcceptor);
Expand All @@ -266,7 +266,7 @@ public TcpBrokerProperties tcpBrokerProperties() {
public RSocketServerBootstrap tcpRSocketServerBootstrap(
TcpBrokerProperties properties,
ReactorResourceFactory resourceFactory,
ObjectProvider<ServerRSocketFactoryProcessor> processors,
ObjectProvider<RSocketServerCustomizer> processors,
BrokerSocketAcceptor brokerSocketAcceptor) {
RSocketServerFactory serverFactory = getRSocketServerFactory(resourceFactory, processors, properties);
return new BrokerRSocketServerBootstrap(properties, serverFactory, brokerSocketAcceptor);
Expand All @@ -288,7 +288,7 @@ public WebsocketBrokerProperties websocketBrokerProperties() {
public RSocketServerBootstrap websocketRSocketServerBootstrap(
WebsocketBrokerProperties properties,
ReactorResourceFactory resourceFactory,
ObjectProvider<ServerRSocketFactoryProcessor> processors,
ObjectProvider<RSocketServerCustomizer> processors,
BrokerSocketAcceptor brokerSocketAcceptor) {
RSocketServerFactory serverFactory = getRSocketServerFactory(resourceFactory, processors, properties);
return new BrokerRSocketServerBootstrap(properties, serverFactory, brokerSocketAcceptor);
Expand Down
Expand Up @@ -19,11 +19,11 @@
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.Unpooled;
import io.rsocket.AbstractRSocket;
import io.rsocket.ConnectionSetupPayload;
import io.rsocket.Payload;
import io.rsocket.RSocket;
import io.rsocket.SocketAcceptor;
import io.rsocket.core.DefaultConnectionSetupPayload;
import io.rsocket.frame.SetupFrameCodec;
import io.rsocket.routing.broker.config.BrokerProperties;
import io.rsocket.routing.broker.config.BrokerProperties.AbstractAcceptor;
Expand Down Expand Up @@ -125,7 +125,7 @@ private Mono<RSocketRequester> connect(AbstractAcceptor connection, Object data,
if (metadata != null) {
builder.setupMetadata(metadata, MimeTypes.ROUTING_FRAME_MIME_TYPE);
}
return builder.rsocketFactory(rsocketFactory -> rsocketFactory
return builder.rsocketConnector(rSocketConnector -> rSocketConnector
.acceptor((setup, sendingSocket) -> Mono.just(rSocket)))
// TODO: other types?
.connectTcp(connection.getHost(), connection.getPort());
Expand All @@ -140,7 +140,7 @@ private Mono<RSocketRequester> connect(AbstractAcceptor connection, Object data,
private void setupRSocket() {
ConnectionSetupPayload connectionSetupPayload = getConnectionSetupPayload();
SocketAcceptor responder = this.messageHandler.responder();
responder.accept(connectionSetupPayload, new AbstractRSocket() {
responder.accept(connectionSetupPayload, new RSocket() {
}).subscribe(rSocket -> {
this.rSocket = rSocket;
});
Expand All @@ -160,7 +160,7 @@ private ConnectionSetupPayload getConnectionSetupPayload() {
ByteBuf setup = SetupFrameCodec.encode(byteBufAllocator, false, 1, 1,
MESSAGE_RSOCKET_COMPOSITE_METADATA.getString(),
MimeTypes.ROUTING_FRAME_MIME_TYPE.toString(), setupPayload);
return ConnectionSetupPayload.create(setup);
return new DefaultConnectionSetupPayload(setup);
}

}
Expand Up @@ -26,10 +26,10 @@
import io.netty.buffer.CompositeByteBuf;
import io.rsocket.Payload;
import io.rsocket.RSocket;
import io.rsocket.RSocketFactory;
import io.rsocket.core.RSocketConnector;
import io.rsocket.frame.decoder.PayloadDecoder;
import io.rsocket.metadata.CompositeMetadataFlyweight;
import io.rsocket.metadata.TaggingMetadataFlyweight;
import io.rsocket.metadata.CompositeMetadataCodec;
import io.rsocket.metadata.TaggingMetadataCodec;
import io.rsocket.metadata.WellKnownMimeType;
import io.rsocket.routing.broker.spring.MimeTypes;
import io.rsocket.routing.common.Id;
Expand All @@ -45,6 +45,7 @@
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Hooks;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoProcessor;

import org.springframework.beans.factory.annotation.Autowired;
Expand Down Expand Up @@ -116,7 +117,8 @@ static ByteBuf encodeRouteSetup(RSocketStrategies strategies, Id routeId, String
ByteBuf routeSetup = RouteSetupFlyweight
.encode(ByteBufAllocator.DEFAULT, routeId, serviceName, tags);

CompositeByteBuf composite = encodeComposite(routeSetup, MimeTypes.ROUTING_FRAME_MIME_TYPE.toString());
CompositeByteBuf composite = encodeComposite(routeSetup, MimeTypes.ROUTING_FRAME_MIME_TYPE
.toString());
return composite;
}

Expand All @@ -126,13 +128,14 @@ static ByteBuf encodeAddress(RSocketStrategies strategies, Id originRouteId, Str
ByteBuf address = AddressFlyweight
.encode(ByteBufAllocator.DEFAULT, originRouteId, Tags.empty(), tags);

CompositeByteBuf composite = encodeComposite(address, MimeTypes.ROUTING_FRAME_MIME_TYPE.toString());
CompositeByteBuf composite = encodeComposite(address, MimeTypes.ROUTING_FRAME_MIME_TYPE
.toString());
return composite;
}

private static CompositeByteBuf encodeComposite(ByteBuf byteBuf, String mimeType) {
CompositeByteBuf composite = ByteBufAllocator.DEFAULT.compositeBuffer();
CompositeMetadataFlyweight
CompositeMetadataCodec
.encodeAndAddMetadata(composite, ByteBufAllocator.DEFAULT,
mimeType, byteBuf);
return composite;
Expand All @@ -158,7 +161,7 @@ public static class Ping
implements Ordered, ApplicationListener<ApplicationReadyEvent> {

private Logger logger = LoggerFactory.getLogger(getClass());

@Autowired
private RSocketStrategies strategies;

Expand Down Expand Up @@ -189,11 +192,11 @@ public void onApplicationEvent(ApplicationReadyEvent event) {
ByteBuf metadata = encodeRouteSetup(strategies, id, "ping");
Payload setupPayload = DefaultPayload.create(EMPTY_BUFFER, metadata);

pongFlux = RSocketFactory.connect().frameDecoder(PayloadDecoder.ZERO_COPY)
pongFlux = RSocketConnector.create().payloadDecoder(PayloadDecoder.ZERO_COPY)
.metadataMimeType(COMPOSITE_MIME_TYPE.toString())
.setupPayload(setupPayload)//.addRequesterPlugin(interceptor)
.transport(TcpClientTransport.create(port)) // proxy
.start().log("startPing" + id)
.connect(TcpClientTransport.create(port)) // proxy
.log("startPing" + id)
.flatMapMany(socket -> doPing(take, socket)).cast(String.class)
.doOnSubscribe(o -> {
if (logger.isDebugEnabled()) {
Expand All @@ -209,7 +212,7 @@ public void onApplicationEvent(ApplicationReadyEvent event) {

MonoProcessor<Void> onClose = MonoProcessor.create();

startDaemonThread("ping"+id, onClose);
startDaemonThread("ping" + id, onClose);
}

Publisher<? extends String> doPing(Integer take, RSocket socket) {
Expand Down Expand Up @@ -277,12 +280,13 @@ public void onApplicationEvent(ApplicationReadyEvent event) {
// meterRegistry, Tag.of("component", "pong"));

ByteBuf metadata = encodeRouteSetup(strategies, routeId, "pong");
RSocketFactory.connect().metadataMimeType(COMPOSITE_MIME_TYPE.toString())
.setupPayload(
DefaultPayload.create(EMPTY_BUFFER, metadata))
/*.addRequesterPlugin(interceptor)*/.acceptor(this::accept)
.transport(TcpClientTransport.create(port)) // proxy
.start().block();
RSocketConnector.create().metadataMimeType(COMPOSITE_MIME_TYPE.toString())
.setupPayload(DefaultPayload.create(EMPTY_BUFFER, metadata))
/*.addRequesterPlugin(interceptor)*/
.acceptor((setup, sendingSocket) -> Mono
.just(accept(sendingSocket)))
.connect(TcpClientTransport.create(port)) // proxy
.block();

MonoProcessor<Void> onClose = MonoProcessor.create();

Expand Down Expand Up @@ -346,20 +350,21 @@ public void onApplicationEvent(ApplicationReadyEvent event) {
//ByteBuf metadata = encodeRouteSetup(strategies, id, "ping");
Payload setupPayload = DefaultPayload.create(EMPTY_BUFFER, EMPTY_BUFFER);

Flux<String> flux = RSocketFactory.connect()
.frameDecoder(PayloadDecoder.ZERO_COPY)
Flux<String> flux = RSocketConnector.create()
.payloadDecoder(PayloadDecoder.ZERO_COPY)
.metadataMimeType(COMPOSITE_MIME_TYPE.toString())
.setupPayload(setupPayload)
.transport(TcpClientTransport.create(port)) // proxy
.start().log("startClusterClient" + id)
.flatMapMany(socket -> doHello(socket)).cast(String.class)
.connect(TcpClientTransport.create(port)) // proxy
.log("startClusterClient" + id)
.flatMapMany(this::doHello).cast(String.class)
.doOnSubscribe(o -> {
if (logger.isDebugEnabled()) {
logger.debug("ClusterClient doOnSubscribe");
}
});

boolean subscribe = env.getProperty("ClusterClient.subscribe", Boolean.class, true);
boolean subscribe = env
.getProperty("ClusterClient.subscribe", Boolean.class, true);

if (subscribe) {
flux.subscribe();
Expand All @@ -369,7 +374,7 @@ public void onApplicationEvent(ApplicationReadyEvent event) {
Publisher<? extends String> doHello(RSocket socket) {
ByteBuf data = ByteBufUtil.writeUtf8(ByteBufAllocator.DEFAULT,
"World " + id);
ByteBuf routingMetadata = TaggingMetadataFlyweight
ByteBuf routingMetadata = TaggingMetadataCodec
.createRoutingMetadata(ByteBufAllocator.DEFAULT,
Collections.singletonList("hello")).getContent();
CompositeByteBuf composite = encodeComposite(routingMetadata, WellKnownMimeType.MESSAGE_RSOCKET_ROUTING
Expand Down
Expand Up @@ -24,7 +24,6 @@
import java.util.function.Function;

import io.netty.util.concurrent.FastThreadLocal;
import io.rsocket.AbstractRSocket;
import io.rsocket.RSocket;
import io.rsocket.routing.broker.RSocketIndex;
import io.rsocket.routing.broker.RoutingTable;
Expand Down Expand Up @@ -146,7 +145,7 @@ private Mono<RSocket> loadbalance(List<RSocket> rSockets, Tags tags) {
}
response.onComplete(new CompletionContext(Status.DISCARD));
// TODO: return empty?
return new AbstractRSocket() { };
return new RSocket() { };
});
}
}
Expand Up @@ -18,14 +18,13 @@

import java.util.Collection;

import io.rsocket.AbstractRSocket;
import io.rsocket.RSocket;

/**
* RSocket implementation that will broadcast payloads to a
* collection of RSockets.
*/
public abstract class BroadcastRSocket extends AbstractRSocket {
public abstract class BroadcastRSocket implements RSocket {

public abstract Collection<? extends RSocket> getRSockets();

Expand Down
Expand Up @@ -18,10 +18,8 @@

import java.util.function.Function;

import io.rsocket.AbstractRSocket;
import io.rsocket.Payload;
import io.rsocket.RSocket;
import io.rsocket.ResponderRSocket;
import io.rsocket.routing.broker.locator.RSocketLocator;
import io.rsocket.routing.common.Tags;
import org.reactivestreams.Publisher;
Expand All @@ -31,7 +29,7 @@
/**
* Routes received requests to the correct routable destination.
*/
public class RoutingRSocket extends AbstractRSocket implements ResponderRSocket {
public class RoutingRSocket implements RSocket {

private final RSocketLocator rSocketLocator;
private final Function<Payload, Tags> tagsExtractor;
Expand Down Expand Up @@ -120,17 +118,4 @@ public Flux<Payload> requestChannel(Publisher<Payload> payloads) {
});
}

@Override
public Flux<Payload> requestChannel(Payload payload, Publisher<Payload> payloads) {
try {
Tags tags = tagsExtractor.apply(payload);
Mono<RSocket> located = rSocketLocator.apply(tags);

return located.flatMapMany(rSocket -> rSocket.requestChannel(Flux.from(payloads).skip(1).startWith(payload))
.onErrorResume(e -> Flux.error(new RuntimeException("TODO", e))));
} catch (Throwable e) {
payload.release();
return Flux.error(new RuntimeException("TODO: fill out values", e)); //TODO:
}
}
}

0 comments on commit 1bf5765

Please sign in to comment.