Skip to content

Commit

Permalink
Add test for Observer interrupter
Browse files Browse the repository at this point in the history
  • Loading branch information
irl committed Oct 20, 2016
1 parent 743c160 commit 4f2094d
Showing 1 changed file with 24 additions and 0 deletions.
24 changes: 24 additions & 0 deletions tests/test_observer.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@

import logging
import queue
import multiprocessing as mp
import threading

import nose

from pathspider.base import SHUTDOWN_SENTINEL
from pathspider.base import QUEUE_SIZE
from pathspider.observer import Observer
from pathspider.observer import simple_observer

def _test_observer(lturi):
Expand Down Expand Up @@ -49,3 +52,24 @@ def test_observer_icmp_ttl_flowcount():
def test_observer_icmp_unreachable_flowcount():
lturi = "pcap:tests/testdata/icmp_unreachable.pcap"
assert _test_observer(lturi) == 2

def test_observer_shutdown():
flowqueue = mp.Queue(QUEUE_SIZE)
observer_shutdown_queue = mp.Queue(QUEUE_SIZE)

observer = Observer("pcap:tests/testdata/random.pcap")
observer_process = mp.Process(
args=(flowqueue,
observer_shutdown_queue),
target=observer.run_flow_enqueuer,
name='observer',
daemon=True)
observer_process.start()

observer_shutdown_queue.put(True)

assert flowqueue.get(True, timeout=3) == SHUTDOWN_SENTINEL

observer_process.join(3)

assert not observer_process.is_alive()

0 comments on commit 4f2094d

Please sign in to comment.