Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Allow listening on UNIX sockets for HTTP listeners
Browse files Browse the repository at this point in the history
Signed-off-by: David Vo <david@vovo.id.au>
  • Loading branch information
auscompgeek committed Aug 20, 2020
1 parent 5eac0b7 commit 77e4eb4
Show file tree
Hide file tree
Showing 6 changed files with 214 additions and 128 deletions.
1 change: 1 addition & 0 deletions changelog.d/8103.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add support for listening on a named UNIX domain socket for HTTP interfaces. Contributed by David Vo.
30 changes: 23 additions & 7 deletions synapse/app/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@

import synapse
from synapse.app import check_bind_error
from synapse.config.server import ListenerConfig
from synapse.config.server import ListenerConfig, TcpListenerConfig
from synapse.crypto import context_factory
from synapse.logging.context import PreserveLoggingContext
from synapse.util.async_helpers import Linearizer
Expand Down Expand Up @@ -142,24 +142,38 @@ def quit_with_error(error_string: str) -> NoReturn:
sys.exit(1)


def listen_metrics(bind_addresses, port):
def listen_metrics(socket_options):
"""
Start Prometheus metrics server.
"""
if not isinstance(socket_options, TcpListenerConfig):
logger.warning(
"Metrics listener only supports TCP, use an HTTP listener instead"
)
return

from synapse.metrics import RegistryProxy, start_http_server

for host in bind_addresses:
if socket_options.tls:
logger.warning("Ignoring 'tls' option for metrics listener")

port = socket_options.port
for host in socket_options.bind_addresses:
logger.info("Starting metrics listener on %s:%d", host, port)
start_http_server(port, addr=host, registry=RegistryProxy)


def listen_tcp(bind_addresses, port, factory, reactor=reactor, backlog=50):
def listen_tcp(socket_options, factory, reactor=reactor, backlog=50):
"""
Create a TCP socket for a port and several addresses
Returns:
list[twisted.internet.tcp.Port]: listening for TCP connections
"""
assert not socket_options.tls
bind_addresses = socket_options.bind_addresses
port = socket_options.port

r = []
for address in bind_addresses:
try:
Expand All @@ -170,15 +184,17 @@ def listen_tcp(bind_addresses, port, factory, reactor=reactor, backlog=50):
return r


def listen_ssl(
bind_addresses, port, factory, context_factory, reactor=reactor, backlog=50
):
def listen_ssl(socket_options, factory, context_factory, reactor=reactor, backlog=50):
"""
Create an TLS-over-TCP socket for a port and several addresses
Returns:
list of twisted.internet.tcp.Port listening for TLS connections
"""
assert socket_options.tls
bind_addresses = socket_options.bind_addresses
port = socket_options.port

r = []
for address in bind_addresses:
try:
Expand Down
75 changes: 43 additions & 32 deletions synapse/app/generic_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
# limitations under the License.
import contextlib
import logging
import os
import sys
from typing import Dict, Iterable, Optional, Set

Expand All @@ -37,7 +38,7 @@
from synapse.config._base import ConfigError
from synapse.config.homeserver import HomeServerConfig
from synapse.config.logger import setup_logging
from synapse.config.server import ListenerConfig
from synapse.config.server import ListenerConfig, TcpListenerConfig, UnixListenerConfig
from synapse.federation import send_queue
from synapse.federation.transport.server import TransportLayerServer
from synapse.handlers.presence import (
Expand Down Expand Up @@ -486,15 +487,8 @@ class GenericWorkerServer(HomeServer):
DATASTORE_CLASS = GenericWorkerSlavedStore

def _listen_http(self, listener_config: ListenerConfig):
port = listener_config.port
bind_addresses = listener_config.bind_addresses

assert listener_config.http_options is not None

site_tag = listener_config.http_options.tag
if site_tag is None:
site_tag = port

# We always include a health resource.
resources = {"/health": HealthResource()}

Expand Down Expand Up @@ -590,43 +584,60 @@ def _listen_http(self, listener_config: ListenerConfig):

root_resource = create_resource_tree(resources, OptionsResource())

_base.listen_tcp(
bind_addresses,
port,
SynapseSite(
"synapse.access.http.%s" % (site_tag,),
site_tag,
listener_config,
root_resource,
self.version_string,
),
reactor=self.get_reactor(),
socket_options = listener_config.socket_options
site_tag = listener_config.http_options.tag

if isinstance(socket_options, TcpListenerConfig):
port = socket_options.port
if site_tag is None:
site_tag = port
site_type = "http"
else:
assert isinstance(socket_options, UnixListenerConfig)
port = None
socket_path = socket_options.path
if site_tag is None:
site_tag = os.path.basename(socket_path)
site_type = "unix"

site = SynapseSite(
"synapse.access.%s.%s" % (site_type, site_tag),
site_tag,
listener_config,
root_resource,
self.version_string,
)

logger.info("Synapse worker now listening on port %d", port)
if port is not None:
_base.listen_tcp(socket_options, site, reactor=self.get_reactor())
logger.info("Synapse worker now listening on port %d", port)
else:
self.get_reactor().listenUNIX(socket_path, site)
logger.info("Synapse worker now listening on socket %s", socket_path)

def start_listening(self, listeners: Iterable[ListenerConfig]):
for listener in listeners:
if listener.type == "http":
self._listen_http(listener)
elif listener.type == "manhole":
_base.listen_tcp(
listener.bind_addresses,
listener.port,
manhole(
username="matrix", password="rabbithole", globals={"hs": self}
),
)
if not isinstance(listener.socket_options, TcpListenerConfig):
logger.warning("Manhole listener currently only supports TCP")
else:
_base.listen_tcp(
listener.socket_options,
manhole(
username="matrix",
password="rabbithole",
globals={"hs": self},
),
)
elif listener.type == "metrics":
if not self.get_config().enable_metrics:
logger.warning(
(
"Metrics listener configured, but "
"enable_metrics is not True!"
)
"Metrics listener configured, but enable_metrics is not True!"
)
else:
_base.listen_metrics(listener.bind_addresses, listener.port)
_base.listen_metrics(listener.socket_options)
else:
logger.warning("Unsupported listener type: %s", listener.type)

Expand Down
111 changes: 61 additions & 50 deletions synapse/app/homeserver.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@
from synapse.app._base import listen_ssl, listen_tcp, quit_with_error
from synapse.config._base import ConfigError
from synapse.config.homeserver import HomeServerConfig
from synapse.config.server import ListenerConfig
from synapse.config.server import ListenerConfig, TcpListenerConfig, UnixListenerConfig
from synapse.federation.transport.server import TransportLayerServer
from synapse.http.additional_resource import AdditionalResource
from synapse.http.server import (
Expand Down Expand Up @@ -92,12 +92,7 @@ class SynapseHomeServer(HomeServer):
DATASTORE_CLASS = DataStore

def _listener_http(self, config: HomeServerConfig, listener_config: ListenerConfig):
port = listener_config.port
bind_addresses = listener_config.bind_addresses
tls = listener_config.tls
site_tag = listener_config.http_options.tag
if site_tag is None:
site_tag = port
assert listener_config.http_options is not None

# We always include a health resource.
resources = {"/health": HealthResource()}
Expand Down Expand Up @@ -137,36 +132,46 @@ def _listener_http(self, config: HomeServerConfig, listener_config: ListenerConf

root_resource = create_resource_tree(resources, root_resource)

if tls:
ports = listen_ssl(
bind_addresses,
port,
SynapseSite(
"synapse.access.https.%s" % (site_tag,),
site_tag,
listener_config,
root_resource,
self.version_string,
),
self.tls_server_context_factory,
reactor=self.get_reactor(),
)
logger.info("Synapse now listening on TCP port %d (TLS)", port)
socket_options = listener_config.socket_options
site_tag = listener_config.http_options.tag

if isinstance(socket_options, TcpListenerConfig):
port = socket_options.port
if site_tag is None:
site_tag = port
site_type = "https" if socket_options.tls else "http"
else:
ports = listen_tcp(
bind_addresses,
port,
SynapseSite(
"synapse.access.http.%s" % (site_tag,),
site_tag,
listener_config,
root_resource,
self.version_string,
),
reactor=self.get_reactor(),
)
logger.info("Synapse now listening on TCP port %d", port)
assert isinstance(socket_options, UnixListenerConfig)
port = None
if site_tag is None:
site_tag = os.path.basename(socket_options.path)
site_type = "unix"

site = SynapseSite(
"synapse.access.%s.%s" % (site_type, site_tag),
site_tag,
listener_config,
root_resource,
self.version_string,
)

if port is not None:
if socket_options.tls:
ports = listen_ssl(
socket_options,
site,
self.tls_server_context_factory,
reactor=self.get_reactor(),
)
logger.info("Synapse now listening on TCP port %d (TLS)", port)

else:
ports = listen_tcp(socket_options, site, reactor=self.get_reactor())
logger.info("Synapse now listening on TCP port %d", port)

else:
ports = [self.get_reactor().listenUNIX(socket_options.path, site)]
logger.info("Synapse now listening on UNIX socket %s", socket_options.path)

return ports

Expand Down Expand Up @@ -295,31 +300,37 @@ def start_listening(self, listeners: Iterable[ListenerConfig]):
if listener.type == "http":
self._listening_services.extend(self._listener_http(config, listener))
elif listener.type == "manhole":
listen_tcp(
listener.bind_addresses,
listener.port,
manhole(
username="matrix", password="rabbithole", globals={"hs": self}
),
)
if not isinstance(listener.socket_options, TcpListenerConfig):
logger.warning("Manhole listener currently only supports TCP")
else:
listen_tcp(
listener.socket_options,
manhole(
username="matrix",
password="rabbithole",
globals={"hs": self},
),
)
elif listener.type == "replication":
if not isinstance(listener.socket_options, TcpListenerConfig):
logger.error(
"Replication configured to listen on a UNIX socket,"
" but only TCP is supported"
)
# XXX: should we straight up bail here?
continue
services = listen_tcp(
listener.bind_addresses,
listener.port,
ReplicationStreamProtocolFactory(self),
listener.socket_options, ReplicationStreamProtocolFactory(self),
)
for s in services:
reactor.addSystemEventTrigger("before", "shutdown", s.stopListening)
elif listener.type == "metrics":
if not self.get_config().enable_metrics:
logger.warning(
(
"Metrics listener configured, but "
"enable_metrics is not True!"
)
"Metrics listener configured, but enable_metrics is not True!"
)
else:
_base.listen_metrics(listener.bind_addresses, listener.port)
_base.listen_metrics(listener.socket_options)
else:
# this shouldn't happen, as the listener type should have been checked
# during parsing
Expand Down
Loading

0 comments on commit 77e4eb4

Please sign in to comment.