Skip to content
This repository has been archived by the owner on Dec 8, 2020. It is now read-only.

Commit

Permalink
Function timeout support for python runtime
Browse files Browse the repository at this point in the history
Added support for function execution timeout in python runtime.

Change-Id: I85fd0096e4129a33c63f886f068f39b2bac3f6ea
Task: 24280
Story: 2002174
  • Loading branch information
heychirag authored and lingxiankong committed Sep 7, 2018
1 parent e99d027 commit 05c7461
Show file tree
Hide file tree
Showing 8 changed files with 77 additions and 12 deletions.
2 changes: 1 addition & 1 deletion devstack/settings
Expand Up @@ -22,7 +22,7 @@ QINLING_CONF_FILE=${QINLING_CONF_DIR}/qinling.conf
QINLING_POLICY_FILE=${QINLING_CONF_DIR}/policy.json
QINLING_AUTH_CACHE_DIR=${QINLING_AUTH_CACHE_DIR:-/var/cache/qinling}
QINLING_FUNCTION_STORAGE_DIR=${QINLING_FUNCTION_STORAGE_DIR:-/opt/qinling/function/packages}
QINLING_PYTHON_RUNTIME_IMAGE=${QINLING_PYTHON_RUNTIME_IMAGE:-openstackqinling/python3-runtime:0.0.1}
QINLING_PYTHON_RUNTIME_IMAGE=${QINLING_PYTHON_RUNTIME_IMAGE:-openstackqinling/python3-runtime:0.0.2}
QINLING_NODEJS_RUNTIME_IMAGE=${QINLING_NODEJS_RUNTIME_IMAGE:-openstackqinling/nodejs-runtime:0.0.1}
QINLING_SIDECAR_IMAGE=${QINLING_SIDECAR_IMAGE:-openstackqinling/sidecar:0.0.2}

Expand Down
2 changes: 1 addition & 1 deletion qinling_tempest_plugin/config.py
Expand Up @@ -40,7 +40,7 @@
'publicURL', 'adminURL', 'internalURL'],
help="The endpoint type to use for the qinling service."),
cfg.StrOpt("python_runtime_image",
default="openstackqinling/python3-runtime:0.0.1",
default="openstackqinling/python3-runtime:0.0.2",
help="The Python runtime being used in the tests."),
cfg.StrOpt("nodejs_runtime_image",
default="openstackqinling/nodejs-runtime:0.0.1",
Expand Down
Expand Up @@ -14,5 +14,5 @@
import time


def main(seconds=5, **kwargs):
def main(seconds=4, **kwargs):
time.sleep(seconds)
30 changes: 27 additions & 3 deletions qinling_tempest_plugin/tests/api/test_executions.py
Expand Up @@ -339,7 +339,7 @@ def test_python_execution_process_number(self):
)

@decorators.idempotent_id('2b5f0787-b82d-4fc4-af76-cf86d389a76b')
def test_python_execution_memory_limit_non_image(self):
def test_python_execution_memory_limit(self):
"""In this case, the following steps are taken:
1. Create a function that requires ~80M memory to run.
Expand Down Expand Up @@ -390,7 +390,7 @@ def test_python_execution_memory_limit_non_image(self):
self.assertEqual(4, output)

@decorators.idempotent_id('ed714f98-29fe-4e8d-b6ee-9730f92bddea')
def test_python_execution_cpu_limit_non_image(self):
def test_python_execution_cpu_limit(self):
"""In this case, the following steps are taken:
1. Create a function that takes some time to finish (calculating the
Expand All @@ -408,7 +408,7 @@ def test_python_execution_cpu_limit_non_image(self):
package = self.create_package(
name='python/test_python_cpu_limit.py'
)
function_id = self.create_function(package_path=package)
function_id = self.create_function(package_path=package, timeout=180)

# Invoke function
resp, body = self.client.create_execution(function_id)
Expand Down Expand Up @@ -486,3 +486,27 @@ def test_python_execution_public_connection(self):
self.assertEqual('success', body['status'])
result = json.loads(body['result'])
self.assertEqual(page_sha256, result['output'])

@decorators.idempotent_id('b05e3bac-b23f-11e8-9679-00224d6b7bc1')
def test_python_execution_timeout(self):
package = self.create_package(
name='python/test_python_sleep.py'
)
function_id = self.create_function(package_path=package)

resp, body = self.client.create_execution(
function_id,
input='{"seconds": 7}'
)

self.assertEqual(201, resp.status)
self.addCleanup(self.client.delete_resource, 'executions',
body['id'], ignore_notfound=True)
self.assertEqual('failed', body['status'])

result = jsonutils.loads(body['result'])

self.assertGreater(result['duration'], 5)
self.assertIn(
'Function execution timeout', result['output']
)
1 change: 1 addition & 0 deletions runtimes/python2/requirements.txt
Expand Up @@ -9,3 +9,4 @@ python-mistralclient>=3.1.0 # Apache-2.0
keystoneauth1>=2.21.0 # Apache-2.0
openstacksdk>=0.9.19
oslo.concurrency>=3.25.0 # Apache-2.0
psutil>=5.4.7 # BSD
28 changes: 24 additions & 4 deletions runtimes/python2/server.py
Expand Up @@ -27,13 +27,15 @@
from flask import Response
from keystoneauth1.identity import generic
from keystoneauth1 import session
import psutil
import requests

app = Flask(__name__)

DOWNLOAD_ERROR = "Failed to download function package from %s, error: %s"
INVOKE_ERROR = "Function execution failed because of too much resource " \
"consumption"
TIMEOUT_ERROR = "Function execution timeout."


def _print_trace():
Expand Down Expand Up @@ -73,6 +75,17 @@ def _get_responce(output, duration, logs, success, code):
)


def _killtree(pid, including_parent=True):
parent = psutil.Process(pid)
for child in parent.children(recursive=True):
print("kill child %s" % child)
child.kill()

if including_parent:
print("kill parent %s" % parent)
parent.kill()


def _invoke_function(execution_id, zip_file_dir, module_name, method, arg,
input, return_dict, rlimit):
"""Thie function is supposed to be running in a child process.
Expand Down Expand Up @@ -146,6 +159,7 @@ def execute():
auth_url = params.get('auth_url')
username = params.get('username')
password = params.get('password')
timeout = params.get('timeout')
zip_file_dir = '/var/qinling/packages/%s' % function_id
rlimit = {
'cpu': params['cpu'],
Expand Down Expand Up @@ -206,25 +220,31 @@ def execute():
return_dict['success'] = False
start = time.time()

# Run the function in a separate process to avoid messing up the log
# Run the function in a separate process to avoid messing up the log. If
# the timeout is reached, kill all the subprocesses.
p = Process(
target=_invoke_function,
args=(execution_id, zip_file_dir, function_module, function_method,
input.pop('__function_input', None), input, return_dict, rlimit)
)

timed_out = False
p.start()
p.join()
p.join(timeout)
if p.is_alive():
_killtree(p.pid)
timed_out = True

####################################################################
#
# Get execution output(log, duration, etc.)
# Get execution result(log, duration, etc.)
#
####################################################################
duration = round(time.time() - start, 3)

# Process was killed unexpectedly or finished with error.
if p.exitcode != 0:
output = INVOKE_ERROR
output = TIMEOUT_ERROR if timed_out else INVOKE_ERROR
success = False
else:
output = return_dict.get('result')
Expand Down
1 change: 1 addition & 0 deletions runtimes/python3/requirements.txt
Expand Up @@ -9,3 +9,4 @@ python-mistralclient>=3.1.0 # Apache-2.0
keystoneauth1>=2.21.0 # Apache-2.0
openstacksdk>=0.9.19
oslo.concurrency>=3.25.0 # Apache-2.0
psutil>=5.4.7 # BSD
23 changes: 21 additions & 2 deletions runtimes/python3/server.py
Expand Up @@ -27,13 +27,15 @@
from flask import Response
from keystoneauth1.identity import generic
from keystoneauth1 import session
import psutil
import requests

app = Flask(__name__)

DOWNLOAD_ERROR = "Failed to download function package from %s, error: %s"
INVOKE_ERROR = "Function execution failed because of too much resource " \
"consumption"
TIMEOUT_ERROR = "Function execution timeout."


def _print_trace():
Expand Down Expand Up @@ -73,6 +75,17 @@ def _get_responce(output, duration, logs, success, code):
)


def _killtree(pid, including_parent=True):
parent = psutil.Process(pid)
for child in parent.children(recursive=True):
print("kill child %s" % child)
child.kill()

if including_parent:
print("kill parent %s" % parent)
parent.kill()


def _invoke_function(execution_id, zip_file_dir, module_name, method, arg,
input, return_dict, rlimit):
"""Thie function is supposed to be running in a child process.
Expand Down Expand Up @@ -146,6 +159,7 @@ def execute():
auth_url = params.get('auth_url')
username = params.get('username')
password = params.get('password')
timeout = params.get('timeout')
zip_file_dir = '/var/qinling/packages/%s' % function_id
rlimit = {
'cpu': params['cpu'],
Expand Down Expand Up @@ -212,8 +226,13 @@ def execute():
args=(execution_id, zip_file_dir, function_module, function_method,
input.pop('__function_input', None), input, return_dict, rlimit)
)

timed_out = False
p.start()
p.join()
p.join(timeout)
if p.is_alive():
_killtree(p.pid)
timed_out = True

####################################################################
#
Expand All @@ -224,7 +243,7 @@ def execute():

# Process was killed unexpectedly or finished with error.
if p.exitcode != 0:
output = INVOKE_ERROR
output = TIMEOUT_ERROR if timed_out else INVOKE_ERROR
success = False
else:
output = return_dict.get('result')
Expand Down

0 comments on commit 05c7461

Please sign in to comment.