Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 35 additions & 0 deletions src/Cassandra.IntegrationTests/ShardAwareOptionsTests.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
using Cassandra.Connections.Control;
using Cassandra.IntegrationTests.TestBase;
using Cassandra.IntegrationTests.TestClusterManagement;
using Cassandra.SessionManagement;
using NUnit.Framework;

namespace Cassandra.IntegrationTests
{
[TestFixture]
public class ShardAwareOptionsTests : TestGlobals
{
private ITestCluster _realCluster;

[TearDown]
public void TestTearDown()
{
TestClusterManager.TryRemove();
_realCluster = null;
}

[Test]
public void Should_Connect_To_Shard_Aware_Cluster()
{
_realCluster = TestClusterManager.CreateNew();
var cluster = ClusterBuilder()
.WithSocketOptions(new SocketOptions().SetReadTimeoutMillis(22000).SetConnectTimeoutMillis(60000))
.AddContactPoint(_realCluster.InitialContactPoint)
.Build();
var session = cluster.Connect();
IInternalCluster internalCluster = cluster;
var controlConnection = (ControlConnection)internalCluster.GetControlConnection();
Assert.IsTrue(controlConnection.IsShardAware());
}
}
}
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
//
//
// Copyright (C) DataStax Inc.
//
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//
// http://www.apache.org/licenses/LICENSE-2.0
//
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
Expand All @@ -33,5 +33,10 @@ public Task ApplySupportedOptionsAsync(IConnection connection)
{
return TaskHelper.Completed;
}

public ShardingInfo GetShardingInfo()
{
return null;
}
}
}
9 changes: 9 additions & 0 deletions src/Cassandra/Connections/Control/ControlConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -897,5 +897,14 @@ public Task ScheduleAllKeyspacesRefreshAsync(bool processNow)
? _eventDebouncer.HandleEventAsync(@event, true)
: _eventDebouncer.ScheduleEventAsync(@event, false);
}

public bool IsShardAware()
{
if (_supportedOptionsInitializer.GetShardingInfo() == null)
{
return false;
}
return _supportedOptionsInitializer.GetShardingInfo().ScyllaNrShards > 0;
}
}
}
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
//
//
// Copyright (C) DataStax Inc.
//
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//
// http://www.apache.org/licenses/LICENSE-2.0
//
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
Expand All @@ -20,5 +20,6 @@ namespace Cassandra.Connections.Control
internal interface ISupportedOptionsInitializer
{
Task ApplySupportedOptionsAsync(IConnection connection);
ShardingInfo GetShardingInfo();
}
}
93 changes: 88 additions & 5 deletions src/Cassandra/Connections/Control/SupportedOptionsInitializer.cs
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
//
//
// Copyright (C) DataStax Inc.
//
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//
// http://www.apache.org/licenses/LICENSE-2.0
//
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
Expand All @@ -28,6 +28,16 @@ internal class SupportedOptionsInitializer : ISupportedOptionsInitializer

private readonly Metadata _metadata;

private const string ScyllaShard = "SCYLLA_SHARD";
private const string ScyllaNrShards = "SCYLLA_NR_SHARDS";
private const string ScyllaPartitioner = "SCYLLA_PARTITIONER";
private const string ScyllaShardingAlgorithm = "SCYLLA_SHARDING_ALGORITHM";
private const string ScyllaShardingIgnoreMSB = "SCYLLA_SHARDING_IGNORE_MSB";
private const string ScyllaShardAwarePort = "SCYLLA_SHARD_AWARE_PORT";
private const string ScyllaShardAwarePortSSL = "SCYLLA_SHARD_AWARE_PORT_SSL";

private ShardingInfo _shardingInfo;

public SupportedOptionsInitializer(Metadata metadata)
{
_metadata = metadata;
Expand All @@ -49,6 +59,12 @@ public async Task ApplySupportedOptionsAsync(IConnection connection)
}

ApplyProductTypeOption(supportedResponse.Output.Options);
ApplyScyllaShardingOption(supportedResponse.Output.Options);
}

public ShardingInfo GetShardingInfo()
{
return _shardingInfo;
}

private void ApplyProductTypeOption(IDictionary<string, string[]> options)
Expand All @@ -68,5 +84,72 @@ private void ApplyProductTypeOption(IDictionary<string, string[]> options)
_metadata.SetProductTypeAsDbaas();
}
}

private void ApplyScyllaShardingOption(IDictionary<string, string[]> options)
{
if (!options.TryGetValue(SupportedOptionsInitializer.ScyllaShard, out var scyllaShard))
{
return;
}
if (scyllaShard.Length <= 0)
{
return;
}

if (!options.TryGetValue(SupportedOptionsInitializer.ScyllaNrShards, out var scyllaNrShards))
{
return;
}
if (scyllaNrShards.Length <= 0)
{
return;
}

if (!options.TryGetValue(SupportedOptionsInitializer.ScyllaPartitioner, out var scyllaPartitioner))
{
return;
}
if (scyllaPartitioner.Length <= 0)
{
return;
}

if (!options.TryGetValue(SupportedOptionsInitializer.ScyllaShardingAlgorithm, out var scyllaShardingAlgorithm))
{
return;
}
if (scyllaShardingAlgorithm.Length <= 0)
{
return;
}

if (!options.TryGetValue(SupportedOptionsInitializer.ScyllaShardingIgnoreMSB, out var scyllaShardingIgnoreMSB))
{
return;
}
if (scyllaShardingIgnoreMSB.Length <= 0)
{
return;
}

if (!options.TryGetValue(SupportedOptionsInitializer.ScyllaShardAwarePort, out var scyllaShardAwarePort))
{
scyllaShardAwarePort = new string[] { "0" };
}

if (!options.TryGetValue(SupportedOptionsInitializer.ScyllaShardAwarePortSSL, out var scyllaShardAwarePortSSL))
{
scyllaShardAwarePortSSL = new string[] { "0" };
}

_shardingInfo = ShardingInfo.Create(
scyllaShard[0],
scyllaNrShards[0],
scyllaPartitioner[0],
scyllaShardingAlgorithm[0],
scyllaShardingIgnoreMSB[0],
scyllaShardAwarePort[0],
scyllaShardAwarePortSSL[0]);
}
}
}
}
45 changes: 45 additions & 0 deletions src/Cassandra/ShardingInfo.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
namespace Cassandra
{
/// <summary>
/// Represents Scylla connection options as sent in SUPPORTED
/// frame.
/// </summary>
public class ShardingInfo
{
public int ScyllaShard { get; }
public int ScyllaNrShards { get; }
public string ScyllaPartitioner { get; }
public string ScyllaShardingAlgorithm { get; }
public ulong ScyllaShardingIgnoreMSB { get; }
public ulong ScyllaShardAwarePort { get; }
public ulong ScyllaShardAwarePortSSL { get; }

private ShardingInfo(int scyllaShard, int scyllaNrShards, string scyllaPartitioner,
string scyllaShardingAlgorithm, ulong scyllaShardingIgnoreMSB,
ulong scyllaShardAwarePort, ulong scyllaShardAwarePortSSL)
{
ScyllaShard = scyllaShard;
ScyllaNrShards = scyllaNrShards;
ScyllaPartitioner = scyllaPartitioner;
ScyllaShardingAlgorithm = scyllaShardingAlgorithm;
ScyllaShardingIgnoreMSB = scyllaShardingIgnoreMSB;
ScyllaShardAwarePort = scyllaShardAwarePort;
ScyllaShardAwarePortSSL = scyllaShardAwarePortSSL;
}

public static ShardingInfo Create(string scyllaShard, string scyllaNrShards, string scyllaPartitioner,
string scyllaShardingAlgorithm, string scyllaShardingIgnoreMSB,
string scyllaShardAwarePort, string scyllaShardAwarePortSSL)
{
return new ShardingInfo(
int.Parse(scyllaShard),
int.Parse(scyllaNrShards),
scyllaPartitioner,
scyllaShardingAlgorithm,
ulong.Parse(scyllaShardingIgnoreMSB),
ulong.Parse(scyllaShardAwarePort),
ulong.Parse(scyllaShardAwarePortSSL)
);
}
}
}