Skip to content
This repository has been archived by the owner on Nov 23, 2020. It is now read-only.

Commit

Permalink
don't remove the file descriptor, serve from monitor too, fixes #262
Browse files Browse the repository at this point in the history
  • Loading branch information
lsbardel committed Dec 17, 2016
1 parent 4cf38ae commit ee56c3c
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 50 deletions.
75 changes: 25 additions & 50 deletions pulsar/apps/socket/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,6 @@ class EchoServerProtocol(pulsar.ProtocolConsumer):
from math import log
from random import lognormvariate
from functools import partial
import asyncio
try:
import ssl
except ImportError: # pragma nocover
Expand Down Expand Up @@ -224,7 +223,6 @@ async def monitor_start(self, monitor):
number of workers to 0.
'''
cfg = self.cfg
loop = monitor._loop
if (not pulsar.platform.has_multiProcessSocket or
cfg.concurrency == 'thread'):
cfg.set('workers', 0)
Expand All @@ -243,34 +241,20 @@ async def monitor_start(self, monitor):
cfg.key_file)
# First create the sockets
try:
server = await loop.create_server(asyncio.Protocol, *address)
server = await self.create_server(monitor, address)
except socket.error as e:
raise ImproperlyConfigured(e)
raise ImproperlyConfigured(e) from None
else:
self.monitor_sockets(monitor, server.sockets)

def monitor_sockets(self, monitor, sockets):
addresses = []
loop = monitor._loop
for sock in sockets:
address = sock.getsockname()
addresses.append(address)
self.logger.debug('Created socket %r', address)
fd = sock.fileno()
try:
loop.remove_reader(fd)
except NotImplementedError:
pass
monitor.sockets = sockets
self.cfg.addresses = addresses
monitor.servers[self.name] = server
self.cfg.addresses = server.addresses

def actorparams(self, monitor, params):
params['sockets'] = monitor.sockets
params['sockets'] = monitor.servers[self.name].sockets

async def worker_start(self, worker, exc=None):
'''Start the worker by invoking the :meth:`create_server` method.
'''
if not exc:
if not exc and self.name not in worker.servers:
server = await self.create_server(worker)
server.bind_event('stop', lambda _, **kw: worker.stop())
worker.servers[self.name] = server
Expand Down Expand Up @@ -298,23 +282,26 @@ def server_factory(self, *args, **kw):
return TcpServer(*args, **kw)

# INTERNALS
async def create_server(self, worker):
async def create_server(self, worker, address=None):
'''Create the Server which will listen for requests.
:return: a :class:`.TcpServer`.
'''
sockets = worker.sockets
sockets = worker.sockets if not address else None
cfg = self.cfg
max_requests = cfg.max_requests
if max_requests:
max_requests = int(lognormvariate(log(max_requests), 0.2))
server = self.server_factory(self.protocol_factory(),
worker._loop,
sockets=sockets,
max_requests=max_requests,
keep_alive=cfg.keep_alive,
name=self.name,
logger=self.logger)
server = self.server_factory(
self.protocol_factory(),
worker._loop,
sockets=sockets,
address=address,
max_requests=max_requests,
keep_alive=cfg.keep_alive,
name=self.name,
logger=self.logger
)
for event in ('connection_made', 'pre_request', 'post_request',
'connection_lost'):
callback = getattr(cfg, event)
Expand Down Expand Up @@ -356,43 +343,37 @@ async def monitor_start(self, monitor):
number of workers to 0.
'''
cfg = self.cfg
loop = monitor._loop
if (not pulsar.platform.has_multiProcessSocket or
cfg.concurrency == 'thread'):
cfg.set('workers', 0)
if not cfg.address:
raise pulsar.ImproperlyConfigured('Could not open a socket. '
'No address to bind to')
address = parse_address(self.cfg.address)
# First create the sockets
transport, _ = await loop.create_datagram_endpoint(
asyncio.DatagramProtocol, address)
sock = transport.get_extra_info('socket')
transport._sock = DummySock()
transport.close()
self.monitor_sockets(monitor, [sock])

def actorparams(self, monitor, params):
params.update({'sockets': monitor.sockets})
server = await self.create_server(monitor, address)
monitor.servers[self.name] = server
self.cfg.addresses = server.addresses

def server_factory(self, *args, **kw):
'''By default returns a new :class:`.DatagramServer`.
'''
return DatagramServer(*args, **kw)

# INTERNALS
async def create_server(self, worker):
async def create_server(self, worker, address=None):
'''Create the Server which will listen for requests.
:return: the server obtained from :meth:`server_factory`.
'''
sockets = worker.sockets if not address else None
cfg = self.cfg
max_requests = cfg.max_requests
if max_requests:
max_requests = int(lognormvariate(log(max_requests), 0.2))
server = self.server_factory(self.protocol_factory(),
worker._loop,
sockets=worker.sockets,
sockets=sockets,
address=address,
max_requests=max_requests,
name=self.name,
logger=self.logger)
Expand All @@ -403,9 +384,3 @@ async def create_server(self, worker):
server.bind_event(event, callback)
await server.create_endpoint()
return server


class DummySock:

def close(self):
pass
23 changes: 23 additions & 0 deletions pulsar/async/protocols.py
Original file line number Diff line number Diff line change
Expand Up @@ -643,6 +643,15 @@ def address(self):
if self._server is not None:
return self._server.sockets[0].getsockname()

@property
def addresses(self):
return [sock.getsockname() for sock in self.sockets or ()]

@property
def sockets(self):
if self._server is not None:
return self._server.sockets

async def start_serving(self, backlog=100, sslcontext=None):
"""Start serving.
Expand Down Expand Up @@ -778,6 +787,20 @@ def __init__(self, protocol_factory, loop=None, address=None,
max_requests=max_requests, logger=logger)
self._params = {'address': address, 'sockets': sockets}

@property
def addresses(self):
return [sock.getsockname() for sock in self.sockets or ()]

@property
def sockets(self):
sockets = []
if self._transports is not None:
for t in self._transports:
sock = t.get_extra_info('socket')
if sock:
sockets.append(sock)
return sockets

async def create_endpoint(self, **kw):
"""create the server endpoint.
Expand Down

0 comments on commit ee56c3c

Please sign in to comment.