Skip to content

Commit

Permalink
airbyte-lib base implementation (airbytehq#33409)
Browse files Browse the repository at this point in the history
Co-authored-by: alafanechere <augustin.lafanechere@gmail.com>
Co-authored-by: Aaron ("AJ") Steers <aj@airbyte.io>
  • Loading branch information
3 people authored and jatinyadav-cc committed Feb 26, 2024
1 parent a64dc89 commit 88aa6ab
Show file tree
Hide file tree
Showing 20 changed files with 1,685 additions and 0 deletions.
18 changes: 18 additions & 0 deletions .github/workflows/airbyte-ci-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,9 @@ jobs:
- 'airbyte-ci/connectors/metadata_service/lib/**'
- 'airbyte-ci/connectors/metadata_service/orchestrator/**'
- '!**/*.md'
airbyte_lib:
- 'airbyte_lib/**'
- '!**/*.md'
- name: Run airbyte-ci/connectors/connector_ops tests
if: steps.changes.outputs.ops_any_changed == 'true'
Expand Down Expand Up @@ -132,3 +135,18 @@ jobs:
docker_hub_password: ${{ secrets.DOCKER_HUB_PASSWORD }}
airbyte_ci_binary_url: ${{ inputs.airbyte_ci_binary_url || 'https://connectors.airbyte.com/airbyte-ci/releases/ubuntu/latest/airbyte-ci' }}
tailscale_auth_key: ${{ secrets.TAILSCALE_AUTH_KEY }}

- name: Run airbyte-lib tests
if: steps.changes.outputs.airbyte_lib_any_changed == 'true'
id: run-airbyte-lib-tests
uses: ./.github/actions/run-dagger-pipeline
with:
context: "pull_request"
docker_hub_password: ${{ secrets.DOCKER_HUB_PASSWORD }}
docker_hub_username: ${{ secrets.DOCKER_HUB_USERNAME }}
gcs_credentials: ${{ secrets.METADATA_SERVICE_PROD_GCS_CREDENTIALS }}
sentry_dsn: ${{ secrets.SENTRY_AIRBYTE_CI_DSN }}
github_token: ${{ secrets.GH_PAT_MAINTENANCE_OCTAVIA }}
subcommand: "test airbyte-lib"
airbyte_ci_binary_url: ${{ inputs.airbyte_ci_binary_url || 'https://connectors.airbyte.com/airbyte-ci/releases/ubuntu/latest/airbyte-ci' }}
tailscale_auth_key: ${{ secrets.TAILSCALE_AUTH_KEY }}
1 change: 1 addition & 0 deletions airbyte-lib/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
.venv*
10 changes: 10 additions & 0 deletions airbyte-lib/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
# airbyte-lib

airbyte-lib is a library that allows to run Airbyte syncs embedded into any Python application, without the need to run Airbyte server.

## Development

* Make sure [Poetry is installed](https://python-poetry.org/docs/#).
* Run `poetry install`
* For examples, check out the `examples` folder. They can be run via `poetry run python examples/<example file>`
* Unit tests and type checks can be run via `poetry run pytest`
12 changes: 12 additions & 0 deletions airbyte-lib/airbyte_lib/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@

from .factories import (get_connector, get_in_memory_cache)
from .sync_result import (Dataset, SyncResult)
from .source import (Source)

__all__ = [
"get_connector",
"get_in_memory_cache",
"Dataset",
"SyncResult",
"Source",
]
54 changes: 54 additions & 0 deletions airbyte-lib/airbyte_lib/cache.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.


from abc import ABC, abstractmethod
from typing import Any, Dict, Iterable, List

from airbyte_protocol.models import AirbyteRecordMessage


class Cache(ABC):
@abstractmethod
def write(self, messages: Iterable[AirbyteRecordMessage]):
pass

@abstractmethod
def get_iterable(self, stream: str) -> Iterable[Dict[str, Any]]:
pass

@abstractmethod
def get_pandas(self, stream: str) -> Any:
pass

@abstractmethod
def get_sql_table(self, stream: str) -> Any:
pass

@abstractmethod
def get_sql_engine(self) -> Any:
pass


class InMemoryCache(Cache):
"""The in-memory cache is accepting airbyte messages and stores them in a dictionary for streams (one list of dicts per stream)."""

def __init__(self) -> None:
self.streams: Dict[str, List[Dict[str, Any]]] = {}

def write(self, messages: Iterable[AirbyteRecordMessage]) -> None:
for message in messages:
if message.stream not in self.streams:
self.streams[message.stream] = []
self.streams[message.stream].append(message.data)

def get_iterable(self, stream: str) -> Iterable[Dict[str, Any]]:
return iter(self.streams[stream])

def get_pandas(self, stream: str) -> Any:
raise NotImplementedError()

def get_sql_table(self, stream: str) -> Any:
raise NotImplementedError()

def get_sql_engine(self) -> Any:
raise NotImplementedError()
155 changes: 155 additions & 0 deletions airbyte-lib/airbyte_lib/executor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.

import os
import subprocess
import sys
from abc import ABC, abstractmethod
from contextlib import contextmanager
from pathlib import Path
from typing import IO, Generator, Iterable, List

from airbyte_lib.registry import ConnectorMetadata


class Executor(ABC):
def __init__(self, metadata: ConnectorMetadata, target_version: str = "latest"):
self.metadata = metadata
self.target_version = target_version if target_version != "latest" else metadata.latest_available_version

@abstractmethod
def execute(self, args: List[str]) -> Iterable[str]:
pass

@abstractmethod
def ensure_installation(self):
pass

@abstractmethod
def install(self):
pass


@contextmanager
def _stream_from_subprocess(args: List[str]) -> Generator[Iterable[str], None, None]:
process = subprocess.Popen(
args,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
universal_newlines=True,
)

def _stream_from_file(file: IO[str]):
while True:
line = file.readline()
if not line:
break
yield line

if process.stdout is None:
raise Exception("Failed to start subprocess")
try:
yield _stream_from_file(process.stdout)
finally:
# Close the stdout stream
if process.stdout:
process.stdout.close()

# Terminate the process if it is still running
if process.poll() is None: # Check if the process is still running
process.terminate()
try:
# Wait for a short period to allow process to terminate gracefully
process.wait(timeout=10)
except subprocess.TimeoutExpired:
# If the process does not terminate within the timeout, force kill it
process.kill()

# Now, the process is either terminated or killed. Check the exit code.
exit_code = process.wait()

# If the exit code is not 0 or -15 (SIGTERM), raise an exception
if exit_code != 0 and exit_code != -15:
raise Exception(f"Process exited with code {exit_code}")


class VenvExecutor(Executor):
def __init__(self, metadata: ConnectorMetadata, target_version: str = "latest", install_if_missing: bool = False):
super().__init__(metadata, target_version)
self.install_if_missing = install_if_missing

def _get_venv_name(self):
return f".venv-{self.metadata.name}"

def _get_connector_path(self):
return Path(self._get_venv_name(), "bin", self.metadata.name)

def _run_subprocess_and_raise_on_failure(self, args: List[str]):
result = subprocess.run(args)
if result.returncode != 0:
raise Exception(f"Install process exited with code {result.returncode}")

def install(self):
venv_name = self._get_venv_name()
self._run_subprocess_and_raise_on_failure([sys.executable, "-m", "venv", venv_name])

pip_path = os.path.join(venv_name, "bin", "pip")

# TODO this is a temporary install path that will be replaced with a proper package name once they are published. At this point we are also using the version
package_to_install = f"../airbyte-integrations/connectors/{self.metadata.name}"
self._run_subprocess_and_raise_on_failure([pip_path, "install", "-e", package_to_install])

def _get_installed_version(self):
"""
In the venv, run the following: python -c "from importlib.metadata import version; print(version('<connector-name>'))"
"""
venv_name = self._get_venv_name()
connector_name = self.metadata.name
return subprocess.check_output(
[os.path.join(venv_name, "bin", "python"), "-c", f"from importlib.metadata import version; print(version('{connector_name}'))"],
universal_newlines=True,
).strip()

def ensure_installation(self):
venv_name = f".venv-{self.metadata.name}"
venv_path = Path(venv_name)
if not venv_path.exists():
if not self.install_if_missing:
raise Exception(f"Connector {self.metadata.name} is not available - venv {venv_name} does not exist")
self.install()

connector_path = self._get_connector_path()
if not connector_path.exists():
raise Exception(f"Could not find connector {self.metadata.name} in venv {venv_name}")

installed_version = self._get_installed_version()
if installed_version != self.target_version:
# If the version doesn't match, reinstall
self.install()

# Check the version again
version_after_install = self._get_installed_version()
if version_after_install != self.target_version:
raise Exception(
f"Failed to install connector {self.metadata.name} version {self.target_version}. Installed version is {version_after_install}"
)

def execute(self, args: List[str]) -> Iterable[str]:
connector_path = self._get_connector_path()

with _stream_from_subprocess([str(connector_path)] + args) as stream:
yield from stream


class PathExecutor(Executor):
def ensure_installation(self):
try:
self.execute(["spec"])
except Exception as e:
raise Exception(f"Connector {self.metadata.name} is not available - executing it failed: {e}")

def install(self):
raise Exception(f"Connector {self.metadata.name} is not available - cannot install it")

def execute(self, args: List[str]) -> Iterable[str]:
with _stream_from_subprocess([self.metadata.name] + args) as stream:
yield from stream
34 changes: 34 additions & 0 deletions airbyte-lib/airbyte_lib/factories.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.


from typing import Any, Dict, Optional

from airbyte_lib.cache import InMemoryCache
from airbyte_lib.executor import PathExecutor, VenvExecutor
from airbyte_lib.registry import get_connector_metadata
from airbyte_lib.source import Source


def get_in_memory_cache():
return InMemoryCache()


def get_connector(
name: str,
version: str = "latest",
config: Optional[Dict[str, Any]] = None,
use_local_install: bool = False,
install_if_missing: bool = False,
):
"""
Get a connector by name and version.
:param name: connector name
:param version: connector version - if not provided, the most recent version will be used
:param config: connector config - if not provided, you need to set it later via the set_config method
:param use_local_install: whether to use a virtual environment to run the connector. If True, the connector is expected to be available on the path (e.g. installed via pip). If False, the connector will be installed automatically in a virtual environment.
:param install_if_missing: whether to install the connector if it is not available locally. This parameter is ignored if use_local_install is True.
"""
metadata = get_connector_metadata(name)
return Source(
PathExecutor(metadata, version) if use_local_install else VenvExecutor(metadata, version, install_if_missing), name, config
)
Empty file.
47 changes: 47 additions & 0 deletions airbyte-lib/airbyte_lib/registry.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.

import importlib.metadata
import json
import os
from dataclasses import dataclass
from typing import Dict, Optional

import requests


@dataclass
class ConnectorMetadata:
name: str
latest_available_version: str


_cache: Optional[Dict[str, ConnectorMetadata]] = None
airbyte_lib_version = importlib.metadata.version("airbyte-lib")

REGISTRY_URL = "https://connectors.airbyte.com/files/registries/v0/oss_registry.json"


def _update_cache() -> None:
global _cache
if os.environ.get("AIRBYTE_LOCAL_REGISTRY"):
with open(str(os.environ.get("AIRBYTE_LOCAL_REGISTRY")), "r") as f:
data = json.load(f)
else:
response = requests.get(REGISTRY_URL, headers={"User-Agent": f"airbyte-lib-{airbyte_lib_version}"})
response.raise_for_status()
data = response.json()
_cache = {}
for connector in data["sources"]:
name = connector["dockerRepository"].replace("airbyte/", "")
_cache[name] = ConnectorMetadata(name, connector["dockerImageTag"])


def get_connector_metadata(name: str):
"""
check the cache for the connector. If the cache is empty, populate by calling update_cache
"""
if not _cache:
_update_cache()
if not _cache or name not in _cache:
raise Exception(f"Connector {name} not found")
return _cache[name]
Loading

0 comments on commit 88aa6ab

Please sign in to comment.