Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions src/Cassandra.IntegrationTests/Core/ClusterTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -245,9 +245,9 @@ public HostDistance Distance(Host host)
return HostDistance.Local;
}

public IEnumerable<Host> NewQueryPlan(string keyspace, IStatement query)
public IEnumerable<HostShard> NewQueryPlan(string keyspace, IStatement query)
{
return _cluster.AllHosts();
return _cluster.AllHosts().Select(h => new HostShard(h, -1));
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -386,10 +386,10 @@ public HostDistance Distance(Host host)
return _parent.Distance(host);
}

public IEnumerable<Host> NewQueryPlan(string keyspace, IStatement query)
public IEnumerable<HostShard> NewQueryPlan(string keyspace, IStatement query)
{
var plan = _parent.NewQueryPlan(keyspace, query);
return plan.Where(h => !_disallowed.Contains(h.Address));
return plan.Where(h => !_disallowed.Contains(h.Host.Address));
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/Cassandra.IntegrationTests/Core/SessionTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -331,7 +331,7 @@ public HostDistance Distance(Host host)
return HostDistance.Local;
}

public IEnumerable<Host> NewQueryPlan(string keyspace, IStatement query)
public IEnumerable<HostShard> NewQueryPlan(string keyspace, IStatement query)
{
return _childPolicy.NewQueryPlan(keyspace, query);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -276,14 +276,14 @@ public HostDistance Distance(Host host)
return HostDistance.Local;
}

public IEnumerable<Host> NewQueryPlan(string keyspace, IStatement query)
public IEnumerable<HostShard> NewQueryPlan(string keyspace, IStatement query)
{
var hosts = _cluster.AllHosts().ToArray();
foreach (var addr in _addresses)
{
var host = hosts.Single(h => h.Address.Address.ToString() == addr);
Interlocked.Increment(ref _hostYielded);
yield return host;
yield return new HostShard(host, -1);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,8 @@ public void TokenMap_Should_RebuildTokenMap_When_NodeIsDecommissioned()
sessionNotSync.ChangeKeyspace(keyspaceName);
sessionSync.ChangeKeyspace(keyspaceName);

ICollection<Host> replicasSync = null;
ICollection<Host> replicasNotSync = null;
ICollection<HostShard> replicasSync = null;
ICollection<HostShard> replicasNotSync = null;

TestHelper.RetryAssert(() =>
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -382,7 +382,7 @@ public void Token_Aware_Uses_Keyspace_From_Statement_To_Determine_Replication(bo
// Get the replicas
var replicas = cluster.GetReplicas(ks, routingKey);
Assert.AreEqual(metadataSync ? 2 : 1, replicas.Count);
CollectionAssert.AreEquivalent(replicas.Select(h => h.Address), coordinators);
CollectionAssert.AreEquivalent(replicas.Select(h => h.Host.Address), coordinators);
}
finally
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,13 +39,13 @@ public HostDistance Distance(Host host)
return HostDistance.Local;
}

public IEnumerable<Host> NewQueryPlan(string keyspace, IStatement query)
public IEnumerable<HostShard> NewQueryPlan(string keyspace, IStatement query)
{
var queryPlan = new List<Host>();
var queryPlan = new List<HostShard>();
var allHosts = _cluster.AllHosts();
foreach (var host in _hosts)
{
queryPlan.Add(allHosts.Single(h => h.Address.ToString() == host));
queryPlan.Add(new HostShard(allHosts.Single(h => h.Address.ToString() == host), -1));
}
return queryPlan;
}
Expand Down
61 changes: 61 additions & 0 deletions src/Cassandra.IntegrationTests/ScyllaTabletTests.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
using System;
using System.Diagnostics;
using System.Linq;
using System.Collections.Generic;
using Cassandra.IntegrationTests.TestBase;
using Cassandra.IntegrationTests.TestClusterManagement;
using NUnit.Framework;
Expand Down Expand Up @@ -66,5 +67,65 @@ public void CorrectTabletMapTest()
Assert.IsTrue(tablets.Count > 0, "Make sure tablets are present in the tablet set");
}
}

[Test]
public void CorrectShardUsingTabletsInTracingTest()
{
_realCluster = TestClusterManager.CreateNew();
var cluster = ClusterBuilder()
.WithSocketOptions(new SocketOptions().SetReadTimeoutMillis(22000).SetConnectTimeoutMillis(60000))
.AddContactPoint(_realCluster.InitialContactPoint)
.Build();
var _session = cluster.Connect();

var rf = 1;
_session.Execute("DROP KEYSPACE IF EXISTS tablettest");
_session.Execute($"CREATE KEYSPACE tablettest WITH replication = {{'class': 'NetworkTopologyStrategy', 'replication_factor': '{rf}'}}");
_session.Execute("CREATE TABLE tablettest.t (pk text, ck text, v text, PRIMARY KEY (pk, ck))");

var populateStatement = _session.Prepare("INSERT INTO tablettest.t (pk, ck, v) VALUES (?, ?, ?)");
//Insert 50 rows to ensure that the tablet map is populated correctly
for (int i = 0; i < 50; i++)
{
_session.Execute(populateStatement.Bind(i.ToString(), "ck" + i, "v" + i));
}

for (int i = 0; i < 50; i++)
{
_session.Execute(populateStatement.Bind(i.ToString(), "ck" + i, "v" + i));
}
}

private void VerifyCorrectShardSingleRow(ISession _session, string pk, string ck, string v)
{
var prepared = _session.Prepare("SELECT pk, ck, v FROM tablettest.t WHERE pk=? AND ck=?");
var result = _session.Execute(prepared.Bind(pk, ck).EnableTracing());

var row = result.First();
Assert.IsNotNull(row);
Assert.AreEqual(pk, row.GetValue<string>("pk"));
Assert.AreEqual(ck, row.GetValue<string>("ck"));
Assert.AreEqual(v, row.GetValue<string>("v"));

var executionInfo = result.Info;
var trace = executionInfo.QueryTrace;
bool anyLocal = false;
var shardSet = new HashSet<string>();
foreach (var eventItem in trace.Events)
{
Trace.TraceInformation(" {0} - {1} - [{2}] - {3}",
eventItem.SourceElapsedMicros,
eventItem.Source,
eventItem.ThreadName,
eventItem.Description);
shardSet.Add(eventItem.ThreadName);
if (eventItem.Description.Contains("querying locally"))
{
anyLocal = true;
}
}
Assert.IsTrue(shardSet.Count == 1);
Assert.IsTrue(anyLocal);
}
}
}
6 changes: 3 additions & 3 deletions src/Cassandra.Tests/BaseUnitTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -79,12 +79,12 @@ public HostDistance Distance(Host host)
return _distance;
}

public IEnumerable<Host> NewQueryPlan(string keyspace, IStatement query)
public IEnumerable<HostShard> NewQueryPlan(string keyspace, IStatement query)
{
return new[]
{
new Host(new IPEndPoint(101L, 9042), ReconnectionPolicy),
new Host(new IPEndPoint(102L, 9042), ReconnectionPolicy)
new HostShard(new Host(new IPEndPoint(101L, 9042), ReconnectionPolicy), -1),
new HostShard(new Host(new IPEndPoint(102L, 9042), ReconnectionPolicy), -1)
};
}
}
Expand Down
7 changes: 5 additions & 2 deletions src/Cassandra.Tests/ClusterTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -291,9 +291,12 @@ public HostDistance Distance(Host host)
return _distances[host.Address.Address.ToString()];
}

public IEnumerable<Host> NewQueryPlan(string keyspace, IStatement query)
public IEnumerable<HostShard> NewQueryPlan(string keyspace, IStatement query)
{
return _cluster.AllHosts().OrderBy(h => Guid.NewGuid().GetHashCode()).Take(_distances.Count);
return _cluster.AllHosts()
.OrderBy(h => Guid.NewGuid().GetHashCode())
.Take(_distances.Count)
.Select(h => new HostShard(h, -1));
}
}
}
Expand Down
4 changes: 2 additions & 2 deletions src/Cassandra.Tests/ExecutionProfiles/ClusterTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -362,9 +362,9 @@ public HostDistance Distance(Host host)
return HostDistance.Local;
}

public IEnumerable<Host> NewQueryPlan(string keyspace, IStatement query)
public IEnumerable<HostShard> NewQueryPlan(string keyspace, IStatement query)
{
return _cluster.AllHosts();
return _cluster.AllHosts().Select(h => new HostShard(h, -1));
}
}
}
Expand Down
8 changes: 4 additions & 4 deletions src/Cassandra.Tests/ExecutionProfiles/RequestHandlerTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -379,13 +379,13 @@ public HostDistance Distance(Host host)
return HostDistance.Local;
}

public IEnumerable<Host> NewQueryPlan(string keyspace, IStatement query)
public IEnumerable<HostShard> NewQueryPlan(string keyspace, IStatement query)
{
Interlocked.Increment(ref Count);
return new List<Host>
return new List<HostShard>
{
new Host(new IPEndPoint(IPAddress.Parse("127.0.0.1"), 9042), contactPoint: null),
new Host(new IPEndPoint(IPAddress.Parse("127.0.0.2"), 9042), contactPoint: null) // 2 hosts for speculative execution policy
new HostShard(new Host(new IPEndPoint(IPAddress.Parse("127.0.0.1"), 9042), contactPoint: null), -1),
new HostShard(new Host(new IPEndPoint(IPAddress.Parse("127.0.0.2"), 9042), contactPoint: null), -1) // 2 hosts for speculative execution policy
};
}
}
Expand Down
4 changes: 2 additions & 2 deletions src/Cassandra.Tests/Policies/DefaultLoadBalancingTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ public void Should_Yield_Preferred_Host_First()
var hosts = lbp.NewQueryPlan(null, statement);
CollectionAssert.AreEqual(
new[] { "201.0.0.0:9042", "101.0.0.0:9042", "102.0.0.0:9042" },
hosts.Select(h => h.Address.ToString()));
hosts.Select(h => h.Host.Address.ToString()));
}

[Test]
Expand All @@ -45,7 +45,7 @@ public void Should_Yield_Child_Hosts_When_No_Preferred_Host_Defined()
var hosts = lbp.NewQueryPlan(null, statement);
CollectionAssert.AreEqual(
new[] { "101.0.0.0:9042", "102.0.0.0:9042" },
hosts.Select(h => h.Address.ToString()));
hosts.Select(h => h.Host.Address.ToString()));
}

[Test]
Expand Down
52 changes: 26 additions & 26 deletions src/Cassandra.Tests/PoliciesUnitTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -52,11 +52,11 @@ public void RoundRobinIsCyclicTest()
foreach (var host in hostList)
{
//Check that each host appears only once
Assert.AreEqual(1, firstRound.Where(h => h.Equals(host)).Count());
Assert.AreEqual(1, firstRound.Where(h => h.Host.Equals(host)).Count());
}

//test the same but in the following times
var followingRounds = new List<Host>();
var followingRounds = new List<HostShard>();
for (var i = 0; i < 10; i++)
{
followingRounds.AddRange(policy.NewQueryPlan(null, new SimpleStatement()));
Expand Down Expand Up @@ -98,7 +98,7 @@ public void RoundRobinIsCyclicTestInParallel()
{
//Slow down to try to execute it at the same time
Thread.Sleep(50);
resultingHosts.Add(h);
resultingHosts.Add(h.Host);
}
Assert.AreEqual(hostLength, resultingHosts.Count);
Assert.AreEqual(hostLength, resultingHosts.Distinct().Count());
Expand Down Expand Up @@ -143,19 +143,19 @@ public void DCAwareRoundRobinPolicyNeverHitsRemoteWhenSet()
var firstRound = balancedHosts.ToList();

//Returns only local hosts
Assert.AreEqual(hostLength - 2, firstRound.Count(h => h.Datacenter == "local"));
Assert.AreEqual(0, firstRound.Count(h => h.Datacenter != "local"));
Assert.AreEqual(hostLength - 2, firstRound.Count(h => h.Host.Datacenter == "local"));
Assert.AreEqual(0, firstRound.Count(h => h.Host.Datacenter != "local"));

//following rounds: test it multiple times
var followingRounds = new List<Host>();
var followingRounds = new List<HostShard>();
for (var i = 0; i < 10; i++)
{
followingRounds.AddRange(policy.NewQueryPlan(null, new SimpleStatement()).ToList());
}
Assert.AreEqual(10 * (hostLength - 2), followingRounds.Count);

//Check that there aren't remote nodes.
Assert.AreEqual(0, followingRounds.Count(h => h.Datacenter != "local"));
Assert.AreEqual(0, followingRounds.Count(h => h.Host.Datacenter != "local"));
}

[Test]
Expand Down Expand Up @@ -194,11 +194,11 @@ public void DCAwareRoundRobinYieldsRemoteNodesAtTheEnd()
var h = hosts[i];
if (i < localHostsLength)
{
Assert.AreEqual(localDc, h.Datacenter);
Assert.AreEqual(localDc, h.Host.Datacenter);
}
else
{
Assert.AreNotEqual(localDc, h.Datacenter);
Assert.AreNotEqual(localDc, h.Host.Datacenter);
}
}
};
Expand Down Expand Up @@ -278,11 +278,11 @@ public void DCAwareRoundRobinPolicyTestInParallel()
.Where(g => g.Count() > 1)
.Select(y => y.Key)
.Count());
firstHosts.Add(hosts[0]);
firstHosts.Add(hosts[0].Host);
//Add to the general list
foreach (var h in hosts)
{
allHosts.Add(h);
allHosts.Add(h.Host);
}
};

Expand Down Expand Up @@ -373,7 +373,7 @@ public void DCAwareRoundRobinPolicyWithNodesChanging()
policy.Initialize(clusterMock.Object);

var hostYielded = new ConcurrentBag<IEnumerable<Host>>();
Action action = () => hostYielded.Add(policy.NewQueryPlan(null, null).ToList());
Action action = () => hostYielded.Add(policy.NewQueryPlan(null, null).ToList().Select(h => h.Host));

//Invoke without nodes changing
TestHelper.ParallelInvoke(action, 100);
Expand Down Expand Up @@ -496,7 +496,7 @@ public void TokenAwarePolicyReturnsLocalReplicasFirst()
//The host at with address == k || address == k + n
var address = TestHelper.GetLastAddressByte(h);
return address == i || address == i + n;
}).ToList();
}).Select(h => new HostShard(h, -1)).ToList();
})
.Verifiable();

Expand All @@ -509,7 +509,7 @@ public void TokenAwarePolicyReturnsLocalReplicasFirst()
//5 local hosts + 2 remote hosts
Assert.AreEqual(7, hosts.Count);
//local replica first
Assert.AreEqual(1, TestHelper.GetLastAddressByte(hosts[0]));
Assert.AreEqual(1, TestHelper.GetLastAddressByte(hosts[0].Host));
clusterMock.Verify();

//key for host :::2 and :::5
Expand All @@ -518,11 +518,11 @@ public void TokenAwarePolicyReturnsLocalReplicasFirst()
hosts = policy.NewQueryPlan(null, new SimpleStatement().SetRoutingKey(k)).ToList();
Assert.AreEqual(7, hosts.Count);
//local replicas first
CollectionAssert.AreEquivalent(new[] { 2, 5 }, hosts.Take(2).Select(TestHelper.GetLastAddressByte));
CollectionAssert.AreEquivalent(new[] { 2, 5 }, hosts.Take(2).Select(h => TestHelper.GetLastAddressByte(h.Host)));
//next should be local nodes
Assert.AreEqual("dc1", hosts[2].Datacenter);
Assert.AreEqual("dc1", hosts[3].Datacenter);
Assert.AreEqual("dc1", hosts[4].Datacenter);
Assert.AreEqual("dc1", hosts[2].Host.Datacenter);
Assert.AreEqual("dc1", hosts[3].Host.Datacenter);
Assert.AreEqual("dc1", hosts[4].Host.Datacenter);
clusterMock.Verify();
}

Expand Down Expand Up @@ -557,7 +557,7 @@ public void TokenAwarePolicyRoundRobinsOnLocalReplicas()
//The host at with address == k and the next one
var address = TestHelper.GetLastAddressByte(h);
return address == i || address == i + 1;
}).ToList();
}).Select(h => new HostShard(h, -1)).ToList();
})
.Verifiable();

Expand All @@ -571,7 +571,7 @@ public void TokenAwarePolicyRoundRobinsOnLocalReplicas()
Action action = () =>
{
var h = policy.NewQueryPlan(null, new SimpleStatement().SetRoutingKey(k)).First();
firstHosts.Add(h);
firstHosts.Add(h.Host);
};
TestHelper.ParallelInvoke(action, times);
Assert.AreEqual(times, firstHosts.Count);
Expand Down Expand Up @@ -605,16 +605,16 @@ public void TokenAwarePolicyReturnsChildHostsIfNoRoutingKey()
//No routing key
var hosts = policy.NewQueryPlan(null, new SimpleStatement()).ToList();
//2 localhosts
Assert.AreEqual(2, hosts.Count(h => policy.Distance(h) == HostDistance.Local));
Assert.AreEqual("dc1", hosts[0].Datacenter);
Assert.AreEqual("dc1", hosts[1].Datacenter);
Assert.AreEqual(2, hosts.Count(h => policy.Distance(h.Host) == HostDistance.Local));
Assert.AreEqual("dc1", hosts[0].Host.Datacenter);
Assert.AreEqual("dc1", hosts[1].Host.Datacenter);
clusterMock.Verify();
//No statement
hosts = policy.NewQueryPlan(null, null).ToList();
//2 localhosts
Assert.AreEqual(2, hosts.Count(h => policy.Distance(h) == HostDistance.Local));
Assert.AreEqual("dc1", hosts[0].Datacenter);
Assert.AreEqual("dc1", hosts[1].Datacenter);
Assert.AreEqual(2, hosts.Count(h => policy.Distance(h.Host) == HostDistance.Local));
Assert.AreEqual("dc1", hosts[0].Host.Datacenter);
Assert.AreEqual("dc1", hosts[1].Host.Datacenter);
clusterMock.Verify();
}

Expand Down
Loading