Skip to content

Commit

Permalink
Merge pull request #7 from ltn22/pytest_implementation_2
Browse files Browse the repository at this point in the history
Squeleton pytest implementation
  • Loading branch information
tkerdonc committed Dec 7, 2017
2 parents eaeda08 + e128da9 commit e49db3e
Show file tree
Hide file tree
Showing 12 changed files with 416 additions and 26 deletions.
2 changes: 2 additions & 0 deletions .coveragerc
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
[run]
omit = python/test/*
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
*.pyc
14 changes: 14 additions & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
language: python
python:
- 2.7
- 3.5
- 3.6
- nightly
install:
- pip install pytest-cov
- pip install coveralls
script:
- py.test --cov=python --cov-branch python/test/

after_success:
- coveralls
9 changes: 1 addition & 8 deletions python/Parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ def parser(self, packet):
print ('argument type', type(packet))

# The "IP_version" field is pulled apart
firstByte = unpack('!BBHHBBQQQQHHHHBBH', packet)
firstByte = unpack('!BBHHBBQQQQHHHHBBH', packet[:52])
print(firstByte)
self.header_fields["IPv6.version", 1] = [firstByte[0] >> 4, 4, 'fixed']
self.header_fields["IPv6.trafficClass", 1] = [(firstByte[0] & 0x0F) << 4 | (firstByte[1] & 0xF0) >> 4, 8, 'fixed']
Expand Down Expand Up @@ -137,10 +137,3 @@ def parser(self, packet):
raise ValueError("error in CoAP option parsing")

return self.header_fields, None

# #ipv6 = bytearray(b'`\x12\x34\x56\x00\x1e\x11\x1e\xfe\x80\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x01\xfe\x80\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x02\x162\x163\x00\x1e\x00\x00A\x02\x00\x01\n\xb3foo\xff\x84\x01\x82  &Ehello')
# ipv6 = bytearray(b'`\x12\x34\x56\x00\x1e\x11\x1e\xfe\x80\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x01\xfe\x80\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x02\x162\x163\x00\x1e\x00\x00A\x02\x00\x01\n\xb3foo\x03bar\x06ABCD==Fk=eth0\xff\x84\x01\x82 &Ehello')
#
# p = Parser()
# f = p.parser(ipv6)
# p.dump()
18 changes: 0 additions & 18 deletions python/RuleMngt.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
from Parser import Parser
import re
import struct

Expand Down Expand Up @@ -250,20 +249,3 @@ def FindRuleFromHeader(self, headers, direction):
# ["CoAP.Uri-Query", 1, "up", "k=", "MSB(16)", "LSB"],
# ["CoAP.Option-End", 1, "up", 0xFF, "equal", "not-sent"]
# ]}
#
#
# ipv6 = bytearray(b'`\x00\x00\x00\x00-\x11\x1e\xfe\x80\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x01\xfe\x80\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x02\x162\x163\x00-\x00\x00A\x02\x00\x01\x82\xb3foo\x03bar\x06ABCD==Fk=eth0\xff\x82\x19\x0bd\x1a\x00\x01\x8e\x96')
#
# p = Parser()
# f, data = p.parser(ipv6)
#
# RM = RuleManager()
# RM.addRule(rule_coap0)
# RM.addRule(rule_coap1)
#
# print("=====")
# print("F", f)
# print (len(f))
#
# print ("rule = ", RM.FindRuleFromHeader(f, "up"))
# print ("rule = ", RM.FindRuleFromID(1))
Empty file added python/__init__.py
Empty file.
Empty file added python/test/__init__.py
Empty file.
46 changes: 46 additions & 0 deletions python/test/test_BitBuffer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
from .. import BitBuffer

def test___init__():
buf = BitBuffer.BitBuffer()
assert (len(buf._buf) == 0)
assert (buf._bit_index == 0)

def test___init__with_value():
bitValue = b'1011'
buf = BitBuffer.BitBuffer(bitValue)
assert(len(buf._buf) == len(bitValue))
assert(buf._bit_index == 0)

#def test_add_bit():
# buf = BitBuffer.BitBuffer()
# assert(buf._buf == bytearray(b''))
# buf.add_bit(0)#
# assert(buf._buf == bytearray(b'0'))

#def test_next_bit():
# buf = BitBuffer.BitBuffer(b'01')
# b = buf.next_bit()
# assert(b == 0)
# b = buf.next_bit()
# assert(b == 1)
# b = buf.next_bit()
# assert(b == 0)

#def test_add_byte():
# buf = BitBuffer.BitBuffer(b'01010000')
# byte = 0x1
# buf.add_byte(byte)
# print buf._buf
# assert (False)

#def test_add_bytes():
# buf = BitBuffer.BitBuffer(b'01')
# assert (True)

def test_buffer():
buf = BitBuffer.BitBuffer(b'01')
assert (buf.buffer() == b'01')

#def test_size():
# buf = BitBuffer.BitBuffer(b'01')
# assert (buf.size() == 2)
45 changes: 45 additions & 0 deletions python/test/test_Compressor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
from .. import Compressor
from .. import BitBuffer

def test___init__():
comp = Compressor.Compressor(None)

assert(len(comp.context) == 0)
assert( "not-sent" in comp.CompressionActions)
assert( "value-sent" in comp.CompressionActions)
assert( "mapping-sent" in comp.CompressionActions)
assert( "LSB" in comp.CompressionActions)
assert( "compute-length" in comp.CompressionActions)
assert( "compute-checksum" in comp.CompressionActions)


def test_CA_notSent():
comp = Compressor.Compressor(None)
assert(comp.CA_notSent('', '', '', 0, 0, 0) == None)

# No length taken into account ?
#def test_CA_valueSent_str():
# comp = Compressor.Compressor(None)
# value = '01001'
# buf = BitBuffer.BitBuffer()
# comp.CA_valueSent(buf, '', value, 0, 0, 0)
# assert(buf._buf == value)

#def test_CA_valueSent_int():
# comp = Compressor.Compressor(None)
# buf = BitBuffer.BitBuffer()
# value = int('1001101', 2)
# comp.CA_valueSent(buf, b'', value, 4*16, 0, 0)
# assert(buf._buf == value)

#def test_CA_mappingSent():
# comp = Compressor.Compressor(None)
# TV = [1, 2, 34]
# FV = 2
# buf = BitBuffer.BitBuffer()
# comp.CA_mappingSent(buf, TV, FV, 0, 0, 0)

def test_CA_LSB():
assert(True)
def test_apply ():
assert(True)
160 changes: 160 additions & 0 deletions python/test/test_Decompressor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
from .. import Decompressor
from .. import BitBuffer
from .. import RuleMngt

def test___init__():
dec = Decompressor.Decompressor(None)
actionEntries = [
"not-sent",
"value-sent",
"mapping-sent",
"LSB",
"compute-length",
"compute-checksum",
]
for a in actionEntries:
assert(a in dec.DecompressionActions)

fieldSizeEntries = [
"IPv6.version",
"IPv6.trafficClass",
"IPv6.flowLabel",
"IPv6.payloadLength",
"IPv6.nextHeader",
"IPv6.hopLimit",
"IPv6.checksum",
"IPv6.prefixES",
"IPv6.iidES",
"IPv6.prefixLA",
"IPv6.iidLA",
"UDP.PortES",
"UDP.PortLA",
"UDP.length",
"UDP.checksum",
"CoAP.version",
"CoAP.type",
"CoAP.tokenLength",
"CoAP.code",
"CoAP.messageID",
"CoAP.token",
"CoAP.Uri-Path",
"CoAP.Content-Format",
"CoAP.Uri-Query",
"CoAP.Option-End",
]
for f in fieldSizeEntries:
assert(f in dec.field_size)

#def test_DA_notSent():
# buf = BitBuffer.BitBuffer()
# dec = Decompressor.Decompressor(None)
# TV = 1
# dec.DA_notSent(buf, [], TV, 4, 'fixed', '', '')
# assert(len(buf._buf) == 0)

#Uncomment when 'buff' typo is fixed in implem
#def test_DA_valueSent_fixed():
# buf = BitBuffer.BitBuffer()
# dec = Decompressor.Decompressor(None)
# headers = BitBuffer.BitBuffer('0101')
# length = 4
# dec.DA_valueSent(buf, headers, '', length, 'fixed', '', 'direct')
#
# assert(buf._buf == headers._buf)


#Uncomment when 'opt_num' error is fixed (uninitialized)
#def test_DA_valueSent_variable():
# buf = BitBuffer.BitBuffer()
# dec = Decompressor.Decompressor(None)
# headers = BitBuffer.BitBuffer('1101')
# length = 8
# dec.DA_valueSent(buf, headers, '', length, 'variable', '', {'CoAPOption':11})
#
# assert(buf._buf == headers._buf)


#Review when bitbuffer fully tested
#def test_DA_mappingSent():
# TV = [0, 2, 1]
# buf = BitBuffer.BitBuffer()
# headers = BitBuffer.BitBuffer(b'01')
# dec = Decompressor.Decompressor(None)
# dec.DA_mappingSent(buf, headers, TV, 2, "fixed", "", "")
# assert(buf._buf == 2)

#Review when append thing in Decompressor is fixed
#def test_DA_LSB_fixed_str():
# TV = b'1101'
# buf = BitBuffer.BitBuffer()
# headers = BitBuffer.BitBuffer(b'010101')
# dec = Decompressor.Decompressor(None)
# dec.DA_LSB(buf, headers, TV, 8, 'fixed', 4, '')
# assert(buf._buf == '11010101')


#Review when bitbuffer fully tested
#def test_DA_LSB_fixed_int():
# TV = 12 # 1100
# buf = BitBuffer.BitBuffer()
# headers = BitBuffer.BitBuffer(b'0101')
# dec = Decompressor.Decompressor(None)
# dec.DA_LSB(buf, headers, TV, 8, 'fixed', 4, '')
# assert(str(buf._buf) == '11000101')

def test_DA_computeLength():
buf = BitBuffer.BitBuffer()
headers = BitBuffer.BitBuffer(b'12')
dec = Decompressor.Decompressor(None)
dec.DA_computeLength(buf, headers, '', '', '', '', '' )
assert(buf._buf == bytearray(b'\xFF\xFF'))

def test_DA_computeChecksum():
buf = BitBuffer.BitBuffer()
headers = BitBuffer.BitBuffer(b'12')
dec = Decompressor.Decompressor(None)
dec.DA_computeChecksum(buf, headers, '', '', '', '', '' )
assert(buf._buf == bytearray(b'\xCC\xCC'))

# This is just for example, the assertion should be done compared to a bytearray build from compressor information
def test_apply ():
rule_coap1 = {"ruleid" : 1,
"content" : [["IPv6.version", 1, "bi", 6, "equal", "not-sent"],
["IPv6.trafficClass", 1, "bi", 0x00, "equal", "not-sent"],
["IPv6.flowLabel", 1, "bi", 0x000000, "equal", "not-sent"],
["IPv6.payloadLength",1, "bi", None, "ignore", "compute-length"],
["IPv6.nextHeader", 1, "bi", 17, "equal", "not-sent"],
["IPv6.hopLimit", 1, "bi", 30, "ignore", "not-sent"],
["IPv6.prefixES", 1, "bi", 0xFE80000000000000, "equal", "not-sent"],
["IPv6.iidES", 1, "bi", 0x0000000000000001, "equal", "not-sent"],
["IPv6.prefixLA", 1, "bi", [0x2001066073010001,
0x2001123456789012,
0x2001123456789013,
0xFE80000000000000],"match-mapping", "mapping-sent"],
["IPv6.iidLA", 1, "bi", 0x0000000000000002, "equal", "not-sent"],
["UDP.PortES", 1, "bi", 5682, "equal", "not-sent"],
["UDP.PortLA", 1, "bi", 5683, "equal", "not-sent"],
["UDP.length", 1, "bi", None, "ignore", "compute-length"],
["UDP.checksum", 1, "bi", None, "ignore", "compute-checksum"],
["CoAP.version", 1, "bi", 1, "equal", "not-sent"],
["CoAP.type", 1, "up", 0, "equal", "not-sent"],
["CoAP.type", 1, "dw", 2, "equal", "not-sent"],
["CoAP.tokenLength", 1, "bi", 1, "equal", "not-sent"],
["CoAP.code", 1, "up", 2, "equal", "not-sent"],
["CoAP.code", 1, "dw", [69, 132], "match-mapping", "mapping-sent"],
["CoAP.messageID", 1, "bi", 1, "MSB(12)", "LSB"],
["CoAP.token", 1, "bi", 0x80, "MSB(4)", "LSB"],
["CoAP.Uri-Path", 1, "up", "foo", "equal", "not-sent"],
["CoAP.Uri-Path", 2, "up", "bar", "equal", "not-sent"],
["CoAP.Uri-Path", 3, "up", None, "ignore", "value-sent"],
["CoAP.Uri-Query", 1, "up", "k=", "MSB(16)", "LSB"],
["CoAP.Option-End", 1, "up", 0xFF, "equal", "not-sent"]
]}
compressed = bytearray(b'\xde\x40') # 11 bits
RM = RuleMngt.RuleManager()
RM.addRule(rule_coap1)
rule = RM.FindRuleFromID(1)
dec = Decompressor.Decompressor(RM)

header, length = dec.apply (compressed, rule, "dw")
assert(header == bytearray(b'`\x00\x00\x00\xff\xff\x11\x1e\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x02\x162\x163\xff\xff\xcc\xccaE\x00\x0f\x82'))
25 changes: 25 additions & 0 deletions python/test/test_Parser.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
from .. import Parser

def test___init__():
parser = Parser.Parser()
assert(len(parser.payload) == 0)
assert(len(parser.header_fields) == 0)

def test_dump(capsys):
parser = Parser.Parser()
optionName = "optName"
fieldPos = 1
optionValue = "optval"
parser.header_fields = {(optionName, fieldPos):optionValue}
parser.dump()
out, err = capsys.readouterr()
assert(optionName in out)
assert(str(fieldPos) in out)
assert(optionValue in out)

def test_parser():
ipv6 = bytearray(b'`\x12\x34\x56\x00\x1e\x11\x1e\xfe\x80\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x01\xfe\x80\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x02\x162\x163\x00\x1e\x00\x00A\x02\x00\x01\n\xb3foo\x03bar\x06ABCD==Fk=eth0\xff\x84\x01\x82 &Ehello')
parser = Parser.Parser()
headers, payload = parser.parser(ipv6)

assert(payload == b'\x84\x01\x82 &Ehello')

0 comments on commit e49db3e

Please sign in to comment.