Skip to content

Commit

Permalink
adding function for getting piped command stdout
Browse files Browse the repository at this point in the history
  • Loading branch information
kerrychu committed Jan 23, 2024
1 parent 49139f9 commit d674653
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 6 deletions.
8 changes: 5 additions & 3 deletions quota_limit.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import os
from dotenv import load_dotenv
from utils.subprocess_operations import get_cmd_stdout
from utils.subprocess_operations import get_piped_stdout

load_dotenv()

Expand All @@ -9,8 +9,10 @@


def get_fileset_quota(project_id: str) -> str:
cmd = f"rquota | grep ${project_id}"
stdout: str = get_cmd_stdout(cmd)
main_cmd = "rquota"
piped_cmd = f"grep {project_id}"

stdout: str = get_piped_stdout(main_command=main_cmd, piped_command=piped_cmd)
return stdout


Expand Down
25 changes: 22 additions & 3 deletions utils/subprocess_operations.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,31 @@
import subprocess
from typing import Optional

JOB_RECORDS = list[dict[str, str]]


def get_cmd_stdout(command: str) -> str:
return subprocess.run(
result = subprocess.run(
command.split(" "), capture_output=True, text=True, check=False
).stdout
)

if result.returncode == 0:
return result.stdout
else:
print(f"Command: {command} failed with error: {result.stderr}")


def get_piped_stdout(main_command: str, piped_command: str) -> Optional[str]:
initial_command_result = subprocess.run(main_command.split(), stdout=subprocess.PIPE, text=True, check=False)

if initial_command_result.returncode == 0:
initial_stdout = initial_command_result.stdout
piped_result = subprocess.run(piped_command.split(" "), input=initial_stdout, stdout=subprocess.PIPE, text=True, check=False)

if piped_result.returncode == 0:
return piped_result.stdout
else:
print(f"Command: {main_command} | {piped_command} failed with error: {initial_command_result.stderr}")


def strip_spaces(l: list[str]) -> list[str]:
Expand All @@ -20,7 +39,7 @@ def stdout_to_job_records(s: str) -> JOB_RECORDS:
data = [strip_spaces(element.split(" ")) for element in s_list[1:]]
job_records = []
for l in data:
d = dict()
d = {}
for key, value in zip(headers, l):
d[key] = value
job_records.append(d)
Expand Down

0 comments on commit d674653

Please sign in to comment.