From 4331ef7b139cae4e81aa2578c13e197cd5bc09c3 Mon Sep 17 00:00:00 2001 From: Artem Inzhyyants <36314070+artem1205@users.noreply.github.com> Date: Thu, 20 Apr 2023 22:14:51 +0200 Subject: [PATCH] Source Amplitude: refactor Events stream (#25317) * Source Amplitude: Refactor Events Stream, based on Python CDK * Source Amplitude: bump docker version * Source Amplitude: Fix add get_updated_state * Revert "Increase memory_limit for Amplitude (#25282)" This reverts commit 06354ae4e4c26be0c25e2d9f0dd9ea02ddf6e7c0. * Source Amplitude: Fix backport compatibility with new config * Update source_definitions.yaml * Source Amplitude: update catalogs --------- Co-authored-by: artem1205 --- .../src/main/resources/seed/oss_catalog.json | 58 +++++- .../resources/seed/source_definitions.yaml | 7 +- .../src/main/resources/seed/source_specs.yaml | 2 +- .../connectors/source-amplitude/Dockerfile | 2 +- .../source_amplitude/manifest.yaml | 33 ---- .../source_amplitude/source.py | 25 +++ .../source_amplitude/streams.py | 187 ++++++++++++++++++ connectors.md | 3 +- 8 files changed, 266 insertions(+), 51 deletions(-) create mode 100644 airbyte-integrations/connectors/source-amplitude/source_amplitude/streams.py diff --git a/airbyte-config-oss/init-oss/src/main/resources/seed/oss_catalog.json b/airbyte-config-oss/init-oss/src/main/resources/seed/oss_catalog.json index 4411af037b11c5..50314e85a83848 100644 --- a/airbyte-config-oss/init-oss/src/main/resources/seed/oss_catalog.json +++ b/airbyte-config-oss/init-oss/src/main/resources/seed/oss_catalog.json @@ -1111,6 +1111,54 @@ "public": true, "custom": false, "releaseStage": "alpha" + }, { + "destinationDefinitionId": "e088acb6-9780-4568-880c-54c2dd7f431b", + "name": "Cumul.io", + "dockerRepository": "airbyte/destination-cumulio", + "dockerImageTag": "0.1.0", + "documentationUrl": "https://docs.airbyte.com/integrations/destinations/cumulio", + "icon": "cumulio.svg", + "spec": { + "documentationUrl": "https://docs.airbyte.com/integrations/destinations/cumulio", + "connectionSpecification": { + "$schema": "http://json-schema.org/draft-07/schema#", + "title": "Destination Cumulio", + "type": "object", + "required": [ "api_host", "api_key", "api_token" ], + "additionalProperties": true, + "properties": { + "api_host": { + "title": "Cumul.io API Host URL", + "description": "URL of the Cumul.io API (e.g. 'https://api.cumul.io', 'https://api.us.cumul.io', or VPC-specific API url). Defaults to 'https://api.cumul.io'.", + "default": "https://api.cumul.io", + "type": "string", + "order": 0 + }, + "api_key": { + "title": "Cumul.io API Key", + "description": "An API key generated in Cumul.io's platform (can be generated here: https://app.cumul.io/start/profile/integration).", + "type": "string", + "airbyte_secret": true, + "order": 1 + }, + "api_token": { + "title": "Cumul.io API Token", + "description": "The corresponding API token generated in Cumul.io's platform (can be generated here: https://app.cumul.io/start/profile/integration).", + "type": "string", + "airbyte_secret": true, + "order": 2 + } + } + }, + "supportsIncremental": true, + "supportsNormalization": false, + "supportsDBT": false, + "supported_destination_sync_modes": [ "overwrite", "append" ] + }, + "tombstone": false, + "public": true, + "custom": false, + "releaseStage": "alpha" }, { "destinationDefinitionId": "81740ce8-d764-4ea7-94df-16bb41de36ae", "name": "Chargify (Keen)", @@ -8987,7 +9035,7 @@ "sourceDefinitionId": "fa9f58c6-2d03-4237-aaa4-07d75e0c1396", "name": "Amplitude", "dockerRepository": "airbyte/source-amplitude", - "dockerImageTag": "0.2.2", + "dockerImageTag": "0.2.3", "documentationUrl": "https://docs.airbyte.com/integrations/sources/amplitude", "icon": "amplitude.svg", "sourceType": "api", @@ -9044,14 +9092,6 @@ "public": true, "custom": false, "releaseStage": "generally_available", - "resourceRequirements": { - "jobSpecific": [ { - "jobType": "sync", - "resourceRequirements": { - "memory_limit": "8Gi" - } - } ] - }, "allowedHosts": { "hosts": [ "amplitude.com", "analytics.eu.amplitude.com" ] } diff --git a/airbyte-config-oss/init-oss/src/main/resources/seed/source_definitions.yaml b/airbyte-config-oss/init-oss/src/main/resources/seed/source_definitions.yaml index e7cf6a81c69b94..d3b75f3ca7dcc6 100644 --- a/airbyte-config-oss/init-oss/src/main/resources/seed/source_definitions.yaml +++ b/airbyte-config-oss/init-oss/src/main/resources/seed/source_definitions.yaml @@ -103,16 +103,11 @@ - name: Amplitude sourceDefinitionId: fa9f58c6-2d03-4237-aaa4-07d75e0c1396 dockerRepository: airbyte/source-amplitude - dockerImageTag: 0.2.2 + dockerImageTag: 0.2.3 documentationUrl: https://docs.airbyte.com/integrations/sources/amplitude icon: amplitude.svg sourceType: api releaseStage: generally_available - resourceRequirements: - jobSpecific: - - jobType: sync - resourceRequirements: - memory_limit: "8Gi" allowedHosts: hosts: - amplitude.com diff --git a/airbyte-config-oss/init-oss/src/main/resources/seed/source_specs.yaml b/airbyte-config-oss/init-oss/src/main/resources/seed/source_specs.yaml index f05c6ba1e2898c..804ba775723e0d 100644 --- a/airbyte-config-oss/init-oss/src/main/resources/seed/source_specs.yaml +++ b/airbyte-config-oss/init-oss/src/main/resources/seed/source_specs.yaml @@ -1344,7 +1344,7 @@ supportsNormalization: false supportsDBT: false supported_destination_sync_modes: [] -- dockerImage: "airbyte/source-amplitude:0.2.2" +- dockerImage: "airbyte/source-amplitude:0.2.3" spec: documentationUrl: "https://docs.airbyte.com/integrations/sources/amplitude" connectionSpecification: diff --git a/airbyte-integrations/connectors/source-amplitude/Dockerfile b/airbyte-integrations/connectors/source-amplitude/Dockerfile index e29ba7ee8b847c..f7e438199e3434 100644 --- a/airbyte-integrations/connectors/source-amplitude/Dockerfile +++ b/airbyte-integrations/connectors/source-amplitude/Dockerfile @@ -34,5 +34,5 @@ COPY source_amplitude ./source_amplitude ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py" ENTRYPOINT ["python", "/airbyte/integration_code/main.py"] -LABEL io.airbyte.version=0.2.2 +LABEL io.airbyte.version=0.2.3 LABEL io.airbyte.name=airbyte/source-amplitude diff --git a/airbyte-integrations/connectors/source-amplitude/source_amplitude/manifest.yaml b/airbyte-integrations/connectors/source-amplitude/source_amplitude/manifest.yaml index a7deefaf743ec0..94c41951fadf8d 100644 --- a/airbyte-integrations/connectors/source-amplitude/source_amplitude/manifest.yaml +++ b/airbyte-integrations/connectors/source-amplitude/source_amplitude/manifest.yaml @@ -119,45 +119,12 @@ definitions: primary_key: "date" path: "/2/users" - events_stream: - $ref: "#/definitions/base_incremental_stream" - retriever: - paginator: - type: NoPagination - requester: - $ref: "#/definitions/requester" - request_parameters: - start: "{{format_datetime(stream_slice.start_time, '%Y%m%dT%H') }}" - end: "{{format_datetime(stream_slice.end_time, '%Y%m%dT%H') }}" - record_selector: - type: RecordSelector - extractor: - type: CustomRecordExtractor - class_name: source_amplitude.components.EventsExtractor - record_filter: - condition: "{{ record[parameters['stream_cursor_field']] > stream_state.get(parameters['stream_cursor_field'],config['start_date']) }}" - incremental_sync: - $ref: "#/definitions/datetime_incremental_sync" - step: "PT{{config.get('request_time_range', 4)}}H" - cursor_field: "{{ parameters.get('stream_cursor_field') }}" - cursor_granularity: PT1H - start_datetime: - datetime: "{{ format_datetime(config['start_date'], '%Y-%m-%dT%H:%M:%S.%f%z') }}" - end_datetime: - datetime: "{{ now_utc().strftime('%Y-%m-%dT%H:%M:%S.%f%z') }}" - datetime_format: "%Y-%m-%dT%H:%M:%S.%f%z" - $parameters: - name: "events" - primary_key: "uuid" - path: "/2/export" - stream_cursor_field: "server_upload_time" streams: - "#/definitions/annotations_stream" - "#/definitions/cohorts_stream" - "#/definitions/average_session_length_stream" - "#/definitions/active_users_stream" - - "#/definitions/events_stream" check: stream_names: diff --git a/airbyte-integrations/connectors/source-amplitude/source_amplitude/source.py b/airbyte-integrations/connectors/source-amplitude/source_amplitude/source.py index aa2ac4e45a4866..247482de51633e 100644 --- a/airbyte-integrations/connectors/source-amplitude/source_amplitude/source.py +++ b/airbyte-integrations/connectors/source-amplitude/source_amplitude/source.py @@ -2,7 +2,13 @@ # Copyright (c) 2023 Airbyte, Inc., all rights reserved. # +from base64 import b64encode +from typing import Any, List, Mapping + from airbyte_cdk.sources.declarative.yaml_declarative_source import YamlDeclarativeSource +from airbyte_cdk.sources.streams import Stream +from airbyte_cdk.sources.streams.http.requests_native_auth import TokenAuthenticator +from source_amplitude.streams import Events """ This file provides the necessary constructs to interpret a provided declarative YAML configuration file into @@ -16,3 +22,22 @@ class SourceAmplitude(YamlDeclarativeSource): def __init__(self): super().__init__(**{"path_to_yaml": "manifest.yaml"}) + + def _convert_auth_to_token(self, username: str, password: str) -> str: + username = username.encode("latin1") + password = password.encode("latin1") + token = b64encode(b":".join((username, password))).strip().decode("ascii") + return token + + def streams(self, config: Mapping[str, Any]) -> List[Stream]: + streams = super().streams(config=config) + auth = TokenAuthenticator(token=self._convert_auth_to_token(config["api_key"], config["secret_key"]), auth_method="Basic") + streams.append( + Events( + authenticator=auth, + start_date=config["start_date"], + data_region=config["data_region"], + event_time_interval={"size_unit": "hours", "size": config.get("request_time_range", 24)}, + ) + ) + return streams diff --git a/airbyte-integrations/connectors/source-amplitude/source_amplitude/streams.py b/airbyte-integrations/connectors/source-amplitude/source_amplitude/streams.py new file mode 100644 index 00000000000000..bb0c263ac9b3b9 --- /dev/null +++ b/airbyte-integrations/connectors/source-amplitude/source_amplitude/streams.py @@ -0,0 +1,187 @@ +# +# Copyright (c) 2023 Airbyte, Inc., all rights reserved. +# + +import gzip +import io +import json +import logging +import zipfile +from typing import IO, Any, Iterable, List, Mapping, MutableMapping, Optional + +import pendulum +import requests +from airbyte_cdk.models import SyncMode +from airbyte_cdk.sources.streams.availability_strategy import AvailabilityStrategy +from airbyte_cdk.sources.streams.http import HttpStream + +LOGGER = logging.getLogger("airbyte") + +HTTP_ERROR_CODES = { + 400: { + "msg": "The file size of the exported data is too large. Shorten the time ranges and try again. The limit size is 4GB.", + "lvl": "ERROR", + }, + 404: { + "msg": "No data collected", + "lvl": "WARN", + }, + 504: { + "msg": "The amount of data is large causing a timeout. For large amounts of data, the Amazon S3 destination is recommended.", + "lvl": "ERROR", + }, +} + + +def error_msg_from_status(status: int = None): + if status: + level = HTTP_ERROR_CODES[status]["lvl"] + message = HTTP_ERROR_CODES[status]["msg"] + if level == "ERROR": + LOGGER.error(message) + elif level == "WARN": + LOGGER.warning(message) + else: + LOGGER.error(f"Unknown error occured: code {status}") + + +class Events(HttpStream): + api_version = 2 + base_params = {} + cursor_field = "server_upload_time" + date_template = "%Y%m%dT%H" + compare_date_template = "%Y-%m-%d %H:%M:%S.%f" + primary_key = "uuid" + state_checkpoint_interval = 1000 + + def __init__(self, data_region: str, start_date: str, event_time_interval: dict = None, **kwargs): + if event_time_interval is None: + event_time_interval = {"size_unit": "hours", "size": 24} + self.data_region = data_region + self.event_time_interval = event_time_interval + self._start_date = pendulum.parse(start_date) if isinstance(start_date, str) else start_date + self.date_time_fields = self._get_date_time_items_from_schema() + super().__init__(**kwargs) + + @property + def url_base(self) -> str: + subdomain = "analytics.eu." if self.data_region == "EU Residency Server" else "" + return f"https://{subdomain}amplitude.com/api/" + + @property + def availability_strategy(self) -> Optional["AvailabilityStrategy"]: + return None + + @property + def time_interval(self) -> dict: + return {self.event_time_interval.get("size_unit"): self.event_time_interval.get("size")} + + def get_updated_state(self, current_stream_state: MutableMapping[str, Any], latest_record: Mapping[str, Any]) -> Mapping[str, Any]: + # save state value in source native format + if self.compare_date_template: + latest_state = pendulum.parse(latest_record[self.cursor_field]).strftime(self.compare_date_template) + else: + latest_state = latest_record.get(self.cursor_field, "") + return {self.cursor_field: max(latest_state, current_stream_state.get(self.cursor_field, ""))} + + def _get_date_time_items_from_schema(self): + """ + Get all properties from schema with format: 'date-time' + """ + result = [] + schema = self.get_json_schema() + for key, value in schema["properties"].items(): + if value.get("format") == "date-time": + result.append(key) + return result + + def _date_time_to_rfc3339(self, record: MutableMapping[str, Any]) -> MutableMapping[str, Any]: + """ + Transform 'date-time' items to RFC3339 format + """ + for item in record: + if item in self.date_time_fields and record[item]: + record[item] = pendulum.parse(record[item]).to_rfc3339_string() + return record + + def parse_response(self, response: requests.Response, stream_state: Mapping[str, Any] = None, **kwargs) -> Iterable[Mapping]: + state_value = stream_state[self.cursor_field] if stream_state else self._start_date.strftime(self.compare_date_template) + try: + zip_file = zipfile.ZipFile(io.BytesIO(response.content)) + except zipfile.BadZipFile as e: + self.logger.exception(e) + self.logger.error( + f"Received an invalid zip file in response to URL: {response.request.url}." + f"The size of the response body is: {len(response.content)}" + ) + return [] + + for gzip_filename in zip_file.namelist(): + with zip_file.open(gzip_filename) as file: + for record in self._parse_zip_file(file): + if record[self.cursor_field] >= state_value: + yield self._date_time_to_rfc3339(record) # transform all `date-time` to RFC3339 + + def _parse_zip_file(self, zip_file: IO[bytes]) -> Iterable[MutableMapping]: + with gzip.open(zip_file) as file: + for record in file: + yield json.loads(record) + + def stream_slices(self, stream_state: Mapping[str, Any] = None, **kwargs) -> Iterable[Optional[Mapping[str, Any]]]: + slices = [] + start = pendulum.parse(stream_state.get(self.cursor_field)) if stream_state else self._start_date + end = pendulum.now() + if start > end: + self.logger.info("The data cannot be requested in the future. Skipping stream.") + return [] + + while start <= end: + slices.append( + { + "start": start.strftime(self.date_template), + "end": start.add(**self.time_interval).subtract(hours=1).strftime(self.date_template), + } + ) + start = start.add(**self.time_interval) + + return slices + + def read_records( + self, + sync_mode: SyncMode, + cursor_field: List[str] = None, + stream_slice: Mapping[str, Any] = None, + stream_state: Mapping[str, Any] = None, + ) -> Iterable[Mapping[str, Any]]: + stream_state = stream_state or {} + start = pendulum.parse(stream_slice["start"]) + end = pendulum.parse(stream_slice["end"]) + if start > end: + yield from [] + # sometimes the API throws a 404 error for not obvious reasons, we have to handle it and log it. + # for example, if there is no data from the specified time period, a 404 exception is thrown + # https://developers.amplitude.com/docs/export-api#status-codes + try: + self.logger.info(f"Fetching {self.name} time range: {start.strftime('%Y-%m-%dT%H')} - {end.strftime('%Y-%m-%dT%H')}") + records = super().read_records(sync_mode, cursor_field, stream_slice, stream_state) + yield from records + except requests.exceptions.HTTPError as error: + status = error.response.status_code + if status in HTTP_ERROR_CODES.keys(): + error_msg_from_status(status) + yield from [] + else: + self.logger.error(f"Error during syncing {self.name} stream - {error}") + raise + + def request_params(self, stream_slice: Mapping[str, Any], **kwargs) -> MutableMapping[str, Any]: + params = self.base_params + params["start"] = pendulum.parse(stream_slice["start"]).strftime(self.date_template) + params["end"] = pendulum.parse(stream_slice["end"]).strftime(self.date_template) + return params + + def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]: + return None + + def path(self, **kwargs) -> str: + return f"{self.api_version}/export" diff --git a/connectors.md b/connectors.md index a2da3b3639fe94..3a2960b4878a3d 100644 --- a/connectors.md +++ b/connectors.md @@ -15,7 +15,7 @@ | **Amazon Ads** | Amazon Ads icon | Source | airbyte/source-amazon-ads:1.0.3 | generally_available | [docs](https://docs.airbyte.com/integrations/sources/amazon-ads) | [connectors/source/amazon-ads](https://github.com/airbytehq/airbyte/issues?q=is:open+is:issue+label:connectors/source/amazon-ads) | [source-amazon-ads](https://github.com/airbytehq/airbyte/tree/master/airbyte-integrations/connectors/source-amazon-ads) | `c6b0a29e-1da9-4512-9002-7bfd0cba2246` | | **Amazon SQS** | Amazon SQS icon | Source | airbyte/source-amazon-sqs:0.1.0 | alpha | [docs](https://docs.airbyte.com/integrations/sources/amazon-sqs) | [connectors/source/amazon-sqs](https://github.com/airbytehq/airbyte/issues?q=is:open+is:issue+label:connectors/source/amazon-sqs) | [source-amazon-sqs](https://github.com/airbytehq/airbyte/tree/master/airbyte-integrations/connectors/source-amazon-sqs) | `983fd355-6bf3-4709-91b5-37afa391eeb6` | | **Amazon Seller Partner** | Amazon Seller Partner icon | Source | airbyte/source-amazon-seller-partner:1.0.1 | alpha | [docs](https://docs.airbyte.com/integrations/sources/amazon-seller-partner) | [connectors/source/amazon-seller-partner](https://github.com/airbytehq/airbyte/issues?q=is:open+is:issue+label:connectors/source/amazon-seller-partner) | [source-amazon-seller-partner](https://github.com/airbytehq/airbyte/tree/master/airbyte-integrations/connectors/source-amazon-seller-partner) | `e55879a8-0ef8-4557-abcf-ab34c53ec460` | -| **Amplitude** | Amplitude icon | Source | airbyte/source-amplitude:0.2.2 | generally_available | [docs](https://docs.airbyte.com/integrations/sources/amplitude) | [connectors/source/amplitude](https://github.com/airbytehq/airbyte/issues?q=is:open+is:issue+label:connectors/source/amplitude) | [source-amplitude](https://github.com/airbytehq/airbyte/tree/master/airbyte-integrations/connectors/source-amplitude) | `fa9f58c6-2d03-4237-aaa4-07d75e0c1396` | +| **Amplitude** | Amplitude icon | Source | airbyte/source-amplitude:0.2.3 | generally_available | [docs](https://docs.airbyte.com/integrations/sources/amplitude) | [connectors/source/amplitude](https://github.com/airbytehq/airbyte/issues?q=is:open+is:issue+label:connectors/source/amplitude) | [source-amplitude](https://github.com/airbytehq/airbyte/tree/master/airbyte-integrations/connectors/source-amplitude) | `fa9f58c6-2d03-4237-aaa4-07d75e0c1396` | | **Apify Dataset** | Apify Dataset icon | Source | airbyte/source-apify-dataset:0.1.11 | alpha | [docs](https://docs.airbyte.com/integrations/sources/apify-dataset) | [connectors/source/apify-dataset](https://github.com/airbytehq/airbyte/issues?q=is:open+is:issue+label:connectors/source/apify-dataset) | [source-apify-dataset](https://github.com/airbytehq/airbyte/tree/master/airbyte-integrations/connectors/source-apify-dataset) | `47f17145-fe20-4ef5-a548-e29b048adf84` | | **Appfollow** | Appfollow icon | Source | airbyte/source-appfollow:0.1.1 | alpha | [docs](https://docs.airbyte.com/integrations/sources/appfollow) | [connectors/source/appfollow](https://github.com/airbytehq/airbyte/issues?q=is:open+is:issue+label:connectors/source/appfollow) | [source-appfollow](https://github.com/airbytehq/airbyte/tree/master/airbyte-integrations/connectors/source-appfollow) | `b4375641-e270-41d3-9c20-4f9cecad87a8` | | **Apple Search Ads** | Apple Search Ads icon | Source | airbyte/source-apple-search-ads:0.1.0 | alpha | [docs](https://docs.airbyte.com/integrations/sources/apple-search-ads) | [connectors/source/apple-search-ads](https://github.com/airbytehq/airbyte/issues?q=is:open+is:issue+label:connectors/source/apple-search-ads) | [source-apple-search-ads](https://github.com/airbytehq/airbyte/tree/master/airbyte-integrations/connectors/source-apple-search-ads) | `e59c8416-c2fa-4bd3-9e95-52677ea281c1` | @@ -303,6 +303,7 @@ | **Clickhouse** | Clickhouse icon | Destination | airbyte/destination-clickhouse:0.2.3 | alpha | [docs](https://docs.airbyte.com/integrations/destinations/clickhouse) | [connectors/destination/clickhouse](https://github.com/airbytehq/airbyte/issues?q=is:open+is:issue+label:connectors/destination/clickhouse) | [destination-clickhouse](https://github.com/airbytehq/airbyte/tree/master/airbyte-integrations/connectors/destination-clickhouse) | `ce0d828e-1dc4-496c-b122-2da42e637e48` | | **Cloudflare R2** | Cloudflare R2 icon | Destination | airbyte/destination-r2:0.1.0 | alpha | [docs](https://docs.airbyte.com/integrations/destinations/r2) | [connectors/destination/r2](https://github.com/airbytehq/airbyte/issues?q=is:open+is:issue+label:connectors/destination/r2) | [destination-r2](https://github.com/airbytehq/airbyte/tree/master/airbyte-integrations/connectors/destination-r2) | `0fb07be9-7c3b-4336-850d-5efc006152ee` | | **Convex** | Convex icon | Destination | airbyte/destination-convex:0.1.0 | alpha | [docs](https://docs.airbyte.io/integrations/destinations/convex) | [connectors/destination/convex](https://github.com/airbytehq/airbyte/issues?q=is:open+is:issue+label:connectors/destination/convex) | [destination-convex](https://github.com/airbytehq/airbyte/tree/master/airbyte-integrations/connectors/destination-convex) | `3eb4d99c-11fa-4561-a259-fc88e0c2f8f4` | +| **Cumul.io** | Cumul.io icon | Destination | airbyte/destination-cumulio:0.1.0 | alpha | [docs](https://docs.airbyte.com/integrations/destinations/cumulio) | [connectors/destination/cumulio](https://github.com/airbytehq/airbyte/issues?q=is:open+is:issue+label:connectors/destination/cumulio) | [destination-cumulio](https://github.com/airbytehq/airbyte/tree/master/airbyte-integrations/connectors/destination-cumulio) | `e088acb6-9780-4568-880c-54c2dd7f431b` | | **Databend** | Databend icon | Destination | airbyte/destination-databend:0.1.2 | alpha | [docs](https://docs.airbyte.com/integrations/destinations/databend) | [connectors/destination/databend](https://github.com/airbytehq/airbyte/issues?q=is:open+is:issue+label:connectors/destination/databend) | [destination-databend](https://github.com/airbytehq/airbyte/tree/master/airbyte-integrations/connectors/destination-databend) | `302e4d8e-08d3-4098-acd4-ac67ca365b88` | | **Databricks Lakehouse** | Databricks Lakehouse icon | Destination | airbyte/destination-databricks:1.0.1 | alpha | [docs](https://docs.airbyte.com/integrations/destinations/databricks) | [connectors/destination/databricks](https://github.com/airbytehq/airbyte/issues?q=is:open+is:issue+label:connectors/destination/databricks) | [destination-databricks](https://github.com/airbytehq/airbyte/tree/master/airbyte-integrations/connectors/destination-databricks) | `072d5540-f236-4294-ba7c-ade8fd918496` | | **DuckDB** | DuckDB icon | Destination | airbyte/destination-duckdb:0.1.0 | alpha | [docs](https://docs.airbyte.io/integrations/destinations/duckdb) | [connectors/destination/duckdb](https://github.com/airbytehq/airbyte/issues?q=is:open+is:issue+label:connectors/destination/duckdb) | [destination-duckdb](https://github.com/airbytehq/airbyte/tree/master/airbyte-integrations/connectors/destination-duckdb) | `94bd199c-2ff0-4aa2-b98e-17f0acb72610` |