Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
101 changes: 89 additions & 12 deletions pyemu/utils/os_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
from contextlib import contextmanager
import json
import uuid
import concurrent.futures

import numpy as np
import pandas as pd
Expand Down Expand Up @@ -419,12 +420,35 @@ def start_workers(
procs = []
worker_dirs = []
if ppw_function is not None:
args = (os.path.join(worker_dir,pst_rel_path),hostname,port)
for i in range(num_workers):
p = mp.Process(target=ppw_function,args=args,kwargs=ppw_kwargs)
args = (os.path.join(worker_dir, pst_rel_path), hostname, port)

# Create processes in batches using ThreadPoolExecutor for faster deployment
def create_and_start_worker(worker_id):
p = mp.Process(target=ppw_function, args=args, kwargs=ppw_kwargs)
p.daemon = True
p.start()
procs.append(p)
if verbose:
print("Started worker {0} (PID: {1})".format(worker_id, p.pid))
return p

# Use ThreadPoolExecutor to create processes in parallel
max_concurrent_starts = num_workers#min(num_workers, 300) # Limit concurrent starts
with concurrent.futures.ThreadPoolExecutor(max_workers=max_concurrent_starts) as executor:
# Submit all worker creation tasks
future_to_id = {
executor.submit(create_and_start_worker, i): i
for i in range(num_workers)
}

# Collect started processes
for future in concurrent.futures.as_completed(future_to_id):
worker_id = future_to_id[future]
try:
p = future.result()
procs.append(p)
except Exception as e:
print("Error starting worker {0}: {1}".format(worker_id, e))
raise

else:
tcp_arg = "{0}:{1}".format(hostname, port)
Expand Down Expand Up @@ -480,15 +504,66 @@ def start_workers(
master_p.wait()
time.sleep(0.5) # a few cycles to let the workers end gracefully

# kill any remaining workers
for p in procs:
p.kill()
# this waits for sweep to finish, but pre/post/model (sub)subprocs may take longer

# More efficient graceful termination for ppw_function workers
if ppw_function is not None:
termination_timeout = 10 # seconds

if verbose:
print("Initiating graceful shutdown of {0} workers...".format(len(procs)))

# First, try graceful termination
for p in procs:
if p.is_alive():
try:
p.terminate() # Send SIGTERM
except:
pass

# Wait for processes to terminate gracefully
start_time = time.time()
remaining_procs = procs[:] # Create a copy

while remaining_procs and (time.time() - start_time) < termination_timeout:
still_alive = []
for p in remaining_procs:
if p.is_alive():
still_alive.append(p)
else:
try:
p.join(timeout=0.1) # Clean up zombie processes
except:
pass

remaining_procs = still_alive
if remaining_procs:
time.sleep(0.1)

# Force kill any remaining processes
if remaining_procs:
if verbose:
print("Force killing {0} remaining workers...".format(len(remaining_procs)))
for p in remaining_procs:
try:
p.kill() # Send SIGKILL
p.join(timeout=1)
except:
pass
else:
# kill any remaining workers (non-ppw_function case)
for p in procs:
p.kill()
# Wait for all processes to finish (this waits for sweep to finish, but pre/post/model subprocs may take longer)
for p in procs:
if ppw_function is not None:
p.join()
# For ppw_function processes, we already handled termination above
# Just ensure they're cleaned up
if p.is_alive():
try:
p.join(timeout=1)
except:
pass
else:
# For regular worker processes
p.wait()
if cleanup:
cleanit = 0
Expand All @@ -499,9 +574,9 @@ def start_workers(
if os.path.exists(d):
success = _try_remove_existing(d, forgive=True)
if success:
removed.update(d)
removed.add(d) # Fixed: use add() instead of update()
else:
removed.update(d)
removed.add(d) # Fixed: use add() instead of update()
if cleanit > 100:
break

Expand Down Expand Up @@ -1133,6 +1208,8 @@ def reserved_port(self):
self.logger.warning(f"Error releasing port {port}: {e}")




if __name__ == "__main__":
host = "localhost"
port = PortManager().get_available_port()
Expand Down
Loading