diff --git a/zktraffic/tests/resources/zab_commitandactivate.pcap b/zktraffic/tests/resources/zab_commitandactivate.pcap new file mode 100644 index 0000000..7a20e4f Binary files /dev/null and b/zktraffic/tests/resources/zab_commitandactivate.pcap differ diff --git a/zktraffic/tests/resources/zab_followerinfo.pcap b/zktraffic/tests/resources/zab_followerinfo.pcap new file mode 100644 index 0000000..f868213 Binary files /dev/null and b/zktraffic/tests/resources/zab_followerinfo.pcap differ diff --git a/zktraffic/tests/resources/zab_revalidate.pcap b/zktraffic/tests/resources/zab_revalidate.pcap new file mode 100644 index 0000000..b80500e Binary files /dev/null and b/zktraffic/tests/resources/zab_revalidate.pcap differ diff --git a/zktraffic/tests/test_zab.py b/zktraffic/tests/test_zab.py index e4c6ec0..6bd5493 100644 --- a/zktraffic/tests/test_zab.py +++ b/zktraffic/tests/test_zab.py @@ -22,11 +22,14 @@ from zktraffic.zab.quorum_packet import ( Ack, Commit, + CommitAndActivate, + FollowerInfo, PacketType, Ping, Proposal, QuorumPacket, Request, + Revalidate ) from .common import get_full_path @@ -35,6 +38,19 @@ LEADER_PORT = 20022 +def run_sniffer(handler, pcapfile, port=LEADER_PORT): + sniffer = Sniffer( + iface="test", + port=port, + msg_cls=QuorumPacket, + handler=handler, + dump_bad_packet=False, + start=False + ) + + sniffer.run(offline=get_full_path(pcapfile)) + + class ZabTestCase(unittest.TestCase): def test_basic(self): payload = '%s%s%s' % ( @@ -65,16 +81,7 @@ def handler(message): elif isinstance(message, Ping): pings.append(message) - sniffer = Sniffer( - iface="test", - port=LEADER_PORT, - msg_cls=QuorumPacket, - handler=handler, - dump_bad_packet=False, - start=False - ) - - sniffer.run(offline=get_full_path('zab_request')) + run_sniffer(handler, "zab_request") # requests assert len(requests) == 3 @@ -120,3 +127,48 @@ def handler(message): assert len(pings) == 57 # for such a short run, this numbers looks too high assert pings[0].zxid_literal == "0x100000000" + + def test_revalidate(self): + revalidates = [] + + def handler(message): + if isinstance(message, Revalidate): + revalidates.append(message) + + run_sniffer(handler, "zab_revalidate") + + assert len(revalidates) == 2 + assert revalidates[0].zxid == -1 + assert revalidates[0].session_id_literal == "0x3001df405ba0000" + assert revalidates[0].timeout == 10000 + + def test_commitandactivate(self): + commitandactivates = [] + + def handler(message): + if isinstance(message, CommitAndActivate): + commitandactivates.append(message) + + run_sniffer(handler, "zab_commitandactivate") + + assert len(commitandactivates) == 4 # 2 join + 2 leaving + assert commitandactivates[0].suggested_leader_id == 3 + assert commitandactivates[0].zxid_literal == "0x10000000d" + + def test_followerinfo(self): + followerinfos = [] + + def handler(message): + if isinstance(message, FollowerInfo): + followerinfos.append(message) + + run_sniffer(handler, "zab_followerinfo") + + assert len(followerinfos) == 2 # this is a 3 members (participants) cluster + assert followerinfos[0].config_version == 0 + + # Note that this is *not* the same version as in FLE (which is -65536 and a long) + assert followerinfos[0].protocol_version == 65536 + + assert followerinfos[0].sid == 2 + assert followerinfos[0].zxid_literal == "0x0" diff --git a/zktraffic/zab/quorum_packet.py b/zktraffic/zab/quorum_packet.py index c1dc393..8def29b 100644 --- a/zktraffic/zab/quorum_packet.py +++ b/zktraffic/zab/quorum_packet.py @@ -273,8 +273,7 @@ class Inform(Proposal): class CommitAndActivate(QuorumPacket): PTYPE = PacketType.COMMITANDACTIVATE __slots__ = ("suggested_leader_id") - def __init__(self, timestamp, src, dst, ptype, zxid, length, - suggested_leader_id): + def __init__(self, timestamp, src, dst, ptype, zxid, length, suggested_leader_id): super(CommitAndActivate, self).__init__(timestamp, src, dst, ptype, zxid, length) self.suggested_leader_id = suggested_leader_id @@ -282,8 +281,7 @@ def __init__(self, timestamp, src, dst, ptype, zxid, length, def with_params(cls, timestamp, src, dst, ptype, zxid, data, offset): data_len, offset = read_number(data, offset) suggested_leader_id, offset = read_long(data, offset) - return cls(timestamp, src, dst, ptype, zxid, len(data), - suggested_leader_id) + return cls(timestamp, src, dst, ptype, zxid, len(data), suggested_leader_id) class NewLeader(QuorumPacket): @@ -330,8 +328,7 @@ class ObserverInfo(FollowerInfo): class LeaderInfo(QuorumPacket): PTYPE = PacketType.LEADERINFO __slots__ = ("protocol_version") - def __init__(self, timestamp, src, dst, ptype, zxid, length, - protocol_version): + def __init__(self, timestamp, src, dst, ptype, zxid, length, protocol_version): super(LeaderInfo, self).__init__(timestamp, src, dst, ptype, zxid, length) self.protocol_version = protocol_version @@ -339,15 +336,13 @@ def __init__(self, timestamp, src, dst, ptype, zxid, length, def with_params(cls, timestamp, src, dst, ptype, zxid, data, offset): data_len, offset = read_number(data, offset) protocol_version, offset = read_number(data, offset) - return cls(timestamp, src, dst, ptype, zxid, len(data), - protocol_version) + return cls(timestamp, src, dst, ptype, zxid, len(data), protocol_version) class AckEpoch(QuorumPacket): PTYPE = PacketType.ACKEPOCH __slots__ = ("epoch") - def __init__(self, timestamp, src, dst, ptype, zxid, length, - epoch): + def __init__(self, timestamp, src, dst, ptype, zxid, length, epoch): super(AckEpoch, self).__init__(timestamp, src, dst, ptype, zxid, length) self.epoch = epoch @@ -355,8 +350,7 @@ def __init__(self, timestamp, src, dst, ptype, zxid, length, def with_params(cls, timestamp, src, dst, ptype, zxid, data, offset): data_len, offset = read_number(data, offset) epoch, offset = read_number(data, offset) - return cls(timestamp, src, dst, ptype, zxid, len(data), - epoch) + return cls(timestamp, src, dst, ptype, zxid, len(data), epoch) class InformAndActivate(Proposal):