Skip to content

Commit

Permalink
feature/available-nodes (#7)
Browse files Browse the repository at this point in the history
  • Loading branch information
kerrychu authored Jan 23, 2024
1 parent 73e8f39 commit 80dc16a
Show file tree
Hide file tree
Showing 5 changed files with 48 additions and 18 deletions.
32 changes: 32 additions & 0 deletions src/available_nodes.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
import os
from dotenv import load_dotenv
from src.utils.subprocess_operations import (
get_piped_stdout,
stdout_to_records,
job_records_to_slack_message,
)
from src.hooks.slack import send_slack_message

load_dotenv()
SLACK_WEBHOOK = os.getenv("SLACK_GPU_JOBS_WEBHOOK")
PARTITIONS: list[str] = os.getenv("PARTITIONS").split(",")
HEADER = ["PARTITION", "AVAILABILITY", "TIMELIMIT", "NODES", "STATE", "NODELIST"]


def get_idle_nodes() -> str:
main_cmd = "sinfo -t idle"
piped_cmd = "grep gpu"
return get_piped_stdout(main_cmd, piped_cmd)


def monitor():
idle_nodes = get_idle_nodes()
idle_node_records = stdout_to_records(HEADER, idle_nodes)
header = "🟢Available nodes in Bunya\n\n"
filtered_idle_node_records = list(filter(lambda x: x["PARTITION"] in PARTITIONS, idle_node_records))
slack_message = job_records_to_slack_message(header, filtered_idle_node_records)
send_slack_message(slack_message, SLACK_WEBHOOK)


if __name__ == "__main__":
monitor()
5 changes: 3 additions & 2 deletions src/gpu_jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,15 @@
from dotenv import load_dotenv
from src.utils.subprocess_operations import (
get_piped_stdout,
stdout_to_gpu_records,
stdout_to_records,
job_records_to_slack_message,
)
from src.hooks.slack import send_slack_message

load_dotenv()
SLACK_WEBHOOK = os.getenv("SLACK_GPU_JOBS_WEBHOOK")
GPU_USERS: list[str] = os.getenv("GPU_USERS").split(",")
HEADER = ["JOBID", "PARTITION", "NAME", "USER", "ST", "TIME", "NODES"]


def get_gpu_jobs() -> str:
Expand All @@ -20,7 +21,7 @@ def get_gpu_jobs() -> str:

def monitor():
gpu_jobs = get_gpu_jobs()
gpu_records = stdout_to_gpu_records(gpu_jobs)
gpu_records = stdout_to_records(HEADER, gpu_jobs)
header = f"🔉 Currently, there are {len(gpu_records)} running GPU Jobs on Bunya.\n Here are some from our lab.\n\n"
filtered_gpu_records = list(filter(lambda x: x["USER"] in GPU_USERS, gpu_records))
slack_message = job_records_to_slack_message(header, filtered_gpu_records)
Expand Down
6 changes: 3 additions & 3 deletions src/my_jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
)
from src.utils.subprocess_operations import (
stdout_to_job_records,
JOB_RECORDS,
RECORDS,
job_records_to_slack_message,
get_cmd_stdout,
)
Expand All @@ -23,7 +23,7 @@
SLACK_WEBHOOK = os.getenv("SLACK_JOB_WEBHOOK")


def list_my_job_records() -> JOB_RECORDS:
def list_my_job_records() -> RECORDS:
"""_summary_ list current jobs of mine
Returns:
Expand All @@ -33,7 +33,7 @@ def list_my_job_records() -> JOB_RECORDS:
return stdout_to_job_records(stdout)


def get_last_updated_job_records() -> Optional[JOB_RECORDS]:
def get_last_updated_job_records() -> Optional[RECORDS]:
if not os.path.exists(JOB_FOLDER):
os.mkdir(JOB_FOLDER)
return None
Expand Down
6 changes: 3 additions & 3 deletions src/utils/data_serialization.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
import json
from .subprocess_operations import JOB_RECORDS
from .subprocess_operations import RECORDS


def write_job_records_to_json(records: JOB_RECORDS, filepath: str):
def write_job_records_to_json(records: RECORDS, filepath: str):
json_data = json.dumps(records, indent=2)
with open(filepath, "w") as outfile:
outfile.write(json_data)


def read_json_as_job_records(filepath: str) -> JOB_RECORDS:
def read_json_as_job_records(filepath: str) -> RECORDS:
f = open(filepath, "r")
return json.loads(f.read())
17 changes: 7 additions & 10 deletions src/utils/subprocess_operations.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,8 @@
import subprocess
from typing import Optional

JOB_RECORD = dict[str, str]
JOB_RECORDS = list[JOB_RECORD]
QUOTA_RECORD = dict[str, str]
QUOTA_RECORDS = list[QUOTA_RECORD]
RECORD = dict[str, str]
RECORDS = list[RECORD]


def get_cmd_stdout(command: str) -> str:
Expand Down Expand Up @@ -44,7 +42,7 @@ def strip_empty_string(l: list[str]) -> list[str]:
return [x for x in l if x != ""]


def stdout_to_job_records(s: str) -> JOB_RECORDS:
def stdout_to_job_records(s: str) -> RECORDS:
s = s.strip()
s_list = s.split("\n")
headers = strip_empty_string(s_list[0].split(" "))
Expand All @@ -59,7 +57,7 @@ def stdout_to_job_records(s: str) -> JOB_RECORDS:
return job_records


def stdout_to_quota_records(s: str) -> QUOTA_RECORD:
def stdout_to_quota_records(s: str) -> RECORD:
s = s.strip()
s_list = s.split()
headers = [
Expand All @@ -78,21 +76,20 @@ def stdout_to_quota_records(s: str) -> QUOTA_RECORD:
return quota_record


def stdout_to_gpu_records(s: str) -> JOB_RECORDS:
def stdout_to_records(header: list[str], s: str) -> RECORDS:
s = s.strip()
s_list = s.split("\n")
headers = ["JOBID", "PARTITION", "NAME", "USER", "ST", "TIME", "NODES"]
data = [strip_empty_string(element.split(" ")) for element in s_list]
job_records = []
for l in data:
d = {}
for key, value in zip(headers, l):
for key, value in zip(header, l):
d[key] = value
job_records.append(d)
return job_records


def job_records_to_slack_message(header: str, job_records: JOB_RECORDS) -> str:
def job_records_to_slack_message(header: str, job_records: RECORDS) -> str:
slack_message = ""
slack_message += header
for job_record in job_records:
Expand Down

0 comments on commit 80dc16a

Please sign in to comment.