Skip to content

Commit

Permalink
IGNORE command
Browse files Browse the repository at this point in the history
  • Loading branch information
pylover committed May 22, 2018
1 parent d7a8462 commit cbe9110
Show file tree
Hide file tree
Showing 4 changed files with 54 additions and 13 deletions.
8 changes: 8 additions & 0 deletions easyq/client.py
Expand Up @@ -71,6 +71,14 @@ async def pull(self, queue, callback):
handlers = self.handlers.setdefault(queue, set())
handlers.add(callback)

async def ignore(self, queue, callback):
handlers = self.handlers.setdefault(queue, set())
if callback not in handlers:
raise ValueError(f'Invalid callback: {callback}')

handlers.remove(callback)
self.transport.write(b'IGNORE %s;\n' % queue)

async def dispatch(self, message, queue):
handlers = self.handlers.get(queue)
if handlers:
Expand Down
7 changes: 7 additions & 0 deletions easyq/queuemanager.py
Expand Up @@ -12,6 +12,10 @@ class AlreadySubscribedError(Exception):
pass


class NotSubscribedError(Exception):
pass


class Queue:

def __init__(self, name):
Expand All @@ -33,6 +37,9 @@ def subscribe(self, protocol):
self.subscriptors.append(protocol)

def unsubscribe(self, protocol):
if protocol not in self.subscriptors:
raise NotSubscribedError()

logger.info(f'Queue {self.name.decode()} was ignored by {protocol.identity}')
self.subscriptors.remove(protocol)

Expand Down
23 changes: 15 additions & 8 deletions easyq/server.py
Expand Up @@ -5,13 +5,7 @@
from .authentication import authenticate, initialize as initialize_authentication
from .configuration import settings
from .logging import getlogger
from .queuemanager import getqueue, dispatcher, AlreadySubscribedError


"""
-> IGNORE queue1
"""
from .queuemanager import getqueue, dispatcher, AlreadySubscribedError, NotSubscribedError


logger = getlogger('PROTO')
Expand All @@ -28,6 +22,7 @@ class Patterns:
login = regex(b'^LOGIN (?P<credentials>.+)$')
push = regex(b'^PUSH (?P<message>.+)(?:\s|\n)INTO (?P<queue>[0-9a-zA-Z\._:-]+)$')
pull = regex(b'^PULL FROM (?P<queue>[0-9a-zA-Z\._:-]+)$')
ignore = regex(b'^IGNORE (?P<queue>[0-9a-zA-Z\._:-]+)$')

def connection_made(self, transport):
self.peername = transport.get_extra_info('peername')
Expand Down Expand Up @@ -111,7 +106,15 @@ async def pull(self, queue):
try:
getqueue(queue).subscribe(self)
except AlreadySubscribedError:
self.transport.write(b'ERROR: QUEUE %s IS ALREASY SUBSCRIBED;\n' % queue)
self.transport.write(b'ERROR: QUEUE %s IS ALREADY SUBSCRIBED;\n' % queue)

async def ignore(self, queue):
try:
getqueue(queue).unsubscribe(self)
except NotSubscribedError:
self.transport.write(b'ERROR: QUEUE %s IS NOT SUBSCRIBED;\n' % queue)



async def process_command(self, command):
logger.debug(f'Processing command: {command.decode()} by {self.identity}')
Expand All @@ -123,6 +126,10 @@ async def process_command(self, command):
if m is not None:
return await self.pull(**m.groupdict())

m = self.Patterns.ignore.match(command)
if m is not None:
return await self.ignore(**m.groupdict())

logger.debug(f'Invalid command: {command}')
self.transport.write(b'ERROR: Invalid command: %s;\n' % command)

Expand Down
29 changes: 24 additions & 5 deletions easyq/tests/test_pull.py
Expand Up @@ -57,19 +57,38 @@ async def test_pull(self):
async def message_received(queue, message):
messages.append(message)

async def error(client_, error):
errors.append(error)

client.onerror = error

await client.pull(b'q1', message_received)
await client.push(b'q1', b'Hello')
await asyncio.sleep(1.1)
self.assertEqual([b'Hello'], messages)
self.assertEqual([], errors)

# pulling twice!
async def error(client_, error):
errors.append(error)

client.onerror = error
await client.pull(b'q1', message_received)
await asyncio.sleep(1.1)
self.assertEqual([b'ERROR: QUEUE q1 IS ALREASY SUBSCRIBED'], errors)
self.assertEqual([b'ERROR: QUEUE q1 IS ALREADY SUBSCRIBED'], errors)

# Unsunscribing
await client.ignore(b'q1', message_received)
messages = []
errors = []
await client.push(b'q1', b'Hello')
await asyncio.sleep(1.1)
self.assertEqual([], messages)
self.assertEqual([], errors)

# Ignoring twice!
client.handlers[b'q1'] = [message_received]
await client.ignore(b'q1', message_received)
await asyncio.sleep(1.1)
self.assertEqual([b'ERROR: QUEUE q1 IS NOT SUBSCRIBED'], errors)




if __name__ == '__main__':
Expand Down

0 comments on commit cbe9110

Please sign in to comment.