Skip to content

Commit

Permalink
cmu,*: Fix a possible crash in error messages
Browse files Browse the repository at this point in the history
Custom types that are instantiated from str may not always
behave as str; in this case it caused a RecursionError instead...

Signed-off-by: David Weinehall <david.weinehall@intel.com>
  • Loading branch information
dweineha committed Apr 18, 2024
1 parent 3598c43 commit 37c3d8f
Show file tree
Hide file tree
Showing 3 changed files with 52 additions and 33 deletions.
12 changes: 8 additions & 4 deletions ansithemeprint.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,10 @@ def __init__(self, string: str, themeref: str) -> None:
themeref (str): The reference to the formatting to use
"""
if not isinstance(string, str) or not isinstance(themeref, str):
raise TypeError("ANSIThemeStr only accepts (str, str)")
self.string = string
self.themeref = themeref
raise TypeError("ANSIThemeStr only accepts (str, str); "
f"received ({type(string)}, {type(themeref)})")
self.string = str(string)
self.themeref = str(themeref)

def __str__(self) -> str:
"""
Expand Down Expand Up @@ -83,7 +84,10 @@ def format(self, themeref: str) -> "ANSIThemeStr":
Returns:
(ANSIThemeStr): The ANSIThemeStr with new formatting applied
"""
self.themeref = themeref
if not isinstance(themeref, str):
raise TypeError("ANSIThemeStr().format() only accepts (str); "
f"received ({type(themeref)})")
self.themeref = str(themeref)
return self

def get_themeref(self) -> str:
Expand Down
3 changes: 3 additions & 0 deletions cmttypes.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@ class FilePath(str):
def __init__(self, path: Union["FilePath", str, Path, PurePath]) -> None:
self.path = str(path)

def __str__(self) -> str:
return self.path

def basename(self) -> str:
"""
Returns the filename part of the FilePath.
Expand Down
70 changes: 41 additions & 29 deletions cmu
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ force_refresh_apis = False # pylint: disable=invalid-name

def init_kubernetes_client() -> None:
"""
Initialise the Kubernetes client
Initialise the Kubernetes client.
"""
global kh # pylint: disable=global-statement
global kh_cache # pylint: disable=global-statement
Expand All @@ -189,7 +189,7 @@ def init_kubernetes_client() -> None:
# pylint: disable-next=too-many-locals
def gather_cluster_info(**kwargs: Any) -> None:
"""
Gather information about the cluster necessary for running playbooks
Gather information about the cluster necessary for running playbooks.
Parameters:
**kwargs (dict[str, Any]): Keyword arguments
Expand Down Expand Up @@ -245,7 +245,7 @@ def gather_cluster_info(**kwargs: Any) -> None:
def format_timestamp(timestamp: datetime,
localtimezone: bool = False) -> List[Union[ThemeRef, ThemeStr]]:
"""
Takes datetime and formats it as a YYYY-MM-DD HH:MM:SS themearray
Takes datetime and formats it as a YYYY-MM-DD HH:MM:SS themearray.
Parameters:
timestamp (datetime): The timestamp
Expand All @@ -269,6 +269,18 @@ def format_timestamp(timestamp: datetime,

# pylint: disable-next=too-many-locals
def update_field_widths(field_dict: Dict, field_names: List[str], objects: List[Type]) -> int:
"""
Process the fields for a line; calcute how wide each field should be,
then return the total line length.
Parameters:
field_dict (dict): The dict containing the description of the field
field_names ([str]): The names of the fields to populate
objects ([InfoClass]): An InfoClass object with the data
for all fields in the field_names list
Returns:
(int): The total length of all the fields of a line
"""
linelen = 0
pos = 0

Expand Down Expand Up @@ -387,7 +399,7 @@ def __get_container_info(obj: Dict, container_type: str,
# pylint: disable-next=unused-argument,too-many-locals
def get_container_info(**kwargs: Any) -> List[Type]:
"""
Infogetter for Containers
Infogetter for Containers.
Parameters:
**kwargs (dict[str, Any]): Keyword arguments
Expand Down Expand Up @@ -469,7 +481,7 @@ def get_container_info(**kwargs: Any) -> List[Type]:
# pylint: disable-next=unused-argument,too-many-locals
def get_log_info(**kwargs: Any) -> List[Type]:
"""
Infogetter for logs
Infogetter for logs.
Parameters:
**kwargs (dict): Additional parameters
Expand Down Expand Up @@ -1964,7 +1976,7 @@ def logpad_files(obj: Dict, **kwargs: Any) -> List[List[Union[ThemeRef, ThemeStr

def logpad_formatted(obj: Dict, **kwargs: Any) -> List[List[Union[ThemeRef, ThemeStr]]]:
"""
Takes an object and dumps it using the specified format (if possible)
Takes an object and dumps it using the specified format (if possible).
Parameters:
obj (dict): The dict to dump
Expand Down Expand Up @@ -2063,13 +2075,13 @@ def get_cmt_log(obj: Dict, **kwargs: Any):

def logpad_yaml(obj: Dict, **kwargs: Any) -> List[List[Union[ThemeRef, ThemeStr]]]:
"""
Takes an object and dumps it as formatted YAML
Takes an object and dumps it as formatted YAML.
Parameters:
obj (dict): The dict to dump formatted as YAML
**kwargs (Dict): Additional parameters
**kwargs (dict[str, Any]): Keyword arguments
Returns:
list[themearray]: A list of themearrays
([themearray]): A list of themearrays
"""

try:
Expand Down Expand Up @@ -2337,7 +2349,7 @@ def generate_list_row(uip: UIProps, listpad: curses.window, data, field_list, yp
# noqa: E501 pylint: disable-next=too-many-locals,too-many-branches,too-many-statements,too-many-return-statements
def genericlistloop(stdscr: curses.window, view: str) -> Retval:
"""
Generic main loop for listviews
Generic main loop for listviews.
Parameters:
stdscr (curses.window): The curses window to operate on
Expand Down Expand Up @@ -4559,7 +4571,7 @@ def clusteroverviewloop(stdscr: curses.window, view):
# pylint: disable-next=too-many-locals,too-many-branches
def check_cni_updates(cni: str, current_version: str) -> str:
"""
Check whether there are newer versions of the CNI available
Check whether there are newer versions of the CNI available.
Parameters:
cni (str): The CNI
Expand Down Expand Up @@ -4638,7 +4650,7 @@ def check_cni_updates(cni: str, current_version: str) -> str:
def update_cni(stdscr: curses.window, cni: str, current_version: str, candidate_version: str,
dry_run: bool = False) -> bool:
"""
Update the CNI
Update the CNI.
Parameters:
stdscr (opaque): A curses stdscr reference
Expand Down Expand Up @@ -7452,8 +7464,8 @@ def containerinfoloop(stdscr: curses.window, container, kind, obj, **kwargs: Any
# pylint: disable-next=too-many-locals,too-many-branches
def do_command(stdscr: curses.window, **kwargs: Any) -> None:
"""
Execute a command on localhost
XXX: This should be renamed
Execute a command on localhost.
XXX: This should be renamed.
Parameters:
stdscr (opaque): A curses stdscr reference
Expand Down Expand Up @@ -7530,8 +7542,8 @@ def do_command(stdscr: curses.window, **kwargs: Any) -> None:
def executecommand(stdscr: curses.window,
obj: Dict, container, msg, command, waitforkeypress: bool) -> None:
"""
Executes a command in a container
XXX: This should be renamed
Executes a command in a container.
XXX: This should be renamed.
"""
try:
kubectl_path = secure_which(FilePath("/usr/bin/kubectl"),
Expand Down Expand Up @@ -8422,7 +8434,7 @@ def diff_resource_configuration(uip: UIProps, items, action, values, kind, **kwa

def format_commandline(args: List[str], implicit_command: bool = True) -> List[ANSIThemeStr]:
"""
Given a command line as an array of strings, format it as an array of ANSIThemeStr
Given a command line as an array of strings, format it as an array of ANSIThemeStr.
Parameters:
args ([str]): The commandline
Expand All @@ -8437,7 +8449,7 @@ def format_commandline(args: List[str], implicit_command: bool = True) -> List[A
for i, arg in enumerate(args):
# The first argument is the program name
if i == 0:
themearray += [ANSIThemeStr(arg, "programname")]
themearray += [ANSIThemeStr(f"{arg}", "programname")]
# This is an option
elif arg.startswith(("-")):
themearray += [ANSIThemeStr(f" {arg}", "option")]
Expand Down Expand Up @@ -8875,7 +8887,7 @@ def list_configuration_files(basedir):
# pylint: disable-next=too-many-locals
def populate_actionlist(**kwargs: Any) -> Tuple[List[Dict], List[Dict]]:
"""
Populate the list of actions based on context and policy
Populate the list of actions based on context and policy.
Parameters:
**kwargs (dict[str, Any]): Keyword arguments
Expand Down Expand Up @@ -9073,7 +9085,7 @@ def __populate_playbooklist(path: FilePath, action_list: Optional[Dict]) -> Opti
# pylint: disable-next=too-many-locals,too-many-branches,too-many-statements
def populate_playbooklist(**kwargs: Any) -> Tuple[List[Dict], Dict]:
"""
Populate the list of playbooks based on context and policy
Populate the list of playbooks based on context and policy.
Parameters:
**kwargs (dict[str, Any]): Keyword arguments
Expand Down Expand Up @@ -10355,10 +10367,10 @@ def setupui(stdscr: curses.window) -> None:
# pylint: disable-next=unused-argument
def list_namespaces(options: List[Tuple[str, str]], args: List[str]) -> None:
"""
List all available namespaces
List all available namespaces.
Parameters:
options (list[(str, str)]): List of opt, optarg
options ([(str, str)]): List of opt, optarg
args ([str]): Unused
"""
color = "auto"
Expand Down Expand Up @@ -10411,10 +10423,10 @@ def list_namespaces(options: List[Tuple[str, str]], args: List[str]) -> None:
# pylint: disable-next=unused-argument,disable-next=too-many-branches,too-many-statements
def list_views(options: List[Tuple[str, str]], args: List[str]) -> None:
"""
List available views and their supported fields and default sort column
List available views and their supported fields and default sort column.
Parameters:
options (list[(str, str)]): List of opt, optarg
options ([(str, str)]): List of opt, optarg
args ([str]): Unused
"""
viewfields = []
Expand Down Expand Up @@ -10512,7 +10524,7 @@ def list_views(options: List[Tuple[str, str]], args: List[str]) -> None:

def checkforview(arg: Union[str, Tuple[str, str]]) -> Optional[str]:
"""
Check whether a view exists
Check whether a view exists.
Parameters:
arg (str|(str, str)): Either the name of a view or a (kind, api_family) tuple
Expand All @@ -10531,7 +10543,7 @@ def checkforview(arg: Union[str, Tuple[str, str]]) -> Optional[str]:

def customise_listviews() -> None:
"""
Apply user-provided customisation to the list views
Apply user-provided customisation to the list views.
"""
for view, viewref in views.items():
if "skip" in viewref:
Expand Down Expand Up @@ -10592,10 +10604,10 @@ def customise_listviews() -> None:
# pylint: disable-next=too-many-branches,too-many-statements
def open_view(options: List[Tuple[str, str]], args: List[str]) -> None:
"""
Open the specified view
Open the specified view.
Parameters:
options (list[(str, str)]): List of opt, optarg
options ([(str, str)]): List of opt, optarg
args ([str]): The view to open,
followed by specifiers for object, namespace,
and--if applicable--container/configmap
Expand Down Expand Up @@ -10983,7 +10995,7 @@ COMMANDLINE = {
# pylint: disable-next=too-many-branches,too-many-statements
def main() -> None:
"""
Main function for the program
Main function for the program.
"""
# global auditlog # pylint: disable=global-statement
# global debuglog # pylint: disable=global-statement
Expand Down

0 comments on commit 37c3d8f

Please sign in to comment.