Skip to content

Commit

Permalink
Nothing
Browse files Browse the repository at this point in the history
  • Loading branch information
pylover committed May 22, 2018
1 parent f639bf3 commit 5bf1e9f
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 11 deletions.
30 changes: 23 additions & 7 deletions easyq/queuemanager.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,13 +51,6 @@ async def dispatch(self, message):
await protocol.dispatch(self.name, message)


def getqueue(name) -> Queue:
if name not in queues:
queues[name] = Queue(name)
logger.info(f'Queue {name.decode()} just created.')
return queues[name]


async def dispatcher(name, intervals=.5, messages_per_queue=5):
logger = getlogger(name)
cycle = 0
Expand All @@ -80,3 +73,26 @@ async def dispatcher(name, intervals=.5, messages_per_queue=5):
except asyncio.CancelledError:
logger.info(f'Terminating on cycle: {cycle}')


def getqueue(name) -> Queue:
if name not in queues:
queues[name] = Queue(name)
logger.info(f'Queue {name.decode()} just created.')
return queues[name]


def subscribe(name, protocol):
getqueue(name).subscribe(protocol)


def unsubscribe(name, protocol):
getqueue(name).unsubscribe(protocol)


def unsubscribe_all(protocol):
for queue in queues.values():
try:
queue.unsubscribe(protocol)
except NotSubscribedError:
pass

9 changes: 5 additions & 4 deletions easyq/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@
from .authentication import authenticate, initialize as initialize_authentication
from .configuration import settings
from .logging import getlogger
from .queuemanager import getqueue, dispatcher, AlreadySubscribedError, NotSubscribedError
from .queuemanager import getqueue, dispatcher, AlreadySubscribedError, NotSubscribedError, \
unsubscribe_all, subscribe, unsubscribe


logger = getlogger('PROTO')
Expand All @@ -31,10 +32,10 @@ def connection_made(self, transport):

def connection_lost(self, exc):
logger.info(f'Connection lost: {self.peername}')
# FIXME: remove from all queues subscriptions

def eof_received(self):
logger.debug(f'EOF Received: {self.peername}')
unsubscribe_all(self)
self.transport.close()

def data_received(self, data):
Expand Down Expand Up @@ -104,13 +105,13 @@ async def push(self, message, queue):

async def pull(self, queue):
try:
getqueue(queue).subscribe(self)
subscribe(queue, self)
except AlreadySubscribedError:
self.transport.write(b'ERROR: QUEUE %s IS ALREADY SUBSCRIBED;\n' % queue)

async def ignore(self, queue):
try:
getqueue(queue).unsubscribe(self)
unsubscribe(queue, self)
except NotSubscribedError:
self.transport.write(b'ERROR: QUEUE %s IS NOT SUBSCRIBED;\n' % queue)

Expand Down

0 comments on commit 5bf1e9f

Please sign in to comment.