Skip to content

Commit

Permalink
Implement high availability cluster functionality
Browse files Browse the repository at this point in the history
[resolves pgjdbc#120]
  • Loading branch information
kressi committed May 29, 2022
1 parent 6d2c1a8 commit cd7fac5
Show file tree
Hide file tree
Showing 20 changed files with 1,456 additions and 171 deletions.
13 changes: 13 additions & 0 deletions src/main/java/io/r2dbc/postgresql/ClientSupplier.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package io.r2dbc.postgresql;

import io.r2dbc.postgresql.client.Client;
import io.r2dbc.postgresql.client.ConnectionSettings;
import reactor.core.publisher.Mono;

import java.net.SocketAddress;

public interface ClientSupplier {

Mono<Client> connect(SocketAddress endpoint, ConnectionSettings settings);

}
31 changes: 31 additions & 0 deletions src/main/java/io/r2dbc/postgresql/ConnectionStrategy.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package io.r2dbc.postgresql;

import io.r2dbc.postgresql.client.Client;
import io.r2dbc.postgresql.client.ConnectionSettings;
import reactor.core.publisher.Mono;

import java.net.SocketAddress;
import java.util.Map;
import java.util.function.Function;

public interface ConnectionStrategy {

Mono<Client> connect();

ConnectionStrategy withOptions(Map<String, String> options);

interface ComposableConnectionStrategy extends ConnectionStrategy {

default <T extends ConnectionStrategy> T chainIf(boolean guard, Function<ComposableConnectionStrategy, T> nextStrategyProvider, Class<T> klass) {
return guard ? nextStrategyProvider.apply(this) : klass.cast(this);
}

ComposableConnectionStrategy withAddress(SocketAddress address);

ComposableConnectionStrategy withConnectionSettings(ConnectionSettings connectionSettings);

ComposableConnectionStrategy withOptions(Map<String, String> options);

}

}
47 changes: 47 additions & 0 deletions src/main/java/io/r2dbc/postgresql/ConnectionStrategyFactory.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package io.r2dbc.postgresql;

import io.netty.channel.unix.DomainSocketAddress;
import io.r2dbc.postgresql.client.MultiHostConfiguration;
import io.r2dbc.postgresql.client.SSLConfig;
import io.r2dbc.postgresql.client.SSLMode;
import io.r2dbc.postgresql.client.SingleHostConfiguration;

import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.ArrayList;
import java.util.List;

public class ConnectionStrategyFactory {

public static ConnectionStrategy getConnectionStrategy(ClientSupplier clientSupplier, PostgresqlConnectionConfiguration configuration) {
SingleHostConfiguration singleHostConfiguration = configuration.getSingleHostConfiguration();
MultiHostConfiguration multiHostConfiguration = configuration.getMultiHostConfiguration();
SSLConfig sslConfig = configuration.getSslConfig();
SocketAddress address = singleHostConfiguration != null ? createSocketAddress(singleHostConfiguration) : null;
return new DefaultConnectionStrategy(address, clientSupplier, configuration, configuration.getConnectionSettings(), configuration.getOptions())
.chainIf(!SSLMode.DISABLE.equals(sslConfig.getSslMode()), strategy -> new SslFallbackConnectionStrategy(configuration, strategy), ConnectionStrategy.ComposableConnectionStrategy.class)
.chainIf(multiHostConfiguration != null, strategy -> new MultiHostConnectionStrategy(createSocketAddress(multiHostConfiguration), configuration, strategy), ConnectionStrategy.class);
}

private static SocketAddress createSocketAddress(SingleHostConfiguration configuration) {
if (!configuration.isUseSocket()) {
return InetSocketAddress.createUnresolved(configuration.getRequiredHost(), configuration.getPort());
}
return DomainSocketFactory.getDomainSocketAddress(configuration);
}

static class DomainSocketFactory {
private static SocketAddress getDomainSocketAddress(SingleHostConfiguration configuration) {
return new DomainSocketAddress(configuration.getRequiredSocket());
}
}

private static List<SocketAddress> createSocketAddress(MultiHostConfiguration configuration) {
List<SocketAddress> addressList = new ArrayList<>(configuration.getHosts().size());
for (MultiHostConfiguration.ServerHost host : configuration.getHosts()) {
addressList.add(InetSocketAddress.createUnresolved(host.getHost(), host.getPort()));
}
return addressList;
}

}
79 changes: 79 additions & 0 deletions src/main/java/io/r2dbc/postgresql/DefaultConnectionStrategy.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
package io.r2dbc.postgresql;

import io.r2dbc.postgresql.authentication.AuthenticationHandler;
import io.r2dbc.postgresql.authentication.PasswordAuthenticationHandler;
import io.r2dbc.postgresql.authentication.SASLAuthenticationHandler;
import io.r2dbc.postgresql.client.Client;
import io.r2dbc.postgresql.client.ConnectionSettings;
import io.r2dbc.postgresql.client.StartupMessageFlow;
import io.r2dbc.postgresql.message.backend.AuthenticationMessage;
import io.r2dbc.postgresql.util.Assert;
import reactor.core.publisher.Mono;

import javax.annotation.Nullable;
import java.net.SocketAddress;
import java.util.Map;

public class DefaultConnectionStrategy implements ConnectionStrategy.ComposableConnectionStrategy {

private final SocketAddress address;

private final ClientSupplier clientSupplier;

private final PostgresqlConnectionConfiguration configuration;

private final ConnectionSettings connectionSettings;

private final Map<String, String> options;

DefaultConnectionStrategy(
@Nullable SocketAddress address,
ClientSupplier clientSupplier,
PostgresqlConnectionConfiguration configuration,
ConnectionSettings connectionSettings,
@Nullable Map<String, String> options
) {
this.address = address;
this.clientSupplier = clientSupplier;
this.configuration = configuration;
this.connectionSettings = connectionSettings;
this.options = options;
}

@Override
public Mono<Client> connect() {
Assert.requireNonNull(this.address, "address must not be null");
return this.clientSupplier.connect(this.address, this.connectionSettings)
.delayUntil(client -> StartupMessageFlow
.exchange(this.configuration.getApplicationName(), this::getAuthenticationHandler, client, this.configuration.getDatabase(), this.configuration.getUsername(), this.options)
.handle(ExceptionFactory.INSTANCE::handleErrorResponse));
}

@Override
public ComposableConnectionStrategy withAddress(SocketAddress address) {
return new DefaultConnectionStrategy(address, this.clientSupplier, this.configuration, this.connectionSettings, this.options);
}

@Override
public ComposableConnectionStrategy withConnectionSettings(ConnectionSettings connectionSettings) {
return new DefaultConnectionStrategy(this.address, this.clientSupplier, this.configuration, connectionSettings, this.options);
}

@Override
public ComposableConnectionStrategy withOptions(Map<String, String> options) {
return new DefaultConnectionStrategy(this.address, this.clientSupplier, this.configuration, this.connectionSettings, options);
}

protected AuthenticationHandler getAuthenticationHandler(AuthenticationMessage message) {
if (PasswordAuthenticationHandler.supports(message)) {
CharSequence password = Assert.requireNonNull(this.configuration.getPassword(), "Password must not be null");
return new PasswordAuthenticationHandler(password, this.configuration.getUsername());
} else if (SASLAuthenticationHandler.supports(message)) {
CharSequence password = Assert.requireNonNull(this.configuration.getPassword(), "Password must not be null");
return new SASLAuthenticationHandler(password, this.configuration.getUsername());
} else {
throw new IllegalStateException(String.format("Unable to provide AuthenticationHandler capable of handling %s", message));
}
}

}
195 changes: 195 additions & 0 deletions src/main/java/io/r2dbc/postgresql/MultiHostConnectionStrategy.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,195 @@
package io.r2dbc.postgresql;

import io.r2dbc.postgresql.client.Client;
import io.r2dbc.postgresql.client.MultiHostConfiguration;
import io.r2dbc.postgresql.codec.DefaultCodecs;
import io.r2dbc.spi.IsolationLevel;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.util.context.Context;

import javax.annotation.Nullable;
import java.net.SocketAddress;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Predicate;

import static io.r2dbc.postgresql.TargetServerType.ANY;
import static io.r2dbc.postgresql.TargetServerType.MASTER;
import static io.r2dbc.postgresql.TargetServerType.PREFER_SECONDARY;
import static io.r2dbc.postgresql.TargetServerType.SECONDARY;

public class MultiHostConnectionStrategy implements ConnectionStrategy {

private final List<SocketAddress> addresses;

private final PostgresqlConnectionConfiguration configuration;

private final ComposableConnectionStrategy connectionStrategy;

private final MultiHostConfiguration multiHostConfiguration;

private final Map<SocketAddress, HostSpecStatus> statusMap;

MultiHostConnectionStrategy(List<SocketAddress> addresses, PostgresqlConnectionConfiguration configuration, ComposableConnectionStrategy connectionStrategy) {
this.addresses = addresses;
this.configuration = configuration;
this.connectionStrategy = connectionStrategy;
this.multiHostConfiguration = this.configuration.getMultiHostConfiguration();
this.statusMap = new ConcurrentHashMap<>();
}

@Override
public Mono<Client> connect() {
AtomicReference<Throwable> exceptionRef = new AtomicReference<>();
TargetServerType targetServerType = this.multiHostConfiguration.getTargetServerType();
return this.tryConnect(targetServerType)
.onErrorResume(e -> {
if (!exceptionRef.compareAndSet(null, e)) {
exceptionRef.get().addSuppressed(e);
}
return Mono.empty();
})
.switchIfEmpty(Mono.defer(() -> targetServerType == PREFER_SECONDARY
? this.tryConnect(MASTER)
: Mono.empty()))
.switchIfEmpty(Mono.error(() -> {
Throwable error = exceptionRef.get();
if (error == null) {
return new PostgresqlConnectionFactory.PostgresConnectionException(String.format("No server matches target type %s", targetServerType.getValue()), null);
} else {
return error;
}
}));
}

@Override
public ConnectionStrategy withOptions(Map<String, String> options) {
return new MultiHostConnectionStrategy(this.addresses, this.configuration, this.connectionStrategy.withOptions(options));
}

private Mono<Client> tryConnect(TargetServerType targetServerType) {
AtomicReference<Throwable> exceptionRef = new AtomicReference<>();
return this.getCandidates(targetServerType).concatMap(candidate -> this.tryConnectToCandidate(targetServerType, candidate)
.onErrorResume(e -> {
if (!exceptionRef.compareAndSet(null, e)) {
exceptionRef.get().addSuppressed(e);
}
this.statusMap.put(candidate, HostSpecStatus.fail(candidate));
return Mono.empty();
}))
.next()
.switchIfEmpty(Mono.defer(() -> exceptionRef.get() != null
? Mono.error(exceptionRef.get())
: Mono.empty()));
}

private static HostSpecStatus evaluateStatus(SocketAddress candidate, @Nullable HostSpecStatus oldStatus) {
return oldStatus == null || oldStatus.hostStatus == HostStatus.CONNECT_FAIL
? HostSpecStatus.ok(candidate)
: oldStatus;
}

private static Mono<Boolean> isPrimaryServer(Client client, PostgresqlConnectionConfiguration configuration) {
PostgresqlConnection connection = new PostgresqlConnection(client, new DefaultCodecs(client.getByteBufAllocator()), DefaultPortalNameSupplier.INSTANCE,
StatementCache.fromPreparedStatementCacheQueries(client, configuration.getPreparedStatementCacheQueries()), IsolationLevel.READ_UNCOMMITTED, configuration);
return connection.createStatement("show transaction_read_only")
.execute()
.flatMap(result -> result.map((row, rowMetadata) -> row.get(0, String.class)))
.map(s -> s.equalsIgnoreCase("off"))
.next();
}

private Flux<SocketAddress> getCandidates(TargetServerType targetServerType) {
return Flux.create(sink -> {
Predicate<Long> needsRecheck = updated -> System.currentTimeMillis() > updated + this.multiHostConfiguration.getHostRecheckTime().toMillis();
List<SocketAddress> addresses = new ArrayList<>(this.addresses);
if (this.multiHostConfiguration.isLoadBalanceHosts()) {
Collections.shuffle(addresses);
}
boolean addressEmitted = false;
for (SocketAddress address : addresses) {
HostSpecStatus currentStatus = this.statusMap.get(address);
if (currentStatus == null || needsRecheck.test(currentStatus.updated) || targetServerType.allowStatus(currentStatus.hostStatus)) {
sink.next(address);
addressEmitted = true;
}
}
if (!addressEmitted) {
// if no candidate matches the requirement or all of them are in unavailable status, try all the hosts
for (SocketAddress address : addresses) {
sink.next(address);
}
}
sink.complete();
});
}

private Mono<Client> tryConnectToCandidate(TargetServerType targetServerType, SocketAddress candidate) {
return Mono.create(sink -> this.connectionStrategy.withAddress(candidate).connect().subscribe(client -> {
this.statusMap.compute(candidate, (a, oldStatus) -> evaluateStatus(candidate, oldStatus));
if (targetServerType == ANY) {
sink.success(client);
return;
}
isPrimaryServer(client, this.configuration).subscribe(
isPrimary -> {
if (isPrimary) {
this.statusMap.put(candidate, HostSpecStatus.primary(candidate));
} else {
this.statusMap.put(candidate, HostSpecStatus.standby(candidate));
}
if (isPrimary && targetServerType == MASTER) {
sink.success(client);
} else if (!isPrimary && (targetServerType == SECONDARY || targetServerType == PREFER_SECONDARY)) {
sink.success(client);
} else {
client.close().subscribe(v -> sink.success(), sink::error, sink::success, Context.of(sink.contextView()));
}
},
sink::error, () -> {}, Context.of(sink.contextView()));
}, sink::error, () -> {}, Context.of(sink.contextView())));
}

enum HostStatus {
CONNECT_FAIL,
CONNECT_OK,
PRIMARY,
STANDBY
}

private static class HostSpecStatus {

public final SocketAddress address;

public final HostStatus hostStatus;

public final long updated;

private HostSpecStatus(SocketAddress address, HostStatus hostStatus) {
this.address = address;
this.hostStatus = hostStatus;
this.updated = System.currentTimeMillis();
}

public static HostSpecStatus fail(SocketAddress host) {
return new HostSpecStatus(host, HostStatus.CONNECT_FAIL);
}

public static HostSpecStatus ok(SocketAddress host) {
return new HostSpecStatus(host, HostStatus.CONNECT_OK);
}

public static HostSpecStatus primary(SocketAddress host) {
return new HostSpecStatus(host, HostStatus.PRIMARY);
}

public static HostSpecStatus standby(SocketAddress host) {
return new HostSpecStatus(host, HostStatus.STANDBY);
}
}
}

0 comments on commit cd7fac5

Please sign in to comment.