Skip to content

Commit

Permalink
tests(zab): covers CommitAndActivate, FollowerInfo & Revalidate
Browse files Browse the repository at this point in the history
Signed-off-by: Raul Gutierrez S <rgs@twitter.com>
  • Loading branch information
Raul Gutierrez S committed Jul 10, 2015
1 parent b0c6456 commit 37eca6a
Show file tree
Hide file tree
Showing 5 changed files with 68 additions and 22 deletions.
Binary file not shown.
Binary file added zktraffic/tests/resources/zab_followerinfo.pcap
Binary file not shown.
Binary file added zktraffic/tests/resources/zab_revalidate.pcap
Binary file not shown.
72 changes: 62 additions & 10 deletions zktraffic/tests/test_zab.py
Expand Up @@ -22,11 +22,14 @@
from zktraffic.zab.quorum_packet import (
Ack,
Commit,
CommitAndActivate,
FollowerInfo,
PacketType,
Ping,
Proposal,
QuorumPacket,
Request,
Revalidate
)

from .common import get_full_path
Expand All @@ -35,6 +38,19 @@
LEADER_PORT = 20022


def run_sniffer(handler, pcapfile, port=LEADER_PORT):
sniffer = Sniffer(
iface="test",
port=port,
msg_cls=QuorumPacket,
handler=handler,
dump_bad_packet=False,
start=False
)

sniffer.run(offline=get_full_path(pcapfile))


class ZabTestCase(unittest.TestCase):
def test_basic(self):
payload = '%s%s%s' % (
Expand Down Expand Up @@ -65,16 +81,7 @@ def handler(message):
elif isinstance(message, Ping):
pings.append(message)

sniffer = Sniffer(
iface="test",
port=LEADER_PORT,
msg_cls=QuorumPacket,
handler=handler,
dump_bad_packet=False,
start=False
)

sniffer.run(offline=get_full_path('zab_request'))
run_sniffer(handler, "zab_request")

# requests
assert len(requests) == 3
Expand Down Expand Up @@ -120,3 +127,48 @@ def handler(message):
assert len(pings) == 57 # for such a short run, this numbers looks too high

assert pings[0].zxid_literal == "0x100000000"

def test_revalidate(self):
revalidates = []

def handler(message):
if isinstance(message, Revalidate):
revalidates.append(message)

run_sniffer(handler, "zab_revalidate")

assert len(revalidates) == 2
assert revalidates[0].zxid == -1
assert revalidates[0].session_id_literal == "0x3001df405ba0000"
assert revalidates[0].timeout == 10000

def test_commitandactivate(self):
commitandactivates = []

def handler(message):
if isinstance(message, CommitAndActivate):
commitandactivates.append(message)

run_sniffer(handler, "zab_commitandactivate")

assert len(commitandactivates) == 4 # 2 join + 2 leaving
assert commitandactivates[0].suggested_leader_id == 3
assert commitandactivates[0].zxid_literal == "0x10000000d"

def test_followerinfo(self):
followerinfos = []

def handler(message):
if isinstance(message, FollowerInfo):
followerinfos.append(message)

run_sniffer(handler, "zab_followerinfo")

assert len(followerinfos) == 2 # this is a 3 members (participants) cluster
assert followerinfos[0].config_version == 0

# Note that this is *not* the same version as in FLE (which is -65536 and a long)
assert followerinfos[0].protocol_version == 65536

assert followerinfos[0].sid == 2
assert followerinfos[0].zxid_literal == "0x0"
18 changes: 6 additions & 12 deletions zktraffic/zab/quorum_packet.py
Expand Up @@ -273,17 +273,15 @@ class Inform(Proposal):
class CommitAndActivate(QuorumPacket):
PTYPE = PacketType.COMMITANDACTIVATE
__slots__ = ("suggested_leader_id")
def __init__(self, timestamp, src, dst, ptype, zxid, length,
suggested_leader_id):
def __init__(self, timestamp, src, dst, ptype, zxid, length, suggested_leader_id):
super(CommitAndActivate, self).__init__(timestamp, src, dst, ptype, zxid, length)
self.suggested_leader_id = suggested_leader_id

@classmethod
def with_params(cls, timestamp, src, dst, ptype, zxid, data, offset):
data_len, offset = read_number(data, offset)
suggested_leader_id, offset = read_long(data, offset)
return cls(timestamp, src, dst, ptype, zxid, len(data),
suggested_leader_id)
return cls(timestamp, src, dst, ptype, zxid, len(data), suggested_leader_id)


class NewLeader(QuorumPacket):
Expand Down Expand Up @@ -330,33 +328,29 @@ class ObserverInfo(FollowerInfo):
class LeaderInfo(QuorumPacket):
PTYPE = PacketType.LEADERINFO
__slots__ = ("protocol_version")
def __init__(self, timestamp, src, dst, ptype, zxid, length,
protocol_version):
def __init__(self, timestamp, src, dst, ptype, zxid, length, protocol_version):
super(LeaderInfo, self).__init__(timestamp, src, dst, ptype, zxid, length)
self.protocol_version = protocol_version

@classmethod
def with_params(cls, timestamp, src, dst, ptype, zxid, data, offset):
data_len, offset = read_number(data, offset)
protocol_version, offset = read_number(data, offset)
return cls(timestamp, src, dst, ptype, zxid, len(data),
protocol_version)
return cls(timestamp, src, dst, ptype, zxid, len(data), protocol_version)


class AckEpoch(QuorumPacket):
PTYPE = PacketType.ACKEPOCH
__slots__ = ("epoch")
def __init__(self, timestamp, src, dst, ptype, zxid, length,
epoch):
def __init__(self, timestamp, src, dst, ptype, zxid, length, epoch):
super(AckEpoch, self).__init__(timestamp, src, dst, ptype, zxid, length)
self.epoch = epoch

@classmethod
def with_params(cls, timestamp, src, dst, ptype, zxid, data, offset):
data_len, offset = read_number(data, offset)
epoch, offset = read_number(data, offset)
return cls(timestamp, src, dst, ptype, zxid, len(data),
epoch)
return cls(timestamp, src, dst, ptype, zxid, len(data), epoch)


class InformAndActivate(Proposal):
Expand Down

0 comments on commit 37eca6a

Please sign in to comment.