Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[grpc] Using gRPC multiprocessing. #8

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -102,3 +102,6 @@ venv.bak/

# mypy
.mypy_cache/

*_pb2.py
*_pb2_grpc.py
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
grpcio>=1.14.2
grpcio>=1.14.2 --no-binary grpcio
grpcio-tools>=1.14.1
50 changes: 37 additions & 13 deletions run_example_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,27 +11,44 @@

from service import registry

logging.basicConfig(level=10, format="%(asctime)s - [%(levelname)8s] - %(name)s - %(message)s")
logging.basicConfig(level=10, format="%(asctime)s - [%(levelname)8s] - "
"%(name)s - %(message)s")
log = logging.getLogger("run_example_service")


def main():
parser = argparse.ArgumentParser(description="Run services")
parser.add_argument("--no-daemon", action="store_false", dest="run_daemon", help="do not start the daemon")
parser.add_argument("--no-daemon",
action="store_false",
dest="run_daemon",
help="do not start the daemon")
parser.add_argument("--daemon-config",
dest="daemon_config",
help="Path of daemon configuration file, without config it won't be started",
required=False
)
parser.add_argument("--ssl", action="store_true", dest="run_ssl", help="start the daemon with SSL")
required=False)
parser.add_argument("--ssl",
action="store_true",
dest="run_ssl",
help="start the daemon with SSL")
parser.add_argument("--mp",
help="number of concurrent processes",
metavar="NUMBER_OF_PROCESSES",
default=1,
type=int,
required=False)
args = parser.parse_args()
root_path = pathlib.Path(__file__).absolute().parent

# All services modules go here
service_modules = ["service.example_service"]

# Call for all the services listed in service_modules
all_p = start_all_services(root_path, service_modules, args.run_daemon, args.daemon_config, args.run_ssl)
all_p = start_all_services(root_path,
service_modules,
args.run_daemon,
args.daemon_config,
args.run_ssl,
args.mp)

# Continuous checking all subprocess
try:
Expand All @@ -46,21 +63,24 @@ def main():
raise


def start_all_services(cwd, service_modules, run_daemon, daemon_config, run_ssl):
def start_all_services(cwd, service_modules,
run_daemon, daemon_config, run_ssl, mp):
"""
Loop through all service_modules and start them.
For each one, an instance of Daemon "snetd" is created.
snetd will start with configs from "snetd.config.json"
"""
all_p = []
for i, service_module in enumerate(service_modules):
for _, service_module in enumerate(service_modules):
service_name = service_module.split(".")[-1]
log.info("Launching {} on port {}".format(str(registry[service_name]), service_module))
all_p += start_service(cwd, service_module, run_daemon, daemon_config, run_ssl)
log.info("Launching {} on port {}".format(service_module,
str(registry[service_name])))
all_p += start_service(cwd, service_module,
run_daemon, daemon_config, run_ssl, mp)
return all_p


def start_service(cwd, service_module, run_daemon, daemon_config, run_ssl):
def start_service(cwd, service_module, run_daemon, daemon_config, run_ssl, mp):
"""
Starts SNET Daemon ("snetd") and the python module of the service
at the passed gRPC port.
Expand All @@ -80,13 +100,17 @@ def add_ssl_configs(conf):
if daemon_config:
all_p.append(start_snetd(str(cwd), daemon_config))
else:
for idx, config_file in enumerate(glob.glob("./snetd_configs/*.json")):
for _, config_file in enumerate(glob.glob("./snetd_configs/*.json")):
if run_ssl:
add_ssl_configs(config_file)
all_p.append(start_snetd(str(cwd), config_file))
service_name = service_module.split(".")[-1]
grpc_port = registry[service_name]["grpc"]
p = subprocess.Popen([sys.executable, "-m", service_module, "--grpc-port", str(grpc_port)], cwd=str(cwd))
p = subprocess.Popen([sys.executable,
"-m", service_module,
"--grpc-port", str(grpc_port),
"--mp", str(mp)],
cwd=str(cwd))
all_p.append(p)
return all_p

Expand Down
29 changes: 0 additions & 29 deletions service/common.py

This file was deleted.

117 changes: 96 additions & 21 deletions service/example_service.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,29 @@
import sys
from concurrent import futures
import contextlib
import datetime
import logging
import multiprocessing
import time
import socket
import sys

import argparse
import os
import grpc
import concurrent.futures as futures

import service.common
from service import registry

# Importing the generated codes from buildproto.sh
import service.service_spec.example_service_pb2_grpc as grpc_bt_grpc
from service.service_spec.example_service_pb2 import Result

logging.basicConfig(level=10, format="%(asctime)s - [%(levelname)8s] - %(name)s - %(message)s")
log = logging.getLogger("example_service")
logging.basicConfig(level=10, format="%(asctime)s - [%(levelname)8s]"
" - %(name)s - %(message)s")
_LOGGER = logging.getLogger("example_service")

_ONE_DAY = datetime.timedelta(days=1)
_PROCESS_COUNT = multiprocessing.cpu_count()
_THREAD_CONCURRENCY = _PROCESS_COUNT

"""
Simple arithmetic service to test the Snet Daemon (gRPC), dApp and/or Snet-CLI.
Expand Down Expand Up @@ -41,16 +52,16 @@
value: 924.0
"""


# Create a class to be added to the gRPC server
# derived from the protobuf codes.
class CalculatorServicer(grpc_bt_grpc.CalculatorServicer):
def __init__(self):
self.pid = os.getpid()
self.a = 0
self.b = 0
self.result = 0
# Just for debugging purpose.
log.debug("CalculatorServicer created")
_LOGGER.debug("[{}] CalculatorServicer created".format(self.pid))

# The method that will be exposed to the snet-cli call command.
# request: incoming data
Expand All @@ -64,7 +75,10 @@ def add(self, request, context):
self.result = Result()

self.result.value = self.a + self.b
log.debug("add({},{})={}".format(self.a, self.b, self.result.value))
_LOGGER.debug("[{}] add({},{})={}".format(self.pid,
self.a,
self.b,
self.result.value))
return self.result

def sub(self, request, context):
Expand All @@ -73,7 +87,10 @@ def sub(self, request, context):

self.result = Result()
self.result.value = self.a - self.b
log.debug("sub({},{})={}".format(self.a, self.b, self.result.value))
_LOGGER.debug("[{}] sub({},{})={}".format(self.pid,
self.a,
self.b,
self.result.value))
return self.result

def mul(self, request, context):
Expand All @@ -82,7 +99,10 @@ def mul(self, request, context):

self.result = Result()
self.result.value = self.a * self.b
log.debug("mul({},{})={}".format(self.a, self.b, self.result.value))
_LOGGER.debug("[{}] mul({},{})={}".format(self.pid,
self.a,
self.b,
self.result.value))
return self.result

def div(self, request, context):
Expand All @@ -91,10 +111,21 @@ def div(self, request, context):

self.result = Result()
self.result.value = self.a / self.b
log.debug("div({},{})={}".format(self.a, self.b, self.result.value))
_LOGGER.debug("[{}] div({},{})={}".format(self.pid,
self.a,
self.b,
self.result.value))
return self.result


def wait_forever(server):
try:
while True:
time.sleep(_ONE_DAY.total_seconds())
except KeyboardInterrupt:
server.stop(None)


# The gRPC serve function.
#
# Params:
Expand All @@ -103,17 +134,61 @@ def div(self, request, context):
#
# Add all your classes to the server here.
# (from generated .py files by protobuf compiler)
def serve(max_workers=10, port=7777):
server = grpc.server(futures.ThreadPoolExecutor(max_workers=max_workers))
def run_server(grpc_port=7777):
options = (('grpc.so_reuseport', 1),)
server = grpc.server(
futures.ThreadPoolExecutor(max_workers=_THREAD_CONCURRENCY,),
options=options)
grpc_bt_grpc.add_CalculatorServicer_to_server(CalculatorServicer(), server)
server.add_insecure_port("[::]:{}".format(port))
return server
server.add_insecure_port("[::]:{}".format(grpc_port))
server.start()
wait_forever(server)


@contextlib.contextmanager
def reserve_port(grpc_port=7777):
"""Find and reserve a port for all subprocesses to use."""
sock = socket.socket(socket.AF_INET6, socket.SOCK_STREAM)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
if sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT) == 0:
raise RuntimeError("Failed to set SO_REUSEPORT.")
sock.bind(('', grpc_port))
try:
yield sock.getsockname()[1]
finally:
sock.close()

def main():
""" Runs the gRPC server to communicate with the SNET Daemon. """
parser = argparse.ArgumentParser(prog=__file__)
service_name = os.path.splitext(os.path.basename(__file__))[0]
parser.add_argument("--grpc-port",
help="port to bind gRPC service to",
default=registry[service_name]['grpc'],
type=int,
required=False)
parser.add_argument("--mp",
help="number of concurrent processes",
metavar="NUMBER_OF_PROCESSES",
default=1,
type=int,
required=False)
args = parser.parse_args()

num_processes = _PROCESS_COUNT if args.mp > _PROCESS_COUNT else args.mp
with reserve_port(args.grpc_port) as port:
sys.stdout.flush()
workers = []
for _ in range(num_processes):
# NOTE: It is imperative that the worker subprocesses be forked before
# any gRPC servers start up. See
# https://github.com/grpc/grpc/issues/16001 for more details.
worker = multiprocessing.Process(target=run_server, args=(port,))
worker.start()
workers.append(worker)
for worker in workers:
worker.join()


if __name__ == "__main__":
"""
Runs the gRPC server to communicate with the Snet Daemon.
"""
parser = service.common.common_parser(__file__)
args = parser.parse_args(sys.argv[1:])
service.common.main_loop(serve, args)
main()
6 changes: 4 additions & 2 deletions test_example_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,11 @@
test_flag = True

# Example Service - Arithmetic
endpoint = input("Endpoint (localhost:{}): ".format(registry["example_service"]["grpc"])) if not test_flag else ""
endpoint = input("Endpoint (localhost:{}): ".format(
registry["example_service"]["grpc"])) if not test_flag else ""
if endpoint == "":
endpoint = "localhost:{}".format(registry["example_service"]["grpc"])
endpoint = "localhost:{}".format(
registry["example_service"]["grpc"])

grpc_method = input("Method (add|sub|mul|div): ") if not test_flag else "mul"
a = float(input("Number 1: ") if not test_flag else "12")
Expand Down