diff --git a/lisa/util/__init__.py b/lisa/util/__init__.py index 0aea60fef9..e33c2be545 100644 --- a/lisa/util/__init__.py +++ b/lisa/util/__init__.py @@ -48,7 +48,7 @@ # source - # https://github.com/django/django/blob/stable/1.3.x/django/core/validators.py#L45 __url_pattern = re.compile( - r"^(?:http|s?ftp)s?://" # http:// or https:// + r"^(?:http|https|sftp|ftp)://" # http:// or https:// r"(?:(?:[A-Z0-9](?:[A-Z0-9-]{0,61}[A-Z0-9])?\.)" r"+(?:[A-Z]{2,6}\.?|[A-Z0-9-]{2,}\.?)|" # ...domain r"localhost|" # localhost... @@ -601,31 +601,51 @@ def is_valid_url(url: str, raise_error: bool = True) -> bool: return is_url -def is_valid_source_code_package( +def _raise_or_log_failure(log: "Logger", raise_error: bool, failure_msg: str) -> bool: + if raise_error: + raise LisaException(failure_msg) + else: + log.debug(failure_msg) + return False + + +# big function to check the parts of a url +# allow raising exceptions or log and return a bool +# allows checks for: +# expected domains +# protocols (require https, sftp, etc) +# filenames (pattern matching) +def check_url( + log: "Logger", source_url: str, - expected_package_name_pattern: Pattern[str], + expected_filename_pattern: Optional[Pattern[str]] = None, allowed_protocols: Optional[List[str]] = None, expected_domains: Optional[List[str]] = None, + raise_error: bool = False, ) -> bool: # avoid using a mutable default parameter if not allowed_protocols: allowed_protocols = [ "https", - "sftp", ] # first, check if it's a url. - if not is_valid_url(url=source_url, raise_error=False): + failure_msg = f"{source_url} is not a valid URL, check your arguments." + if not ( + is_valid_url(url=source_url, raise_error=False) + or _raise_or_log_failure(log, raise_error, failure_msg) + ): return False # NOTE: urllib might not work as you'd expect. # It doesn't throw on lots of things you wouldn't expect to be urls. # You must verify the parts on your own, some of them may be empty, some null. # check: https://docs.python.org/3/library/urllib.parse.html#url-parsing - + failure_msg = f"urlparse failed to parse url {source_url}, check your arguments." try: parts = urlparse(source_url) except ValueError: - return False + if not _raise_or_log_failure(log, raise_error, failure_msg): + return False # ex: from https://www.com/path/to/file.tar # scheme : https @@ -634,23 +654,53 @@ def is_valid_source_code_package( # get the filename from the path portion of the url file_path = parts.path.split("/")[-1] - full_match = expected_package_name_pattern.match(file_path) - if not full_match: - return False + full_match = None + # check we can match against the filename + if expected_filename_pattern: + full_match = expected_filename_pattern.match(file_path) + failure_msg = ( + f"File at {source_url} did not match pattern " + "{expected_package_name_pattern.pattern}." + ) + if not full_match: + if not _raise_or_log_failure(log, raise_error, failure_msg): + return False # check the expected domain is correct if present valid_netloc = not expected_domains or any( [domain.endswith(parts.netloc) for domain in expected_domains] ) + failure_msg = ( + f"net location of url {source_url} did not match " + f"expected domains { ','.join(expected_domains) } " + ) + if not (valid_netloc or _raise_or_log_failure(log, raise_error, failure_msg)): + return False - # optional but default is check access is via sftp/https - valid_scheme = any([parts.scheme == x for x in allowed_protocols]) - return ( - valid_scheme - and parts.netloc != "" - and valid_netloc - and (full_match.group(0) == file_path) + # Check the protocol (aka scheme) in the url + # default is check access is via https + failure_msg = ( + f"URL {source_url} uses an invalid protocol " + "or net location! Check url argument." ) + valid_scheme = any([parts.scheme == x for x in allowed_protocols]) + valid_netloc_and_scheme = valid_scheme and parts.netloc != "" and valid_netloc + if not ( + valid_netloc_and_scheme or _raise_or_log_failure(log, raise_error, failure_msg) + ): + return False + # finally verify the full match we found matches the actual filename + # avoids an accidental partial match + if expected_filename_pattern and full_match: + path_matches = full_match.group(0) == file_path + failure_msg = ( + f"File at url {source_url} failed to match" + f" pattern {expected_filename_pattern.pattern}." + ) + if not (path_matches or _raise_or_log_failure(log, raise_error, failure_msg)): + return False + + return True def filter_ansi_escape(content: str) -> str: diff --git a/microsoft/testsuites/dpdk/rdma_core.py b/microsoft/testsuites/dpdk/rdma_core.py index 5d1efcdf65..c43389e9c2 100644 --- a/microsoft/testsuites/dpdk/rdma_core.py +++ b/microsoft/testsuites/dpdk/rdma_core.py @@ -9,7 +9,7 @@ from lisa import Node from lisa.operating_system import Debian, Fedora, Suse from lisa.tools import Git, Make, Pkgconfig, Tar, Wget -from lisa.util import LisaException, SkippedException, is_valid_source_code_package +from lisa.util import LisaException, SkippedException, check_url class RdmaCoreManager: @@ -77,9 +77,10 @@ def _check_source_install(self) -> None: ) # finally, validate what we have looks reasonable and cool - is_valid_package = is_valid_source_code_package( + is_valid_package = check_url( + self.node.log, source_url=self._rdma_core_source, - expected_package_name_pattern=self._source_pattern, + expected_path_pattern=self._source_pattern, allowed_protocols=["https"], expected_domains=[ "visualstudio.com",