Skip to content

Commit

Permalink
Polishing
Browse files Browse the repository at this point in the history
Reformat code.
  • Loading branch information
mp911de committed Sep 7, 2020
1 parent 934f042 commit f515a73
Show file tree
Hide file tree
Showing 12 changed files with 104 additions and 88 deletions.
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,8 @@ Mono<Connection> connectionMono = Mono.from(connectionFactory.create());
| `autodetectExtensions` | Whether to auto-detect and register `Extension`s from the class path. Defaults to `true`. _(Optional)_
| `fetchSize` | The default number of rows to return when fetching results. Defaults to `0` for unlimited. _(Optional)_
| `forceBinary` | Whether to force binary transfer. Defaults to `false`. _(Optional)_
| `preparedStatementCacheQueries` | Determine the number of queries that are cached in each connection. The default is `-1`, meaning there's no limit. The value of `-1` disables the cache. Any other value specifies the cache size.
| `loopResources` | TCP/Socket LoopResources (depends on the endpoint connection type). _(Optional)_
| `preparedStatementCacheQueries` | Determine the number of queries that are cached in each connection. The default is `-1`, meaning there's no limit. The value of `0` disables the cache. Any other value specifies the cache size.
| `options` | A `Map<String, String>` of connection parameters. These are applied to each database connection created by the `ConnectionFactory`. Useful for setting generic [PostgreSQL connection parameters][psql-runtime-config]. _(Optional)_
| `schema` | The search path to set. _(Optional)_
| `sslMode` | SSL mode to use, see `SSLMode` enum. Supported values: `DISABLE`, `ALLOW`, `PREFER`, `REQUIRE`, `VERIFY_CA`, `VERIFY_FULL`, `TUNNEL`. _(Optional)_
Expand All @@ -90,7 +91,6 @@ Mono<Connection> connectionMono = Mono.from(connectionFactory.create());
| `sslHostnameVerifier` | `javax.net.ssl.HostnameVerifier` implementation. _(Optional)_
| `tcpNoDelay` | Enabled/disable TCP NoDelay. Disabled by default. _(Optional)_
| `tcpKeepAlive` | Enabled/disable TCP KeepAlive. Disabled by default. _(Optional)_
| `tcpLoopResources`| TCP LoopResources. _(Optional)_

**Programmatic Configuration**

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,9 @@ public final class PostgresqlConnectionConfiguration {

private final String host;

@Nullable
private final LoopResources loopResources;

private final Map<String, String> options;

private final CharSequence password;
Expand All @@ -82,20 +85,18 @@ public final class PostgresqlConnectionConfiguration {

private final String socket;

private final String username;

private final SSLConfig sslConfig;

private final boolean tcpKeepAlive;

private final boolean tcpNoDelay;

private final LoopResources tcpLoopResources;
private final String username;

private PostgresqlConnectionConfiguration(String applicationName, boolean autodetectExtensions, @Nullable Duration connectTimeout, @Nullable String database, List<Extension> extensions,
ToIntFunction<String> fetchSize, boolean forceBinary, @Nullable String host, @Nullable Map<String, String> options, @Nullable CharSequence password,
int port, int preparedStatementCacheQueries, @Nullable String schema, @Nullable String socket, boolean tcpKeepAlive, boolean tcpNoDelay,
String username, SSLConfig sslConfig, LoopResources tcpLoopResources) {
ToIntFunction<String> fetchSize, boolean forceBinary, @Nullable String host, @Nullable LoopResources loopResources,
@Nullable Map<String, String> options, @Nullable CharSequence password, int port, int preparedStatementCacheQueries, @Nullable String schema,
@Nullable String socket, SSLConfig sslConfig, boolean tcpKeepAlive, boolean tcpNoDelay, String username) {
this.applicationName = Assert.requireNonNull(applicationName, "applicationName must not be null");
this.autodetectExtensions = autodetectExtensions;
this.connectTimeout = connectTimeout;
Expand All @@ -104,6 +105,7 @@ private PostgresqlConnectionConfiguration(String applicationName, boolean autode
this.fetchSize = fetchSize;
this.forceBinary = forceBinary;
this.host = host;
this.loopResources = loopResources;
this.options = options == null ? new LinkedHashMap<>() : new LinkedHashMap<>(options);

if (schema != null && !schema.isEmpty()) {
Expand All @@ -114,11 +116,10 @@ private PostgresqlConnectionConfiguration(String applicationName, boolean autode
this.port = port;
this.preparedStatementCacheQueries = preparedStatementCacheQueries;
this.socket = socket;
this.username = Assert.requireNonNull(username, "username must not be null");
this.sslConfig = sslConfig;
this.tcpKeepAlive = tcpKeepAlive;
this.tcpNoDelay = tcpNoDelay;
this.tcpLoopResources = tcpLoopResources;
this.username = Assert.requireNonNull(username, "username must not be null");
}

/**
Expand All @@ -141,9 +142,11 @@ public String toString() {
", fetchSize=" + this.fetchSize +
", forceBinary='" + this.forceBinary + '\'' +
", host='" + this.host + '\'' +
", loopResources='" + this.loopResources + '\'' +
", options='" + this.options + '\'' +
", password='" + obfuscate(this.password != null ? this.password.length() : 0) + '\'' +
", port=" + this.port +
", socket=" + this.socket +
", tcpKeepAlive=" + this.tcpKeepAlive +
", tcpNoDelay=" + this.tcpNoDelay +
", username='" + this.username + '\'' +
Expand Down Expand Up @@ -192,6 +195,11 @@ String getRequiredHost() {
return host;
}

@Nullable
LoopResources getLoopResources() {
return this.loopResources;
}

Map<String, String> getOptions() {
return Collections.unmodifiableMap(this.options);
}
Expand Down Expand Up @@ -253,10 +261,6 @@ SSLConfig getSslConfig() {
return this.sslConfig;
}

LoopResources getTcpLoopResources() {
return this.tcpLoopResources;
}

private static String obfuscate(int length) {

StringBuilder builder = new StringBuilder();
Expand Down Expand Up @@ -285,7 +289,7 @@ public static final class Builder {
@Nullable
private String database;

private List<Extension> extensions = new ArrayList<>();
private final List<Extension> extensions = new ArrayList<>();

private ToIntFunction<String> fetchSize = sql -> NO_LIMIT;

Expand Down Expand Up @@ -332,7 +336,7 @@ public static final class Builder {
private boolean tcpNoDelay = false;

@Nullable
private LoopResources tcpLoopResources = null;
private LoopResources loopResources = null;

@Nullable
private String username;
Expand Down Expand Up @@ -383,9 +387,8 @@ public PostgresqlConnectionConfiguration build() {
}

return new PostgresqlConnectionConfiguration(this.applicationName, this.autodetectExtensions, this.connectTimeout, this.database, this.extensions, this.fetchSize, this.forceBinary,
this.host, this.options, this.password, this.port, this.preparedStatementCacheQueries, this.schema, this.socket, this.tcpKeepAlive, this.tcpNoDelay, this.username,
this.createSslConfig()
, this.tcpLoopResources);
this.host, this.loopResources, this.options, this.password, this.port, this.preparedStatementCacheQueries, this.schema, this.socket, this.createSslConfig(), this.tcpKeepAlive,
this.tcpNoDelay, this.username);
}

/**
Expand Down Expand Up @@ -492,6 +495,18 @@ public Builder host(String host) {
return this;
}

/**
* Configure {@link LoopResources}.
*
* @param loopResources the {@link LoopResources}
* @return this {@link Builder}
* @since 0.8.5
*/
public Builder loopResources(LoopResources loopResources) {
this.loopResources = Assert.requireNonNull(loopResources, "loopResources must not be null");
return this;
}

/**
* Configure connection initialization parameters.
* <p>
Expand Down Expand Up @@ -692,18 +707,6 @@ public Builder username(String username) {
return this;
}

/**
* Configure TCP {@link LoopResources}.
*
* @param loopResources the {@link LoopResources}
* @return this {@link Builder}
* @since 1.0.0
*/
public Builder tcpLoopResources(LoopResources loopResources) {
this.tcpLoopResources = Assert.requireNonNull(loopResources, "tcpLoopResources must not be null");
return this;
}

@Override
public String toString() {
return "Builder{" +
Expand All @@ -715,6 +718,7 @@ public String toString() {
", fetchSize='" + this.fetchSize + '\'' +
", forceBinary='" + this.forceBinary + '\'' +
", host='" + this.host + '\'' +
", loopResources='" + this.loopResources + '\'' +
", parameters='" + this.options + '\'' +
", password='" + obfuscate(this.password != null ? this.password.length() : 0) + '\'' +
", port=" + this.port +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@
import io.r2dbc.postgresql.authentication.AuthenticationHandler;
import io.r2dbc.postgresql.authentication.PasswordAuthenticationHandler;
import io.r2dbc.postgresql.authentication.SASLAuthenticationHandler;
import io.r2dbc.postgresql.client.ConnectionSettings;
import io.r2dbc.postgresql.client.Client;
import io.r2dbc.postgresql.client.ConnectionSettings;
import io.r2dbc.postgresql.client.ReactorNettyClient;
import io.r2dbc.postgresql.client.SSLConfig;
import io.r2dbc.postgresql.client.SSLMode;
Expand Down Expand Up @@ -79,7 +79,7 @@ public PostgresqlConnectionFactory(PostgresqlConnectionConfiguration configurati
this.configuration = Assert.requireNonNull(configuration, "configuration must not be null");
this.endpoint = createSocketAddress(configuration);

ConnectionSettings options = new ConnectionSettings(configuration.getConnectTimeout(), configuration.isTcpKeepAlive(), configuration.isTcpNoDelay());
ConnectionSettings options = new ConnectionSettings(configuration.getConnectTimeout(), configuration.isTcpKeepAlive(), configuration.isTcpNoDelay(), configuration.getLoopResources());
this.clientFactory = sslConfig -> ReactorNettyClient.connect(ConnectionProvider.newConnection(), this.endpoint, options, sslConfig).cast(Client.class);
this.extensions = getExtensions(configuration);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,18 @@ public final class PostgresqlConnectionFactoryProvider implements ConnectionFact
*/
public static final Option<Boolean> FORCE_BINARY = Option.valueOf("forceBinary");

/**
* Event {@link LoopResources}.
*
* @since 0.8.5
*/
public static final Option<LoopResources> LOOP_RESOURCES = Option.valueOf("loopResources");

/**
* Connection options which are applied once after the connection has been created.
*/
public static final Option<Map<String, String>> OPTIONS = Option.valueOf("options");

/**
* Driver option value.
*/
Expand All @@ -74,6 +86,12 @@ public final class PostgresqlConnectionFactoryProvider implements ConnectionFact
*/
public static final String LEGACY_POSTGRESQL_DRIVER = "postgres";

/**
* Determine the number of queries that are cached in each connection.
* The default is {@code -1}, meaning there's no limit. The value of {@code 0} disables the cache. Any other value specifies the cache size.
*/
public static final Option<Integer> PREPARED_STATEMENT_CACHE_QUERIES = Option.valueOf("preparedStatementCacheQueries");

/**
* Schema search path (alias for "currentSchema").
*/
Expand Down Expand Up @@ -140,24 +158,6 @@ public final class PostgresqlConnectionFactoryProvider implements ConnectionFact
*/
public static final Option<Boolean> TCP_NODELAY = Option.valueOf("tcpNoDelay");

/**
* TCP {@link LoopResources}.
*
* @since 1.0.0
*/
public static final Option<LoopResources> TCP_LOOP_RESOURCES = Option.valueOf("tcpLoopResources");

/**
* Determine the number of queries that are cached in each connection.
* The default is {@code -1}, meaning there's no limit. The value of {@code 0} disables the cache. Any other value specifies the cache size.
*/
public static final Option<Integer> PREPARED_STATEMENT_CACHE_QUERIES = Option.valueOf("preparedStatementCacheQueries");

/**
* Connection options which are applied once after the connection has been created.
*/
public static final Option<Map<String, String>> OPTIONS = Option.valueOf("options");

/**
* Returns a new {@link PostgresqlConnectionConfiguration.Builder} configured with the given {@link ConnectionFactoryOptions}.
*
Expand Down Expand Up @@ -209,6 +209,7 @@ private static PostgresqlConnectionConfiguration.Builder fromConnectionFactoryOp
mapper.from(DATABASE).to(builder::database);
mapper.from(FETCH_SIZE).map(OptionMapper::toInteger).to(builder::fetchSize);
mapper.from(FORCE_BINARY).map(OptionMapper::toBoolean).to(builder::forceBinary);
mapper.from(LOOP_RESOURCES).to(builder::loopResources);
mapper.from(OPTIONS).map(PostgresqlConnectionFactoryProvider::convertToMap).to(builder::options);
mapper.from(PASSWORD).to(builder::password);
mapper.from(PORT).map(OptionMapper::toInteger).to(builder::port);
Expand All @@ -219,7 +220,6 @@ private static PostgresqlConnectionConfiguration.Builder fromConnectionFactoryOp
});
mapper.from(TCP_KEEPALIVE).map(OptionMapper::toBoolean).to(builder::tcpKeepAlive);
mapper.from(TCP_NODELAY).map(OptionMapper::toBoolean).to(builder::tcpNoDelay);
mapper.from(TCP_LOOP_RESOURCES).to(builder::tcpLoopResources);
builder.username(options.getRequiredValue(USER));

return builder;
Expand Down
21 changes: 13 additions & 8 deletions src/main/java/io/r2dbc/postgresql/client/ConnectionSettings.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,13 @@ public final class ConnectionSettings {
private final boolean tcpNoDelay;

@Nullable
private final LoopResources tcpLoopResources;
private final LoopResources loopResources;

public ConnectionSettings(@Nullable Duration connectTimeout, boolean tcpKeepAlive, boolean tcpNoDelay, @Nullable LoopResources tcpLoopResources) {
public ConnectionSettings(@Nullable Duration connectTimeout, boolean tcpKeepAlive, boolean tcpNoDelay, @Nullable LoopResources loopResources) {
this.tcpKeepAlive = tcpKeepAlive;
this.tcpNoDelay = tcpNoDelay;
this.connectTimeout = connectTimeout;
this.tcpLoopResources = tcpLoopResources;
this.loopResources = loopResources;
}

@Nullable
Expand All @@ -58,12 +58,17 @@ boolean isTcpNoDelay() {
return this.tcpNoDelay;
}

public boolean hasTcpLoopResources() {
return this.tcpLoopResources != null;
boolean hasLoopResources() {
return this.loopResources != null;
}

LoopResources getTcpLoopResources() {
return this.tcpLoopResources;

LoopResources getRequiredLoopResources() {

if (!hasLoopResources()) {
throw new IllegalStateException("No LoopResources configured");
}

return this.loopResources;
}

}
29 changes: 18 additions & 11 deletions src/main/java/io/r2dbc/postgresql/client/ReactorNettyClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -346,17 +346,18 @@ public static Mono<ReactorNettyClient> connect(String host, int port, @Nullable
/**
* Create a new frame processor connected to a given host.
*
* @param host the host to connect to
* @param port the port to connect to
* @param connectTimeout connect timeout
* @param sslConfig SSL configuration
* @param tcpLoopResources tcp loop resources
* @param host the host to connect to
* @param port the port to connect to
* @param connectTimeout connect timeout
* @param sslConfig SSL configuration
* @param loopResources tcp loop resources
* @throws IllegalArgumentException if {@code host} is {@code null}
* @since 0.8.5
*/
public static Mono<ReactorNettyClient> connect(String host, int port, @Nullable Duration connectTimeout, @Nullable SSLConfig sslConfig, @Nullable LoopResources tcpLoopResources) {
public static Mono<ReactorNettyClient> connect(String host, int port, @Nullable Duration connectTimeout, @Nullable SSLConfig sslConfig, @Nullable LoopResources loopResources) {
Assert.requireNonNull(host, "host must not be null");

ConnectionSettings settings = new ConnectionSettings(connectTimeout, false, false, tcpLoopResources);
ConnectionSettings settings = new ConnectionSettings(connectTimeout, false, false, loopResources);
return connect(ConnectionProvider.newConnection(), InetSocketAddress.createUnresolved(host, port), settings, sslConfig);
}

Expand All @@ -377,11 +378,13 @@ public static Mono<ReactorNettyClient> connect(ConnectionProvider connectionProv
TcpClient tcpClient = TcpClient.create(connectionProvider).remoteAddress(() -> socketAddress);

if (!(socketAddress instanceof InetSocketAddress)) {
tcpClient = tcpClient.runOn(new SocketLoopResources(), true);
tcpClient = tcpClient.runOn(new SocketLoopResources(connectionSettings.hasLoopResources() ? connectionSettings.getRequiredLoopResources() : TcpResources.get()), true);
} else {
if (connectionSettings.hasTcpLoopResources()) {
tcpClient = tcpClient.runOn(connectionSettings.getTcpLoopResources());

if (connectionSettings.hasLoopResources()) {
tcpClient = tcpClient.runOn(connectionSettings.getRequiredLoopResources());
}

tcpClient = tcpClient.option(ChannelOption.SO_KEEPALIVE, connectionSettings.isTcpKeepAlive());
tcpClient = tcpClient.option(ChannelOption.TCP_NODELAY, connectionSettings.isTcpNoDelay());
}
Expand Down Expand Up @@ -597,7 +600,11 @@ static class SocketLoopResources implements LoopResources {
epoll = epollCheck;
}

private final LoopResources delegate = TcpResources.get();
private final LoopResources delegate;

public SocketLoopResources(LoopResources delegate) {
this.delegate = delegate;
}

@SuppressWarnings("unchecked")
private static Class<? extends Channel> findClass(String className) {
Expand Down
Loading

0 comments on commit f515a73

Please sign in to comment.