Skip to content

Commit

Permalink
Refactor the daemon to use select on all sockets for maximum throughp…
Browse files Browse the repository at this point in the history
…ut and robustness
  • Loading branch information
kovidgoyal committed Mar 10, 2013
1 parent f3c62bc commit 070d328
Showing 1 changed file with 80 additions and 49 deletions.
129 changes: 80 additions & 49 deletions powerline-daemon
Expand Up @@ -7,15 +7,14 @@ __license__ = 'GPL v3'
__copyright__ = '2013, Kovid Goyal <kovid at kovidgoyal.net>'
__docformat__ = 'restructuredtext en'

import socket, os, argparse, errno, sys, select
import socket, os, argparse, errno, sys, select, time

from powerline.shell import ShellPowerline
from powerline.lib import parsedotval

uid = os.getuid()
# Use the abstract namespace for sockets rather than the filesystem (Available
# in linux)
address = '\0powerline-ipc-%d'%uid
address = '\0powerline-ipc-%d'%os.getuid()

parser = argparse.ArgumentParser()
parser.add_argument('ext', nargs=1)
Expand All @@ -29,10 +28,15 @@ parser.add_argument('-t', '--theme_option', metavar='THEME.KEY.KEY=VALUE', type=
parser.add_argument('-p', '--config_path', metavar='PATH')
parser.add_argument('--cwd', metavar='PATH')

EOF = b'EOF\0\0'

def render(args):
if args.cwd:
odir = os.getcwd()
os.chdir(args.cwd)
try:
os.chdir(args.cwd)
except EnvironmentError:
pass # cwd does not exist/No permissions to execute it
try:
powerline = ShellPowerline(args)
return powerline.renderer.render(width=args.width, side=args.side)
Expand All @@ -49,12 +53,17 @@ def eintr_retry_call(func, *args, **kwargs):
continue
raise

def do_read(conn):
def do_read(conn, timeout=2.0):
''' Read data from the client. If the client fails to send data within
timeout seconds, abort. '''
read = []
end_time = time.time() + timeout
while not read or not read[-1].endswith(b'\0\0'):
r, w, e = select.select((conn,), (), (conn,), 1.0)
r, w, e = select.select((conn,), (), (conn,), timeout)
if e:
return
if time.time() > end_time:
return
if not r:
continue
x = eintr_retry_call(conn.recv, 4096)
Expand All @@ -64,54 +73,78 @@ def do_read(conn):
break
return b''.join(read)

def handle_client(conn):
def do_write(conn, result):
try:
try:
req = do_read(conn)
except Exception:
return
if req:
if req == b'EOF\0\0':
eintr_retry_call(conn.sendall, result+b'\0')
except Exception:
pass

def do_render(req):
try:
args = [x for x in req.split(b'\0') if x]
args = parser.parse_args(args)
ans = render(args)
except (Exception, SystemExit) as e:
ans = str(e)
if isinstance(ans, type(u'')):
ans = ans.encode('utf-8')
return ans

def do_one(sock, read_sockets, write_sockets, result_map):
r, w, e = select.select(
tuple(read_sockets)+(sock,), tuple(write_sockets),
tuple(read_sockets)+tuple(write_sockets)+(sock,),
60.0)

if sock in e:
# We cannot accept any more connections, so we exit
raise SystemExit(1)

for s in e:
# Discard all broken connections to clients
s.close()
read_sockets.discard(s)
write_sockets.discard(s)

for s in r:
if s == sock:
# A client wants to connect
conn, _ = eintr_retry_call(sock.accept)
read_sockets.add(conn)
else:
# A client has sent some data
read_sockets.discard(s)
req = do_read(s)
if req == EOF:
raise SystemExit(0)
try:
args = [x for x in req.split(b'\0') if x]
args = parser.parse_args(args)
ans = render(args)
except (Exception, SystemExit) as e:
ans = str(e)
if isinstance(ans, type(u'')):
ans = ans.encode('utf-8')
while True:
r, w, e = select.select((), (conn,), (conn,))
if e:
return
if w:
break
try:
eintr_retry_call(conn.sendall, ans+b'\0')
except Exception:
pass
finally:
conn.close()
elif req:
ans = do_render(req)
result_map[s] = ans
write_sockets.add(s)
else:
s.close()

for s in w:
# A client is ready to receive the result
write_sockets.discard(s)
result = result_map.pop(s)
try:
do_write(s, result)
finally:
s.close()

def main_loop(sock):
sock.listen(1)
sock.setblocking(0)

read_sockets, write_sockets = set(), set()
result_map = {}
try:
while True:
try:
r, w, e = select.select((sock,), (), (sock,), 2.0)
if e:
# An exception occurred on the socket, abort
raise SystemExit(1)
if not r:
continue
conn, _ = eintr_retry_call(sock.accept)
except socket.error:
continue
handle_client(conn)
do_one(sock, read_sockets, write_sockets, result_map)
except KeyboardInterrupt:
raise SystemExit(0)
return 0

def daemonize(stdin=os.devnull, stdout=os.devnull, stderr=os.devnull):
try:
Expand Down Expand Up @@ -174,13 +207,14 @@ def main():
except socket.error:
print ('No running daemon found')
return
sock.sendall(b'EOF\0\0')
sock.sendall(EOF)
sock.close()
print ('Kill command sent to daemon, if it does not die in a couple of seconds use kill to kill it')
return
sock = check_existing()
if sock is None:
print ('The daemon is already running.')
print ('The daemon is already running. Use %s -k to kill it.'%
os.path.basename(sys.argv[0]), file=sys.stderr)
raise SystemExit(1)

if args.foreground:
Expand All @@ -191,6 +225,3 @@ def main():
if __name__ == '__main__':
main()

if __name__ == '__main__':
main()

0 comments on commit 070d328

Please sign in to comment.