Skip to content

Commit

Permalink
rev ports work also for other protos in 4th layer
Browse files Browse the repository at this point in the history
git-svn-id: http://svn.umitproject.org/svnroot/umit/branch/UMPA@5218 1105ee14-b0fa-0310-85a5-ff3eed429ff7
  • Loading branch information
bxsx committed Aug 11, 2009
1 parent 3607a86 commit bd2aa1e
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 8 deletions.
46 changes: 41 additions & 5 deletions tests/a_unit/test_extensions/test_models.py
Expand Up @@ -24,7 +24,7 @@

import umit.umpa
import umit.umpa.sniffing
from umit.umpa.protocols import IP, TCP
from umit.umpa.protocols import IP, TCP, UDP
from umit.umpa.extensions import models
from umit.umpa.utils.exceptions import UMPAException
from tests.utils import SendPacket
Expand All @@ -38,9 +38,10 @@ def __init__(self, filter, device, queue):
self._device = device
self._queue = queue

class RevPortsThread(SniffThread):
class RevPortsTCPThread(SniffThread):
def run(self):
pkt = umit.umpa.sniffing.sniff(2, device=self._device, filter=self._filter)
pkt = umit.umpa.sniffing.sniff(2, device=self._device,
filter=self._filter)
try:
assert pkt[0].ip.src == "127.0.0.1"
assert pkt[0].ip.dst == "127.0.0.1"
Expand All @@ -53,6 +54,22 @@ def run(self):
except Exception, e:
self._queue.put(e)

class RevPortsUDPThread(SniffThread):
def run(self):
pkt = umit.umpa.sniffing.sniff(2, device=self._device,
filter=self._filter)
try:
assert pkt[0].ip.src == "127.0.0.1"
assert pkt[0].ip.dst == "127.0.0.1"
assert pkt[0].udp.srcport == 80
assert pkt[0].udp.dstport == 0
assert pkt[1].ip.src == "127.0.0.1"
assert pkt[1].ip.dst == "127.0.0.1"
assert pkt[1].udp.srcport == 0
assert pkt[1].udp.dstport == 80
except Exception, e:
self._queue.put(e)

class RevHostsThread(SniffThread):
def run(self):
pkt = umit.umpa.sniffing.sniff(2, device=self._device, filter=self._filter)
Expand All @@ -79,15 +96,34 @@ class TestModels(object):
pass

class TestReact(TestModels):
def test_revports(self):
def test_revports_tcp(self):
# use queue to communicate between threads
# py.test doesn't catch assertions from threads by itself
queue = Queue.Queue()

th = SendPacket(umit.umpa.Packet(IP(), TCP(srcport=80, dstport=0)))
th.start()

th2 = RevPortsThread("host 127.0.0.1 and port 80", "lo", queue)
th2 = RevPortsTCPThread("host 127.0.0.1 and port 80", "lo", queue)
th2.start()
models.react(1, filter="host 127.0.0.1 and port 80", device="lo",
revports=True)
th.join()
th2.join()
if not queue.empty():
err = queue.get()
raise AssertionError(err)
queue.join()

def test_revports_udp(self):
# use queue to communicate between threads
# py.test doesn't catch assertions from threads by itself
queue = Queue.Queue()

th = SendPacket(umit.umpa.Packet(IP(), UDP(srcport=80, dstport=0)))
th.start()

th2 = RevPortsUDPThread("host 127.0.0.1 and port 80", "lo", queue)
th2.start()
models.react(1, filter="host 127.0.0.1 and port 80", device="lo",
revports=True)
Expand Down
9 changes: 6 additions & 3 deletions umit/umpa/extensions/models.py
Expand Up @@ -53,9 +53,12 @@ def revhosts(pkt):
return pkt

def revports(pkt):
tmp = pkt.tcp.dstport
pkt.tcp.dstport = pkt.tcp.srcport
pkt.tcp.srcport = tmp
for proto in pkt.protos:
if proto.layer == 4:
tmp = proto.dstport
proto.dstport = proto.srcport
proto.srcport = tmp
break
return pkt

def forwardfunc(pkt, fwd):
Expand Down

0 comments on commit bd2aa1e

Please sign in to comment.