Skip to content

Commit

Permalink
Implement non-blocking pdns remote backend unix socket listener
Browse files Browse the repository at this point in the history
Use select() instead of socket.makefiles()
  • Loading branch information
cvaroqui committed May 26, 2023
1 parent c758e29 commit 59fd7dd
Showing 1 changed file with 40 additions and 42 deletions.
82 changes: 40 additions & 42 deletions opensvc/daemon/dns.py
Expand Up @@ -10,6 +10,7 @@
import shutil
import json
import re
import select
import time

import foreign.six as six
Expand All @@ -18,7 +19,7 @@
from utilities.net.ipaddress import ip_address
from utilities.storage import Storage
from utilities.naming import split_path
from utilities.string import bdecode
from utilities.string import bdecode, bencode
from utilities.lazy import lazy

PTR_SUFFIX = ".in-addr.arpa."
Expand Down Expand Up @@ -138,44 +139,54 @@ def do(self):
conn.close()

def handle_client(self, conn):
# todo: change implementation to avoid socket.makefile with non blocking mode
conn.settimeout(self.sock_tmo)
cr = conn.makefile("r", **MAKEFILE_KWARGS)
cw = conn.makefile("w", **MAKEFILE_KWARGS)
conn.setblocking(False)
try:
self._handle_client(conn, cr, cw)
self._handle_client(conn)
except Exception as exc:
self.log.exception(exc)
finally:
try:
cr.close()
except socket.error:
pass
try:
cw.close()
except socket.error:
pass
try:
conn.close()
except socket.error:
pass
sys.exit(0)

def _handle_client(self, conn, cr, cw):
def recv(self, conn):
if six.PY3:
sep = b"\n"
else:
sep = "\n"
chunks = []
buff_size = 4096
while True:
try:
data = cr.readline()
except socket.timeout as exc:
ready = select.select([conn], [], [conn], 1)
if ready[0]:
chunk = conn.recv(1024)
else:
break
except socket.error as exc:
self.log.info("%s", exc)
if ready[2]:
break
if len(data) == 0:
#self.log.info("no more data")
if not chunk:
break
chunks.append(chunk)
if chunk.endswith(sep):
break
if six.PY3:
data = b"".join(chunks)
else:
data = "".join(chunks)
return data

def _handle_client(self, conn):
while True:
if self.stopped():
self.log.info("stop event received (handler thread)")
break

data = self.recv(conn)

if not data:
continue

self.log.debug("received %s", data)

try:
Expand All @@ -185,31 +196,20 @@ def _handle_client(self, conn, cr, cw):
self.log.error(exc)
data = None

if self.stopped():
self.log.info("stop event received (handler thread)")
break

if data is None or not isinstance(data, dict):
continue

try:
result = self.router(data)
except Exception as exc:
self.log.error("dns request: %s => handler error: %s", data, exc)
return {"error": "unexpected backend error", "result": False}
result = {"error": "unexpected backend error", "result": False}
data = ""
if result is not None:
message = json.dumps(result) + "\n"
try:
cw.write(message)
cw.flush()
except socket.error as exc:
if exc.errno != errno.EPIPE:
raise
self.log.info("client died (broken pipe)")
break
self.log.debug("replied %s", message)
message_len = len(message)
self.stats.sessions.tx += message_len
message = bencode(json.dumps(result) + "\n")
self.log.debug("reply %s", message)
conn.sendall(message)
self.stats.sessions.tx += len(message)

#########################################################################
#
Expand All @@ -230,8 +230,6 @@ def router(self, data):
if not hasattr(self, fname):
return {"error": "action not supported", "result": False}
result = getattr(self, fname)(data.get("parameters", {}))
if result == []:
return False
return {"result": result}

def action_initialize(self, parameters):
Expand Down

0 comments on commit 59fd7dd

Please sign in to comment.