-
-
Notifications
You must be signed in to change notification settings - Fork 565
Description
Hi! 👋
In my project, I heavily rely on websockets
for exchanging synchronization data and it works like a charm. ❤️
Especially, there is a local websocket server running as a background service to which several user-facing apps connect to. The reason for this is to have only one outgoing websocket connection from this background service to a remote server distributing sync data to other clients.
Now I need centralized logging, so that logs of individual apps can end up in one file.
Since I already have a websocket server running in the background, it might be cool to have a WebsocketHandler
similar to SocketHandler
already shipping with the logging
module.
I know that for this, a websocket connection is a bit over the top as it is only used one-way (logs from user-facing app to the background websocket server), but otherwise I would need to manage a separate TCP Server.
For testing, I used a slightly modified version of the example script for an echo server from the docs:
🐍 Server Script
#!/usr/bin/env python
import asyncio
from websockets.server import serve
async def print_message(websocket):
async for message in websocket:
print(message)
async def main():
async with serve(print_message, "localhost", 8000):
await asyncio.Future() # run forever
try:
asyncio.run(main())
except KeyboardInterrupt:
print("closing")
My criteria for a good WebsocketHandler
are:
- graceful shutdown on program exit
- graceful handling of connection interruption (i.e. killing the server) and start sending logs again upon reconnect
- handling of graceful server shutdown
I tried to implement such a handler in two ways, however, I am not really happy with both:
Threading Client API
I basically re-used the implementation of SocketHandler
🐍 WebsocketThreadingHandler Class
import logging
import pickle
import struct
import time
from websockets.sync.client import connect
class WebsocketThreadingHandler(logging.Handler):
def __init__(self, uri: str):
self.uri = uri
self.sock = None
self.retryTime = None
#
# Exponential backoff parameters.
#
self.retryStart = 1.0
self.retryMax = 30.0
self.retryFactor = 2.0
super().__init__()
def makePickle(self, record):
"""
Pickles the record in binary format with a length prefix, and
returns it ready for transmission across the socket.
"""
ei = record.exc_info
if ei:
# just to get traceback text into record.exc_text ...
dummy = self.format(record)
# See issue #14436: If msg or args are objects, they may not be
# available on the receiving end. So we convert the msg % args
# to a string, save it as msg and zap the args.
d = dict(record.__dict__)
d["msg"] = record.getMessage()
d["args"] = None
d["exc_info"] = None
# Issue #25685: delete 'message' if present: redundant with 'msg'
d.pop("message", None)
s = pickle.dumps(d, 1)
slen = struct.pack(">L", len(s))
return slen + s
def createSocket(self):
"""
Try to create a socket, using an exponential backoff with
a max retry time. Thanks to Robert Olson for the original patch
(SF #815911) which has been slightly refactored.
"""
now = time.time()
# Either retryTime is None, in which case this
# is the first time back after a disconnect, or
# we've waited long enough.
if self.retryTime is None:
attempt = True
else:
attempt = now >= self.retryTime
if attempt:
try:
print("try create socket")
self.sock = connect(self.uri) # <-- here, the recv_events_thread starts
print("create socket")
self.retryTime = None # next time, no delay before trying
except OSError:
# Creation failed, so set the retry time and return.
if self.retryTime is None:
self.retryPeriod = self.retryStart
else:
self.retryPeriod = self.retryPeriod * self.retryFactor
if self.retryPeriod > self.retryMax:
self.retryPeriod = self.retryMax
self.retryTime = now + self.retryPeriod
def send(self, s):
if self.sock is None:
self.createSocket()
if self.sock:
try:
self.sock.send(s)
print("emit")
except (Exception, KeyboardInterrupt):
self.closeSocket()
def emit(self, record):
"""This gets called on every new log record."""
try:
s = self.makePickle(record)
self.send(s)
except Exception:
self.handleError(record)
def closeSocket(self):
print("close socket")
self.sock.close()
self.sock = None
def handleError(self, record):
print("HANDLE ERROR")
with self.lock:
if self.sock:
self.closeSocket()
else:
super().handleError(record)
def close(self):
with self.lock:
if self.sock:
self.closeSocket()
super().close()
print("close handler")
Problem
I need to press Ctrl+C
twice since the ClientConnection.recv_events_thread
is not exiting on the first one, so it is failing the first of my criteria. In order to overcome this, I have to explicitly call ClientConnection.close()
after catching the KeyboardInterrupt
exception:
🐍 Client Script
import logging
from time import sleep
log = logging.getLogger(__name__)
websocket_handler = WebsocketThreadingHandler("ws://localhost:8000")
log.addHandler(websocket_handler)
log.setLevel(logging.DEBUG)
try:
while True:
log.debug("DEBUG")
sleep(1)
log.info("INFO")
sleep(1)
log.warning("WARNING")
sleep(1)
log.error("ERROR")
sleep(1)
log.critical("CRITICAL")
sleep(1)
except KeyboardInterrupt:
websocket_handler.sock.close() # <-- necessary for only one `Ctrl+C`, but not elegant
exit()
Sans-I/O API
Again I basically wrote it like a SocketHandler
, but this time there is no listening thread. Hence, no need to press Ctrl+C
twice.
I followed the docs as good as I could to implement all the rules for a working connection:
🐍 WebsocketSansIOHandler Class
import logging
from logging.handlers import SocketHandler
import socket
from threading import Thread
from time import sleep
from websockets.client import ClientProtocol
from websockets.uri import parse_uri
class WebsocketSansIOHandler(SocketHandler):
def __init__(self, uri: str):
self.uri = parse_uri(uri)
self.events = list()
super().__init__(self.uri.host, self.uri.port)
def makeSocket(self):
print("make new socket")
# open TCP connection
self.sock = super().makeSocket()
sock = self.sock
# TODO: perform TLS handshake
# if self.uri.secure:
# ...
# init protocol
# TODO: Here or in __init__?
# it seems that otherwise messages don't get sent
# on reconnect after a BrokenPipeError
self.protocol = ClientProtocol(self.uri)
protocol = self.protocol
# send handshake request
print("handshake")
request = protocol.connect()
protocol.send_request(request)
self.send_data()
# receive data
self.receive_data()
# raise reason if handshake failed
if protocol.handshake_exc is not None:
self.reset_socket()
raise protocol.handshake_exc
return sock
def send_data(self):
try:
for data in self.protocol.data_to_send():
if data:
print("send data")
self.sock.sendall(data)
else:
# half-close TCP connection, i.e. close the write side
print("close write side")
self.sock.shutdown(socket.SHUT_WR)
except OSError:
self.reset_socket()
def receive_data(self):
try:
data = self.sock.recv(65536)
except OSError: # socket closed
data = b""
if data:
print("receive data")
self.protocol.receive_data(data)
self.process_events_received()
self.check_close_expected()
# necessary because `websockets` responds to ping frames,
# close frames, and incorrect inputs automatically
self.send_data()
else:
print("receive EOF")
self.protocol.receive_eof()
self.check_close_expected
self.close_socket()
def handleError(self, record):
print("HANDLE ERROR")
if self.closeOnError and self.sock:
self.close_socket()
else:
logging.Handler.handleError(self, record)
def check_close_expected(self):
# TODO: run in separate thread
if self.protocol.close_expected():
print("close expected")
t = Thread(target=self.close_socket, kwargs=dict(delay=10))
t.run()
def process_events_received(self):
# do something with the events,
# first event is handshake response
print("process events received")
events = self.protocol.events_received()
if events:
print("adding new events")
self.events.extend(events)
def close_socket(self, delay=None):
print("close socket")
if delay is not None:
print("add delay", delay)
sleep(delay)
self.protocol.send_close()
self.send_data()
self.reset_socket()
def reset_socket(self):
if self.sock is not None:
print("reset socket")
self.sock.close()
self.sock = None
self.protocol = None
def send(self, s):
if self.sock is None:
self.createSocket()
if self.sock:
try:
self.protocol.send_binary(s)
self.send_data()
except Exception as exc:
print(exc)
self.close_socket()
def close(self):
with self.lock:
if self.sock:
self.close_socket()
print("close handler")
logging.Handler.close(self)
Problem
It fulfills all my criteria. However, while puzzling all the protocol pieces together, I got really confused and now I don't know whether my implementation is good enough or needs to be improved.
🐍 Client Script
import logging
from time import sleep
log = logging.getLogger(__name__)
websocket_handler = WebsocketSansIOHandler("ws://localhost:8000")
log.addHandler(websocket_handler)
log.setLevel(logging.DEBUG)
try:
while True:
log.debug("DEBUG")
sleep(1)
log.info("INFO")
sleep(1)
log.warning("WARNING")
sleep(1)
log.error("ERROR")
sleep(1)
log.critical("CRITICAL")
sleep(1)
except KeyboardInterrupt:
# no need to explicitly close the websocket connection
exit()
(I am sorry for the wall of text ...)
What is your opinion on that?