-
Notifications
You must be signed in to change notification settings - Fork 357
Description
I have a question and i read a lot of doc or forum but i dont find a concrete example.
I have one need
Client send telemetry datas periodically as an agent or deamon to server, (datas available from server saved in database...), and when server crash, client have to reconnect automatically on server.... (specific thing, server can send command to client to close it)
2 ways to do it,
1 create a periodical send fireAndForget data
2 more interesting (and what i try to do)
Client send setup connection to server, server can accept or refuse client, if accept ask to client for telemetry
with second option when server crash, client is not reconnecting when server restart. I tried to init connection another time when detect onError but work only one time
Expected Behavior
when server crash, client try to reconnect on server
Actual Behavior
client forget server and do nohintg more...
Steps to Reproduce
here the client code
@Slf4j
@Service
public class Client {
@Value("${application.telemetry.test.uuid}")
private String test;
@Value("${application.telemetry.server.host}")
private String host;
@Value("${application.telemetry.server.port}")
private Integer port;
public static Integer INTERVAL;
@Value("${application.telemetry.interval}")
public void setInterval(Integer interval) {
INTERVAL = interval;
}
private RSocketRequester rsocketRequester;
private RSocketRequester.Builder rsocketRequesterBuilder;
private RSocketStrategies strategies;
@Autowired
public Client(RSocketRequester.Builder builder, @Qualifier("rSocketStrategies") RSocketStrategies strategies) {
this.rsocketRequesterBuilder = builder;
this.strategies = strategies;
}
@EventListener(ApplicationReadyEvent.class)
public void initClient() {
final String client;
// (1)
if ("uuid".equals(test)) {
client = UUID.randomUUID().toString();
} else {
client = TelemetryUtils.getClientHostName();
}
log.info("Connecting using client ID:" + client);
SocketAcceptor responder = RSocketMessageHandler.responder(strategies, new ClientHandler());
this.rsocketRequester = rsocketRequesterBuilder.setupRoute("telemetry.identification").setupData(client)
.rsocketStrategies(strategies).rsocketConnector(connector -> connector.acceptor(responder).reconnect(Retry.indefinitely()))
.connectTcp(host, port).block();
this.rsocketRequester.rsocket().onClose().doOnError(error -> {
log.warn("Connection CLOSED");
})
.doFinally(consumer ->{
log.info("Client DISCONNECTED");
rsocketRequester.rsocket().dispose();
initClient();
}).subscribe();
while (true);
}client handler
@Slf4j
@Component
public class ClientHandler {
@MessageMapping("telemetry")
public Flux<TelemetryDto> statusUpdate(String status) {
log.info(status);
log.info("Start send telemetry");
return Flux.interval(Duration.ofSeconds(Client.INTERVAL)).map(index ->new TelemetryDto(TelemetryUtils.getProcessCpuLoad(),TelemetryUtils.getMemoryUsage(),TelemetryUtils.getActiveProcesses()))
.retry();
}
/**
* Used to execute command sent by server (for now only stop agent)
* @param command
*/
@MessageMapping("close")
public void message(String command){
log.info("new command coming : {}", command);
if("stop".equals(command)) {
log.info("stopping agent requiested");
Runtime.getRuntime().exit(0);
}else {
log.error("Command not implemented");
}
}
}And server code (PS i am using spring boot)
@Slf4j
@CrossOrigin("*")
@Controller
public class TelemetryController {
@Autowired
TelemetryRepository repository;
private final Map<String, RSocketRequester> CLIENTS = new HashMap<>();
@GetMapping("/clients/connected")
public ResponseEntity<?> getClientsConnected() {
log.debug("Get connected client list");
return ResponseEntity.ok(CLIENTS.keySet());
}
@GetMapping("/clients")
public ResponseEntity<?> getAllClients() {
log.debug("get all data registered");
return ResponseEntity.ok(repository.findAll());
}
@GetMapping("/clients/{id}")
public ResponseEntity<?> getClientData(@PathVariable(value = "id") String id) {
log.debug("get all data registered");
return ResponseEntity.ok(repository.findById(id));
}
/**
* used to close manually agent
*
* @param uuid
* @return
*/
@PostMapping("/clients/close/{id}")
public ResponseEntity<?> closeClient(@PathVariable(value = "id") String id) {
log.info("closing agent with id {}", id);
// send command to client requested
if (CLIENTS.get(id) != null) {
Mono<Void> call = CLIENTS.get(id).route("close").data("stop").send();
call.doOnSuccess(consumer -> {
log.info("Client {} closed",id);
CLIENTS.remove(id);
}).subscribe();
return ResponseEntity.noContent().build();
}
return ResponseEntity.notFound().build();
}
@PreDestroy
void shutdown() {
log.info("Detaching all remaining clients...");
CLIENTS.values().stream().forEach(requester -> requester.rsocket().dispose());
log.info("Shutting down.");
}
@MessageMapping("foo")
void call(RSocketRequester requester, @Payload String data) {
log.info(data);
}
@ConnectMapping("telemetry.identification")
void connectShellClientAndAskForTelemetry(RSocketRequester requester, @Payload String client) {
requester.rsocket().onClose().doFirst(() -> {
// Add all new clients to a client list
log.info("Client: {} CONNECTED.", client);
CLIENTS.put(client, requester);
}).doOnError(error -> {
// Warn when channels are closed by clients
log.warn("Channel to client {} CLOSED", client);
}).doFinally(consumer -> {
// Remove disconnected clients from the client list
CLIENTS.remove(client);
log.info("Client {} DISCONNECTED", client);
}).subscribe();
// Once connection confirmed, ask to send telemetry update
requester.route("telemetry").data("OPEN").retrieveFlux(TelemetryDto.class).doOnNext(s -> {
log.info("Client: {} inserting data", client, s.getCpuUsage());
// each time data incoming, update database
repository.save(
new Telemetry(client, s.getCpuUsage(), s.getMemoryUsed(), s.getProcesses(), LocalDateTime.now()));
}).retry().subscribe();
// manage channel close and disconnection
Hooks.onErrorDropped(error -> {
if ("Disposed".equals(error.getCause().getMessage())
|| error.getMessage().contains("ClosedChannelException"))
log.trace("client was stopped {}", client);
else
log.error("error while getting telemetry datas", error);
});
}
}```
## Possible Solution
Concrete example to achieve this case (i think common case)
## Your Environment
Spring boot rsocket 2.6.3
Spring boot
Java 11 or 15
Windows 10