Skip to content

Commit

Permalink
Implement an concurrent.futures.Executor for pysqa
Browse files Browse the repository at this point in the history
  • Loading branch information
jan-janssen committed Jul 16, 2023
1 parent 1a942dd commit eaf55fe
Show file tree
Hide file tree
Showing 4 changed files with 217 additions and 0 deletions.
Empty file added pysqa/executor/__init__.py
Empty file.
58 changes: 58 additions & 0 deletions pysqa/executor/__main__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
import os
import sys

from pympipool import PoolExecutor
from pysqa.executor.helper import (
read_from_file,
deserialize,
apply_funct,
write_to_file,
serialize_result,
)


def execute_tasks(cores, cache_directory):
tasks_in_progress_dict = {}
with PoolExecutor(
max_workers=cores,
oversubscribe=False,
enable_flux_backend=False,
enable_slurm_backend=False,
init_function=None,
cwd=cache_directory,
queue_adapter=None,
queue_adapter_kwargs=None,
) as exe:
while True:
file_lst = os.listdir(cache_directory)
for file_name_in in file_lst:
key = file_name_in.split(".in.pl")[0]
file_name_out = key + ".out.pl"
if (
file_name_in.endswith(".in.pl")
and file_name_out not in file_lst
and file_name_out not in tasks_in_progress_dict.keys()
):
funct_dict = read_from_file(
file_name=os.path.join(cache_directory, file_name_in)
)
apply_dict = deserialize(funct_dict=funct_dict)
for k, v in apply_dict.items():
tasks_in_progress_dict[k] = exe.submit(
fn=v["fn"], args=v["args"], kwargs=v["kwargs"]
)
for k, v in tasks_in_progress_dict.items():
if v.done():
result_dict = apply_funct(apply_dict=v.result())
write_to_file(
funct_dict=serialize_result(result_dict=result_dict),
state="out",
cache_directory=cache_directory,
)


if __name__ == "__main__":
arguments_lst = sys.argv
cores_arg = arguments_lst[arguments_lst.index("--cores") + 1]
path_arg = arguments_lst[arguments_lst.index("--path") + 1]
execute_tasks(cores=cores_arg, cache_directory=path_arg)
51 changes: 51 additions & 0 deletions pysqa/executor/executor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
import os
import queue
from concurrent.futures import Future, Executor
from threading import Thread

from pympipool import cancel_items_in_queue
from pysqa.executor.helper import (
reload_previous_futures,
find_executed_tasks,
serialize_funct,
write_to_file,
)


class FileExecutor(Executor):
def __init__(self, cwd=None, queue_adapter=None, queue_adapter_kwargs=None):
self._task_queue = queue.Queue()
self._memory_dict = {}
self._cache_directory = os.path.abspath(os.path.expanduser(cwd))
reload_previous_futures(
future_queue=self._task_queue,
future_dict=self._memory_dict,
cache_directory=self._cache_directory,
)
self._process = Thread(
target=find_executed_tasks,
kwargs={
"future_queue": self._task_queue,
"cache_directory": self._cache_directory,
"queue_adapter": queue_adapter,
"queue_adapter_kwargs": queue_adapter_kwargs,
},
)
self._process.start()

def submit(self, fn, *args, **kwargs):
funct_dict = serialize_funct(fn, *args, **kwargs)
key = list(funct_dict.keys())[0]
if key not in self._memory_dict.keys():
self._memory_dict[key] = Future()
_ = write_to_file(
funct_dict=funct_dict, state="in", cache_directory=self._cache_directory
)[0]
self._task_queue.put({key: self._memory_dict[key]})
return self._memory_dict[key]

def shutdown(self, wait=True, *, cancel_futures=False):
if cancel_futures:
cancel_items_in_queue(que=self._task_queue)
self._task_queue.put({"shutdown": True, "wait": wait})
self._process.join()
108 changes: 108 additions & 0 deletions pysqa/executor/helper.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
import os
import re
import queue
from concurrent.futures import Future

import hashlib
import cloudpickle


def get_hash(binary):
# Remove specification of jupyter kernel from hash to be deterministic
binary_no_ipykernel = re.sub(b"(?<=/ipykernel_)(.*)(?=/)", b"", binary)
return str(hashlib.md5(binary_no_ipykernel).hexdigest())


def serialize_funct(fn, *args, **kwargs):
binary = cloudpickle.dumps({"fn": fn, "args": args, "kwargs": kwargs})
return {fn.__name__ + get_hash(binary=binary): binary}


def get_file_name(name, state):
return name + "." + state + ".pl"


def write_to_file(funct_dict, state, cache_directory):
file_name_lst = []
for k, v in funct_dict.items():
file_name = get_file_name(name=k, state=state)
file_name_lst.append(file_name)
with open(os.path.join(cache_directory, file_name), "wb") as f:
f.write(v)
return file_name_lst


def read_from_file(file_name):
name = file_name.split("/")[-1].split(".")[0]
with open(file_name, "rb") as f:
return {name: f.read()}


def deserialize(funct_dict):
try:
return {k: cloudpickle.loads(v) for k, v in funct_dict.items()}
except EOFError:
return {}


def apply_funct(apply_dict):
return {
k: v["fn"].__call__(*v["args"], **v["kwargs"]) for k, v in apply_dict.items()
}


def serialize_result(result_dict):
return {k: cloudpickle.dumps(v) for k, v in result_dict.items()}


def set_future(file_name, future):
values = deserialize(funct_dict=read_from_file(file_name=file_name)).values()
if len(values) == 1:
future.set_result(list(values)[0])


def reload_previous_futures(future_queue, future_dict, cache_directory):
file_lst = os.listdir(cache_directory)
for f in file_lst:
if f.endswith(".in.pl"):
key = f.split(".in.pl")[0]
future_dict[key] = Future()
file_name_out = key + ".out.pl"
if file_name_out in file_lst:
set_future(
file_name=os.path.join(cache_directory, file_name_out),
future=future_dict[key],
)
else:
future_queue.put({key: future_dict[key]})


def find_executed_tasks(
future_queue, cache_directory, queue_adapter, queue_adapter_kwargs
):
task_memory_dict = {}
command = (
"python -m pysqa.executor --cores "
+ str(queue_adapter_kwargs["cores"])
+ " --path "
+ str(cache_directory),
)
queue_adapter.submit_job(
working_directory=cache_directory, command=command, **queue_adapter_kwargs
)
while True:
task_dict = {}
file_lst = os.listdir(cache_directory)
try:
task_dict = future_queue.get_nowait()
except queue.Empty:
pass
for key, future in task_dict.items():
task_memory_dict[key] = future
for key, future in task_memory_dict.items():
file_name_out = get_file_name(name=key, state="out")
if not future.done() and file_name_out in file_lst:
set_future(
file_name=os.path.join(cache_directory, file_name_out),
future=future,
)

0 comments on commit eaf55fe

Please sign in to comment.