Skip to content

Commit

Permalink
Fixing the existing tests of the queue consumer to work for the
Browse files Browse the repository at this point in the history
management command.  Still need to improve coverage of the consumer.
  • Loading branch information
coleifer committed Jun 2, 2011
1 parent 71b5f95 commit 94b24f3
Showing 1 changed file with 31 additions and 51 deletions.
82 changes: 31 additions & 51 deletions djutils/tests/queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,14 @@

from django.conf import settings
from django.contrib.auth.models import User
from django.core.management.base import CommandError

from djutils.queue.bin.consumer import QueueDaemon
from djutils.management.commands.queue_consumer import Command as QueueConsumer
from djutils.queue.decorators import crontab, queue_command, periodic_command
from djutils.queue.queue import QueueCommand, PeriodicQueueCommand, QueueException, invoker
from djutils.queue.registry import registry
from djutils.test import TestCase

from djutils.utils.helpers import ObjectDict

class DummyThreadQueue():
"""A replacement for the stdlib Queue.Queue"""
Expand All @@ -22,28 +23,18 @@ def put(self, message):
def join(self):
pass

class TestQueueDaemon(QueueDaemon):

class TestQueueConsumer(QueueConsumer):
"""Subclass of the consumer for test purposes"""
def start(self):
self.run()

def stop(self):
raise

def get_logger(self):
def get_logger(self, verbosity):
return logging.getLogger('djutils.tests.queue.logger')

def initialize_threads(self):
self._threads = []
self._error = threading.Event()
def initialize_options(self, options):
super(TestQueueConsumer, self).initialize_options(options)

self._queue = DummyThreadQueue()


class Options(dict):
def __getattr__(self, name):
return self[name]


class UserCommand(QueueCommand):
def execute(self):
user, old_email, new_email = self.data
Expand Down Expand Up @@ -82,14 +73,14 @@ def setUp(self):
settings.QUEUE_ALWAYS_EAGER = False

self.dummy = User.objects.create_user('username', 'user@example.com', 'password')
self.consumer_options = Options(
pidfile='',
self.consumer_options = ObjectDict(
logfile='',
delay=.1,
backoff=2,
max_delay=.4,
no_periodic=False,
threads=2,
verbosity=1,
)
invoker.flush()

Expand Down Expand Up @@ -299,55 +290,44 @@ def test_periodic_command_enqueueing(self):
self.assertEqual(User.objects.filter(username='thirty').count(), 1)

def test_daemon_initialization(self):
daemon = TestQueueDaemon(self.consumer_options)
consumer = TestQueueConsumer()

db_name = 'testqueue'

self.assertEqual(daemon.pidfile, '/var/run/djutils-%s.pid' % db_name)
self.assertEqual(daemon.logfile, '/var/log/djutils-%s.log' % db_name)
self.assertEqual(daemon.delay, 0.1)
self.assertEqual(daemon.max_delay, 0.4)
self.assertEqual(daemon.backoff_factor, 2)
self.assertEqual(daemon.periodic_commands, True)
self.assertEqual(daemon.threads, 2)
consumer.initialize_options(self.consumer_options)

self.consumer_options['pidfile'] = '/var/run/custom.pid'
self.consumer_options['logfile'] = '/var/log/custom.log'

daemon = TestQueueDaemon(self.consumer_options)
self.assertEqual(daemon.pidfile, '/var/run/custom.pid')
self.assertEqual(daemon.logfile, '/var/log/custom.log')
self.assertEqual(consumer.logfile, '/var/log/djutils-%s.log' % db_name)
self.assertEqual(consumer.delay, 0.1)
self.assertEqual(consumer.max_delay, 0.4)
self.assertEqual(consumer.backoff_factor, 2)
self.assertEqual(consumer.periodic_commands, True)
self.assertEqual(consumer.threads, 2)

daemon_factory = lambda options: TestQueueDaemon(options)
self.consumer_options['logfile'] = '/var/log/custom.log'
consumer.initialize_options(self.consumer_options)

self.consumer_options['backoff'] = 0.5
self.assertRaises(ValueError, daemon_factory, self.consumer_options)
self.assertRaises(CommandError, consumer.initialize_options, self.consumer_options)

self.consumer_options['backoff'] = 2
self.consumer_options['threads'] = 0
self.assertRaises(ValueError, daemon_factory, self.consumer_options)

self.consumer_options['threads'] = 1
other_daemon = daemon_factory(self.consumer_options)
self.assertRaises(CommandError, consumer.initialize_options, self.consumer_options)

def test_daemon_delay(self):
daemon = TestQueueDaemon(self.consumer_options)

# start up some worker threads
daemon.initialize_threads()
daemon.start_workers()

def test_consumer_delay(self):
consumer = TestQueueConsumer()
consumer.initialize_options(self.consumer_options)

# processing when there is no message will sleep
start = time.time()
daemon.process_message()
consumer.process_message()
end = time.time()

# make sure it slept the initial amount
self.assertTrue(.09 < end - start < .11)

# try processing another message -- will delay longer
start = time.time()
daemon.process_message()
consumer.process_message()
end = time.time()

self.assertTrue(.19 < end - start < .21)
Expand All @@ -359,14 +339,14 @@ def test_daemon_delay(self):
self.assertEqual(dummy.email, 'user@example.com')

# processing the message will reset the delay to initial state
daemon.process_message()
consumer.process_message()

# make sure the command was executed
dummy = User.objects.get(username='username')
self.assertEqual(dummy.email, 'decor@ted.com')

# make sure the delay was reset
self.assertEqual(daemon.delay, .1)
self.assertEqual(consumer.delay, .1)

def test_daemon_multithreading(self):
pass
Expand Down

0 comments on commit 94b24f3

Please sign in to comment.