Skip to content

Commit

Permalink
Polishing
Browse files Browse the repository at this point in the history
Refactor connection creation from composeable ConnectionStrategy into composeable ConnectionFunctions and a parameter-less ConnectionStrategy that holds all connection target details.

Refactor SSL fallback into ConnectionFunction as SSL is part of the initial handshake. Move startup options into ConnectionSettings.

Simplify sink subscriptions into Flux composition for easier synchronization of closed connections. Add duration style parser.

Add license headers and since tags, update documentation.

[#120][resolves #474][#203]

Signed-off-by: Mark Paluch <mpaluch@vmware.com>
  • Loading branch information
mp911de committed Jun 1, 2022
1 parent 4390792 commit 2e0969e
Show file tree
Hide file tree
Showing 33 changed files with 1,271 additions and 509 deletions.
86 changes: 54 additions & 32 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ This driver provides the following features:
* Login with username/password (MD5, SASL/SCRAM) or implicit trust
* SCRAM authentication
* Unix Domain Socket transport
* Connection Fail-over supporting multiple hosts
* TLS
* Explicit transactions
* Notifications
Expand Down Expand Up @@ -67,40 +68,44 @@ Mono<Connection> connectionMono = Mono.from(connectionFactory.create());

**Supported ConnectionFactory Discovery Options**

| Option | Description
| ----------------- | -----------
| `ssl` | Enables SSL usage (`SSLMode.VERIFY_FULL`).
| `driver` | Must be `postgresql`.
| `host` | Server hostname to connect to.
| `port` | Server port to connect to. Defaults to `5432`. _(Optional)_
| `socket` | Unix Domain Socket path to connect to as alternative to TCP. _(Optional)_
| `username` | Login username.
| `password` | Login password. _(Optional when using TLS Certificate authentication)_
| `database` | Database to select. _(Optional)_
| `applicationName` | The name of the application connecting to the database. Defaults to `r2dbc-postgresql`. _(Optional)_
| `autodetectExtensions` | Whether to auto-detect and register `Extension`s from the class path. Defaults to `true`. _(Optional)_
| `compatibilityMode` | Enable compatibility mode for cursored fetching. Required when using newer pgpool versions. Defaults to `false`. _(Optional)_
| `errorResponseLogLevel` | Log level for error responses. Any of `OFF`, `DEBUG`, `INFO`, `WARN` or `ERROR` Defaults to `DEBUG`. _(Optional)_
| `extensions` | Collection of `Extension` to provide additional extensions when creating a connection factory. Defaults to empty. _(Optional)_
| `fetchSize` | The default number of rows to return when fetching results. Defaults to `0` for unlimited. _(Optional)_
| `forceBinary` | Whether to force binary transfer. Defaults to `false`. _(Optional)_
| `loopResources` | TCP/Socket LoopResources (depends on the endpoint connection type). _(Optional)_
| `lockWaitTimeout` | Lock wait timeout. _(Optional)_
| `noticeLogLevel` | Log level for error responses. Any of `OFF`, `DEBUG`, `INFO`, `WARN` or `ERROR` Defaults to `DEBUG`. _(Optional)_
| `preferAttachedBuffers` |Configure whether codecs should prefer attached data buffers. The default is `false`, meaning that codecs will copy data from the input buffer into a byte array. Enabling attached buffers requires consumption of values such as `Json` to avoid memory leaks.
| Option | Description
|---------------------------------| -----------
| `ssl` | Enables SSL usage (`SSLMode.VERIFY_FULL`).
| `driver` | Must be `postgresql`.
| `protocol` | Protocol specifier. Empty to use single-host operations. Supported: `failover` for multi-server failover operations. _(Optional)_
| `host` | Server hostname to connect to. May contain a comma-separated list of hosts with ports when using the `failover` protocol.
| `port` | Server port to connect to. Defaults to `5432`. _(Optional)_
| `socket` | Unix Domain Socket path to connect to as alternative to TCP. _(Optional)_
| `username` | Login username.
| `password` | Login password. _(Optional when using TLS Certificate authentication)_
| `database` | Database to select. _(Optional)_
| `applicationName` | The name of the application connecting to the database. Defaults to `r2dbc-postgresql`. _(Optional)_
| `autodetectExtensions` | Whether to auto-detect and register `Extension`s from the class path. Defaults to `true`. _(Optional)_
| `compatibilityMode` | Enable compatibility mode for cursored fetching. Required when using newer pgpool versions. Defaults to `false`. _(Optional)_
| `errorResponseLogLevel` | Log level for error responses. Any of `OFF`, `DEBUG`, `INFO`, `WARN` or `ERROR` Defaults to `DEBUG`. _(Optional)_
| `extensions` | Collection of `Extension` to provide additional extensions when creating a connection factory. Defaults to empty. _(Optional)_
| `fetchSize` | The default number of rows to return when fetching results. Defaults to `0` for unlimited. _(Optional)_
| `forceBinary` | Whether to force binary transfer. Defaults to `false`. _(Optional)_
| `hostRecheckTime` | Host status recheck time when using multi-server operations. Defaults to `10 seconds`. _(Optional)_
| `loadBalanceHosts` | Whether to shuffle the list of given hostnames before connect when using multi-server operations. Defaults to `true. _(Optional)_
| `loopResources` | TCP/Socket LoopResources (depends on the endpoint connection type). _(Optional)_
| `lockWaitTimeout` | Lock wait timeout. _(Optional)_
| `noticeLogLevel` | Log level for error responses. Any of `OFF`, `DEBUG`, `INFO`, `WARN` or `ERROR` Defaults to `DEBUG`. _(Optional)_
| `preferAttachedBuffers` |Configure whether codecs should prefer attached data buffers. The default is `false`, meaning that codecs will copy data from the input buffer into a byte array. Enabling attached buffers requires consumption of values such as `Json` to avoid memory leaks.
| `preparedStatementCacheQueries` | Determine the number of queries that are cached in each connection. The default is `-1`, meaning there's no limit. The value of `0` disables the cache. Any other value specifies the cache size.
| `options` | A `Map<String, String>` of connection parameters. These are applied to each database connection created by the `ConnectionFactory`. Useful for setting generic [PostgreSQL connection parameters][psql-runtime-config]. _(
| `options` | A `Map<String, String>` of connection parameters. These are applied to each database connection created by the `ConnectionFactory`. Useful for setting generic [PostgreSQL connection parameters][psql-runtime-config]. _(
Optional)_
| `schema` | The search path to set. _(Optional)_
| `sslMode` | SSL mode to use, see `SSLMode` enum. Supported values: `DISABLE`, `ALLOW`, `PREFER`, `REQUIRE`, `VERIFY_CA`, `VERIFY_FULL`, `TUNNEL`. _(Optional)_
| `sslRootCert` | Path to SSL CA certificate in PEM format. Can be also a resource path. _(Optional)_
| `sslKey` | Path to SSL key for TLS authentication in PEM format. Can be also a resource path. _(Optional)_
| `sslCert` | Path to SSL certificate for TLS authentication in PEM format. Can be also a resource path. _(Optional)_
| `sslPassword` | Key password to decrypt SSL key. _(Optional)_
| `sslHostnameVerifier` | `javax.net.ssl.HostnameVerifier` implementation. _(Optional)_
| `statementTimeout`| Statement timeout. _(Optional)_
| `tcpNoDelay` | Enable/disable TCP NoDelay. Enabled by default. _(Optional)_
| `tcpKeepAlive` | Enable/disable TCP KeepAlive. Disabled by default. _(Optional)_
| `schema` | The search path to set. _(Optional)_
| `sslMode` | SSL mode to use, see `SSLMode` enum. Supported values: `DISABLE`, `ALLOW`, `PREFER`, `REQUIRE`, `VERIFY_CA`, `VERIFY_FULL`, `TUNNEL`. _(Optional)_
| `sslRootCert` | Path to SSL CA certificate in PEM format. Can be also a resource path. _(Optional)_
| `sslKey` | Path to SSL key for TLS authentication in PEM format. Can be also a resource path. _(Optional)_
| `sslCert` | Path to SSL certificate for TLS authentication in PEM format. Can be also a resource path. _(Optional)_
| `sslPassword` | Key password to decrypt SSL key. _(Optional)_
| `sslHostnameVerifier` | `javax.net.ssl.HostnameVerifier` implementation. _(Optional)_
| `statementTimeout` | Statement timeout. _(Optional)_
| `targetServerType` | Type of server to use when using multi-host operations. Supported values: `ANY`, `PRIMARY`, `SECONDARY`, `PREFER_SECONDARY`. Defaults to `ANY`. _(Optional)_
| `tcpNoDelay` | Enable/disable TCP NoDelay. Enabled by default. _(Optional)_
| `tcpKeepAlive` | Enable/disable TCP KeepAlive. Disabled by default. _(Optional)_

**Programmatic Configuration**

Expand Down Expand Up @@ -167,6 +172,23 @@ If you'd rather like the latest snapshots of the upcoming major version, use our
</repository>
```

## Connection Fail-over

To support simple connection fail-over it is possible to define multiple endpoints (host and port pairs) in the connection url separated by commas. The driver will try once to connect to each of them
in order until the connection succeeds. If none succeeds a normal connection exception is thrown. Make sure to specify the `failover` protocol.

The syntax for the connection url is:

```
r2dbc:postgresql:failover://user:foo@host1:5433,host2:5432,host3
```

For example an application can create two connection pools. One data source is for writes, another for reads. The write pool limits connections only to a primary node:

```
r2dbc:postgresql:failover://user:foo@host1:5433,host2:5432,host3?targetServerType=primary.
```

## Cursors

R2DBC Postgres supports both, the [simple](https://www.postgresql.org/docs/current/protocol-flow.html#id-1.10.5.7.4)
Expand Down
13 changes: 0 additions & 13 deletions src/main/java/io/r2dbc/postgresql/ClientSupplier.java

This file was deleted.

46 changes: 46 additions & 0 deletions src/main/java/io/r2dbc/postgresql/ConnectionFunction.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Copyright 2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.r2dbc.postgresql;

import io.r2dbc.postgresql.client.Client;
import io.r2dbc.postgresql.client.ConnectionSettings;
import reactor.core.publisher.Mono;

import java.net.SocketAddress;

/**
* Interface defining a function how to connect to a single {@link SocketAddress endpoint} applying {@link ConnectionSettings}.
* <p>A connection function is a low-level utility whose result is a valid {@link Client} object. Connection functions may perform multiple connection attempts (e.g. SSL handshake downgrading).
* Topology discovery is a higher-level concept that is typically encapsulated as part of a {@link ConnectionStrategy}.
*
* @see ConnectionStrategy
* @since 1.0
*/
@FunctionalInterface
public interface ConnectionFunction {

/**
* Establish a connection to the given {@link SocketAddress endpoint} applying {@link ConnectionSettings}.
*
* @param endpoint the endpoint to connect to
* @param settings the settings to apply
* @return a mono that connects to the given endpoint upon subscription
* @throws IllegalArgumentException if {@code socketAddress} or {@code settings} is {@code null}
*/
Mono<Client> connect(SocketAddress endpoint, ConnectionSettings settings);

}
52 changes: 31 additions & 21 deletions src/main/java/io/r2dbc/postgresql/ConnectionStrategy.java
Original file line number Diff line number Diff line change
@@ -1,31 +1,41 @@
/*
* Copyright 2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.r2dbc.postgresql;

import io.r2dbc.postgresql.client.Client;
import io.r2dbc.postgresql.client.ConnectionSettings;
import reactor.core.publisher.Mono;

import java.net.SocketAddress;
import java.util.Map;
import java.util.function.Function;

/**
* Interface defining a connection strategy on how to obtain a Postgres {@link Client} object.
* <p>
* Typically, connection strategies use a {@link ConnectionFunction} and are configured with a connection endpoint to establish a client connection to the target server as the {@link #connect()}
* method does not take any parameters.
*
* @see ConnectionFunction
* @since 1.0
*/
@FunctionalInterface
public interface ConnectionStrategy {

/**
* Establish a connection to a target server that is determined by this connection strategy.
*
* @return a mono that initiates the connection upon subscription.
*/
Mono<Client> connect();

ConnectionStrategy withOptions(Map<String, String> options);

interface ComposableConnectionStrategy extends ConnectionStrategy {

default <T extends ConnectionStrategy> T chainIf(boolean guard, Function<ComposableConnectionStrategy, T> nextStrategyProvider, Class<T> klass) {
return guard ? nextStrategyProvider.apply(this) : klass.cast(this);
}

ComposableConnectionStrategy withAddress(SocketAddress address);

ComposableConnectionStrategy withConnectionSettings(ConnectionSettings connectionSettings);

ComposableConnectionStrategy withOptions(Map<String, String> options);

}

}
61 changes: 53 additions & 8 deletions src/main/java/io/r2dbc/postgresql/ConnectionStrategyFactory.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,24 @@
/*
* Copyright 2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.r2dbc.postgresql;

import io.netty.channel.unix.DomainSocketAddress;
import io.r2dbc.postgresql.client.Client;
import io.r2dbc.postgresql.client.ConnectionSettings;
import io.r2dbc.postgresql.client.MultiHostConfiguration;
import io.r2dbc.postgresql.client.SSLConfig;
import io.r2dbc.postgresql.client.SSLMode;
Expand All @@ -11,36 +29,63 @@
import java.util.ArrayList;
import java.util.List;

public class ConnectionStrategyFactory {
/**
* Factory methods to obtain a {@link ConnectionStrategy} object.
*
* @since 1.0
*/
final class ConnectionStrategyFactory {

/**
* Create a {@link ConnectionStrategy} that is able to connect to the specified {@link PostgresqlConnectionConfiguration configuration}.
*
* @param connectionFunction the raw connection function to use to create a {@link Client}. The connection function is enhanced during the connect phase to perform a handshake with the database.
* @param configuration the configuration object
* @return the connection strategy to use.
*/
public static ConnectionStrategy getConnectionStrategy(ConnectionFunction connectionFunction, PostgresqlConnectionConfiguration configuration, ConnectionSettings connectionSettings) {
return doGetConnectionStrategy(new SingleHostConnectionFunction(connectionFunction, configuration), configuration, connectionSettings);
}

private static ConnectionStrategy doGetConnectionStrategy(ConnectionFunction connectionFunction, PostgresqlConnectionConfiguration configuration, ConnectionSettings connectionSettings) {

public static ConnectionStrategy getConnectionStrategy(ClientSupplier clientSupplier, PostgresqlConnectionConfiguration configuration) {
SingleHostConfiguration singleHostConfiguration = configuration.getSingleHostConfiguration();
MultiHostConfiguration multiHostConfiguration = configuration.getMultiHostConfiguration();
SSLConfig sslConfig = configuration.getSslConfig();
SocketAddress address = singleHostConfiguration != null ? createSocketAddress(singleHostConfiguration) : null;
return new DefaultConnectionStrategy(address, clientSupplier, configuration, configuration.getConnectionSettings(), configuration.getOptions())
.chainIf(!SSLMode.DISABLE.equals(sslConfig.getSslMode()), strategy -> new SslFallbackConnectionStrategy(configuration, strategy), ConnectionStrategy.ComposableConnectionStrategy.class)
.chainIf(multiHostConfiguration != null, strategy -> new MultiHostConnectionStrategy(createSocketAddress(multiHostConfiguration), configuration, strategy), ConnectionStrategy.class);
if (!SSLMode.DISABLE.equals(sslConfig.getSslMode())) {
connectionFunction = new SslFallbackConnectionFunction(sslConfig, connectionFunction);
}

MultiHostConfiguration multiHostConfiguration = configuration.getMultiHostConfiguration();
if (multiHostConfiguration != null) {
return new MultiHostConnectionStrategy(connectionFunction, createSocketAddress(multiHostConfiguration), configuration, connectionSettings);
}

return new SingleHostConnectionStrategy(connectionFunction, createSocketAddress(configuration.getRequiredSingleHostConfiguration()), connectionSettings);
}

private static SocketAddress createSocketAddress(SingleHostConfiguration configuration) {
if (!configuration.isUseSocket()) {
return InetSocketAddress.createUnresolved(configuration.getRequiredHost(), configuration.getPort());
}

return DomainSocketFactory.getDomainSocketAddress(configuration);
}

static class DomainSocketFactory {

private static SocketAddress getDomainSocketAddress(SingleHostConfiguration configuration) {
return new DomainSocketAddress(configuration.getRequiredSocket());
}

}

private static List<SocketAddress> createSocketAddress(MultiHostConfiguration configuration) {

List<SocketAddress> addressList = new ArrayList<>(configuration.getHosts().size());

for (MultiHostConfiguration.ServerHost host : configuration.getHosts()) {
addressList.add(InetSocketAddress.createUnresolved(host.getHost(), host.getPort()));
}

return addressList;
}

Expand Down
2 changes: 2 additions & 0 deletions src/main/java/io/r2dbc/postgresql/DisabledStatementCache.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@

class DisabledStatementCache implements StatementCache {

static final DisabledStatementCache INSTANCE = new DisabledStatementCache();

private static final String UNNAMED_STATEMENT_NAME = "";

DisabledStatementCache() {
Expand Down

0 comments on commit 2e0969e

Please sign in to comment.