diff --git a/core/src/main/java/com/datastax/oss/driver/internal/core/channel/DriverChannel.java b/core/src/main/java/com/datastax/oss/driver/internal/core/channel/DriverChannel.java index afd5daee392..f0e0176c144 100644 --- a/core/src/main/java/com/datastax/oss/driver/internal/core/channel/DriverChannel.java +++ b/core/src/main/java/com/datastax/oss/driver/internal/core/channel/DriverChannel.java @@ -32,6 +32,7 @@ import com.datastax.oss.driver.internal.core.adminrequest.ThrottledAdminRequestHandler; import com.datastax.oss.driver.internal.core.pool.ChannelPool; import com.datastax.oss.driver.internal.core.protocol.LwtInfo; +import com.datastax.oss.driver.internal.core.protocol.ProtocolFeatureStore; import com.datastax.oss.driver.internal.core.protocol.ShardingInfo; import com.datastax.oss.driver.internal.core.protocol.ShardingInfo.ConnectionShardingInfo; import com.datastax.oss.driver.internal.core.session.DefaultSession; @@ -61,9 +62,6 @@ public class DriverChannel { static final AttributeKey CLUSTER_NAME_KEY = AttributeKey.valueOf("cluster_name"); static final AttributeKey>> OPTIONS_KEY = AttributeKey.valueOf("options"); - static final AttributeKey SHARDING_INFO_KEY = - AttributeKey.valueOf("sharding_info"); - static final AttributeKey LWT_INFO_KEY = AttributeKey.valueOf("lwt_info"); @SuppressWarnings("RedundantStringConstructorCall") static final Object GRACEFUL_CLOSE_MESSAGE = new String("GRACEFUL_CLOSE_MESSAGE"); @@ -78,6 +76,7 @@ public class DriverChannel { private final ProtocolVersion protocolVersion; private final AtomicBoolean closing = new AtomicBoolean(); private final AtomicBoolean forceClosing = new AtomicBoolean(); + private ProtocolFeatureStore featureStore; DriverChannel( EndPoint endPoint, @@ -148,18 +147,35 @@ public Map> getOptions() { return channel.attr(OPTIONS_KEY).get(); } + public ProtocolFeatureStore getSupportedFeatures() { + if (featureStore != null) { + return featureStore; + } + + ProtocolFeatureStore fromChannel = ProtocolFeatureStore.loadFromChannel(channel); + if (fromChannel == null) { + return ProtocolFeatureStore.Empty; + } + // Features can't be renegotiated. + // Once features is populated into channel it is enough to update cache and no need to + // invalidate it further. + + featureStore = fromChannel; + return featureStore; + } + public int getShardId() { - return channel.hasAttr(SHARDING_INFO_KEY) ? channel.attr(SHARDING_INFO_KEY).get().shardId : 0; + ConnectionShardingInfo info = getSupportedFeatures().getShardingInfo(); + return info != null ? info.shardId : 0; } public ShardingInfo getShardingInfo() { - return channel.hasAttr(SHARDING_INFO_KEY) - ? channel.attr(SHARDING_INFO_KEY).get().shardingInfo - : null; + ConnectionShardingInfo info = getSupportedFeatures().getShardingInfo(); + return info != null ? info.shardingInfo : null; } public LwtInfo getLwtInfo() { - return channel.attr(LWT_INFO_KEY).get(); + return getSupportedFeatures().getLwtFeatureInfo(); } /** diff --git a/core/src/main/java/com/datastax/oss/driver/internal/core/channel/ProtocolInitHandler.java b/core/src/main/java/com/datastax/oss/driver/internal/core/channel/ProtocolInitHandler.java index 95cbf1e200b..248b51c89f1 100644 --- a/core/src/main/java/com/datastax/oss/driver/internal/core/channel/ProtocolInitHandler.java +++ b/core/src/main/java/com/datastax/oss/driver/internal/core/channel/ProtocolInitHandler.java @@ -23,8 +23,6 @@ */ package com.datastax.oss.driver.internal.core.channel; -import static com.datastax.oss.driver.internal.core.channel.DriverChannel.LWT_INFO_KEY; - import com.datastax.oss.driver.api.core.DefaultProtocolVersion; import com.datastax.oss.driver.api.core.InvalidKeyspaceException; import com.datastax.oss.driver.api.core.ProtocolVersion; @@ -40,12 +38,9 @@ import com.datastax.oss.driver.internal.core.context.InternalDriverContext; import com.datastax.oss.driver.internal.core.protocol.BytesToSegmentDecoder; import com.datastax.oss.driver.internal.core.protocol.FrameToSegmentEncoder; -import com.datastax.oss.driver.internal.core.protocol.LwtInfo; +import com.datastax.oss.driver.internal.core.protocol.ProtocolFeatureStore; import com.datastax.oss.driver.internal.core.protocol.SegmentToBytesEncoder; import com.datastax.oss.driver.internal.core.protocol.SegmentToFrameDecoder; -import com.datastax.oss.driver.internal.core.protocol.ShardingInfo; -import com.datastax.oss.driver.internal.core.protocol.ShardingInfo.ConnectionShardingInfo; -import com.datastax.oss.driver.internal.core.protocol.TabletInfo; import com.datastax.oss.driver.internal.core.util.ProtocolUtils; import com.datastax.oss.driver.internal.core.util.concurrent.UncaughtExceptions; import com.datastax.oss.protocol.internal.Message; @@ -96,8 +91,7 @@ class ProtocolInitHandler extends ConnectInitHandler { private String logPrefix; private ChannelHandlerContext ctx; private final boolean querySupportedOptions; - private LwtInfo lwtInfo; - private TabletInfo tabletInfo; + private ProtocolFeatureStore featureStore; /** * @param querySupportedOptions whether to send OPTIONS as the first message, to request which @@ -192,11 +186,8 @@ Message getRequest() { return request = Options.INSTANCE; case STARTUP: Map startupOptions = new HashMap<>(context.getStartupOptions()); - if (lwtInfo != null) { - lwtInfo.addOption(startupOptions); - } - if (tabletInfo != null && tabletInfo.isEnabled()) { - TabletInfo.addOption(startupOptions); + if (featureStore != null) { + featureStore.populateStartupOptions(startupOptions); } return request = new Startup(startupOptions); case GET_CLUSTER_NAME: @@ -229,15 +220,8 @@ void onResponse(Message response) { if (step == Step.OPTIONS && response instanceof Supported) { channel.attr(DriverChannel.OPTIONS_KEY).set(((Supported) response).options); Supported res = (Supported) response; - ConnectionShardingInfo shardingInfo = ShardingInfo.parseShardingInfo(res.options); - if (shardingInfo != null) { - channel.attr(DriverChannel.SHARDING_INFO_KEY).set(shardingInfo); - } - lwtInfo = LwtInfo.parseLwtInfo(res.options); - if (lwtInfo != null) { - channel.attr(LWT_INFO_KEY).set(lwtInfo); - } - tabletInfo = TabletInfo.parseTabletInfo(res.options); + featureStore = ProtocolFeatureStore.parseSupportedOptions(res.options); + featureStore.storeInChannel(channel); step = Step.STARTUP; send(); } else if (step == Step.STARTUP && response instanceof Ready) { diff --git a/core/src/main/java/com/datastax/oss/driver/internal/core/protocol/LwtInfo.java b/core/src/main/java/com/datastax/oss/driver/internal/core/protocol/LwtInfo.java index 5ac8abc2e53..2058e8d317f 100644 --- a/core/src/main/java/com/datastax/oss/driver/internal/core/protocol/LwtInfo.java +++ b/core/src/main/java/com/datastax/oss/driver/internal/core/protocol/LwtInfo.java @@ -37,7 +37,7 @@ public boolean isLwt(int flags) { return (flags & mask) == mask; } - public static LwtInfo parseLwtInfo(Map> supported) { + public static LwtInfo loadFromSupportedOptions(Map> supported) { if (!supported.containsKey(SCYLLA_LWT_ADD_METADATA_MARK_KEY)) { return null; } @@ -67,7 +67,7 @@ public static LwtInfo parseLwtInfo(Map> supported) { return new LwtInfo((int) mask); } - public void addOption(Map options) { + public void populateStartupOptions(Map options) { options.put(SCYLLA_LWT_ADD_METADATA_MARK_KEY, Integer.toString(mask)); } } diff --git a/core/src/main/java/com/datastax/oss/driver/internal/core/protocol/ProtocolFeatureStore.java b/core/src/main/java/com/datastax/oss/driver/internal/core/protocol/ProtocolFeatureStore.java new file mode 100644 index 00000000000..1d026d10dd0 --- /dev/null +++ b/core/src/main/java/com/datastax/oss/driver/internal/core/protocol/ProtocolFeatureStore.java @@ -0,0 +1,62 @@ +package com.datastax.oss.driver.internal.core.protocol; + +import edu.umd.cs.findbugs.annotations.NonNull; +import io.netty.channel.Channel; +import io.netty.util.AttributeKey; +import java.util.List; +import java.util.Map; + +public class ProtocolFeatureStore { + private static final AttributeKey CHANNEL_KEY = + AttributeKey.valueOf("protocol_feature_store"); + + private final LwtInfo lwtInfo; + private final ShardingInfo.ConnectionShardingInfo shardingInfo; + private final TabletInfo tabletInfo; + + public static final ProtocolFeatureStore Empty = new ProtocolFeatureStore(null, null, null); + + ProtocolFeatureStore( + LwtInfo lwtInfo, ShardingInfo.ConnectionShardingInfo shardingInfo, TabletInfo tabletInfo) { + this.lwtInfo = lwtInfo; + this.shardingInfo = shardingInfo; + this.tabletInfo = tabletInfo; + } + + public LwtInfo getLwtFeatureInfo() { + return lwtInfo; + } + + public ShardingInfo.ConnectionShardingInfo getShardingInfo() { + return shardingInfo; + } + + public TabletInfo getTabletFeatureInfo() { + return tabletInfo; + } + + public static ProtocolFeatureStore parseSupportedOptions( + @NonNull Map> options) { + LwtInfo lwtInfo = LwtInfo.loadFromSupportedOptions(options); + ShardingInfo.ConnectionShardingInfo shardingInfo = ShardingInfo.parseShardingInfo(options); + TabletInfo tabletInfo = TabletInfo.loadFromSupportedOptions(options); + return new ProtocolFeatureStore(lwtInfo, shardingInfo, tabletInfo); + } + + public void populateStartupOptions(@NonNull Map options) { + if (lwtInfo != null) { + lwtInfo.populateStartupOptions(options); + } + if (tabletInfo != null && tabletInfo.isEnabled()) { + TabletInfo.populateStartupOptions(options); + } + } + + public static ProtocolFeatureStore loadFromChannel(@NonNull Channel channel) { + return channel.attr(ProtocolFeatureStore.CHANNEL_KEY).get(); + } + + public void storeInChannel(@NonNull Channel channel) { + channel.attr(ProtocolFeatureStore.CHANNEL_KEY).set(this); + } +} diff --git a/core/src/main/java/com/datastax/oss/driver/internal/core/protocol/TabletInfo.java b/core/src/main/java/com/datastax/oss/driver/internal/core/protocol/TabletInfo.java index 8c33e803fd5..09ca42c9295 100644 --- a/core/src/main/java/com/datastax/oss/driver/internal/core/protocol/TabletInfo.java +++ b/core/src/main/java/com/datastax/oss/driver/internal/core/protocol/TabletInfo.java @@ -19,7 +19,7 @@ public boolean isEnabled() { return enabled; } - public static TabletInfo parseTabletInfo(Map> supported) { + public static TabletInfo loadFromSupportedOptions(Map> supported) { List values = supported.get(SCYLLA_TABLETS_STARTUP_OPTION_KEY); return new TabletInfo( values != null @@ -27,7 +27,7 @@ public static TabletInfo parseTabletInfo(Map> supported) { && values.get(0).equals(SCYLLA_TABLETS_STARTUP_OPTION_VALUE)); } - public static void addOption(Map options) { + public static void populateStartupOptions(Map options) { options.put(SCYLLA_TABLETS_STARTUP_OPTION_KEY, SCYLLA_TABLETS_STARTUP_OPTION_VALUE); } }