Skip to content

Commit

Permalink
AirbyteLib: Installation improvements and improved error handling (ai…
Browse files Browse the repository at this point in the history
  • Loading branch information
aaronsteers authored and jatinyadav-cc committed Feb 21, 2024
1 parent 18d4cde commit ea51661
Show file tree
Hide file tree
Showing 10 changed files with 609 additions and 189 deletions.
310 changes: 235 additions & 75 deletions airbyte-lib/airbyte_lib/_executor.py

Large diffs are not rendered by default.

76 changes: 51 additions & 25 deletions airbyte-lib/airbyte_lib/_factories/connector_factories.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
from __future__ import annotations

import shutil
from pathlib import Path
from typing import Any

from airbyte_lib._executor import Executor, PathExecutor, VenvExecutor
from airbyte_lib.exceptions import AirbyteLibInputError
from airbyte_lib.registry import get_connector_metadata
from airbyte_lib import exceptions as exc
from airbyte_lib._executor import PathExecutor, VenvExecutor
from airbyte_lib.registry import ConnectorMetadata, get_connector_metadata
from airbyte_lib.source import Source


Expand All @@ -15,7 +17,7 @@ def get_connector(
pip_url: str | None = None,
config: dict[str, Any] | None = None,
*,
use_local_install: bool = False,
local_executable: Path | str | None = None,
install_if_missing: bool = True,
) -> Source:
"""Get a connector by name and version.
Expand All @@ -29,34 +31,58 @@ def get_connector(
connector name.
config: connector config - if not provided, you need to set it later via the set_config
method.
use_local_install: whether to use a virtual environment to run the connector. If True, the
connector is expected to be available on the path (e.g. installed via pip). If False,
the connector will be installed automatically in a virtual environment.
install_if_missing: whether to install the connector if it is not available locally. This
parameter is ignored if use_local_install is True.
local_executable: If set, the connector will be assumed to already be installed and will be
executed using this path or executable name. Otherwise, the connector will be installed
automatically in a virtual environment.
install_if_missing: Whether to install the connector if it is not available locally. This
parameter is ignored when local_executable is set.
"""
metadata = get_connector_metadata(name)
if use_local_install:
if local_executable:
if pip_url:
raise AirbyteLibInputError(
message="Param 'pip_url' is not supported when 'use_local_install' is True."
raise exc.AirbyteLibInputError(
message="Param 'pip_url' is not supported when 'local_executable' is set."
)
if version:
raise AirbyteLibInputError(
message="Param 'version' is not supported when 'use_local_install' is True."
raise exc.AirbyteLibInputError(
message="Param 'version' is not supported when 'local_executable' is set."
)
executor: Executor = PathExecutor(
metadata=metadata,
target_version=version,
)

else:
executor = VenvExecutor(
metadata=metadata,
target_version=version,
install_if_missing=install_if_missing,
pip_url=pip_url,
if isinstance(local_executable, str):
if "/" in local_executable or "\\" in local_executable:
# Assume this is a path
local_executable = Path(local_executable).absolute()
else:
which_executable = shutil.which(local_executable)
if which_executable is None:
raise FileNotFoundError(local_executable)
local_executable = Path(which_executable).absolute()

return Source(
name=name,
config=config,
executor=PathExecutor(
name=name,
path=local_executable,
),
)

metadata: ConnectorMetadata | None = None
try:
metadata = get_connector_metadata(name)
except exc.AirbyteConnectorNotRegisteredError:
if not pip_url:
# We don't have a pip url or registry entry, so we can't install the connector
raise

executor = VenvExecutor(
name=name,
metadata=metadata,
target_version=version,
pip_url=pip_url,
)
if install_if_missing:
executor.ensure_installation()

return Source(
executor=executor,
name=name,
Expand Down
12 changes: 10 additions & 2 deletions airbyte-lib/airbyte_lib/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,14 @@ class AirbyteConnectorRegistryError(AirbyteError):
"""Error when accessing the connector registry."""


@dataclass
class AirbyteConnectorNotRegisteredError(AirbyteConnectorRegistryError):
"""Connector not found in registry."""

connector_name: str | None = None
guidance = "Please double check the connector name."


# Connector Errors


Expand All @@ -184,8 +192,8 @@ class AirbyteConnectorError(AirbyteError):
connector_name: str | None = None


class AirbyteConnectorNotFoundError(AirbyteConnectorError):
"""Connector not found."""
class AirbyteConnectorExecutableNotFoundError(AirbyteConnectorError):
"""Connector executable not found."""


class AirbyteConnectorInstallationError(AirbyteConnectorError):
Expand Down
73 changes: 54 additions & 19 deletions airbyte-lib/airbyte_lib/registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

import json
import os
from copy import copy
from dataclasses import dataclass
from pathlib import Path

Expand All @@ -12,47 +13,81 @@
from airbyte_lib.version import get_version


__cache: dict[str, ConnectorMetadata] | None = None


REGISTRY_ENV_VAR = "AIRBYTE_LOCAL_REGISTRY"
REGISTRY_URL = "https://connectors.airbyte.com/files/registries/v0/oss_registry.json"


@dataclass
class ConnectorMetadata:
name: str
latest_available_version: str


_cache: dict[str, ConnectorMetadata] | None = None
def _get_registry_url() -> str:
if REGISTRY_ENV_VAR in os.environ:
return str(os.environ.get(REGISTRY_ENV_VAR))

REGISTRY_URL = "https://connectors.airbyte.com/files/registries/v0/oss_registry.json"
return REGISTRY_URL


def _update_cache() -> None:
global _cache
if os.environ.get("AIRBYTE_LOCAL_REGISTRY"):
with Path(str(os.environ.get("AIRBYTE_LOCAL_REGISTRY"))).open() as f:
data = json.load(f)
else:
def _get_registry_cache(*, force_refresh: bool = False) -> dict[str, ConnectorMetadata]:
"""Return the registry cache."""
global __cache
if __cache and not force_refresh:
return __cache

registry_url = _get_registry_url()
if registry_url.startswith("http"):
response = requests.get(
REGISTRY_URL, headers={"User-Agent": f"airbyte-lib-{get_version()}"}
registry_url, headers={"User-Agent": f"airbyte-lib-{get_version()}"}
)
response.raise_for_status()
data = response.json()
_cache = {}
else:
# Assume local file
with Path(registry_url).open() as f:
data = json.load(f)

new_cache: dict[str, ConnectorMetadata] = {}

for connector in data["sources"]:
name = connector["dockerRepository"].replace("airbyte/", "")
_cache[name] = ConnectorMetadata(name, connector["dockerImageTag"])
new_cache[name] = ConnectorMetadata(name, connector["dockerImageTag"])

if len(new_cache) == 0:
raise exc.AirbyteLibInternalError(
message="Connector registry is empty.",
context={
"registry_url": _get_registry_url(),
},
)

__cache = new_cache
return __cache


def get_connector_metadata(name: str) -> ConnectorMetadata:
"""Check the cache for the connector.
If the cache is empty, populate by calling update_cache.
"""
if not _cache:
_update_cache()
if not _cache or name not in _cache:
raise exc.AirbyteLibInputError(
message="Connector name not found in registry.",
guidance="Please double check the connector name.",
cache = copy(_get_registry_cache())
if not cache:
raise exc.AirbyteLibInternalError(
message="Connector registry could not be loaded.",
context={
"registry_url": _get_registry_url(),
},
)
if name not in cache:
raise exc.AirbyteConnectorNotRegisteredError(
connector_name=name,
context={
"connector_name": name,
"registry_url": _get_registry_url(),
"available_connectors": sorted(cache.keys()),
},
)
return _cache[name]
return cache[name]
81 changes: 64 additions & 17 deletions airbyte-lib/airbyte_lib/source.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from typing import TYPE_CHECKING, Any

import jsonschema
import yaml

from airbyte_protocol.models import (
AirbyteCatalog,
Expand Down Expand Up @@ -68,7 +69,13 @@ def __init__(
name: str,
config: dict[str, Any] | None = None,
streams: list[str] | None = None,
*,
validate: bool = False,
) -> None:
"""Initialize the source.
If config is provided, it will be validated against the spec if validate is True.
"""
self._processed_records = 0
self.executor = executor
self.name = name
Expand All @@ -79,7 +86,7 @@ def __init__(
self._spec: ConnectorSpecification | None = None
self._selected_stream_names: list[str] | None = None
if config is not None:
self.set_config(config)
self.set_config(config, validate=validate)
if streams is not None:
self.set_streams(streams)

Expand All @@ -102,8 +109,22 @@ def set_streams(self, streams: list[str]) -> None:
)
self._selected_stream_names = streams

def set_config(self, config: dict[str, Any]) -> None:
self._validate_config(config)
def set_config(
self,
config: dict[str, Any],
*,
validate: bool = False,
) -> None:
"""Set the config for the connector.
If validate is True, raise an exception if the config fails validation.
If validate is False, validation will be deferred until check() or validate_config()
is called.
"""
if validate:
self.validate_config(config)

self._config_dict = config

@property
Expand Down Expand Up @@ -131,9 +152,13 @@ def _discover(self) -> AirbyteCatalog:
log_text=self._last_log_messages,
)

def _validate_config(self, config: dict[str, Any]) -> None:
"""Validate the config against the spec."""
def validate_config(self, config: dict[str, Any] | None = None) -> None:
"""Validate the config against the spec.
If config is not provided, the already-set config will be validated.
"""
spec = self._get_spec(force_refresh=False)
config = self._config if config is None else config
jsonschema.validate(config, spec.connectionSpecification)

def get_available_streams(self) -> list[str]:
Expand Down Expand Up @@ -161,6 +186,21 @@ def _get_spec(self, *, force_refresh: bool = False) -> ConnectorSpecification:
log_text=self._last_log_messages,
)

@property
def _yaml_spec(self) -> str:
"""Get the spec as a yaml string.
For now, the primary use case is for writing and debugging a valid config for a source.
This is private for now because we probably want better polish before exposing this
as a stable interface. This will also get easier when we have docs links with this info
for each connector.
"""
spec_obj: ConnectorSpecification = self._get_spec()
spec_dict = spec_obj.dict(exclude_unset=True)
# convert to a yaml string
return yaml.dump(spec_dict)

@property
def discovered_catalog(self) -> AirbyteCatalog:
"""Get the raw catalog for the given streams.
Expand Down Expand Up @@ -248,17 +288,23 @@ def check(self) -> None:
* Make sure the subprocess is killed when the function returns.
"""
with as_temp_files([self._config]) as [config_file]:
for msg in self._execute(["check", "--config", config_file]):
if msg.type == Type.CONNECTION_STATUS and msg.connectionStatus:
if msg.connectionStatus.status != Status.FAILED:
return # Success!

raise exc.AirbyteConnectorCheckFailedError(
context={
"message": msg.connectionStatus.message,
}
)
raise exc.AirbyteConnectorCheckFailedError(log_text=self._last_log_messages)
try:
for msg in self._execute(["check", "--config", config_file]):
if msg.type == Type.CONNECTION_STATUS and msg.connectionStatus:
if msg.connectionStatus.status != Status.FAILED:
return # Success!

raise exc.AirbyteConnectorCheckFailedError(
context={
"message": msg.connectionStatus.message,
}
)
raise exc.AirbyteConnectorCheckFailedError(log_text=self._last_log_messages)
except exc.AirbyteConnectorReadError as ex:
raise exc.AirbyteConnectorCheckFailedError(
message="The connector failed to check the connection.",
log_text=ex.log_text,
) from ex

def install(self) -> None:
"""Install the connector if it is not yet installed."""
Expand Down Expand Up @@ -338,7 +384,8 @@ def _execute(self, args: list[str]) -> Iterator[AirbyteMessage]:
* Read the output line by line of the subprocess and serialize them AirbyteMessage objects.
Drop if not valid.
"""
self.executor.ensure_installation()
# Fail early if the connector is not installed.
self.executor.ensure_installation(auto_fix=False)

try:
self._last_log_messages = []
Expand Down
Loading

0 comments on commit ea51661

Please sign in to comment.