Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -232,54 +232,55 @@ public void every_statement_should_deliver_tablet_info() {
// Preparation of the statements without KS will fail on the session with no ks specified
continue;
}
CqlSession session = sessionEntry.getValue().get();
// Empty out tablets information
if (session.getMetadata().getTabletMap().isPresent()) {
session
.getMetadata()
.getTabletMap()
.get()
.removeByKeyspace(CqlIdentifier.fromCql(KEYSPACE_NAME));
}
Statement stmt;
try {
stmt = stmtEntry.getValue().apply(session);
} catch (Exception e) {
RuntimeException ex =
new RuntimeException(
try (CqlSession session = sessionEntry.getValue().get()) {
// Empty out tablets information
if (session.getMetadata().getTabletMap().isPresent()) {
session
.getMetadata()
.getTabletMap()
.get()
.removeByKeyspace(CqlIdentifier.fromCql(KEYSPACE_NAME));
}
Statement stmt;
try {
stmt = stmtEntry.getValue().apply(session);
} catch (Exception e) {
RuntimeException ex =
new RuntimeException(
String.format(
"Failed to build statement %s on session %s",
stmtEntry.getKey(), sessionEntry.getKey()));
ex.addSuppressed(e);
throw ex;
}
try {
if (!executeOnAllHostsAndReturnIfResultHasTabletsInfo(session, stmt)) {
testErrors.add(
String.format(
"Failed to build statement %s on session %s",
"Statement %s on session %s got no tablet info",
stmtEntry.getKey(), sessionEntry.getKey()));
ex.addSuppressed(e);
throw ex;
}
try {
if (!executeOnAllHostsAndReturnIfResultHasTabletsInfo(session, stmt)) {
continue;
}
} catch (Exception e) {
testErrors.add(
String.format(
"Statement %s on session %s got no tablet info",
"Failed to execute statement %s on session %s: %s",
stmtEntry.getKey(), sessionEntry.getKey(), e));
continue;
}
if (!waitSessionLearnedTabletInfo(session)) {
testErrors.add(
String.format(
"Statement %s on session %s did not trigger session tablets update",
stmtEntry.getKey(), sessionEntry.getKey()));
continue;
}
} catch (Exception e) {
testErrors.add(
String.format(
"Failed to execute statement %s on session %s: %s",
stmtEntry.getKey(), sessionEntry.getKey(), e));
continue;
}
if (!waitSessionLearnedTabletInfo(session)) {
testErrors.add(
String.format(
"Statement %s on session %s did not trigger session tablets update",
stmtEntry.getKey(), sessionEntry.getKey()));
continue;
}
if (!checkIfRoutedProperly(session, stmt)) {
testErrors.add(
String.format(
"Statement %s on session %s was routed to different nodes",
stmtEntry.getKey(), sessionEntry.getKey()));
if (!checkIfRoutedProperly(session, stmt)) {
testErrors.add(
String.format(
"Statement %s on session %s was routed to different nodes",
stmtEntry.getKey(), sessionEntry.getKey()));
}
}
}
}
Expand All @@ -293,45 +294,47 @@ public void every_statement_should_deliver_tablet_info() {

@Test
public void should_receive_each_tablet_exactly_once() {
CqlSession session =
CqlSession.builder().addContactEndPoints(CCM_RULE.getContactPoints()).build();
int counter = 0;
PreparedStatement preparedStatement = session.prepare(STMT_INSERT);
for (int i = 1; i <= QUERIES; i++) {
if (executeAndReturnIfResultHasTabletsInfo(session, preparedStatement.bind(i, i))) {
counter++;
try (CqlSession session =
CqlSession.builder().addContactEndPoints(CCM_RULE.getContactPoints()).build()) {
PreparedStatement preparedStatement = session.prepare(STMT_INSERT);
for (int i = 1; i <= QUERIES; i++) {
if (executeAndReturnIfResultHasTabletsInfo(session, preparedStatement.bind(i, i))) {
counter++;
}
}
Assert.assertEquals(INITIAL_TABLETS, counter);
assertSessionTabletMapIsFilled(session);
}
Assert.assertEquals(INITIAL_TABLETS, counter);
assertSessionTabletMapIsFilled(session);
session.close();

session = CqlSession.builder().addContactEndPoints(CCM_RULE.getContactPoints()).build();
counter = 0;
preparedStatement = session.prepare(STMT_SELECT);
for (int i = 1; i <= QUERIES; i++) {
if (executeAndReturnIfResultHasTabletsInfo(session, preparedStatement.bind(i, i))) {
counter++;

try (CqlSession session =
CqlSession.builder().addContactEndPoints(CCM_RULE.getContactPoints()).build()) {
counter = 0;
PreparedStatement preparedStatement = session.prepare(STMT_SELECT);
for (int i = 1; i <= QUERIES; i++) {
if (executeAndReturnIfResultHasTabletsInfo(session, preparedStatement.bind(i, i))) {
counter++;
}
}
}

LOG.debug("Ran first set of queries");
LOG.debug("Ran first set of queries");

// With enough queries we should hit a wrong node for each tablet exactly once.
Assert.assertEquals(INITIAL_TABLETS, counter);
assertSessionTabletMapIsFilled(session);
// With enough queries we should hit a wrong node for each tablet exactly once.
Assert.assertEquals(INITIAL_TABLETS, counter);
assertSessionTabletMapIsFilled(session);

// All tablet information should be available by now (unless for some reason cluster did sth on
// its own)
// We should not receive any tablet payloads now, since they are sent only on mismatch.
for (int i = 1; i <= QUERIES; i++) {
// All tablet information should be available by now (unless for some reason cluster did sth
// on its own). We should not receive any tablet payloads now, since they are sent only on
// mismatch.
for (int i = 1; i <= QUERIES; i++) {

ResultSet rs = session.execute(preparedStatement.bind(i, i));
Map<String, ByteBuffer> payload = rs.getExecutionInfo().getIncomingPayload();
ResultSet rs = session.execute(preparedStatement.bind(i, i));
Map<String, ByteBuffer> payload = rs.getExecutionInfo().getIncomingPayload();

if (payload.containsKey(TabletInfo.TABLETS_ROUTING_V1_CUSTOM_PAYLOAD_KEY)) {
throw new RuntimeException(
"Received non empty payload with tablets routing information: " + payload);
if (payload.containsKey(TabletInfo.TABLETS_ROUTING_V1_CUSTOM_PAYLOAD_KEY)) {
throw new RuntimeException(
"Received non empty payload with tablets routing information: " + payload);
}
}
}
}
Expand Down
Loading