Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
add cross-partition short-read-protection support
  • Loading branch information
jasonstack committed Sep 13, 2017
1 parent 6d77ace commit e706952
Showing 1 changed file with 47 additions and 1 deletion.
48 changes: 47 additions & 1 deletion consistency_test.py
Expand Up @@ -821,7 +821,7 @@ def test_13747(self):
# prior to CASSANDRA-13747 this would cause an assertion in short read protection code
node2.start(wait_other_notice=True)
stmt = SimpleStatement("SELECT DISTINCT token(id), id FROM test.test;",
consistency_level = ConsistencyLevel.ALL)
consistency_level=ConsistencyLevel.ALL)
result = list(session.execute(stmt))
assert_length_equal(result, 5)

Expand Down Expand Up @@ -962,6 +962,52 @@ def short_read_quorum_delete_test(self):
node3.stop(wait_other_notice=True)
assert_none(session, "SELECT * FROM t WHERE id = 0 LIMIT 1", cl=ConsistencyLevel.QUORUM)

@since('3.0')
def short_read_partitions_delete_with_limit_test(self):
self._short_read_partitions_delete_test(use_limit=True)

@since('3.0')
def short_read_partitions_delete_with_fetchsize_test(self):
self._short_read_partitions_delete_test(use_limit=False)

def _short_read_partitions_delete_test(self, use_limit=True):
cluster = self.cluster
cluster.set_configuration_options(values={'hinted_handoff_enabled': False})
cluster.set_batch_commitlog(enabled=True)
cluster.populate(2).start(wait_other_notice=True)
node1, node2 = self.cluster.nodelist()

session = self.patient_cql_connection(node1)
create_ks(session, 'ks', 2)
session.execute("CREATE TABLE t (k int, c int, PRIMARY KEY(k, c)) WITH read_repair_chance = 0.0")

# we write 1 and 2 in a partition: all nodes get it.
session.execute(SimpleStatement("INSERT INTO t (k, c) VALUES (1, 1) USING TIMESTAMP 1", consistency_level=ConsistencyLevel.ALL))
session.execute(SimpleStatement("INSERT INTO t (k, c) VALUES (2, 1) USING TIMESTAMP 2", consistency_level=ConsistencyLevel.ALL))

# we delete partition 1: only node 1 gets it.
node2.flush()
node2.stop(wait_other_notice=True)
session = self.patient_cql_connection(node1, 'ks', consistency_level=ConsistencyLevel.ONE)
session.execute(SimpleStatement("DELETE FROM t USING TIMESTAMP 3 WHERE k = 1"))
node2.start(wait_other_notice=True)

# we delete partition 2: only node 2 gets it.
node1.flush()
node1.stop(wait_other_notice=True)
session = self.patient_cql_connection(node2, 'ks', consistency_level=ConsistencyLevel.ONE)
session.execute(SimpleStatement("DELETE FROM t USING TIMESTAMP 4 WHERE k = 2"))
node1.start(wait_other_notice=True)

# read from both nodes
if use_limit:
session = self.patient_cql_connection(node1, 'ks', consistency_level=ConsistencyLevel.ALL)
assert_none(session, "SELECT * FROM t LIMIT 1")
else:
session = self.patient_cql_connection(node1, 'ks', consistency_level=ConsistencyLevel.ALL)
session.default_fetch_size = 1
assert_none(session, "SELECT * FROM t")

def readrepair_test(self):
cluster = self.cluster
cluster.set_configuration_options(values={'hinted_handoff_enabled': False})
Expand Down

0 comments on commit e706952

Please sign in to comment.