Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 22 additions & 11 deletions src/Cassandra.IntegrationTests/Core/SessionTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -170,14 +170,18 @@ public void Session_Keyspace_Create_Case_Sensitive()
});
}

[Test]
public void Should_Create_The_Right_Amount_Of_Connections()
[TestCase(true)]
[TestCase(false)]
public void Should_Create_The_Right_Amount_Of_Connections(bool useShardAwareness)
{
var poolingOptions1 = new PoolingOptions().SetCoreConnectionsPerHost(HostDistance.Local, 3);
if (!useShardAwareness)
{
poolingOptions1.DisableShardAwareness();
}
var localCluster1 = GetNewTemporaryCluster(
builder => builder
.WithPoolingOptions(
new PoolingOptions()
.SetCoreConnectionsPerHost(HostDistance.Local, 3)));
.WithPoolingOptions(poolingOptions1));

var localSession1 = (IInternalSession)localCluster1.Connect();
var hosts1 = localCluster1.AllHosts().ToList();
Expand All @@ -191,12 +195,18 @@ public void Should_Create_The_Right_Amount_Of_Connections()
Thread.Sleep(2000);
var pool11 = localSession1.GetOrCreateConnectionPool(hosts1[0], HostDistance.Local);
var pool12 = localSession1.GetOrCreateConnectionPool(hosts1[1], HostDistance.Local);
Assert.That(pool11.OpenConnections, Is.EqualTo(3));
Assert.That(pool12.OpenConnections, Is.EqualTo(3));
var expectedConnections1 = useShardAwareness ? 4 : 3;
Assert.That(pool11.OpenConnections, Is.EqualTo(expectedConnections1));
Assert.That(pool12.OpenConnections, Is.EqualTo(expectedConnections1));

var poolingOptions2 = new PoolingOptions().SetCoreConnectionsPerHost(HostDistance.Local, 1);
if (!useShardAwareness)
{
poolingOptions2.DisableShardAwareness();
}
using (var localCluster2 = ClusterBuilder()
.AddContactPoint(TestCluster.InitialContactPoint)
.WithPoolingOptions(new PoolingOptions().SetCoreConnectionsPerHost(HostDistance.Local, 1))
.WithPoolingOptions(poolingOptions2)
.Build())
{
var localSession2 = (IInternalSession)localCluster2.Connect();
Expand All @@ -211,8 +221,9 @@ public void Should_Create_The_Right_Amount_Of_Connections()
Thread.Sleep(2000);
var pool21 = localSession2.GetOrCreateConnectionPool(hosts2[0], HostDistance.Local);
var pool22 = localSession2.GetOrCreateConnectionPool(hosts2[1], HostDistance.Local);
Assert.That(pool21.OpenConnections, Is.EqualTo(1));
Assert.That(pool22.OpenConnections, Is.EqualTo(1));
var expectedConnections2 = useShardAwareness ? 2 : 1;
Assert.That(pool21.OpenConnections, Is.EqualTo(expectedConnections2));
Assert.That(pool22.OpenConnections, Is.EqualTo(expectedConnections2));
}
}

Expand All @@ -232,7 +243,7 @@ public async Task Session_With_Host_Changing_Distance()
var builder = ClusterBuilder()
.AddContactPoint(TestCluster.InitialContactPoint)
.WithLoadBalancingPolicy(lbp)
.WithPoolingOptions(new PoolingOptions().SetCoreConnectionsPerHost(HostDistance.Local, 3))
.WithPoolingOptions(new PoolingOptions().SetCoreConnectionsPerHost(HostDistance.Local, 3).DisableShardAwareness())
.WithReconnectionPolicy(new ConstantReconnectionPolicy(1000));
var counter = 0;
using (var localCluster = builder.Build())
Expand Down
27 changes: 26 additions & 1 deletion src/Cassandra.IntegrationTests/ShardAwareOptionsTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
using Cassandra.IntegrationTests.TestClusterManagement;
using Cassandra.SessionManagement;
using NUnit.Framework;
using System.Linq;

namespace Cassandra.IntegrationTests
{
Expand Down Expand Up @@ -31,5 +32,29 @@ public void Should_Connect_To_Shard_Aware_Cluster()
var controlConnection = (ControlConnection)internalCluster.GetControlConnection();
Assert.IsTrue(controlConnection.IsShardAware());
}

[TestCase(1)]
[TestCase(4)]
public void Should_Have_NrShards_Connections(int connectionsPerHost)
{
_realCluster = TestClusterManager.CreateNew();
var cluster = ClusterBuilder()
.WithSocketOptions(new SocketOptions().SetReadTimeoutMillis(22000).SetConnectTimeoutMillis(60000))
.WithPoolingOptions(new PoolingOptions()
.SetCoreConnectionsPerHost(HostDistance.Local, connectionsPerHost))
.AddContactPoint(_realCluster.InitialContactPoint)
.Build();
var session = cluster.Connect();
IInternalSession internalSession = (IInternalSession)session;
var pools = internalSession.GetPools();
foreach (var kvp in pools)
{
var shardCount = 2;
var connectionsPerShard = connectionsPerHost / shardCount + (connectionsPerHost % shardCount > 0 ? 1 : 0);
Assert.AreEqual(shardCount * connectionsPerShard, kvp.Value.OpenConnections);
var shardGroups = kvp.Value.ConnectionsSnapshot.GroupBy(c => c.ShardID);
Assert.IsTrue(shardGroups.All(g => g.Count() == connectionsPerShard));
}
}
}
}
}
79 changes: 79 additions & 0 deletions src/Cassandra.IntegrationTests/ShardAwarenessTests.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Diagnostics;
using Cassandra.Connections.Control;
using Cassandra.IntegrationTests.TestBase;
using Cassandra.IntegrationTests.TestClusterManagement;
using Cassandra.SessionManagement;
using NUnit.Framework;

namespace Cassandra.IntegrationTests
{
[TestFixture]
public class ShardAwarenessTest : TestGlobals
{
private ITestCluster _realCluster;

[TearDown]
public void TestTearDown()
{
TestClusterManager.TryRemove();
_realCluster = null;
}

[Test]
public void CorrectShardInTracingTest()
{
_realCluster = TestClusterManager.CreateNew();
var cluster = ClusterBuilder()
.WithSocketOptions(new SocketOptions().SetReadTimeoutMillis(22000).SetConnectTimeoutMillis(60000))
.AddContactPoint(_realCluster.InitialContactPoint)
.Build();
var _session = cluster.Connect();

_session.Execute("DROP KEYSPACE IF EXISTS shardawaretest");
_session.Execute("CREATE KEYSPACE shardawaretest WITH replication = {'class': 'SimpleStrategy', 'replication_factor': '3'}");
_session.Execute("CREATE TABLE shardawaretest.t (pk text, ck text, v text, PRIMARY KEY (pk, ck))");

var populateStatement = _session.Prepare("INSERT INTO shardawaretest.t (pk, ck, v) VALUES (?, ?, ?)");
_session.Execute(populateStatement.Bind("a", "b", "c"));
_session.Execute(populateStatement.Bind("e", "f", "g"));
_session.Execute(populateStatement.Bind("100002", "f", "g"));

VerifyCorrectShardSingleRow(_session, "a", "b", "c", "shard 0");
VerifyCorrectShardSingleRow(_session, "e", "f", "g", "shard 0");
VerifyCorrectShardSingleRow(_session, "100002", "f", "g", "shard 1");
}

private void VerifyCorrectShardSingleRow(ISession _session, string pk, string ck, string v, string shard)
{
var prepared = _session.Prepare("SELECT pk, ck, v FROM shardawaretest.t WHERE pk=? AND ck=?");
var result = _session.Execute(prepared.Bind(pk, ck).EnableTracing());

var row = result.First();
Assert.IsNotNull(row);
Assert.AreEqual(pk, row.GetValue<string>("pk"));
Assert.AreEqual(ck, row.GetValue<string>("ck"));
Assert.AreEqual(v, row.GetValue<string>("v"));

var executionInfo = result.Info;
var trace = executionInfo.QueryTrace;
bool anyLocal = false;
foreach (var eventItem in trace.Events)
{
Trace.TraceInformation(" {0} - {1} - [{2}] - {3}",
eventItem.SourceElapsedMicros,
eventItem.Source,
eventItem.ThreadName,
eventItem.Description);
Assert.IsTrue(eventItem.ThreadName.StartsWith(shard));
if (eventItem.Description.Contains("querying locally"))
{
anyLocal = true;
}
}
Assert.IsTrue(anyLocal);
}
}
}
Loading