From 7b224969794114af1cb845d27fe8db80342f6907 Mon Sep 17 00:00:00 2001 From: arturgontijo Date: Tue, 13 Aug 2019 16:05:18 -0300 Subject: [PATCH 1/2] [grpc] Using gRPC multiprocessing. --- .gitignore | 3 ++ requirements.txt | 2 +- run_example_service.py | 38 ++++++++++----- service/common.py | 29 ------------ service/example_service.py | 97 +++++++++++++++++++++++++++++--------- test_example_service.py | 6 ++- 6 files changed, 111 insertions(+), 64 deletions(-) delete mode 100644 service/common.py diff --git a/.gitignore b/.gitignore index 894a44c..1630138 100644 --- a/.gitignore +++ b/.gitignore @@ -102,3 +102,6 @@ venv.bak/ # mypy .mypy_cache/ + +*_pb2.py +*_pb2_grpc.py diff --git a/requirements.txt b/requirements.txt index 915391b..03aa603 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,2 +1,2 @@ -grpcio>=1.14.2 +grpcio>=1.14.2 --no-binary grpcio grpcio-tools>=1.14.1 \ No newline at end of file diff --git a/run_example_service.py b/run_example_service.py index 77a18e2..3202e4e 100644 --- a/run_example_service.py +++ b/run_example_service.py @@ -11,19 +11,25 @@ from service import registry -logging.basicConfig(level=10, format="%(asctime)s - [%(levelname)8s] - %(name)s - %(message)s") +logging.basicConfig(level=10, format="%(asctime)s - [%(levelname)8s] - " + "%(name)s - %(message)s") log = logging.getLogger("run_example_service") def main(): parser = argparse.ArgumentParser(description="Run services") - parser.add_argument("--no-daemon", action="store_false", dest="run_daemon", help="do not start the daemon") + parser.add_argument("--no-daemon", + action="store_false", + dest="run_daemon", + help="do not start the daemon") parser.add_argument("--daemon-config", dest="daemon_config", help="Path of daemon configuration file, without config it won't be started", - required=False - ) - parser.add_argument("--ssl", action="store_true", dest="run_ssl", help="start the daemon with SSL") + required=False) + parser.add_argument("--ssl", + action="store_true", + dest="run_ssl", + help="start the daemon with SSL") args = parser.parse_args() root_path = pathlib.Path(__file__).absolute().parent @@ -31,7 +37,11 @@ def main(): service_modules = ["service.example_service"] # Call for all the services listed in service_modules - all_p = start_all_services(root_path, service_modules, args.run_daemon, args.daemon_config, args.run_ssl) + all_p = start_all_services(root_path, + service_modules, + args.run_daemon, + args.daemon_config, + args.run_ssl) # Continuous checking all subprocess try: @@ -53,10 +63,12 @@ def start_all_services(cwd, service_modules, run_daemon, daemon_config, run_ssl) snetd will start with configs from "snetd.config.json" """ all_p = [] - for i, service_module in enumerate(service_modules): + for _, service_module in enumerate(service_modules): service_name = service_module.split(".")[-1] - log.info("Launching {} on port {}".format(str(registry[service_name]), service_module)) - all_p += start_service(cwd, service_module, run_daemon, daemon_config, run_ssl) + log.info("Launching {} on port {}".format(str(registry[service_name]), + service_module)) + all_p += start_service(cwd, service_module, + run_daemon, daemon_config, run_ssl) return all_p @@ -80,13 +92,17 @@ def add_ssl_configs(conf): if daemon_config: all_p.append(start_snetd(str(cwd), daemon_config)) else: - for idx, config_file in enumerate(glob.glob("./snetd_configs/*.json")): + for _, config_file in enumerate(glob.glob("./snetd_configs/*.json")): if run_ssl: add_ssl_configs(config_file) all_p.append(start_snetd(str(cwd), config_file)) service_name = service_module.split(".")[-1] grpc_port = registry[service_name]["grpc"] - p = subprocess.Popen([sys.executable, "-m", service_module, "--grpc-port", str(grpc_port)], cwd=str(cwd)) + p = subprocess.Popen([ + sys.executable, + "-m", service_module, + "--grpc-port", str(grpc_port)], + cwd=str(cwd)) all_p.append(p) return all_p diff --git a/service/common.py b/service/common.py deleted file mode 100644 index 21aa4e5..0000000 --- a/service/common.py +++ /dev/null @@ -1,29 +0,0 @@ -import argparse -import os.path -import time - -from service import registry - - -def common_parser(script_name): - parser = argparse.ArgumentParser(prog=script_name) - service_name = os.path.splitext(os.path.basename(script_name))[0] - parser.add_argument("--grpc-port", - help="port to bind gRPC service to", - default=registry[service_name]['grpc'], - type=int, - required=False) - return parser - - -# From gRPC docs: -# Because start() does not block you may need to sleep-loop if there is nothing -# else for your code to do while serving. -def main_loop(grpc_handler, args): - server = grpc_handler(port=args.grpc_port) - server.start() - try: - while True: - time.sleep(1) - except KeyboardInterrupt: - server.stop(0) diff --git a/service/example_service.py b/service/example_service.py index 97dd523..9bce775 100644 --- a/service/example_service.py +++ b/service/example_service.py @@ -1,18 +1,29 @@ -import sys +from concurrent import futures +import contextlib +import datetime import logging +import multiprocessing +import time +import socket +import sys +import argparse +import os.path import grpc -import concurrent.futures as futures -import service.common +from service import registry # Importing the generated codes from buildproto.sh import service.service_spec.example_service_pb2_grpc as grpc_bt_grpc from service.service_spec.example_service_pb2 import Result -logging.basicConfig(level=10, format="%(asctime)s - [%(levelname)8s] - %(name)s - %(message)s") -log = logging.getLogger("example_service") +logging.basicConfig(level=10, format="%(asctime)s - [%(levelname)8s]" + " - %(name)s - %(message)s") +_LOGGER = logging.getLogger("example_service") +_ONE_DAY = datetime.timedelta(days=1) +_PROCESS_COUNT = multiprocessing.cpu_count() +_THREAD_CONCURRENCY = _PROCESS_COUNT """ Simple arithmetic service to test the Snet Daemon (gRPC), dApp and/or Snet-CLI. @@ -41,7 +52,6 @@ value: 924.0 """ - # Create a class to be added to the gRPC server # derived from the protobuf codes. class CalculatorServicer(grpc_bt_grpc.CalculatorServicer): @@ -50,7 +60,7 @@ def __init__(self): self.b = 0 self.result = 0 # Just for debugging purpose. - log.debug("CalculatorServicer created") + _LOGGER.debug("CalculatorServicer created") # The method that will be exposed to the snet-cli call command. # request: incoming data @@ -64,7 +74,7 @@ def add(self, request, context): self.result = Result() self.result.value = self.a + self.b - log.debug("add({},{})={}".format(self.a, self.b, self.result.value)) + _LOGGER.debug("add({},{})={}".format(self.a, self.b, self.result.value)) return self.result def sub(self, request, context): @@ -73,7 +83,7 @@ def sub(self, request, context): self.result = Result() self.result.value = self.a - self.b - log.debug("sub({},{})={}".format(self.a, self.b, self.result.value)) + _LOGGER.debug("sub({},{})={}".format(self.a, self.b, self.result.value)) return self.result def mul(self, request, context): @@ -82,7 +92,7 @@ def mul(self, request, context): self.result = Result() self.result.value = self.a * self.b - log.debug("mul({},{})={}".format(self.a, self.b, self.result.value)) + _LOGGER.debug("mul({},{})={}".format(self.a, self.b, self.result.value)) return self.result def div(self, request, context): @@ -91,10 +101,18 @@ def div(self, request, context): self.result = Result() self.result.value = self.a / self.b - log.debug("div({},{})={}".format(self.a, self.b, self.result.value)) + _LOGGER.debug("div({},{})={}".format(self.a, self.b, self.result.value)) return self.result +def wait_forever(server): + try: + while True: + time.sleep(_ONE_DAY.total_seconds()) + except KeyboardInterrupt: + server.stop(None) + + # The gRPC serve function. # # Params: @@ -103,17 +121,54 @@ def div(self, request, context): # # Add all your classes to the server here. # (from generated .py files by protobuf compiler) -def serve(max_workers=10, port=7777): - server = grpc.server(futures.ThreadPoolExecutor(max_workers=max_workers)) +def run_server(grpc_port=7777): + options = (('grpc.so_reuseport', 1),) + server = grpc.server( + futures.ThreadPoolExecutor(max_workers=_THREAD_CONCURRENCY,), + options=options) grpc_bt_grpc.add_CalculatorServicer_to_server(CalculatorServicer(), server) - server.add_insecure_port("[::]:{}".format(port)) - return server + server.add_insecure_port("[::]:{}".format(grpc_port)) + server.start() + wait_forever(server) + + +@contextlib.contextmanager +def reserve_port(grpc_port=7777): + """Find and reserve a port for all subprocesses to use.""" + sock = socket.socket(socket.AF_INET6, socket.SOCK_STREAM) + sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1) + if sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT) == 0: + raise RuntimeError("Failed to set SO_REUSEPORT.") + sock.bind(('', grpc_port)) + try: + yield sock.getsockname()[1] + finally: + sock.close() + +def main(): + """ Runs the gRPC server to communicate with the SNET Daemon. """ + parser = argparse.ArgumentParser(prog=__file__) + service_name = os.path.splitext(os.path.basename(__file__))[0] + parser.add_argument("--grpc-port", + help="port to bind gRPC service to", + default=registry[service_name]['grpc'], + type=int, + required=False) + args = parser.parse_args(sys.argv[1:]) + with reserve_port(args.grpc_port) as port: + _LOGGER.debug("Binding to port '%s'", port) + sys.stdout.flush() + workers = [] + for _ in range(_PROCESS_COUNT): + # NOTE: It is imperative that the worker subprocesses be forked before + # any gRPC servers start up. See + # https://github.com/grpc/grpc/issues/16001 for more details. + worker = multiprocessing.Process(target=run_server, args=(port,)) + worker.start() + workers.append(worker) + for worker in workers: + worker.join() if __name__ == "__main__": - """ - Runs the gRPC server to communicate with the Snet Daemon. - """ - parser = service.common.common_parser(__file__) - args = parser.parse_args(sys.argv[1:]) - service.common.main_loop(serve, args) + main() diff --git a/test_example_service.py b/test_example_service.py index bdda5ae..dc2bb5d 100644 --- a/test_example_service.py +++ b/test_example_service.py @@ -16,9 +16,11 @@ test_flag = True # Example Service - Arithmetic - endpoint = input("Endpoint (localhost:{}): ".format(registry["example_service"]["grpc"])) if not test_flag else "" + endpoint = input("Endpoint (localhost:{}): ".format( + registry["example_service"]["grpc"])) if not test_flag else "" if endpoint == "": - endpoint = "localhost:{}".format(registry["example_service"]["grpc"]) + endpoint = "localhost:{}".format( + registry["example_service"]["grpc"]) grpc_method = input("Method (add|sub|mul|div): ") if not test_flag else "mul" a = float(input("Number 1: ") if not test_flag else "12") From 5bcfdde2f4fb8b1a7d7d4d9e21eddf3d5309062a Mon Sep 17 00:00:00 2001 From: arturgontijo Date: Wed, 14 Aug 2019 08:06:22 -0300 Subject: [PATCH 2/2] [grpc] Number of processes through arguments. --- run_example_service.py | 30 +++++++++++++++++++----------- service/example_service.py | 38 +++++++++++++++++++++++++++++--------- 2 files changed, 48 insertions(+), 20 deletions(-) diff --git a/run_example_service.py b/run_example_service.py index 3202e4e..92f0424 100644 --- a/run_example_service.py +++ b/run_example_service.py @@ -30,6 +30,12 @@ def main(): action="store_true", dest="run_ssl", help="start the daemon with SSL") + parser.add_argument("--mp", + help="number of concurrent processes", + metavar="NUMBER_OF_PROCESSES", + default=1, + type=int, + required=False) args = parser.parse_args() root_path = pathlib.Path(__file__).absolute().parent @@ -41,7 +47,8 @@ def main(): service_modules, args.run_daemon, args.daemon_config, - args.run_ssl) + args.run_ssl, + args.mp) # Continuous checking all subprocess try: @@ -56,7 +63,8 @@ def main(): raise -def start_all_services(cwd, service_modules, run_daemon, daemon_config, run_ssl): +def start_all_services(cwd, service_modules, + run_daemon, daemon_config, run_ssl, mp): """ Loop through all service_modules and start them. For each one, an instance of Daemon "snetd" is created. @@ -65,14 +73,14 @@ def start_all_services(cwd, service_modules, run_daemon, daemon_config, run_ssl) all_p = [] for _, service_module in enumerate(service_modules): service_name = service_module.split(".")[-1] - log.info("Launching {} on port {}".format(str(registry[service_name]), - service_module)) + log.info("Launching {} on port {}".format(service_module, + str(registry[service_name]))) all_p += start_service(cwd, service_module, - run_daemon, daemon_config, run_ssl) + run_daemon, daemon_config, run_ssl, mp) return all_p -def start_service(cwd, service_module, run_daemon, daemon_config, run_ssl): +def start_service(cwd, service_module, run_daemon, daemon_config, run_ssl, mp): """ Starts SNET Daemon ("snetd") and the python module of the service at the passed gRPC port. @@ -98,11 +106,11 @@ def add_ssl_configs(conf): all_p.append(start_snetd(str(cwd), config_file)) service_name = service_module.split(".")[-1] grpc_port = registry[service_name]["grpc"] - p = subprocess.Popen([ - sys.executable, - "-m", service_module, - "--grpc-port", str(grpc_port)], - cwd=str(cwd)) + p = subprocess.Popen([sys.executable, + "-m", service_module, + "--grpc-port", str(grpc_port), + "--mp", str(mp)], + cwd=str(cwd)) all_p.append(p) return all_p diff --git a/service/example_service.py b/service/example_service.py index 9bce775..aed133c 100644 --- a/service/example_service.py +++ b/service/example_service.py @@ -8,7 +8,7 @@ import sys import argparse -import os.path +import os import grpc from service import registry @@ -56,11 +56,12 @@ # derived from the protobuf codes. class CalculatorServicer(grpc_bt_grpc.CalculatorServicer): def __init__(self): + self.pid = os.getpid() self.a = 0 self.b = 0 self.result = 0 # Just for debugging purpose. - _LOGGER.debug("CalculatorServicer created") + _LOGGER.debug("[{}] CalculatorServicer created".format(self.pid)) # The method that will be exposed to the snet-cli call command. # request: incoming data @@ -74,7 +75,10 @@ def add(self, request, context): self.result = Result() self.result.value = self.a + self.b - _LOGGER.debug("add({},{})={}".format(self.a, self.b, self.result.value)) + _LOGGER.debug("[{}] add({},{})={}".format(self.pid, + self.a, + self.b, + self.result.value)) return self.result def sub(self, request, context): @@ -83,7 +87,10 @@ def sub(self, request, context): self.result = Result() self.result.value = self.a - self.b - _LOGGER.debug("sub({},{})={}".format(self.a, self.b, self.result.value)) + _LOGGER.debug("[{}] sub({},{})={}".format(self.pid, + self.a, + self.b, + self.result.value)) return self.result def mul(self, request, context): @@ -92,7 +99,10 @@ def mul(self, request, context): self.result = Result() self.result.value = self.a * self.b - _LOGGER.debug("mul({},{})={}".format(self.a, self.b, self.result.value)) + _LOGGER.debug("[{}] mul({},{})={}".format(self.pid, + self.a, + self.b, + self.result.value)) return self.result def div(self, request, context): @@ -101,7 +111,10 @@ def div(self, request, context): self.result = Result() self.result.value = self.a / self.b - _LOGGER.debug("div({},{})={}".format(self.a, self.b, self.result.value)) + _LOGGER.debug("[{}] div({},{})={}".format(self.pid, + self.a, + self.b, + self.result.value)) return self.result @@ -154,12 +167,19 @@ def main(): default=registry[service_name]['grpc'], type=int, required=False) - args = parser.parse_args(sys.argv[1:]) + parser.add_argument("--mp", + help="number of concurrent processes", + metavar="NUMBER_OF_PROCESSES", + default=1, + type=int, + required=False) + args = parser.parse_args() + + num_processes = _PROCESS_COUNT if args.mp > _PROCESS_COUNT else args.mp with reserve_port(args.grpc_port) as port: - _LOGGER.debug("Binding to port '%s'", port) sys.stdout.flush() workers = [] - for _ in range(_PROCESS_COUNT): + for _ in range(num_processes): # NOTE: It is imperative that the worker subprocesses be forked before # any gRPC servers start up. See # https://github.com/grpc/grpc/issues/16001 for more details.