Skip to content

Commit

Permalink
Merge pull request #366 from vantage6/feature/print-task-input-in-nod…
Browse files Browse the repository at this point in the history
…e-logs

Add algorithm input to node logs if it is serialized as pickle
  • Loading branch information
bartvanb committed Nov 16, 2022
2 parents 3185ce8 + d8226d0 commit e52f223
Showing 1 changed file with 21 additions and 11 deletions.
32 changes: 21 additions & 11 deletions vantage6-node/vantage6/node/docker/task_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
to be cleaned at some point. """
import logging
import os
import pickle
import docker.errors

from enum import Enum
Expand Down Expand Up @@ -89,6 +90,7 @@ def __init__(self, image: str, vpn_manager: VPNManager, node_name: str,

self.container = None
self.status_code = None
self.docker_input = None

self.labels = {
f"{APPNAME}-type": "algorithm",
Expand Down Expand Up @@ -190,9 +192,8 @@ def run(self, docker_input: bytes, tmp_vol_name: str, token: str,
self._make_task_folders()

# prepare volumes
self.volumes = self._prepare_volumes(
docker_input, tmp_vol_name, token
)
self.docker_input = docker_input
self.volumes = self._prepare_volumes(tmp_vol_name, token)
self.log.debug(f"volumes: {self.volumes}")

# setup environment variables
Expand Down Expand Up @@ -257,9 +258,21 @@ def _run_algorithm(self) -> List[Dict]:
algo_image_name=self.image
)

# try reading docker input
deserialized_input = None
if self.docker_input:
try:
deserialized_input = pickle.loads(self.docker_input)
except Exception:
pass

# attempt to run the image
try:
self.log.info(f"Run docker image={self.image}")
if deserialized_input:
self.log.info(f"Run docker image {self.image} with input "
f"{deserialized_input}")
else:
self.log.info(f"Run docker image {self.image}")
self.container = self.docker.containers.run(
self.image,
detach=True,
Expand Down Expand Up @@ -303,15 +316,12 @@ def _make_task_folders(self) -> None:
os.makedirs(self.task_folder_path, exist_ok=True)
self.output_file = os.path.join(self.task_folder_path, "output")

def _prepare_volumes(self, docker_input: bytes, tmp_vol_name: str,
token: str) -> Dict:
def _prepare_volumes(self, tmp_vol_name: str, token: str) -> Dict:
"""
Generate docker volumes required to run the algorithm
Parameters
----------
docker_input: bytes
Input that can be read by docker container
tmp_vol_name: str
Name of temporary docker volume assigned to the algorithm
token: str
Expand All @@ -322,13 +332,13 @@ def _prepare_volumes(self, docker_input: bytes, tmp_vol_name: str,
Dict:
Volumes to support running the algorithm
"""
if isinstance(docker_input, str):
docker_input = docker_input.encode('utf8')
if isinstance(self.docker_input, str):
self.docker_input = self.docker_input.encode('utf8')

# Create I/O files & token for the algorithm container
self.log.debug("prepare IO files in docker volume")
io_files = [
('input', docker_input),
('input', self.docker_input),
('output', b''),
('token', token.encode("ascii")),
]
Expand Down

0 comments on commit e52f223

Please sign in to comment.