Skip to content

Commit

Permalink
[demorunner] exec_and_wait ported
Browse files Browse the repository at this point in the history
  • Loading branch information
hmaciasc committed Jun 20, 2023
1 parent f3361ce commit b10b60d
Show file tree
Hide file tree
Showing 4 changed files with 70 additions and 72 deletions.
94 changes: 45 additions & 49 deletions ipol_demo/modules/core/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -501,7 +501,7 @@ def __init__(self):
os.makedirs(self.dl_extras_dir, exist_ok=True)
os.makedirs(self.demo_extras_main_dir, exist_ok=True)

base_url = os.environ["IPOL_URL"]
base_url = os.environ.get("IPOL_URL", "http://" + socket.getfqdn())
demorunners_path = cherrypy.config["demorunners_file"]
policy = cherrypy.config["dispatcher_policy"]
self.dispatcher = dispatcher.Dispatcher(
Expand Down Expand Up @@ -1527,7 +1527,7 @@ def ensure_compilation(self, dr_name, demo_id, ddl_build):
"""
Ensure that the source codes of the demo are all updated and compiled correctly
"""
build_data = {"demo_id": demo_id, "ddl_build": json.dumps(ddl_build)}
build_data = {"ddl_build": ddl_build}
ssh_response, _ = self.post(
"api/demoinfo/get_ssh_keys", "post", data={"demo_id": demo_id}
)
Expand All @@ -1536,28 +1536,20 @@ def ensure_compilation(self, dr_name, demo_id, ddl_build):
build_data["ssh_key.public"] = ssh_response["pubkey"]
build_data["ssh_key.private"] = ssh_response["privkey"]

url = f"api/demorunner/{dr_name}/ensure_compilation"
dr_response, _ = self.post(url, "post", data=build_data)
url = f"api/demorunner/{dr_name}/compilations/{demo_id}"
dr_response, dr_response_code = self.post(
url, "post", data=json.dumps(build_data)
)

# Read the JSON response from the DR
try:
demorunner_response = dr_response.json()
except Exception as ex:
dr_response_content = Core.get_response_error_or_content(dr_response)
error_message = "**An internal error has occurrred in the demo system, sorry for the inconvenience.\
The IPOL Team has been notified and will fix the issue as soon as possible**. Bad format in the response \
from DR server {} in demo #{}. {} - {}".format(
dr_name, demo_id, dr_response_content, ex
)
self.logger.exception(error_message)
raise IPOLEnsureCompilationError(error_message)
demorunner_response = dr_response.json()

# Check if the compilation was successful
if demorunner_response["status"] != "OK":
if dr_response_code != 201:
print("Compilation error in demo #{}".format(demo_id))
# Add the compilation failure info into the exception
buildlog = demorunner_response.get("buildlog", "")
demorunner_message = demorunner_response["message"]
demorunner_message = demorunner_response["detail"]
error_message = "DR={}, {} - {}".format(
dr_name, buildlog, demorunner_message
)
Expand Down Expand Up @@ -1684,7 +1676,7 @@ def execute_experiment(
dr_name,
demo_id,
key,
params,
parameters,
inputs_names,
ddl_run,
ddl_general,
Expand All @@ -1693,50 +1685,57 @@ def execute_experiment(
"""
Execute the experiment in the given DR.
"""
params = {**params}
parameters = {**parameters}
for i, input_item in inputs_names.items():
params[f"orig_input_{i}"] = input_item["origin"]
params[f"input_{i}"] = input_item["converted"]
parameters[f"orig_input_{i}"] = input_item["origin"]
parameters[f"input_{i}"] = input_item["converted"]

userdata = {
"demo_id": demo_id,
"key": key,
"params": json.dumps(params),
"ddl_run": ddl_run,
}

if "timeout" in ddl_general:
userdata["timeout"] = ddl_general["timeout"]

i = 0
inputs = {}
for root, _, files in os.walk(work_dir):
for file in files:
files = []
for root, _, wd_files in os.walk(work_dir):
for file in wd_files:
path = os.path.join(root, file)
relative_path = path.replace(work_dir, "./")
fd = open(path, "rb")
inputs[f"inputs.{i}"] = (relative_path, fd, "application/octet-stream")
i += 1
files.append(
(
"files",
(
file,
open(path, "rb"),
"application/octet-stream",
),
)
)

url = f"api/demorunner/{dr_name}/exec_and_wait"
resp, status_code = self.post(url, "post", data=userdata, files=inputs)
url = f"api/demorunner/{dr_name}/exec_and_wait/{demo_id}"
dr_response, status_code = self.post(
url,
"post",
params=userdata,
data={"parameters": json.dumps(parameters)},
files=files,
)

if status_code != 200:
demo_state = self.get_demo_metadata(demo_id)["state"].lower()
error = "IPOLDemorunnerUnresponsive"
website_message = (
f"Demorunner {dr_name} not responding (error {resp.status_code})"
f"Demorunner {dr_name} not responding (error {dr_response.status_code})"
)
raise IPOLDemoRunnerResponseError(website_message, demo_state, key, error)

zipcontent = io.BytesIO(resp.content)
zipcontent = io.BytesIO(dr_response.content)
zip = zipfile.ZipFile(zipcontent)
zip.extractall(work_dir)

try:
demorunner_response = json.load(
open(os.path.join(work_dir, "exec_info.json"))
)
dr_response = json.load(open(os.path.join(work_dir, "exec_info.json")))
except Exception as ex:
error_message = "**An internal error has occurred in the demo system, sorry for the inconvenience.\
The IPOL team has been notified and will fix the issue as soon as possible**. Bad format in the response from DR server {} in demo #{}. - {}".format(
Expand All @@ -1745,7 +1744,7 @@ def execute_experiment(
self.logger.exception(error_message)
raise IPOLExecutionError(error_message, error_message)

if demorunner_response["status"] != "OK":
if status_code != 200:
print("DR answered KO for demo #{}".format(demo_id))

try:
Expand All @@ -1754,8 +1753,8 @@ def execute_experiment(
demo_state = "<???>"

# Message for the web interface
error_msg = (demorunner_response["algo_info"]["error_message"]).strip()
error = demorunner_response.get("error", "").strip()
error_msg = (dr_response["algo_info"]["error_message"]).strip()
error = dr_response.get("error", "").strip()

# Prepare a message for the website.
website_message = "DR={}\n{}".format(dr_name, error_msg)
Expand Down Expand Up @@ -1789,9 +1788,9 @@ def execute_experiment(

algo_info_dic = self.read_algo_info(work_dir)
for name in algo_info_dic:
demorunner_response["algo_info"][name] = algo_info_dic[name]
dr_response["algo_info"][name] = algo_info_dic[name]

demorunner_response["work_url"] = (
dr_response["work_url"] = (
os.path.join(
"/api/core/",
self.shared_folder_rel,
Expand All @@ -1802,7 +1801,7 @@ def execute_experiment(
+ "/"
)

return demorunner_response
return dr_response

@cherrypy.expose
def run(self, **kwargs):
Expand Down Expand Up @@ -2110,20 +2109,17 @@ def post(api_url, method=None, **kwargs):
General purpose function to make http requests.
"""
base_url = os.environ.get("IPOL_URL", "http://" + socket.getfqdn())
headers = {"Content-Type": "application/json; charset=UTF-8"}
if method == "get":
response = requests.get(f"{base_url}/{api_url}")
return response.json(), response.status_code
elif method == "put":
response = requests.put(f"{base_url}/{api_url}", **kwargs, headers=headers)
response = requests.put(f"{base_url}/{api_url}", **kwargs)
return response.json(), response.status_code
elif method == "post":
response = requests.post(f"{base_url}/{api_url}", **kwargs)
return response, response.status_code
elif method == "delete":
response = requests.delete(
f"{base_url}/{api_url}", **kwargs, headers=headers
)
response = requests.delete(f"{base_url}/{api_url}", **kwargs)
return response, response.status_code
else:
assert False
Expand Down
6 changes: 3 additions & 3 deletions ipol_demo/modules/demorunner/Tools/build.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
import zipfile
from subprocess import Popen

from .error import IPOLCompileError, VirtualEnvError
from .error import VirtualEnvError

TIME_FMT = "%a, %d %b %Y %H:%M:%S %Z"

Expand Down Expand Up @@ -53,7 +53,7 @@ def download(url, fname, username=None, password=None):
url_info = url_handle.info()

if "last-modified" not in url_info:
raise IPOLCompileError(
raise Exception(
"Incomplete HTTP response. Missing 'last-modified' \
HTTP header. Hint: do not use GitHub, GitLab, or Dropbox as a file server."
)
Expand Down Expand Up @@ -92,7 +92,7 @@ def extract(fname, target):

# Report bad path in case of starting with "/" or containing ".."
if any([os.path.isabs(f) or ".." in f for f in content]):
raise IPOLCompileError(
raise Exception(
'Build Zip contains forbidden paths. It can not extract on/from a parent directory like "../".'
)
else:
Expand Down
4 changes: 2 additions & 2 deletions ipol_demo/modules/demorunner/Tools/run_demo_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ def set_demo_id(self, demo_id):

# -----------------------------------------------------------------------------
def get_demo_id(self):
return self.demo_id
return str(self.demo_id)

# -----------------------------------------------------------------------------
# set the algorihtm parameters as a dictionnary
Expand Down Expand Up @@ -95,7 +95,7 @@ def get_MATLAB_path(self):

# ------------------- demoextras functions ------------------------------------
def set_share_demoExtras_dirs(self, share_demoExtras_dir, demo_id):
self.main_demoExtras_Folder = os.path.join(share_demoExtras_dir, demo_id)
self.main_demoExtras_Folder = os.path.join(share_demoExtras_dir, str(demo_id))

def get_demoExtras_main_folder(self):
return self.main_demoExtras_Folder
Expand Down
38 changes: 20 additions & 18 deletions ipol_demo/modules/demorunner/demorunner.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
from string import Template
from subprocess import PIPE, Popen
from threading import Lock
from typing import Annotated
from typing import Annotated, List

import Tools.build as build
import Tools.run_demo_base as run_demo_base
Expand All @@ -47,6 +47,7 @@
from fastapi.exceptions import RequestValidationError
from fastapi.responses import JSONResponse, Response
from pydantic import BaseSettings
from starlette.responses import StreamingResponse
from Tools.run_demo_base import IPOLTimeoutError


Expand Down Expand Up @@ -425,8 +426,7 @@ def ensure_compilation(demo_id: int, build_data: dict) -> None:
message = f"INTERNAL ERROR in ensure_compilation. {ex}"
logger.exception(f"INTERNAL ERROR in ensure_compilation, demo {demo_id}")

response = {"message": message}
raise HTTPException(status_code=500, detail=response)
raise HTTPException(status_code=500, detail=message)


@private_route.post("/compilations", status_code=201)
Expand Down Expand Up @@ -454,7 +454,7 @@ def delete_compilation(demo_id: int) -> None:
"""
Remove compilation folder if exists
"""
compilation_path = os.path.join(settings.main_bin_dir, demo_id)
compilation_path = os.path.join(settings.main_bin_dir, str(demo_id))
if os.path.isdir(compilation_path):
shutil.rmtree(compilation_path)

Expand Down Expand Up @@ -539,7 +539,7 @@ def variable_substitution(ddl_run: str, demo_id: int, params: dict) -> str:
"""
Replace the variables with its values and return the command to be executed
"""
params["demoextras"] = os.path.join(settings.share_demoExtras_dir, demo_id)
params["demoextras"] = os.path.join(settings.share_demoExtras_dir, str(demo_id))
params["matlab_path"] = settings.MATLAB_path
params["bin"] = get_bin_dir(demo_id)
params["virtualenv"] = get_bin_dir(demo_id) + "venv"
Expand All @@ -550,7 +550,7 @@ def get_bin_dir(demo_id: int) -> str:
"""
Returns the directory with the peer-reviewed author programs
"""
return os.path.join(settings.main_bin_dir, demo_id, "bin/")
return os.path.join(settings.main_bin_dir, f"{demo_id}bin/")


def read_workdir_file(work_dir: str, filename: str) -> str:
Expand All @@ -565,21 +565,20 @@ def read_workdir_file(work_dir: str, filename: str) -> str:
return content


@app.post("/exec_and_wait/{demo_id}", status_code=201)
@app.post("/exec_and_wait/{demo_id}", status_code=200)
async def exec_and_wait(
response: Response,
demo_id: int,
key: str,
ddl_run: str,
timeout: int,
params: Annotated[str, Body()],
files: Annotated[list[UploadFile], File()],
) -> dict:
parameters: Annotated[str, Body()],
files: Annotated[List[UploadFile], File()],
timeout: int = settings.default_timeout,
) -> UploadFile:
"""
Run the algorithm
"""
print(ddl_run, key, timeout, params, files)
params = params
parameters = json.loads(parameters)

work_dir_handle = tempfile.TemporaryDirectory()
work_dir = work_dir_handle.name
Expand All @@ -591,10 +590,10 @@ async def exec_and_wait(
os.makedirs(dirname, exist_ok=True)
open(path, "wb").write(input.file.read())

path_with_the_binaries = os.path.join(settings.main_bin_dir, demo_id + "/")
path_with_the_binaries = os.path.join(settings.main_bin_dir, f"{demo_id}/")
res_data = {}
res_data["key"] = key
res_data["params"] = params
res_data["params"] = parameters
res_data["algo_info"] = {}
# run the algorithm
try:
Expand All @@ -611,7 +610,7 @@ async def exec_and_wait(
work_dir,
path_with_the_binaries,
ddl_run,
params,
parameters,
res_data,
timeout,
)
Expand Down Expand Up @@ -683,9 +682,12 @@ async def exec_and_wait(
zip.write(path, in_zip_path)

# send the zip as the reply to the request
response.headers["Content-Type"] = "application/zip"
fd.seek(0)
return fd.read()
return StreamingResponse(
fd,
media_type="application/zip",
headers={"Content-Disposition": "attachment;filename=uploaded_files.zip"},
)


def read_authorized_patterns() -> list:
Expand Down

0 comments on commit b10b60d

Please sign in to comment.