From 7b27e593c7e91b876033f37c695581157e54342a Mon Sep 17 00:00:00 2001 From: "Aaron (\"AJ\") Steers" Date: Mon, 29 Jan 2024 22:07:21 -0800 Subject: [PATCH] AirbyteLib: Installation improvements and improved error handling (#34572) --- airbyte-lib/airbyte_lib/_executor.py | 310 +++++++++++++----- .../_factories/connector_factories.py | 76 +++-- airbyte-lib/airbyte_lib/exceptions.py | 12 +- airbyte-lib/airbyte_lib/registry.py | 73 +++-- airbyte-lib/airbyte_lib/source.py | 81 ++++- airbyte-lib/airbyte_lib/validate.py | 10 +- airbyte-lib/docs/generated/airbyte_lib.html | 46 ++- .../source-test/source_test/__init__.py | 0 .../tests/integration_tests/test_install.py | 23 ++ .../integration_tests/test_integration.py | 167 +++++++--- 10 files changed, 609 insertions(+), 189 deletions(-) create mode 100644 airbyte-lib/tests/integration_tests/fixtures/source-test/source_test/__init__.py create mode 100644 airbyte-lib/tests/integration_tests/test_install.py diff --git a/airbyte-lib/airbyte_lib/_executor.py b/airbyte-lib/airbyte_lib/_executor.py index 20899f892006c..a43d56249163d 100644 --- a/airbyte-lib/airbyte_lib/_executor.py +++ b/airbyte-lib/airbyte_lib/_executor.py @@ -1,22 +1,23 @@ # Copyright (c) 2023 Airbyte, Inc., all rights reserved. from __future__ import annotations +import shlex import subprocess import sys from abc import ABC, abstractmethod -from contextlib import contextmanager +from contextlib import contextmanager, suppress from pathlib import Path -from typing import IO, TYPE_CHECKING, Any, NoReturn +from shutil import rmtree +from typing import IO, TYPE_CHECKING, Any, NoReturn, cast from airbyte_lib import exceptions as exc +from airbyte_lib.registry import ConnectorMetadata from airbyte_lib.telemetry import SourceTelemetryInfo, SourceType if TYPE_CHECKING: from collections.abc import Generator, Iterable, Iterator - from airbyte_lib.registry import ConnectorMetadata - _LATEST_VERSION = "latest" @@ -24,22 +25,37 @@ class Executor(ABC): def __init__( self, - metadata: ConnectorMetadata, + *, + name: str | None = None, + metadata: ConnectorMetadata | None = None, target_version: str | None = None, ) -> None: - self.metadata = metadata - self.enforce_version = target_version is not None - if target_version is None or target_version == _LATEST_VERSION: - self.target_version = metadata.latest_available_version - else: - self.target_version = target_version + """Initialize a connector executor. + + The 'name' param is required if 'metadata' is None. + """ + if not name and not metadata: + raise exc.AirbyteLibInternalError(message="Either name or metadata must be provided.") + + self.name: str = name or cast(ConnectorMetadata, metadata).name # metadata is not None here + self.metadata: ConnectorMetadata | None = metadata + self.enforce_version: bool = target_version is not None + + self.reported_version: str | None = None + self.target_version: str | None = None + if target_version: + if metadata and target_version == _LATEST_VERSION: + self.target_version = metadata.latest_available_version + else: + self.target_version = target_version @abstractmethod def execute(self, args: list[str]) -> Iterator[str]: pass @abstractmethod - def ensure_installation(self) -> None: + def ensure_installation(self, *, auto_fix: bool = True) -> None: + _ = auto_fix pass @abstractmethod @@ -101,71 +117,150 @@ def _stream_from_file(file: IO[str]) -> Generator[str, Any, None]: # If the exit code is not 0 or -15 (SIGTERM), raise an exception if exit_code not in (0, -15): - raise exc.AirbyteSubprocessFailedError(exit_code=exit_code) + raise exc.AirbyteSubprocessFailedError( + run_args=args, + exit_code=exit_code, + ) class VenvExecutor(Executor): def __init__( self, - metadata: ConnectorMetadata, + name: str | None = None, + *, + metadata: ConnectorMetadata | None = None, target_version: str | None = None, pip_url: str | None = None, - *, - install_if_missing: bool = False, + install_root: Path | None = None, ) -> None: - super().__init__(metadata, target_version) - self.install_if_missing = install_if_missing + """Initialize a connector executor that runs a connector in a virtual environment. + + Args: + name: The name of the connector. + metadata: (Optional.) The metadata of the connector. + target_version: (Optional.) The version of the connector to install. + pip_url: (Optional.) The pip URL of the connector to install. + install_root: (Optional.) The root directory where the virtual environment will be + created. If not provided, the current working directory will be used. + """ + super().__init__(name=name, metadata=metadata, target_version=target_version) # This is a temporary install path that will be replaced with a proper package # name once they are published. - # TODO: Replace with `f"airbyte-{self.metadata.name}"` - self.pip_url = pip_url or f"../airbyte-integrations/connectors/{self.metadata.name}" + # TODO: Replace with `f"airbyte-{self.name}"` + self.pip_url = pip_url or f"../airbyte-integrations/connectors/{self.name}" + self.install_root = install_root or Path.cwd() def _get_venv_name(self) -> str: - return f".venv-{self.metadata.name}" + return f".venv-{self.name}" + + def _get_venv_path(self) -> Path: + return self.install_root / self._get_venv_name() def _get_connector_path(self) -> Path: - return Path(self._get_venv_name(), "bin", self.metadata.name) + return self._get_venv_path() / "bin" / self.name def _run_subprocess_and_raise_on_failure(self, args: list[str]) -> None: - result = subprocess.run(args, check=False) + result = subprocess.run( + args, + check=False, + stderr=subprocess.PIPE, + ) if result.returncode != 0: - raise exc.AirbyteConnectorInstallationError from exc.AirbyteSubprocessFailedError( - exit_code=result.returncode + raise exc.AirbyteSubprocessFailedError( + run_args=args, + exit_code=result.returncode, + log_text=result.stderr.decode("utf-8"), ) def uninstall(self) -> None: - venv_name = self._get_venv_name() - if Path(venv_name).exists(): - self._run_subprocess_and_raise_on_failure(["rm", "-rf", venv_name]) + if self._get_venv_path().exists(): + rmtree(str(self._get_venv_path())) + + self.reported_version = None # Reset the reported version from the previous installation def install(self) -> None: - venv_name = self._get_venv_name() - self._run_subprocess_and_raise_on_failure([sys.executable, "-m", "venv", venv_name]) + """Install the connector in a virtual environment. + + After installation, the installed version will be stored in self.reported_version. + """ + self._run_subprocess_and_raise_on_failure( + [sys.executable, "-m", "venv", str(self._get_venv_path())] + ) - pip_path = str(Path(venv_name) / "bin" / "pip") + pip_path = str(self._get_venv_path() / "bin" / "pip") - self._run_subprocess_and_raise_on_failure([pip_path, "install", "-e", self.pip_url]) + try: + self._run_subprocess_and_raise_on_failure( + args=[pip_path, "install", *shlex.split(self.pip_url)] + ) + except exc.AirbyteSubprocessFailedError as ex: + # If the installation failed, remove the virtual environment + # Otherwise, the connector will be considered as installed and the user may not be able + # to retry the installation. + with suppress(exc.AirbyteSubprocessFailedError): + self.uninstall() - def _get_installed_version(self) -> str: + raise exc.AirbyteConnectorInstallationError from ex + + # Assuming the installation succeeded, store the installed version + self.reported_version = self._get_installed_version(raise_on_error=False, recheck=True) + + def _get_installed_version( + self, + *, + raise_on_error: bool = False, + recheck: bool = False, + ) -> str | None: """Detect the version of the connector installed. + Returns the version string if it can be detected, otherwise None. + + If raise_on_error is True, raise an exception if the version cannot be detected. + + If recheck if False and the version has already been detected, return the cached value. + In the venv, we run the following: > python -c "from importlib.metadata import version; print(version(''))" """ - venv_name = self._get_venv_name() - connector_name = self.metadata.name - return subprocess.check_output( - [ - Path(venv_name) / "bin" / "python", - "-c", - f"from importlib.metadata import version; print(version('{connector_name}'))", - ], - universal_newlines=True, - ).strip() + if not recheck and self.reported_version: + return self.reported_version + + connector_name = self.name + if not self.interpreter_path.exists(): + # No point in trying to detect the version if the interpreter does not exist + if raise_on_error: + raise exc.AirbyteLibInternalError( + message="Connector's virtual environment interpreter could not be found.", + context={ + "interpreter_path": self.interpreter_path, + }, + ) + return None + + try: + return subprocess.check_output( + [ + self.interpreter_path, + "-c", + f"from importlib.metadata import version; print(version('{connector_name}'))", + ], + universal_newlines=True, + ).strip() + except Exception: + if raise_on_error: + raise + + return None + + @property + def interpreter_path(self) -> Path: + return self._get_venv_path() / "bin" / "python" def ensure_installation( self, + *, + auto_fix: bool = True, ) -> None: """Ensure that the connector is installed in a virtual environment. @@ -176,48 +271,77 @@ def ensure_installation( Note: Version verification is not supported for connectors installed from a local path. """ - venv_name = f".venv-{self.metadata.name}" - venv_path = Path(venv_name) - if not venv_path.exists(): - if not self.install_if_missing: - raise exc.AirbyteConnectorNotFoundError( - message="Connector not available and venv does not exist.", - guidance=( - "Please ensure the connector is pre-installed or consider enabling " - "`install_if_missing=True`." - ), + # Store the installed version (or None if not installed) + if not self.reported_version: + self.reported_version = self._get_installed_version() + + original_installed_version = self.reported_version + + reinstalled = False + venv_name = f".venv-{self.name}" + if not self._get_venv_path().exists(): + if not auto_fix: + raise exc.AirbyteConnectorInstallationError( + message="Virtual environment does not exist.", + connector_name=self.name, + context={ + "venv_path": self._get_venv_path(), + }, + ) + + # If the venv path does not exist, install. + self.install() + reinstalled = True + + elif not self._get_connector_path().exists(): + if not auto_fix: + raise exc.AirbyteConnectorInstallationError( + message="Could not locate connector executable within the virtual environment.", + connector_name=self.name, context={ - "connector_name": self.metadata.name, - "venv_name": venv_name, + "connector_path": self._get_connector_path(), }, ) + + # If the connector path does not exist, uninstall and re-install. + # This is sometimes caused by a failed or partial installation. + self.uninstall() self.install() + reinstalled = True + + # By now, everything should be installed. Raise an exception if not. connector_path = self._get_connector_path() if not connector_path.exists(): - raise exc.AirbyteConnectorNotFoundError( - connector_name=self.metadata.name, + raise exc.AirbyteConnectorInstallationError( + message="Connector's executable could not be found within the virtual environment.", + connector_name=self.name, context={ - "venv_name": venv_name, + "connector_path": self._get_connector_path(), }, ) from FileNotFoundError(connector_path) if self.enforce_version: - installed_version = self._get_installed_version() - if installed_version != self.target_version: - # If the version doesn't match, reinstall - self.install() + version_after_reinstall: str | None = None + if self.reported_version != self.target_version: + if auto_fix and not reinstalled: + # If we haven't already reinstalled above, reinstall now. + self.install() + reinstalled = True + + if reinstalled: + version_after_reinstall = self.reported_version # Check the version again - version_after_install = self._get_installed_version() - if version_after_install != self.target_version: + if self.reported_version != self.target_version: raise exc.AirbyteConnectorInstallationError( - connector_name=self.metadata.name, + message="Connector's reported version does not match the target version.", + connector_name=self.name, context={ "venv_name": venv_name, "target_version": self.target_version, - "installed_version": installed_version, - "version_after_install": version_after_install, + "original_installed_version": original_installed_version, + "version_after_reinstall": version_after_reinstall, }, ) @@ -228,33 +352,69 @@ def execute(self, args: list[str]) -> Iterator[str]: yield from stream def get_telemetry_info(self) -> SourceTelemetryInfo: - return SourceTelemetryInfo(self.metadata.name, SourceType.VENV, self.target_version) + return SourceTelemetryInfo( + name=self.name, + type=SourceType.VENV, + version=self.reported_version, + ) class PathExecutor(Executor): - def ensure_installation(self) -> None: + def __init__( + self, + name: str | None = None, + *, + path: Path, + target_version: str | None = None, + ) -> None: + """Initialize a connector executor that runs a connector from a local path. + + If path is simply the name of the connector, it will be expected to exist in the current + PATH or in the current working directory. + """ + self.path: Path = path + name = name or path.name + super().__init__(name=name, target_version=target_version) + + def ensure_installation( + self, + *, + auto_fix: bool = True, + ) -> None: + """Ensure that the connector executable can be found. + + The auto_fix parameter is ignored for this executor type. + """ + _ = auto_fix try: self.execute(["spec"]) except Exception as e: - raise exc.AirbyteConnectorNotFoundError( - connector_name=self.metadata.name, + # TODO: Improve error handling. We should try to distinguish between + # a connector that is not installed and a connector that is not + # working properly. + raise exc.AirbyteConnectorExecutableNotFoundError( + connector_name=self.name, ) from e def install(self) -> NoReturn: raise exc.AirbyteConnectorInstallationError( message="Connector cannot be installed because it is not managed by airbyte-lib.", - connector_name=self.metadata.name, + connector_name=self.name, ) def uninstall(self) -> NoReturn: raise exc.AirbyteConnectorInstallationError( message="Connector cannot be uninstalled because it is not managed by airbyte-lib.", - connector_name=self.metadata.name, + connector_name=self.name, ) def execute(self, args: list[str]) -> Iterator[str]: - with _stream_from_subprocess([self.metadata.name, *args]) as stream: + with _stream_from_subprocess([str(self.path), *args]) as stream: yield from stream def get_telemetry_info(self) -> SourceTelemetryInfo: - return SourceTelemetryInfo(self.metadata.name, SourceType.LOCAL_INSTALL, version=None) + return SourceTelemetryInfo( + str(self.name), + SourceType.LOCAL_INSTALL, + version=self.reported_version, + ) diff --git a/airbyte-lib/airbyte_lib/_factories/connector_factories.py b/airbyte-lib/airbyte_lib/_factories/connector_factories.py index 4dbe8c6f41f06..197ed61142c69 100644 --- a/airbyte-lib/airbyte_lib/_factories/connector_factories.py +++ b/airbyte-lib/airbyte_lib/_factories/connector_factories.py @@ -1,11 +1,13 @@ # Copyright (c) 2023 Airbyte, Inc., all rights reserved. from __future__ import annotations +import shutil +from pathlib import Path from typing import Any -from airbyte_lib._executor import Executor, PathExecutor, VenvExecutor -from airbyte_lib.exceptions import AirbyteLibInputError -from airbyte_lib.registry import get_connector_metadata +from airbyte_lib import exceptions as exc +from airbyte_lib._executor import PathExecutor, VenvExecutor +from airbyte_lib.registry import ConnectorMetadata, get_connector_metadata from airbyte_lib.source import Source @@ -15,7 +17,7 @@ def get_connector( pip_url: str | None = None, config: dict[str, Any] | None = None, *, - use_local_install: bool = False, + local_executable: Path | str | None = None, install_if_missing: bool = True, ) -> Source: """Get a connector by name and version. @@ -29,34 +31,58 @@ def get_connector( connector name. config: connector config - if not provided, you need to set it later via the set_config method. - use_local_install: whether to use a virtual environment to run the connector. If True, the - connector is expected to be available on the path (e.g. installed via pip). If False, - the connector will be installed automatically in a virtual environment. - install_if_missing: whether to install the connector if it is not available locally. This - parameter is ignored if use_local_install is True. + local_executable: If set, the connector will be assumed to already be installed and will be + executed using this path or executable name. Otherwise, the connector will be installed + automatically in a virtual environment. + install_if_missing: Whether to install the connector if it is not available locally. This + parameter is ignored when local_executable is set. """ - metadata = get_connector_metadata(name) - if use_local_install: + if local_executable: if pip_url: - raise AirbyteLibInputError( - message="Param 'pip_url' is not supported when 'use_local_install' is True." + raise exc.AirbyteLibInputError( + message="Param 'pip_url' is not supported when 'local_executable' is set." ) if version: - raise AirbyteLibInputError( - message="Param 'version' is not supported when 'use_local_install' is True." + raise exc.AirbyteLibInputError( + message="Param 'version' is not supported when 'local_executable' is set." ) - executor: Executor = PathExecutor( - metadata=metadata, - target_version=version, - ) - else: - executor = VenvExecutor( - metadata=metadata, - target_version=version, - install_if_missing=install_if_missing, - pip_url=pip_url, + if isinstance(local_executable, str): + if "/" in local_executable or "\\" in local_executable: + # Assume this is a path + local_executable = Path(local_executable).absolute() + else: + which_executable = shutil.which(local_executable) + if which_executable is None: + raise FileNotFoundError(local_executable) + local_executable = Path(which_executable).absolute() + + return Source( + name=name, + config=config, + executor=PathExecutor( + name=name, + path=local_executable, + ), ) + + metadata: ConnectorMetadata | None = None + try: + metadata = get_connector_metadata(name) + except exc.AirbyteConnectorNotRegisteredError: + if not pip_url: + # We don't have a pip url or registry entry, so we can't install the connector + raise + + executor = VenvExecutor( + name=name, + metadata=metadata, + target_version=version, + pip_url=pip_url, + ) + if install_if_missing: + executor.ensure_installation() + return Source( executor=executor, name=name, diff --git a/airbyte-lib/airbyte_lib/exceptions.py b/airbyte-lib/airbyte_lib/exceptions.py index 3c6336d031033..934e936d4d489 100644 --- a/airbyte-lib/airbyte_lib/exceptions.py +++ b/airbyte-lib/airbyte_lib/exceptions.py @@ -174,6 +174,14 @@ class AirbyteConnectorRegistryError(AirbyteError): """Error when accessing the connector registry.""" +@dataclass +class AirbyteConnectorNotRegisteredError(AirbyteConnectorRegistryError): + """Connector not found in registry.""" + + connector_name: str | None = None + guidance = "Please double check the connector name." + + # Connector Errors @@ -184,8 +192,8 @@ class AirbyteConnectorError(AirbyteError): connector_name: str | None = None -class AirbyteConnectorNotFoundError(AirbyteConnectorError): - """Connector not found.""" +class AirbyteConnectorExecutableNotFoundError(AirbyteConnectorError): + """Connector executable not found.""" class AirbyteConnectorInstallationError(AirbyteConnectorError): diff --git a/airbyte-lib/airbyte_lib/registry.py b/airbyte-lib/airbyte_lib/registry.py index bd030a867ff00..a7faf64eb9195 100644 --- a/airbyte-lib/airbyte_lib/registry.py +++ b/airbyte-lib/airbyte_lib/registry.py @@ -3,6 +3,7 @@ import json import os +from copy import copy from dataclasses import dataclass from pathlib import Path @@ -12,32 +13,60 @@ from airbyte_lib.version import get_version +__cache: dict[str, ConnectorMetadata] | None = None + + +REGISTRY_ENV_VAR = "AIRBYTE_LOCAL_REGISTRY" +REGISTRY_URL = "https://connectors.airbyte.com/files/registries/v0/oss_registry.json" + + @dataclass class ConnectorMetadata: name: str latest_available_version: str -_cache: dict[str, ConnectorMetadata] | None = None +def _get_registry_url() -> str: + if REGISTRY_ENV_VAR in os.environ: + return str(os.environ.get(REGISTRY_ENV_VAR)) -REGISTRY_URL = "https://connectors.airbyte.com/files/registries/v0/oss_registry.json" + return REGISTRY_URL -def _update_cache() -> None: - global _cache - if os.environ.get("AIRBYTE_LOCAL_REGISTRY"): - with Path(str(os.environ.get("AIRBYTE_LOCAL_REGISTRY"))).open() as f: - data = json.load(f) - else: +def _get_registry_cache(*, force_refresh: bool = False) -> dict[str, ConnectorMetadata]: + """Return the registry cache.""" + global __cache + if __cache and not force_refresh: + return __cache + + registry_url = _get_registry_url() + if registry_url.startswith("http"): response = requests.get( - REGISTRY_URL, headers={"User-Agent": f"airbyte-lib-{get_version()}"} + registry_url, headers={"User-Agent": f"airbyte-lib-{get_version()}"} ) response.raise_for_status() data = response.json() - _cache = {} + else: + # Assume local file + with Path(registry_url).open() as f: + data = json.load(f) + + new_cache: dict[str, ConnectorMetadata] = {} + for connector in data["sources"]: name = connector["dockerRepository"].replace("airbyte/", "") - _cache[name] = ConnectorMetadata(name, connector["dockerImageTag"]) + new_cache[name] = ConnectorMetadata(name, connector["dockerImageTag"]) + + if len(new_cache) == 0: + raise exc.AirbyteLibInternalError( + message="Connector registry is empty.", + context={ + "registry_url": _get_registry_url(), + }, + ) + + __cache = new_cache + return __cache def get_connector_metadata(name: str) -> ConnectorMetadata: @@ -45,14 +74,20 @@ def get_connector_metadata(name: str) -> ConnectorMetadata: If the cache is empty, populate by calling update_cache. """ - if not _cache: - _update_cache() - if not _cache or name not in _cache: - raise exc.AirbyteLibInputError( - message="Connector name not found in registry.", - guidance="Please double check the connector name.", + cache = copy(_get_registry_cache()) + if not cache: + raise exc.AirbyteLibInternalError( + message="Connector registry could not be loaded.", + context={ + "registry_url": _get_registry_url(), + }, + ) + if name not in cache: + raise exc.AirbyteConnectorNotRegisteredError( + connector_name=name, context={ - "connector_name": name, + "registry_url": _get_registry_url(), + "available_connectors": sorted(cache.keys()), }, ) - return _cache[name] + return cache[name] diff --git a/airbyte-lib/airbyte_lib/source.py b/airbyte-lib/airbyte_lib/source.py index ec37e11791dd1..4db25b3afae09 100644 --- a/airbyte-lib/airbyte_lib/source.py +++ b/airbyte-lib/airbyte_lib/source.py @@ -7,6 +7,7 @@ from typing import TYPE_CHECKING, Any import jsonschema +import yaml from airbyte_protocol.models import ( AirbyteCatalog, @@ -68,7 +69,13 @@ def __init__( name: str, config: dict[str, Any] | None = None, streams: list[str] | None = None, + *, + validate: bool = False, ) -> None: + """Initialize the source. + + If config is provided, it will be validated against the spec if validate is True. + """ self._processed_records = 0 self.executor = executor self.name = name @@ -79,7 +86,7 @@ def __init__( self._spec: ConnectorSpecification | None = None self._selected_stream_names: list[str] | None = None if config is not None: - self.set_config(config) + self.set_config(config, validate=validate) if streams is not None: self.set_streams(streams) @@ -102,8 +109,22 @@ def set_streams(self, streams: list[str]) -> None: ) self._selected_stream_names = streams - def set_config(self, config: dict[str, Any]) -> None: - self._validate_config(config) + def set_config( + self, + config: dict[str, Any], + *, + validate: bool = False, + ) -> None: + """Set the config for the connector. + + If validate is True, raise an exception if the config fails validation. + + If validate is False, validation will be deferred until check() or validate_config() + is called. + """ + if validate: + self.validate_config(config) + self._config_dict = config @property @@ -131,9 +152,13 @@ def _discover(self) -> AirbyteCatalog: log_text=self._last_log_messages, ) - def _validate_config(self, config: dict[str, Any]) -> None: - """Validate the config against the spec.""" + def validate_config(self, config: dict[str, Any] | None = None) -> None: + """Validate the config against the spec. + + If config is not provided, the already-set config will be validated. + """ spec = self._get_spec(force_refresh=False) + config = self._config if config is None else config jsonschema.validate(config, spec.connectionSpecification) def get_available_streams(self) -> list[str]: @@ -161,6 +186,21 @@ def _get_spec(self, *, force_refresh: bool = False) -> ConnectorSpecification: log_text=self._last_log_messages, ) + @property + def _yaml_spec(self) -> str: + """Get the spec as a yaml string. + + For now, the primary use case is for writing and debugging a valid config for a source. + + This is private for now because we probably want better polish before exposing this + as a stable interface. This will also get easier when we have docs links with this info + for each connector. + """ + spec_obj: ConnectorSpecification = self._get_spec() + spec_dict = spec_obj.dict(exclude_unset=True) + # convert to a yaml string + return yaml.dump(spec_dict) + @property def discovered_catalog(self) -> AirbyteCatalog: """Get the raw catalog for the given streams. @@ -248,17 +288,23 @@ def check(self) -> None: * Make sure the subprocess is killed when the function returns. """ with as_temp_files([self._config]) as [config_file]: - for msg in self._execute(["check", "--config", config_file]): - if msg.type == Type.CONNECTION_STATUS and msg.connectionStatus: - if msg.connectionStatus.status != Status.FAILED: - return # Success! - - raise exc.AirbyteConnectorCheckFailedError( - context={ - "message": msg.connectionStatus.message, - } - ) - raise exc.AirbyteConnectorCheckFailedError(log_text=self._last_log_messages) + try: + for msg in self._execute(["check", "--config", config_file]): + if msg.type == Type.CONNECTION_STATUS and msg.connectionStatus: + if msg.connectionStatus.status != Status.FAILED: + return # Success! + + raise exc.AirbyteConnectorCheckFailedError( + context={ + "message": msg.connectionStatus.message, + } + ) + raise exc.AirbyteConnectorCheckFailedError(log_text=self._last_log_messages) + except exc.AirbyteConnectorReadError as ex: + raise exc.AirbyteConnectorCheckFailedError( + message="The connector failed to check the connection.", + log_text=ex.log_text, + ) from ex def install(self) -> None: """Install the connector if it is not yet installed.""" @@ -338,7 +384,8 @@ def _execute(self, args: list[str]) -> Iterator[AirbyteMessage]: * Read the output line by line of the subprocess and serialize them AirbyteMessage objects. Drop if not valid. """ - self.executor.ensure_installation() + # Fail early if the connector is not installed. + self.executor.ensure_installation(auto_fix=False) try: self._last_log_messages = [] diff --git a/airbyte-lib/airbyte_lib/validate.py b/airbyte-lib/airbyte_lib/validate.py index 75eab7e3fd394..551960ed5250d 100644 --- a/airbyte-lib/airbyte_lib/validate.py +++ b/airbyte-lib/airbyte_lib/validate.py @@ -42,11 +42,16 @@ def _parse_args() -> argparse.Namespace: def _run_subprocess_and_raise_on_failure(args: list[str]) -> None: - result = subprocess.run(args, check=False) + result = subprocess.run( + args, + check=False, + stderr=subprocess.PIPE, + ) if result.returncode != 0: raise exc.AirbyteSubprocessFailedError( run_args=args, exit_code=result.returncode, + log_text=result.stderr.decode("utf-8"), ) @@ -55,7 +60,8 @@ def full_tests(connector_name: str, sample_config: str) -> None: source = ab.get_connector( # TODO: FIXME: noqa: SIM115, PTH123 connector_name, - config=json.load(open(sample_config)), # noqa: SIM115, PTH123 + config=json.load(open(sample_config)), # noqa: SIM115, PTH123, + install_if_missing=False, ) print("Running check...") diff --git a/airbyte-lib/docs/generated/airbyte_lib.html b/airbyte-lib/docs/generated/airbyte_lib.html index c8d9f47128ea3..ba7c11e54e3d9 100644 --- a/airbyte-lib/docs/generated/airbyte_lib.html +++ b/airbyte-lib/docs/generated/airbyte_lib.html @@ -254,7 +254,7 @@
Inherited Members
def - get_connector( name: str, version: str | None = None, pip_url: str | None = None, config: dict[str, typing.Any] | None = None, *, use_local_install: bool = False, install_if_missing: bool = True) -> Source: + get_connector( name: str, version: str | None = None, pip_url: str | None = None, config: dict[str, typing.Any] | None = None, *, local_executable: pathlib.Path | str | None = None, install_if_missing: bool = True) -> Source:
@@ -271,11 +271,11 @@
Inherited Members
connector name. config: connector config - if not provided, you need to set it later via the set_config method. - use_local_install: whether to use a virtual environment to run the connector. If True, the - connector is expected to be available on the path (e.g. installed via pip). If False, - the connector will be installed automatically in a virtual environment. - install_if_missing: whether to install the connector if it is not available locally. This - parameter is ignored if use_local_install is True.

+ local_executable: If set, the connector will be assumed to already be installed and will be + executed using this path or executable name. Otherwise, the connector will be installed + automatically in a virtual environment. + install_if_missing: Whether to install the connector if it is not available locally. This + parameter is ignored when local_executable is set.

@@ -409,13 +409,17 @@
Inherited Members
- Source( executor: airbyte_lib._executor.Executor, name: str, config: dict[str, typing.Any] | None = None, streams: list[str] | None = None) + Source( executor: airbyte_lib._executor.Executor, name: str, config: dict[str, typing.Any] | None = None, streams: list[str] | None = None, *, validate: bool = False)
- +

Initialize the source.

+ +

If config is provided, it will be validated against the spec if validate is True.

+
+
@@ -465,13 +469,37 @@
Inherited Members
def - set_config(self, config: dict[str, typing.Any]) -> None: + set_config(self, config: dict[str, typing.Any], *, validate: bool = False) -> None:
+

Set the config for the connector.

+ +

If validate is True, raise an exception if the config fails validation.

+ +

If validate is False, validation will be deferred until check() or validate_config() +is called.

+
+ + +
+
+
+ + def + validate_config(self, config: dict[str, typing.Any] | None = None) -> None: + + +
+ +

Validate the config against the spec.

+ +

If config is not provided, the already-set config will be validated.

+
+
diff --git a/airbyte-lib/tests/integration_tests/fixtures/source-test/source_test/__init__.py b/airbyte-lib/tests/integration_tests/fixtures/source-test/source_test/__init__.py new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/airbyte-lib/tests/integration_tests/test_install.py b/airbyte-lib/tests/integration_tests/test_install.py new file mode 100644 index 0000000000000..c93801489f376 --- /dev/null +++ b/airbyte-lib/tests/integration_tests/test_install.py @@ -0,0 +1,23 @@ +# Copyright (c) 2023 Airbyte, Inc., all rights reserved. + +from gettext import install +import pytest + +from airbyte_lib._factories.connector_factories import get_connector +from airbyte_lib import exceptions as exc + + +def test_install_failure_log_pypi(): + """Test that the install log is created and contains the expected content.""" + with pytest.raises(exc.AirbyteConnectorNotRegisteredError): + source = get_connector("source-not-found") + + with pytest.raises(exc.AirbyteConnectorInstallationError) as exc_info: + source = get_connector( + "source-not-found", + pip_url="https://pypi.org/project/airbyte-not-found", + install_if_missing=True, + ) + + # Check that the stderr log contains the expected content from a failed pip install + assert 'Could not install requirement' in str(exc_info.value.__cause__.log_text) diff --git a/airbyte-lib/tests/integration_tests/test_integration.py b/airbyte-lib/tests/integration_tests/test_integration.py index 5dcd1f54e649b..5d55861975776 100644 --- a/airbyte-lib/tests/integration_tests/test_integration.py +++ b/airbyte-lib/tests/integration_tests/test_integration.py @@ -3,6 +3,7 @@ from collections.abc import Mapping import os import shutil +import subprocess from typing import Any from unittest.mock import Mock, call, patch import tempfile @@ -17,7 +18,7 @@ import pytest from airbyte_lib.caches import PostgresCache, PostgresCacheConfig -from airbyte_lib.registry import _update_cache +from airbyte_lib import registry from airbyte_lib.version import get_version from airbyte_lib.results import ReadResult from airbyte_lib.datasets import CachedDataset, LazyDataset, SQLDataset @@ -28,24 +29,32 @@ import ulid -@pytest.fixture(scope="module", autouse=True) +LOCAL_TEST_REGISTRY_URL = "./tests/integration_tests/fixtures/registry.json" + + +@pytest.fixture(scope="package", autouse=True) def prepare_test_env(): """ Prepare test environment. This will pre-install the test source from the fixtures array and set the environment variable to use the local json file as registry. """ - if os.path.exists(".venv-source-test"): - shutil.rmtree(".venv-source-test") + venv_dir = ".venv-source-test" + if os.path.exists(venv_dir): + shutil.rmtree(venv_dir) - os.system("python -m venv .venv-source-test") - os.system(".venv-source-test/bin/pip install -e ./tests/integration_tests/fixtures/source-test") + subprocess.run(["python", "-m", "venv", venv_dir], check=True) + subprocess.run([f"{venv_dir}/bin/pip", "install", "-e", "./tests/integration_tests/fixtures/source-test"], check=True) - os.environ["AIRBYTE_LOCAL_REGISTRY"] = "./tests/integration_tests/fixtures/registry.json" + os.environ["AIRBYTE_LOCAL_REGISTRY"] = LOCAL_TEST_REGISTRY_URL os.environ["DO_NOT_TRACK"] = "true" + # Force-refresh the registry cache + _ = registry._get_registry_cache(force_refresh=True) + yield shutil.rmtree(".venv-source-test") + @pytest.fixture def expected_test_stream_data() -> dict[str, list[dict[str, str | int]]]: return { @@ -58,15 +67,50 @@ def expected_test_stream_data() -> dict[str, list[dict[str, str | int]]]: ], } -def test_list_streams(expected_test_stream_data: dict[str, list[dict[str, str | int]]]): - source = ab.get_connector("source-test", config={"apiKey": "test"}) +def test_registry_get(): + assert registry._get_registry_url() == LOCAL_TEST_REGISTRY_URL + metadata = registry.get_connector_metadata("source-test") + assert metadata.name == "source-test" + assert metadata.latest_available_version == "0.0.1" + + +def test_list_streams(expected_test_stream_data: dict[str, list[dict[str, str | int]]]): + source = ab.get_connector( + "source-test", config={"apiKey": "test"}, install_if_missing=False + ) assert source.get_available_streams() == list(expected_test_stream_data.keys()) def test_invalid_config(): - with pytest.raises(Exception): - ab.get_connector("source-test", config={"apiKey": 1234}) + source = ab.get_connector( + "source-test", config={"apiKey": 1234}, install_if_missing=False + ) + with pytest.raises(exc.AirbyteConnectorCheckFailedError): + source.check() + + +def test_ensure_installation_detection(): + """Assert that install isn't called, since the connector is already installed by the fixture.""" + with patch("airbyte_lib._executor.VenvExecutor.install") as mock_venv_install, \ + patch("airbyte_lib.source.Source.install") as mock_source_install, \ + patch("airbyte_lib._executor.VenvExecutor.ensure_installation") as mock_ensure_installed: + source = ab.get_connector( + "source-test", + config={"apiKey": 1234}, + pip_url="https://pypi.org/project/airbyte-not-found", + install_if_missing=True, + ) + assert mock_ensure_installed.call_count == 1 + assert not mock_venv_install.called + assert not mock_source_install.called + + +def test_source_yaml_spec(): + source = ab.get_connector( + "source-test", config={"apiKey": 1234}, install_if_missing=False + ) + assert source._yaml_spec.startswith("connectionSpecification:\n $schema:") def test_non_existing_connector(): @@ -93,22 +137,35 @@ def test_version_enforcement(raises, latest_available_version, requested_version In this test, the actually installed version is 0.0.1 """ - _update_cache() - from airbyte_lib.registry import _cache - _cache["source-test"].latest_available_version = latest_available_version - if raises: - with pytest.raises(Exception): - ab.get_connector("source-test", version=requested_version, config={"apiKey": "abc"}) - else: - ab.get_connector("source-test", version=requested_version, config={"apiKey": "abc"}) - - # reset - _cache["source-test"].latest_available_version = "0.0.1" + patched_entry = registry.ConnectorMetadata( + name="source-test", latest_available_version=latest_available_version + ) + with patch.dict("airbyte_lib.registry.__cache", {"source-test": patched_entry}, clear=False): + if raises: + with pytest.raises(Exception): + source = ab.get_connector( + "source-test", + version=requested_version, + config={"apiKey": "abc"}, + install_if_missing=False, + ) + source.executor.ensure_installation(auto_fix=False) + else: + source = ab.get_connector( + "source-test", + version=requested_version, + config={"apiKey": "abc"}, + install_if_missing=False, + ) + source.executor.ensure_installation(auto_fix=False) def test_check(): - source = ab.get_connector("source-test", config={"apiKey": "test"}) - + source = ab.get_connector( + "source-test", + config={"apiKey": "test"}, + install_if_missing=False, + ) source.check() @@ -141,7 +198,7 @@ def assert_cache_data(expected_test_stream_data: dict[str, list[dict[str, str | pd.DataFrame(expected_test_stream_data[stream_name]), check_dtype=False, ) - + # validate that the cache doesn't contain any other streams if streams: assert len(list(cache.__iter__())) == len(streams) @@ -459,6 +516,15 @@ def test_sync_with_merge_to_postgres(new_pg_cache_config: PostgresCacheConfig, e check_dtype=False, ) + +def test_airbyte_lib_version() -> None: + assert get_version() + assert isinstance(get_version(), str) + + # Ensure the version is a valid semantic version (x.y.z or x.y.z.alpha0) + assert 3 <= len(get_version().split(".")) <= 4 + + @patch.dict('os.environ', {'DO_NOT_TRACK': ''}) @patch('airbyte_lib.telemetry.requests') @patch('airbyte_lib.telemetry.datetime') @@ -601,27 +667,48 @@ def test_failing_path_connector(): ab.get_connector("source-test", config={"apiKey": "test"}, use_local_install=True) def test_succeeding_path_connector(): - old_path = os.environ["PATH"] + new_path = f"{os.path.abspath('.venv-source-test/bin')}:{os.environ['PATH']}" + + # Patch the PATH env var to include the test venv bin folder + with patch.dict(os.environ, {"PATH": new_path}): + source = ab.get_connector( + "source-test", + config={"apiKey": "test"}, + local_executable="source-test", + ) + source.check() - # set path to include the test venv bin folder - os.environ["PATH"] = f"{os.path.abspath('.venv-source-test/bin')}:{os.environ['PATH']}" - source = ab.get_connector("source-test", config={"apiKey": "test"}, use_local_install=True) - source.check() +def test_install_uninstall(): + with tempfile.TemporaryDirectory() as temp_dir: + source = ab.get_connector( + "source-test", + pip_url="./tests/integration_tests/fixtures/source-test", + config={"apiKey": "test"}, + install_if_missing=False, + ) - os.environ["PATH"] = old_path + # Override the install root to avoid conflicts with the test fixture + install_root = Path(temp_dir) + source.executor.install_root = install_root -def test_install_uninstall(): - source = ab.get_connector("source-test", pip_url="./tests/integration_tests/fixtures/source-test", config={"apiKey": "test"}, install_if_missing=False) + # assert that the venv is gone + assert not os.path.exists(install_root / ".venv-source-test") - source.uninstall() + # use which to check if the executable is available + assert shutil.which("source-test") is None - # assert that the venv is gone - assert not os.path.exists(".venv-source-test") + # assert that the connector is not available + with pytest.raises(Exception): + source.check() + + source.install() + + assert os.path.exists(install_root / ".venv-source-test") + assert os.path.exists(install_root / ".venv-source-test/bin/source-test") - # assert that the connector is not available - with pytest.raises(Exception): source.check() - source.install() + source.uninstall() - source.check() + assert not os.path.exists(install_root / ".venv-source-test") + assert not os.path.exists(install_root / ".venv-source-test/bin/source-test")