Skip to content

Commit

Permalink
Refactor internal APIs to avoid requiring duplicate data in method/co…
Browse files Browse the repository at this point in the history
…nstructor params (#871)

JAVA-4471
  • Loading branch information
stIncMale committed Feb 7, 2022
1 parent 47c99b5 commit ffe64ec
Show file tree
Hide file tree
Showing 21 changed files with 109 additions and 156 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -131,14 +131,6 @@ public ClusterableServer getServer(final ServerAddress serverAddress) {
return serverTuple.server;
}


private final class DefaultServerDescriptionChangedListener implements ServerDescriptionChangedListener {
@Override
public void serverDescriptionChanged(final ServerDescriptionChangedEvent event) {
onChange(event);
}
}

void onChange(final Collection<ServerAddress> newHosts) {
withLock(() -> {
if (isClosed()) {
Expand Down Expand Up @@ -167,7 +159,8 @@ void onChange(final Collection<ServerAddress> newHosts) {
});
}

private void onChange(final ServerDescriptionChangedEvent event) {
@Override
public void onChange(final ServerDescriptionChangedEvent event) {
withLock(() -> {
if (isClosed()) {
return;
Expand Down Expand Up @@ -347,7 +340,7 @@ private void addServer(final ServerAddress serverAddress) {
if (LOGGER.isInfoEnabled()) {
LOGGER.info(format("Adding discovered server %s to client view of cluster", serverAddress));
}
ClusterableServer server = createServer(serverAddress, new DefaultServerDescriptionChangedListener());
ClusterableServer server = createServer(serverAddress);
addressToServerTupleMap.put(serverAddress, new ServerTuple(server, getConnectingServerDescription(serverAddress)));
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@
import com.mongodb.lang.Nullable;
import com.mongodb.selector.CompositeServerSelector;
import com.mongodb.selector.ServerSelector;
import org.bson.BsonTimestamp;

import java.util.ArrayList;
import java.util.Collections;
Expand Down Expand Up @@ -91,8 +90,8 @@ abstract class BaseCluster implements Cluster {
}

@Override
public BsonTimestamp getClusterTime() {
return clusterClock.getClusterTime();
public ClusterClock getClock() {
return clusterClock;
}

@Override
Expand Down Expand Up @@ -392,9 +391,8 @@ private ServerSelector getCompositeServerSelector(final ServerSelector serverSel
}
}

protected ClusterableServer createServer(final ServerAddress serverAddress,
final ServerDescriptionChangedListener serverDescriptionChangedListener) {
return serverFactory.create(this, serverAddress, serverDescriptionChangedListener, clusterClock);
protected ClusterableServer createServer(final ServerAddress serverAddress) {
return serverFactory.create(this, serverAddress);
}

private void throwIfIncompatible(final ClusterDescription curDescription) {
Expand Down
17 changes: 10 additions & 7 deletions driver-core/src/main/com/mongodb/internal/connection/Cluster.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,13 @@

import com.mongodb.ServerAddress;
import com.mongodb.connection.ClusterId;
import com.mongodb.event.ServerDescriptionChangedEvent;
import com.mongodb.internal.VisibleForTesting;
import com.mongodb.internal.async.SingleResultCallback;
import com.mongodb.connection.ClusterDescription;
import com.mongodb.connection.ClusterSettings;
import com.mongodb.lang.Nullable;
import com.mongodb.selector.ServerSelector;
import org.bson.BsonTimestamp;

import java.io.Closeable;

Expand Down Expand Up @@ -69,13 +69,9 @@ public interface Cluster extends Closeable {
ClusterDescription getCurrentDescription();

/**
* Get the last seen cluster time
*
* @since 3.8
* @return the last seen cluster time or null if not set
* Get the {@link ClusterClock} from which one may get the last seen cluster time.
*/
@Nullable
BsonTimestamp getClusterTime();
ClusterClock getClock();

ServerTuple selectServer(ServerSelector serverSelector);

Expand All @@ -99,4 +95,11 @@ public interface Cluster extends Closeable {
* @param action The action to {@linkplain Runnable#run() do}.
*/
void withLock(Runnable action);

/**
* This method allows {@link Server}s to notify the {@link Cluster} about changes in their state as per the
* <a href="https://github.com/mongodb/specifications/blob/master/source/server-discovery-and-monitoring/server-discovery-and-monitoring.rst">
* Server Discovery And Monitoring</a> specification.
*/
void onChange(ServerDescriptionChangedEvent event);
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,7 @@
import com.mongodb.connection.ServerSettings;

public interface ClusterableServerFactory {
ClusterableServer create(Cluster cluster, ServerAddress serverAddress,
ServerDescriptionChangedListener serverDescriptionChangedListener, ClusterClock clusterClock);
ClusterableServer create(Cluster cluster, ServerAddress serverAddress);

ServerSettings getSettings();
}
Original file line number Diff line number Diff line change
Expand Up @@ -106,13 +106,13 @@ public Cluster createCluster(final ClusterSettings originalClusterSettings, fina
DnsSrvRecordMonitorFactory dnsSrvRecordMonitorFactory = new DefaultDnsSrvRecordMonitorFactory(clusterId, serverSettings);

if (clusterSettings.getMode() == ClusterConnectionMode.LOAD_BALANCED) {
ClusterableServerFactory serverFactory = new LoadBalancedClusterableServerFactory(clusterId, serverSettings,
ClusterableServerFactory serverFactory = new LoadBalancedClusterableServerFactory(serverSettings,
connectionPoolSettings, internalConnectionPoolSettings, streamFactory, credential, commandListener, applicationName,
mongoDriverInformation != null ? mongoDriverInformation : MongoDriverInformation.builder().build(), compressorList,
serverApi);
return new LoadBalancedCluster(clusterId, clusterSettings, serverFactory, dnsSrvRecordMonitorFactory);
} else {
ClusterableServerFactory serverFactory = new DefaultClusterableServerFactory(clusterId, clusterSettings, serverSettings,
ClusterableServerFactory serverFactory = new DefaultClusterableServerFactory(serverSettings,
connectionPoolSettings, internalConnectionPoolSettings,
streamFactory, heartbeatStreamFactory, credential, commandListener, applicationName,
mongoDriverInformation != null ? mongoDriverInformation : MongoDriverInformation.builder().build(), compressorList,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,7 @@
import com.mongodb.MongoDriverInformation;
import com.mongodb.ServerAddress;
import com.mongodb.ServerApi;
import com.mongodb.connection.ClusterId;
import com.mongodb.connection.ClusterSettings;
import com.mongodb.connection.ClusterConnectionMode;
import com.mongodb.connection.ConnectionPoolSettings;
import com.mongodb.connection.ServerId;
import com.mongodb.connection.ServerSettings;
Expand All @@ -38,8 +37,6 @@
import static java.util.Collections.emptyList;

public class DefaultClusterableServerFactory implements ClusterableServerFactory {
private final ClusterId clusterId;
private final ClusterSettings clusterSettings;
private final ServerSettings serverSettings;
private final ConnectionPoolSettings connectionPoolSettings;
private final InternalConnectionPoolSettings internalConnectionPoolSettings;
Expand All @@ -53,15 +50,13 @@ public class DefaultClusterableServerFactory implements ClusterableServerFactory
@Nullable
private final ServerApi serverApi;

public DefaultClusterableServerFactory(final ClusterId clusterId, final ClusterSettings clusterSettings,
public DefaultClusterableServerFactory(
final ServerSettings serverSettings, final ConnectionPoolSettings connectionPoolSettings,
final InternalConnectionPoolSettings internalConnectionPoolSettings,
final StreamFactory streamFactory, final StreamFactory heartbeatStreamFactory,
final MongoCredential credential, final CommandListener commandListener,
final String applicationName, final MongoDriverInformation mongoDriverInformation,
final List<MongoCompressor> compressorList, final @Nullable ServerApi serverApi) {
this.clusterId = clusterId;
this.clusterSettings = clusterSettings;
this.serverSettings = serverSettings;
this.connectionPoolSettings = connectionPoolSettings;
this.internalConnectionPoolSettings = internalConnectionPoolSettings;
Expand All @@ -76,27 +71,26 @@ public DefaultClusterableServerFactory(final ClusterId clusterId, final ClusterS
}

@Override
public ClusterableServer create(final Cluster cluster, final ServerAddress serverAddress,
final ServerDescriptionChangedListener serverDescriptionChangedListener,
final ClusterClock clusterClock) {
ServerId serverId = new ServerId(clusterId, serverAddress);
public ClusterableServer create(final Cluster cluster, final ServerAddress serverAddress) {
ServerId serverId = new ServerId(cluster.getClusterId(), serverAddress);
ClusterConnectionMode clusterMode = cluster.getSettings().getMode();
SameObjectProvider<SdamServerDescriptionManager> sdamProvider = SameObjectProvider.uninitialized();
ServerMonitor serverMonitor = new DefaultServerMonitor(serverId, serverSettings, clusterClock,
ServerMonitor serverMonitor = new DefaultServerMonitor(serverId, serverSettings, cluster.getClock(),
// no credentials, compressor list, or command listener for the server monitor factory
new InternalStreamConnectionFactory(clusterSettings.getMode(), heartbeatStreamFactory, null, applicationName,
new InternalStreamConnectionFactory(clusterMode, heartbeatStreamFactory, null, applicationName,
mongoDriverInformation, emptyList(), null, serverApi),
serverApi, sdamProvider);
ConnectionPool connectionPool = new DefaultConnectionPool(serverId,
new InternalStreamConnectionFactory(clusterSettings.getMode(), streamFactory, credential, applicationName,
new InternalStreamConnectionFactory(clusterMode, streamFactory, credential, applicationName,
mongoDriverInformation, compressorList, commandListener, serverApi),
connectionPoolSettings, internalConnectionPoolSettings, sdamProvider);
ServerListener serverListener = singleServerListener(serverSettings);
SdamServerDescriptionManager sdam = new DefaultSdamServerDescriptionManager(cluster, serverId, serverDescriptionChangedListener,
serverListener, serverMonitor, connectionPool, clusterSettings.getMode());
SdamServerDescriptionManager sdam = new DefaultSdamServerDescriptionManager(cluster, serverId, serverListener, serverMonitor,
connectionPool, clusterMode);
sdamProvider.initialize(sdam);
serverMonitor.start();
return new DefaultServer(serverId, clusterSettings.getMode(), connectionPool, new DefaultConnectionFactory(), serverMonitor,
sdam, serverListener, commandListener, clusterClock, true);
return new DefaultServer(serverId, clusterMode, connectionPool, new DefaultConnectionFactory(), serverMonitor,
sdam, serverListener, commandListener, cluster.getClock(), true);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
final class DefaultSdamServerDescriptionManager implements SdamServerDescriptionManager {
private final Cluster cluster;
private final ServerId serverId;
private final ServerDescriptionChangedListener serverDescriptionChangedListener;
private final ServerListener serverListener;
private final ServerMonitor serverMonitor;
private final ConnectionPool connectionPool;
Expand All @@ -44,13 +43,11 @@ final class DefaultSdamServerDescriptionManager implements SdamServerDescription

DefaultSdamServerDescriptionManager(final Cluster cluster,
final ServerId serverId,
final ServerDescriptionChangedListener serverDescriptionChangedListener,
final ServerListener serverListener, final ServerMonitor serverMonitor,
final ConnectionPool connectionPool,
final ClusterConnectionMode connectionMode) {
this.cluster = cluster;
this.serverId = assertNotNull(serverId);
this.serverDescriptionChangedListener = assertNotNull(serverDescriptionChangedListener);
this.serverListener = assertNotNull(serverListener);
this.serverMonitor = assertNotNull(serverMonitor);
this.connectionPool = assertNotNull(connectionPool);
Expand Down Expand Up @@ -113,7 +110,7 @@ private void updateDescription(final ServerDescription newDescription) {
if (!wouldDescriptionsGenerateEquivalentEvents(newDescription, previousDescription)) {
serverListener.serverDescriptionChanged(serverDescriptionChangedEvent);
}
serverDescriptionChangedListener.serverDescriptionChanged(serverDescriptionChangedEvent);
cluster.onChange(serverDescriptionChangedEvent);
}

private void handleException(final SdamIssue sdamIssue, final boolean beforeHandshake) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,10 @@
import com.mongodb.event.ClusterDescriptionChangedEvent;
import com.mongodb.event.ClusterListener;
import com.mongodb.event.ClusterOpeningEvent;
import com.mongodb.event.ServerDescriptionChangedEvent;
import com.mongodb.internal.async.SingleResultCallback;
import com.mongodb.lang.Nullable;
import com.mongodb.selector.ServerSelector;
import org.bson.BsonTimestamp;

import java.util.ArrayList;
import java.util.Collection;
Expand Down Expand Up @@ -159,7 +159,7 @@ private void init(final ClusterId clusterId, final ClusterableServerFactory serv
.address(host)
.build()),
settings, serverFactory.getSettings());
server = serverFactory.create(this, host, event -> { }, clusterClock);
server = serverFactory.create(this, host);

clusterListener.clusterDescriptionChanged(new ClusterDescriptionChangedEvent(clusterId, description, initialDescription));
}
Expand Down Expand Up @@ -196,9 +196,9 @@ public ClusterDescription getCurrentDescription() {
}

@Override
public BsonTimestamp getClusterTime() {
public ClusterClock getClock() {
isTrue("open", !isClosed());
return clusterClock.getClusterTime();
return clusterClock;
}

@Override
Expand Down Expand Up @@ -287,6 +287,11 @@ public void withLock(final Runnable action) {
fail();
}

@Override
public void onChange(final ServerDescriptionChangedEvent event) {
fail();
}

private void handleServerSelectionRequest(final ServerSelectionRequest serverSelectionRequest) {
assertTrue(initializationCompleted);
if (srvRecordResolvedToMultipleHosts) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import com.mongodb.ServerApi;
import com.mongodb.annotations.ThreadSafe;
import com.mongodb.connection.ClusterConnectionMode;
import com.mongodb.connection.ClusterId;
import com.mongodb.connection.ConnectionPoolSettings;
import com.mongodb.connection.ServerId;
import com.mongodb.connection.ServerSettings;
Expand All @@ -37,7 +36,6 @@

@ThreadSafe
public class LoadBalancedClusterableServerFactory implements ClusterableServerFactory {
private final ClusterId clusterId;
private final ServerSettings serverSettings;
private final ConnectionPoolSettings connectionPoolSettings;
private final InternalConnectionPoolSettings internalConnectionPoolSettings;
Expand All @@ -49,14 +47,13 @@ public class LoadBalancedClusterableServerFactory implements ClusterableServerFa
private final List<MongoCompressor> compressorList;
private final ServerApi serverApi;

public LoadBalancedClusterableServerFactory(final ClusterId clusterId, final ServerSettings serverSettings,
public LoadBalancedClusterableServerFactory(final ServerSettings serverSettings,
final ConnectionPoolSettings connectionPoolSettings,
final InternalConnectionPoolSettings internalConnectionPoolSettings,
final StreamFactory streamFactory, final MongoCredential credential,
final CommandListener commandListener,
final String applicationName, final MongoDriverInformation mongoDriverInformation,
final List<MongoCompressor> compressorList, final ServerApi serverApi) {
this.clusterId = clusterId;
this.serverSettings = serverSettings;
this.connectionPoolSettings = connectionPoolSettings;
this.internalConnectionPoolSettings = internalConnectionPoolSettings;
Expand All @@ -70,17 +67,15 @@ public LoadBalancedClusterableServerFactory(final ClusterId clusterId, final Ser
}

@Override
public ClusterableServer create(final Cluster cluster, final ServerAddress serverAddress,
final ServerDescriptionChangedListener serverDescriptionChangedListener,
final ClusterClock clusterClock) {
ConnectionPool connectionPool = new DefaultConnectionPool(new ServerId(clusterId, serverAddress),
public ClusterableServer create(final Cluster cluster, final ServerAddress serverAddress) {
ConnectionPool connectionPool = new DefaultConnectionPool(new ServerId(cluster.getClusterId(), serverAddress),
new InternalStreamConnectionFactory(ClusterConnectionMode.LOAD_BALANCED, streamFactory, credential, applicationName,
mongoDriverInformation, compressorList, commandListener, serverApi),
connectionPoolSettings, internalConnectionPoolSettings, EmptyProvider.instance());
connectionPool.ready();

return new LoadBalancedServer(new ServerId(clusterId, serverAddress), connectionPool, new DefaultConnectionFactory(),
singleServerListener(serverSettings), clusterClock);
return new LoadBalancedServer(new ServerId(cluster.getClusterId(), serverAddress), connectionPool, new DefaultConnectionFactory(),
singleServerListener(serverSettings), cluster.getClock());
}

@Override
Expand Down

This file was deleted.

Loading

0 comments on commit ffe64ec

Please sign in to comment.