In [None]:
#|hide
#|default_exp core

# core

> Fast scripts using daemon mode

In [None]:
#|export
from concurrent.futures import ProcessPoolExecutor
from contextlib import redirect_stdout
from io import StringIO
from multiprocessing import get_context
from socketserver import TCPServer, StreamRequestHandler
import time

from fastcore.meta import *
from fastcore.script import *
from fastcore.utils import *

from nbprocess.clean import nbprocess_clean

In [None]:
#|hide
from nbprocess.showdoc import *

In [None]:
from fastcore.net import *

In [None]:
#|export
def _handle(func, data):
    argv = data.decode().strip()
    sys.argv = [func.__name__] + (argv.split(' ') if argv else [])
    print('sys.argv:', sys.argv)
    with redirect_stdout(StringIO()) as s: func()
    return s.getvalue().encode()

In [None]:
#|export
class DaemonHandler(StreamRequestHandler):
    def handle(self):
        data = self.rfile.readline().strip()
        print("{} wrote:".format(self.client_address[0]))
        print('data:', data)
        future = self.server.pool.submit(_handle, self.server.func, data)
        result = future.result()
        print('result:', result)
        self.wfile.write(result)

In [None]:
#|export
class DaemonServer(TCPServer):
    @delegates(TCPServer)
    def __init__(self, server_address, RequestHandlerClass, func, timeout=None, **kwargs):
        self.func,self.timeout = func,timeout
        super().__init__(server_address, RequestHandlerClass)
        
    def server_activate(self):
        self.pool = ProcessPoolExecutor(mp_context=get_context('fork')) # TODO: make ctx configurable?
        super().server_activate()
        
    def server_close(self):
        if hasattr(self,'pool'): self.pool.shutdown()
        super().server_close()
        
    def handle_timeout(self):
        print('timed out')
        return True

In [None]:
def send_recv(addr, s='', encoding='utf-8'):
    host,port = addr
    with start_client(port, host) as client:
        client.sendall((s+'\n').encode(encoding))
        return client.recv(1024).decode(encoding)

In [None]:
def _func(): return 'hello world!'

In [None]:
addr = 'localhost',9999
@threaded
def _f():
    with DaemonServer(addr, PoolHandler, _func) as srv: srv.handle_request()
_f()
time.sleep(0.2) # wait for server to start

send_recv(addr)

127.0.0.1 wrote:
data: b''
sys.argv: ['_func']
result: b''


''

In [None]:
#|export
if __name__ == '__main__':
    with DaemonServer(('localhost',9999), DaemonHandler, nbprocess_clean, 3) as srv:
        while not srv.handle_request(): pass

OSError: [Errno 48] Address already in use

In [None]:
#|export
@call_parse#(nested=True)
def fastdaemon():
    "Fast scripts using daemon mode"
    print(f'Called fastdaemon, with: {locals()}')
    with ProcessPoolExecutor() as pool:
        future = pool.submit(add1, 0)
        result = future.result()
        print(pool._processes)
    return result

## Export -

In [None]:
#|hide
#|eval: false
from nbprocess.doclinks import nbprocess_export
nbprocess_export()