-
Notifications
You must be signed in to change notification settings - Fork 1
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
stop the command but not kill all the processes #3
Comments
Do you use the latest commit in the branch If true, maybe you can try the following version. Here I have introduced # -*- coding: utf-8 -*-
# @Time : 2021/3/6
# @Author : Lart Pang
# @GitHub : https://github.com/lartpang
import argparse
import os
import signal
import subprocess
import time
from enum import Enum
from multiprocessing import Manager, Pool, freeze_support
from queue import Queue
import yaml
class STATUS(Enum):
WAITING = 0
RUNNING = 1
DONE = 2
FAILED = 3
def worker(cmd: str, gpu_ids: str, queue: Queue, job_id: int, done_jobs: dict):
job_identifier = f"[Job-{job_id}:GPU-{gpu_ids}]"
# 设置子程序环境变量
env = os.environ.copy()
env["CUDA_VISIBLE_DEVICES"] = gpu_ids
# subprocess.run(cmd, shell=True, check=True, env=env)
with subprocess.Popen(cmd, shell=True, env=env) as sub_proc:
# 使用subprocess.Popen代替subprocess.run
try:
print(f"{job_identifier} Executing {cmd}...")
sub_proc.wait()
done_jobs[job_id] = STATUS.DONE
except Exception as e:
print(f"{job_identifier} Command '{cmd}' failed: {e}")
sub_proc.terminate()
done_jobs[job_id] = STATUS.FAILED
# 释放GPU资源回队列
for gpu in gpu_ids.split(","):
queue.put(gpu)
print(f"{job_identifier} Release GPU {gpu_ids}...")
def get_args():
# fmt: off
parser = argparse.ArgumentParser()
parser.add_argument("--gpu-pool", nargs="+", type=int, default=[0], help="The pool containing all ids of your gpu devices.")
parser.add_argument("--max-workers", type=int, help="The max number of the workers.")
parser.add_argument("--cmd-pool",type=str, required=True, help="The path of the yaml containing all cmds.")
parser.add_argument("--interval-for-waiting-gpu",type=int, default=3, help="In seconds, the interval for waiting for a GPU to be available.")
parser.add_argument("--interval-for-loop",type=int, default=1, help="In seconds, the interval for looping.")
# fmt: on
args = parser.parse_args()
if args.max_workers is None:
args.max_workers = len(args.gpu_pool)
return args
def init_worker():
signal.signal(signal.SIGINT, signal.SIG_IGN)
def main():
args = get_args()
print("[YOUR CONFIG]\n" + str(args))
with open(args.cmd_pool, mode="r", encoding="utf-8") as f:
jobs = yaml.safe_load(f)
assert isinstance(jobs, (tuple, list)), jobs
print("[YOUR CMDS]\n" + "\n\t".join([str(job) for job in jobs]))
manager = Manager()
# 创建一个跨进程共享的队列来统计空余的GPU资源
available_gpus = manager.Queue()
for i in args.gpu_pool:
available_gpus.put(str(i))
# 创建一个跨进程共享的dict来跟踪已完成的命令
done_jobs = manager.dict()
for job_id, job_info in enumerate(jobs):
if job_info["num_gpus"] > len(args.gpu_pool):
raise ValueError(f"The number of gpus in job {job_id} is larger than the number of available gpus.")
done_jobs[job_id] = STATUS.WAITING
# 在创建进程池之前注册信号处理器,以便在接收到中断信号时执行清理操作
original_sigint_handler = signal.signal(signal.SIGINT, signal.SIG_IGN)
pool = Pool(processes=args.max_workers, initializer=init_worker)
# 将原始的信号处理器恢复
signal.signal(signal.SIGINT, original_sigint_handler)
try:
# 循环处理指令,直到所有指令都被处理
while not all([status is STATUS.DONE for status in done_jobs.values()]):
for job_id, job_info in enumerate(jobs):
if done_jobs[job_id] in [STATUS.DONE, STATUS.RUNNING]:
continue
# else: STATUS.WAITING, STATUS.FAILED
# job_name = job_info["name"]
command = job_info["command"]
num_gpus = job_info["num_gpus"]
num_avaliable_gpus = available_gpus.qsize()
# 如果当前有足够的GPU资源,执行指令
if num_gpus <= num_avaliable_gpus:
done_jobs[job_id] = STATUS.RUNNING
# 从队列中获取可用的GPU资源
gpu_ids = ",".join([available_gpus.get() for _ in range(num_gpus)])
# 执行给定的指令,并提供回调函数来更新完成的命令列表
pool.apply_async(worker, args=(command, gpu_ids, available_gpus, job_id, done_jobs))
else:
# 如果GPU资源不足,跳过当前指令,稍后重试
print(f"Skipping '{command}', not enough GPUs available ({num_gpus} > {num_avaliable_gpus}).")
# 等待一段时间再次检查
time.sleep(args.interval_for_waiting_gpu)
# 等待一段时间再次检查
time.sleep(args.interval_for_loop)
# 关闭进程池并等待所有任务完成
pool.close()
except KeyboardInterrupt:
print("[CAUGHT KEYBOARDINTERRUPT, TERMINATING WORKERS!]")
pool.terminate()
finally:
pool.join()
manager.shutdown()
print("[ALL COMMANDS HAVE BEEN COMPLETED!]")
if __name__ == "__main__":
freeze_support()
main() |
Hi, i tried the branch dev and found it works good since now i could use ctrl+c to close all the processes related to jobs.
|
I am also wondering if we could set up each job with just one line in config.txt. Since sometime i needs to run hundreds of tasks, it would much easier for me to generate these commands for jobs by copy and paste. |
You can try this form: - {name: job1, command: "python /home/jwq/Compressor/main.py epochs=300 field='[\"temperature\"]'", num_gpus: 1} as in: Lines 16 to 17 in 8957feb
|
Thx i will try it:) i am wondering the point of num_gpus. It seems like RunIt will hold a queue for jobs and once a job is done, the released gpus would be reassigned to the waiting jobs in the queue. Do i understand correct? |
Yes. Here's a simple flowchart that outlines the core process by GPT-4: graph TD
A[Start] --> B[Read Configuration and Command Pool]
B --> C[Initialize Shared Resources]
C --> |Maximum number of requirements met| D[Loop Until All Jobs Done]
D --> E[Check Available GPUs]
E -->|Enough GPUs| F[Run Job in Separate Process]
E -->|Not Enough GPUs| G[Wait and Retry]
F --> H[Job Completes]
F --> I[Job Fails]
H --> J[Update Job Status and Return GPUs]
I --> J
G --> D
J -->|All Jobs Done| K[End]
C -->|Maximum number of requirements not met| L[Terminate Workers]
L --> M[Shutdown Manager and Join Pool]
M --> K
|
Thanks for the positive feedback, I'll push the latest version to the main branch! |
It seems amazing. 1: Could user use this tool to run a list of jobs even if the GPU pool cannot run them at the same time? Since I saw there are "wait and retry" and "return GPUs", it seems like RunIt will smartly run some job and then run the remaing jobs based on the utilization of GPU pool. 2: Could I give each job the same name? Considering I run hundreds of small jobs, it's inconvenient to rename them one by one. 3: When we say "Interrupted", does it mean the interruption by ctrl-c? And will it shut down all the running jobs and kill the remaining jobs ( or said, the Runit process), or just one job? 4: Will you consider adding up some GUI for RunIt just like nvitop, which I believe would be more fancy and easy for monitoring jobs. Thank you for your contribution. It's really good tool considering its convenience for researchers :) |
Sometimes i will run similar tasks which means i know how much memory each task will take. If all gpus are occupied but still leave some memory space that could run another task, there would be some waste. Thus im wondering if there is any threshold in RunIt to identify if a gpu is idle or busy, or the third state "could still work more". |
Yes.
Yes,
(I tweaked the flowchart to make it more intuitive:)
done_jobs = manager.dict()
for job_id, job_info in enumerate(jobs):
if job_info["num_gpus"] > len(args.gpu_pool):
raise ValueError(f"The number of gpus in job {job_id} is larger than the number of available gpus.")
done_jobs[job_id] = STATUS.WAITING
Thanks for the suggestion, the script itself came from a need in my own day-to-day research to perform a series of different deeplearning experiments. A TUI interface similar to nvitop is really worth trying though. |
Currently just one task exists on just one GPU. |
Thanks for the reply. RunIt is based on python, right? I will see if I could understand the code and make some change when I get free. Thank you. |
Yes, for the version on commit , I usually run RunIt twice with two different configs since I know each of my task will only take 10% of one GPU memory, which could run 2X jobs on the same number of GPUs. |
Yes, there is room for improvement in this tool. |
@lartpang Hi hope it finds you well! Now im running multiple small tasks with my gpus. Since my task is stable and i know the memory they need, to achieve the most efficiency, i usually run runit twice. Im wondering if you recently add the module to let one GPU run multiple tasks simultaneously. Thx. |
Hi, @BitCalSaul Thanks for your idea. |
Hi, @BitCalSaul |
Thx, i will have a try. BTW, there may be a typo on your readme, which should be runit_based_on_memory.py. I have a question regarding the two arguments, "interval-for-waiting-gpu" and "interval-for-loop." For the first argument, I understand it as the time interval between consecutive checks to determine if a specific GPU has available memory exceeding the set memory threshold. For the second argument, does it refer to the time interval between consecutive jobs? I'm not entirely clear on its purpose. Could you please clarify? |
I have finished the test, thanks for your new feature. Now I could change the config to make full use of my GPUs. I have two following questions:
|
For the first question, we can use the |
Hi @lartpang Thank you for your reply. I remember you solved the second question before, please check here #3 (comment). Is this situation different from now? I am frequently using the code from commit 056b1cf because it fits my needs well. However, I've encountered a small issue: even when specifying the gpu-pool argument, Runit still occasionally runs my jobs on GPUs that I haven't assigned to the pool. This is problematic because sometimes my labmate runs code with dynamic memory usage, so even if Runit checks the GPU and sees enough memory available at the start, the situation can change quickly, leading to out-of-memory errors. Could you please take a look into this? |
@BitCalSaul |
I didn't get the point, what kind of strategy? I thought the gpu-pool argument would have worked but not. |
It is about this:
About this, em..., maybe you can try the new script in the latest commit: https://github.com/lartpang/RunIt/blob/727543d503f6ccab5766947817a100d951b90795/runit_based_on_detected_memory.py |
Thanks for your clarification! |
Hi, thanks for your new features. RunIt is smart since it could detect the real-time memory and now my GPUs could be fully used :) That's what I exactly want! BTW, I manually kill my processes on GPU since "ctrl+c" won't work. I am wondering if you have any recommended shortcut to kill processes on GPU for a specific user. I have found similar questions but not work. |
I run Runit in tmux or zsh. I found even though I stop the Runit command by ctrl-c, the processes still are there and take the GPU memory with 0 util. I have to kill the processes one by one.
I'm wondering how to kill all the processes in a quick way.
The text was updated successfully, but these errors were encountered: