Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,10 @@
import com.mongodb.connection.AsyncTransportSettings;
import com.mongodb.connection.NettyTransportSettings;
import com.mongodb.connection.SocketSettings;
import com.mongodb.connection.SslSettings;
import com.mongodb.connection.TransportSettings;
import com.mongodb.internal.connection.netty.NettyStreamFactoryFactory;
import com.mongodb.lang.Nullable;
import com.mongodb.spi.dns.InetAddressResolver;

import java.io.IOException;
Expand All @@ -34,16 +36,26 @@
*/
public final class StreamFactoryHelper {

public static StreamFactory getSyncStreamFactory(final MongoClientSettings settings,
final InetAddressResolver inetAddressResolver, final SocketSettings socketSettings) {
TransportSettings transportSettings = settings.getTransportSettings();
public static StreamFactoryFactory getSyncStreamFactoryFactory(
@Nullable final TransportSettings transportSettings,
final InetAddressResolver inetAddressResolver) {

if (transportSettings == null) {
return new SocketStreamFactory(inetAddressResolver, socketSettings, settings.getSslSettings());
return new StreamFactoryFactory() {
@Override
public StreamFactory create(final SocketSettings socketSettings, final SslSettings sslSettings) {
return new SocketStreamFactory(inetAddressResolver, socketSettings, sslSettings);
}

@Override
public void close() {
//NOP
}
};
} else if (transportSettings instanceof AsyncTransportSettings) {
throw new MongoClientException("Unsupported transport settings in sync: " + transportSettings.getClass().getName());
} else if (transportSettings instanceof NettyTransportSettings) {
return getNettyStreamFactoryFactory(inetAddressResolver, (NettyTransportSettings) transportSettings)
.create(socketSettings, settings.getSslSettings());
return getNettyStreamFactoryFactory(inetAddressResolver, (NettyTransportSettings) transportSettings);
} else {
throw new MongoClientException("Unsupported transport settings: " + transportSettings.getClass().getName());
}
Expand Down Expand Up @@ -75,7 +87,7 @@ public static StreamFactoryFactory getAsyncStreamFactoryFactory(final MongoClien
}
}

private static NettyStreamFactoryFactory getNettyStreamFactoryFactory(final InetAddressResolver inetAddressResolver,
public static NettyStreamFactoryFactory getNettyStreamFactoryFactory(final InetAddressResolver inetAddressResolver,
final NettyTransportSettings transportSettings) {
return NettyStreamFactoryFactory.builder()
.applySettings(transportSettings)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,20 +33,20 @@
import com.mongodb.client.MongoIterable;
import com.mongodb.client.SynchronousContextProvider;
import com.mongodb.client.model.bulk.ClientBulkWriteOptions;
import com.mongodb.client.model.bulk.ClientNamespacedWriteModel;
import com.mongodb.client.model.bulk.ClientBulkWriteResult;
import com.mongodb.client.model.bulk.ClientNamespacedWriteModel;
import com.mongodb.connection.ClusterDescription;
import com.mongodb.connection.SocketSettings;
import com.mongodb.internal.TimeoutSettings;
import com.mongodb.internal.connection.Cluster;
import com.mongodb.internal.connection.DefaultClusterFactory;
import com.mongodb.internal.connection.InternalConnectionPoolSettings;
import com.mongodb.internal.connection.StreamFactory;
import com.mongodb.internal.connection.StreamFactoryFactory;
import com.mongodb.internal.diagnostics.logging.Logger;
import com.mongodb.internal.diagnostics.logging.Loggers;
import com.mongodb.internal.session.ServerSessionPool;
import com.mongodb.lang.Nullable;
import com.mongodb.spi.dns.InetAddressResolver;
import org.bson.BsonDocument;
import org.bson.Document;
import org.bson.codecs.configuration.CodecRegistry;
Expand All @@ -60,7 +60,7 @@
import static com.mongodb.client.internal.Crypts.createCrypt;
import static com.mongodb.internal.connection.ClientMetadataHelper.createClientMetadataDocument;
import static com.mongodb.internal.connection.ServerAddressHelper.getInetAddressResolver;
import static com.mongodb.internal.connection.StreamFactoryHelper.getSyncStreamFactory;
import static com.mongodb.internal.connection.StreamFactoryHelper.getSyncStreamFactoryFactory;
import static com.mongodb.internal.event.EventListenerHelper.getCommandListener;
import static java.lang.String.format;
import static org.bson.codecs.configuration.CodecRegistries.withUuidRepresentation;
Expand All @@ -75,14 +75,21 @@ public final class MongoClientImpl implements MongoClient {
private final MongoDriverInformation mongoDriverInformation;
private final MongoClusterImpl delegate;
private final AtomicBoolean closed;
private final StreamFactoryFactory streamFactoryFactory;

public MongoClientImpl(final MongoClientSettings settings, final MongoDriverInformation mongoDriverInformation) {
this(createCluster(settings, mongoDriverInformation), mongoDriverInformation, settings, null);
this(mongoDriverInformation, settings, null);
}

public MongoClientImpl(final Cluster cluster, final MongoDriverInformation mongoDriverInformation,
private MongoClientImpl(final MongoDriverInformation mongoDriverInformation,
final MongoClientSettings settings,
@Nullable final OperationExecutor operationExecutor) {
@Nullable final OperationExecutor operationExecutor) {

this.streamFactoryFactory = getSyncStreamFactoryFactory(settings.getTransportSettings(), getInetAddressResolver(settings));
StreamFactory streamFactory = getStreamFactory(streamFactoryFactory, settings, false);
StreamFactory heartbeatStreamFactory = getStreamFactory(streamFactoryFactory, settings, true);
Cluster cluster = createCluster(settings, mongoDriverInformation, streamFactory, heartbeatStreamFactory);

this.settings = notNull("settings", settings);
this.mongoDriverInformation = mongoDriverInformation;
AutoEncryptionSettings autoEncryptionSettings = settings.getAutoEncryptionSettings();
Expand Down Expand Up @@ -114,6 +121,13 @@ public void close() {
}
delegate.getServerSessionPool().close();
delegate.getCluster().close();
if (streamFactoryFactory != null) {
try {
streamFactoryFactory.close();
} catch (Exception e) {
LOGGER.warn("Exception closing resource", e);
}
}
}
}

Expand Down Expand Up @@ -287,21 +301,24 @@ public ClientBulkWriteResult bulkWrite(
}

private static Cluster createCluster(final MongoClientSettings settings,
@Nullable final MongoDriverInformation mongoDriverInformation) {
@Nullable final MongoDriverInformation mongoDriverInformation,
final StreamFactory streamFactory, final StreamFactory heartbeatStreamFactory) {
notNull("settings", settings);
return new DefaultClusterFactory().createCluster(settings.getClusterSettings(), settings.getServerSettings(),
settings.getConnectionPoolSettings(), InternalConnectionPoolSettings.builder().build(),
TimeoutSettings.create(settings), getStreamFactory(settings, false),
TimeoutSettings.createHeartbeatSettings(settings), getStreamFactory(settings, true),
TimeoutSettings.create(settings), streamFactory,
TimeoutSettings.createHeartbeatSettings(settings), heartbeatStreamFactory,
settings.getCredential(), settings.getLoggerSettings(), getCommandListener(settings.getCommandListeners()),
settings.getApplicationName(), mongoDriverInformation, settings.getCompressorList(), settings.getServerApi(),
settings.getDnsClient());
}

private static StreamFactory getStreamFactory(final MongoClientSettings settings, final boolean isHeartbeat) {
private static StreamFactory getStreamFactory(
final StreamFactoryFactory streamFactoryFactory,
final MongoClientSettings settings,
final boolean isHeartbeat) {
SocketSettings socketSettings = isHeartbeat ? settings.getHeartbeatSocketSettings() : settings.getSocketSettings();
InetAddressResolver inetAddressResolver = getInetAddressResolver(settings);
return getSyncStreamFactory(settings, inetAddressResolver, socketSettings);
return streamFactoryFactory.create(socketSettings, settings.getSslSettings());
}

public Cluster getCluster() {
Expand Down