From 59f9f0989aa576fc28e0efac463cc93095f360a9 Mon Sep 17 00:00:00 2001 From: Jeremy Date: Mon, 1 Nov 2021 20:52:01 -0500 Subject: [PATCH] =?UTF-8?q?=F0=9F=8E=89=20=20New=20Destination:=20Elastics?= =?UTF-8?q?earch=20(#7005)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * feat: adds destination-elasticsearch * feat: adds destination-elasticsearch es server container * refactor: header configuration * update: only call createIndex when preparing the writes * update: reuse container * fix: make index names valid and use namespace * refactor: use bulk process and buffered consumer * refactor: fix bulk process and buffered consumer * chore: update documentation * update: remove ssl reference * fix: bulk indexing adds test logging config to inspect http wire begins work for overrwriting existing records * docs: update for authentication * refactor: simplify config * refactor: cleanup indices, implement auth * update: cleanup equals/toString in Elasticsearch ConnectionConfiguration * chore: use conventions and remove unused code * update: close underlying rest connection * update: enable `supportsNormalization` * refactor: better encapsulate index naming * update: allow upserting * update: use oneOf for auth method * refactor: use encapsulated auth object * chore: pretty * update: simplify auth header creation * chore: remove unused class * update: use boolean as field type * adds: elasticsearch example server * fix: api secret test --- .gitignore | 5 +- .vscode/settings.json | 3 + .../68f351a7-2745-4bef-ad7f-996b8e51bb8c.json | 7 + .../seed/destination_definitions.yaml | 5 + airbyte-integrations/builds.md | 1 + .../destination-elasticsearch/.dockerignore | 3 + .../destination-elasticsearch/Dockerfile | 11 + .../destination-elasticsearch/README.md | 68 ++++ .../destination-elasticsearch/bootstrap.md | 33 ++ .../destination-elasticsearch/build.gradle | 49 +++ .../docker-compose.yaml | 11 + .../elasticsearch/ConnectorConfiguration.java | 159 +++++++++ ...icsearchAirbyteMessageConsumerFactory.java | 114 ++++++ .../ElasticsearchAuthenticationMethod.java | 11 + .../ElasticsearchConnection.java | 328 ++++++++++++++++++ .../ElasticsearchDestination.java | 98 ++++++ .../ElasticsearchWriteConfig.java | 102 ++++++ .../main/resources/log4j2-test.example.yml | 33 ++ .../src/main/resources/spec.json | 93 +++++ ...lasticsearchDestinationAcceptanceTest.java | 105 ++++++ .../ConnectorConfigurationTest.java | 94 +++++ .../ElasticsearchConnectionTest.java | 59 ++++ .../ElasticsearchDestinationTest.java | 185 ++++++++++ .../elasticsearch/elasticsearch.yml | 4 + docs/SUMMARY.md | 1 + docs/integrations/README.md | 1 + .../destinations/elasticsearch.md | 69 ++++ 27 files changed, 1649 insertions(+), 3 deletions(-) create mode 100644 .vscode/settings.json create mode 100644 airbyte-config/init/src/main/resources/config/STANDARD_DESTINATION_DEFINITION/68f351a7-2745-4bef-ad7f-996b8e51bb8c.json create mode 100644 airbyte-integrations/connectors/destination-elasticsearch/.dockerignore create mode 100644 airbyte-integrations/connectors/destination-elasticsearch/Dockerfile create mode 100644 airbyte-integrations/connectors/destination-elasticsearch/README.md create mode 100644 airbyte-integrations/connectors/destination-elasticsearch/bootstrap.md create mode 100644 airbyte-integrations/connectors/destination-elasticsearch/build.gradle create mode 100644 airbyte-integrations/connectors/destination-elasticsearch/docker-compose.yaml create mode 100644 airbyte-integrations/connectors/destination-elasticsearch/src/main/java/io/airbyte/integrations/destination/elasticsearch/ConnectorConfiguration.java create mode 100644 airbyte-integrations/connectors/destination-elasticsearch/src/main/java/io/airbyte/integrations/destination/elasticsearch/ElasticsearchAirbyteMessageConsumerFactory.java create mode 100644 airbyte-integrations/connectors/destination-elasticsearch/src/main/java/io/airbyte/integrations/destination/elasticsearch/ElasticsearchAuthenticationMethod.java create mode 100644 airbyte-integrations/connectors/destination-elasticsearch/src/main/java/io/airbyte/integrations/destination/elasticsearch/ElasticsearchConnection.java create mode 100644 airbyte-integrations/connectors/destination-elasticsearch/src/main/java/io/airbyte/integrations/destination/elasticsearch/ElasticsearchDestination.java create mode 100644 airbyte-integrations/connectors/destination-elasticsearch/src/main/java/io/airbyte/integrations/destination/elasticsearch/ElasticsearchWriteConfig.java create mode 100644 airbyte-integrations/connectors/destination-elasticsearch/src/main/resources/log4j2-test.example.yml create mode 100644 airbyte-integrations/connectors/destination-elasticsearch/src/main/resources/spec.json create mode 100644 airbyte-integrations/connectors/destination-elasticsearch/src/test-integration/java/io/airbyte/integrations/destination/elasticsearch/ElasticsearchDestinationAcceptanceTest.java create mode 100644 airbyte-integrations/connectors/destination-elasticsearch/src/test/java/io/airbyte/integrations/destination/elasticsearch/ConnectorConfigurationTest.java create mode 100644 airbyte-integrations/connectors/destination-elasticsearch/src/test/java/io/airbyte/integrations/destination/elasticsearch/ElasticsearchConnectionTest.java create mode 100644 airbyte-integrations/connectors/destination-elasticsearch/src/test/java/io/airbyte/integrations/destination/elasticsearch/ElasticsearchDestinationTest.java create mode 100644 airbyte-integrations/connectors/destination-elasticsearch/src/test/java/io/airbyte/integrations/destination/elasticsearch/elasticsearch.yml create mode 100644 docs/integrations/destinations/elasticsearch.md diff --git a/.gitignore b/.gitignore index 45316ca315b54..c20c705ffe50e 100644 --- a/.gitignore +++ b/.gitignore @@ -60,6 +60,5 @@ resources/examples/airflow/logs/* # Cloud Demo !airbyte-webapp/src/packages/cloud/data - -# Sphinx Docs -_build +/bin/ +/**/bin/ diff --git a/.vscode/settings.json b/.vscode/settings.json new file mode 100644 index 0000000000000..4f81299a37cfc --- /dev/null +++ b/.vscode/settings.json @@ -0,0 +1,3 @@ +{ + "java.configuration.updateBuildConfiguration": "automatic" +} diff --git a/airbyte-config/init/src/main/resources/config/STANDARD_DESTINATION_DEFINITION/68f351a7-2745-4bef-ad7f-996b8e51bb8c.json b/airbyte-config/init/src/main/resources/config/STANDARD_DESTINATION_DEFINITION/68f351a7-2745-4bef-ad7f-996b8e51bb8c.json new file mode 100644 index 0000000000000..a1c5bb5e1ac91 --- /dev/null +++ b/airbyte-config/init/src/main/resources/config/STANDARD_DESTINATION_DEFINITION/68f351a7-2745-4bef-ad7f-996b8e51bb8c.json @@ -0,0 +1,7 @@ +{ + "destinationDefinitionId": "68f351a7-2745-4bef-ad7f-996b8e51bb8c", + "name": "Elasticsearch", + "dockerRepository": "airbyte/destination-elasticsearch", + "dockerImageTag": "0.1.0", + "documentationUrl": "https://docs.airbyte.io/integrations/destinations/elasticsearch" +} diff --git a/airbyte-config/init/src/main/resources/seed/destination_definitions.yaml b/airbyte-config/init/src/main/resources/seed/destination_definitions.yaml index 7e67a08b0c565..fc8c6e05d48b5 100644 --- a/airbyte-config/init/src/main/resources/seed/destination_definitions.yaml +++ b/airbyte-config/init/src/main/resources/seed/destination_definitions.yaml @@ -23,6 +23,11 @@ dockerRepository: airbyte/destination-dynamodb dockerImageTag: 0.1.0 documentationUrl: https://docs.airbyte.io/integrations/destinations/dynamodb +- destinationDefinitionId: 68f351a7-2745-4bef-ad7f-996b8e51bb8c + name: Elasticsearch + dockerRepository: airbyte/destination-elasticsearch + dockerImageTag: 0.1.0 + documentationUrl: https://docs.airbyte.io/integrations/destinations/elasticsearch - name: Google Cloud Storage (GCS) destinationDefinitionId: ca8f6566-e555-4b40-943a-545bf123117a dockerRepository: airbyte/destination-gcs diff --git a/airbyte-integrations/builds.md b/airbyte-integrations/builds.md index 87ae0adce6343..70ce39f56dc4e 100644 --- a/airbyte-integrations/builds.md +++ b/airbyte-integrations/builds.md @@ -99,6 +99,7 @@ | Azure Blob Storage | [![destination-azure-blob-storage](https://img.shields.io/endpoint?url=https%3A%2F%2Fdnsgjos7lj2fu.cloudfront.net%2Ftests%2Fsummary%2Fdestination-azure-blob-storage%2Fbadge.json)](https://dnsgjos7lj2fu.cloudfront.net/tests/summary/destination-azure-blob-storage) | | BigQuery | [![destination-bigquery](https://img.shields.io/endpoint?url=https%3A%2F%2Fdnsgjos7lj2fu.cloudfront.net%2Ftests%2Fsummary%2Fdestination-bigquery%2Fbadge.json)](https://dnsgjos7lj2fu.cloudfront.net/tests/summary/destination-bigquery) | | Databricks | (Temporarily Not Available) | +| Elasticsearch | (Temporarily Not Available) | | Google Cloud Storage (GCS) | [![destination-gcs](https://img.shields.io/endpoint?url=https%3A%2F%2Fdnsgjos7lj2fu.cloudfront.net%2Ftests%2Fsummary%2Fdestination-gcs%2Fbadge.json)](https://dnsgjos7lj2fu.cloudfront.net/tests/summary/destination-gcs) | | Google PubSub | [![destination-pubsub](https://img.shields.io/endpoint?url=https%3A%2F%2Fdnsgjos7lj2fu.cloudfront.net%2Ftests%2Fsummary%2Fdestination-pubsub%2Fbadge.json)](https://dnsgjos7lj2fu.cloudfront.net/tests/summary/destination-pubsub) | | Kafka | [![destination-kafka](https://img.shields.io/endpoint?url=https%3A%2F%2Fdnsgjos7lj2fu.cloudfront.net%2Ftests%2Fsummary%2Fdestination-kafka%2Fbadge.json)](https://dnsgjos7lj2fu.cloudfront.net/tests/summary/destination-kafka) | diff --git a/airbyte-integrations/connectors/destination-elasticsearch/.dockerignore b/airbyte-integrations/connectors/destination-elasticsearch/.dockerignore new file mode 100644 index 0000000000000..65c7d0ad3e73c --- /dev/null +++ b/airbyte-integrations/connectors/destination-elasticsearch/.dockerignore @@ -0,0 +1,3 @@ +* +!Dockerfile +!build diff --git a/airbyte-integrations/connectors/destination-elasticsearch/Dockerfile b/airbyte-integrations/connectors/destination-elasticsearch/Dockerfile new file mode 100644 index 0000000000000..ae3509fcadb7d --- /dev/null +++ b/airbyte-integrations/connectors/destination-elasticsearch/Dockerfile @@ -0,0 +1,11 @@ +FROM airbyte/integration-base-java:dev + +WORKDIR /airbyte +ENV APPLICATION destination-elasticsearch + +COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar + +RUN tar xf ${APPLICATION}.tar --strip-components=1 + +LABEL io.airbyte.version=0.1.0 +LABEL io.airbyte.name=airbyte/destination-elasticsearch diff --git a/airbyte-integrations/connectors/destination-elasticsearch/README.md b/airbyte-integrations/connectors/destination-elasticsearch/README.md new file mode 100644 index 0000000000000..0d7468cc682b4 --- /dev/null +++ b/airbyte-integrations/connectors/destination-elasticsearch/README.md @@ -0,0 +1,68 @@ +# Destination Elasticsearch + +This is the repository for the Elasticsearch destination connector in Java. +For information about how to use this connector within Airbyte, see [the User Documentation](https://docs.airbyte.io/integrations/destinations/elasticsearch). + +## Local development + +#### Building via Gradle +From the Airbyte repository root, run: +``` +./gradlew :airbyte-integrations:connectors:destination-elasticsearch:build +``` + +#### Create credentials +**If you are a community contributor**, generate the necessary credentials and place them in `secrets/config.json` conforming to the spec file in `src/main/resources/spec.json`. +Note that the `secrets` directory is git-ignored by default, so there is no danger of accidentally checking in sensitive information. + +**If you are an Airbyte core member**, follow the [instructions](https://docs.airbyte.io/connector-development#using-credentials-in-ci) to set up the credentials. + +### Locally running the connector docker image + +#### Build +Build the connector image via Gradle: +``` +./gradlew :airbyte-integrations:connectors:destination-elasticsearch:airbyteDocker +``` +When building via Gradle, the docker image name and tag, respectively, are the values of the `io.airbyte.name` and `io.airbyte.version` `LABEL`s in +the Dockerfile. + +#### Run +Then run any of the connector commands as follows: +``` +docker run --rm airbyte/destination-elasticsearch:dev spec +docker run --rm -v $(pwd)/secrets:/secrets airbyte/destination-elasticsearch:dev check --config /secrets/config.json +docker run --rm -v $(pwd)/secrets:/secrets airbyte/destination-elasticsearch:dev discover --config /secrets/config.json +docker run --rm -v $(pwd)/secrets:/secrets -v $(pwd)/integration_tests:/integration_tests airbyte/destination-elasticsearch:dev read --config /secrets/config.json --catalog /integration_tests/configured_catalog.json +``` + +## Testing +We use `JUnit` for Java tests. + +### Unit and Integration Tests +Place unit tests under `src/test/io/airbyte/integrations/destinations/elasticsearch`. + +#### Acceptance Tests +Airbyte has a standard test suite that all destination connectors must pass. Implement the `TODO`s in +`src/test-integration/java/io/airbyte/integrations/destinations/elasticsearchDestinationAcceptanceTest.java`. + +### Using gradle to run tests +All commands should be run from airbyte project root. +To run unit tests: +``` +./gradlew :airbyte-integrations:connectors:destination-elasticsearch:unitTest +``` +To run acceptance and custom integration tests: +``` +./gradlew :airbyte-integrations:connectors:destination-elasticsearch:integrationTest +``` + +## Dependency Management + +### Publishing a new version of the connector +You've checked out the repo, implemented a million dollar feature, and you're ready to share your changes with the world. Now what? +1. Make sure your changes are passing unit and integration tests. +1. Bump the connector version in `Dockerfile` -- just increment the value of the `LABEL io.airbyte.version` appropriately (we use [SemVer](https://semver.org/)). +1. Create a Pull Request. +1. Pat yourself on the back for being an awesome contributor. +1. Someone from Airbyte will take a look at your PR and iterate with you to merge it into master. diff --git a/airbyte-integrations/connectors/destination-elasticsearch/bootstrap.md b/airbyte-integrations/connectors/destination-elasticsearch/bootstrap.md new file mode 100644 index 0000000000000..b7f614cba5d21 --- /dev/null +++ b/airbyte-integrations/connectors/destination-elasticsearch/bootstrap.md @@ -0,0 +1,33 @@ +# Elasticsearch Destination + +Elasticsearch is a Lucene based search engine that's a type of NoSql storage. +Documents are created in an `index`, similar to a `table`in a relation database. + +The documents are structured with fields that may contain nested complex structures. +[Read more about Elastic](https://elasticsearch.org/) + +This connector maps an incoming `stream` to an Elastic `index`. +When using destination sync mode `append` and `append_dedup`, an `upsert` operation is performed against the Elasticsearch index. +When using `overwrite`, the records/docs are place in a temp index, then cloned to the target index. +The target index is deleted first, if it exists before the sync. + +The [ElasticsearchConnection.java](./src/main/java/io/airbyte/integrations/destination/elasticsearch/ElasticsearchConnection.java) +handles the communication with the Elastic server. +This uses the `elasticsearch-java` rest client from the Elasticsearch team - +[https://github.com/elastic/elasticsearch-java/](https://github.com/elastic/elasticsearch-java/) + +The [ElasticsearchAirbyteMessageConsumerFactory.java](./src/main/java/io/airbyte/integrations/destination/elasticsearch/ElasticsearchAirbyteMessageConsumerFactory.java) +contains the logic for organizing a batch of records and reporting progress. + +The `namespace` and stream `name` are used to generate an index name. +The index is created if it doesn't exist, but no other index configuration is done at this time. + +Elastic will determine the type of data by detection. +You can create an index ahead of time for field type customization. + +Basic authentication and API key authentication are supported. + +## Development +See the Elasticsearch client tests for examples on how to use the library. + +[https://github.com/elastic/elasticsearch-java/blob/main/java-client/src/test/java/co/elastic/clients/elasticsearch/end_to_end/RequestTest.java](https://github.com/elastic/elasticsearch-java/blob/main/java-client/src/test/java/co/elastic/clients/elasticsearch/end_to_end/RequestTest.java) \ No newline at end of file diff --git a/airbyte-integrations/connectors/destination-elasticsearch/build.gradle b/airbyte-integrations/connectors/destination-elasticsearch/build.gradle new file mode 100644 index 0000000000000..680898b591690 --- /dev/null +++ b/airbyte-integrations/connectors/destination-elasticsearch/build.gradle @@ -0,0 +1,49 @@ +plugins { + id 'application' + id 'airbyte-docker' + id 'airbyte-integration-test-java' +} + +application { + mainClass = 'io.airbyte.integrations.destination.elasticsearch.ElasticsearchDestination' +} + +dependencies { + implementation project(':airbyte-config:models') + implementation project(':airbyte-protocol:models') + implementation project(':airbyte-integrations:bases:base-java') + implementation files(project(':airbyte-integrations:bases:base-java').airbyteDocker.outputs) + + implementation 'co.elastic.clients:elasticsearch-java:7.15.0' + implementation 'com.fasterxml.jackson.core:jackson-databind:2.12.3' + implementation 'org.projectlombok:lombok:1.18.20' + + // EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0 + // https://eclipse-ee4j.github.io/jsonp/ + implementation 'jakarta.json:jakarta.json-api:2.0.1' + + // Needed even if using Jackson to have an implementation of the Jsonp object model + // EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0 + // https://github.com/eclipse-ee4j/jsonp + implementation 'org.glassfish:jakarta.json:2.0.1' + + // MIT + // https://www.testcontainers.org/ + //implementation "org.testcontainers:testcontainers:1.16.0" + testImplementation "org.testcontainers:elasticsearch:1.15.3" + integrationTestJavaImplementation "org.testcontainers:elasticsearch:1.15.3" + + integrationTestJavaImplementation project(':airbyte-integrations:bases:standard-destination-test') + integrationTestJavaImplementation project(':airbyte-integrations:connectors:destination-elasticsearch') +} + +repositories { + maven { + name = "ESSnapshots" + url = "https://snapshots.elastic.co/maven/" + } + maven { + name = "ESJavaGithubPackages" + url = "https://maven.pkg.github.com/elastic/elasticsearch-java" + } +} \ No newline at end of file diff --git a/airbyte-integrations/connectors/destination-elasticsearch/docker-compose.yaml b/airbyte-integrations/connectors/destination-elasticsearch/docker-compose.yaml new file mode 100644 index 0000000000000..85c445885ef8f --- /dev/null +++ b/airbyte-integrations/connectors/destination-elasticsearch/docker-compose.yaml @@ -0,0 +1,11 @@ +version: "3.7" + +services: + elastic: + image: "docker.elastic.co/elasticsearch/elasticsearch:7.15.1" + ports: + - "9200:9200" + environment: + ES_JAVA_OPTS: "-Xms256m -Xmx256m" + discovery.type: "single-node" + network.host: "0.0.0.0" \ No newline at end of file diff --git a/airbyte-integrations/connectors/destination-elasticsearch/src/main/java/io/airbyte/integrations/destination/elasticsearch/ConnectorConfiguration.java b/airbyte-integrations/connectors/destination-elasticsearch/src/main/java/io/airbyte/integrations/destination/elasticsearch/ConnectorConfiguration.java new file mode 100644 index 0000000000000..4ec0dfd3d01a5 --- /dev/null +++ b/airbyte-integrations/connectors/destination-elasticsearch/src/main/java/io/airbyte/integrations/destination/elasticsearch/ConnectorConfiguration.java @@ -0,0 +1,159 @@ +/* + * Copyright (c) 2021 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.destination.elasticsearch; + +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import java.util.Objects; + +@JsonIgnoreProperties(ignoreUnknown = true) +public class ConnectorConfiguration { + + private String endpoint; + private boolean upsert; + private AuthenticationMethod authenticationMethod = new AuthenticationMethod(); + + public ConnectorConfiguration() {} + + public static ConnectorConfiguration fromJsonNode(JsonNode config) { + return new ObjectMapper().convertValue(config, ConnectorConfiguration.class); + } + + public String getEndpoint() { + return this.endpoint; + } + + public boolean isUpsert() { + return this.upsert; + } + + public AuthenticationMethod getAuthenticationMethod() { + return this.authenticationMethod; + } + + public void setEndpoint(String endpoint) { + this.endpoint = endpoint; + } + + public void setUpsert(boolean upsert) { + this.upsert = upsert; + } + + public void setAuthenticationMethod(AuthenticationMethod authenticationMethod) { + this.authenticationMethod = authenticationMethod; + } + + @Override + public boolean equals(Object o) { + if (this == o) + return true; + if (o == null || getClass() != o.getClass()) + return false; + ConnectorConfiguration that = (ConnectorConfiguration) o; + return upsert == that.upsert && Objects.equals(endpoint, that.endpoint) && Objects.equals(authenticationMethod, that.authenticationMethod); + } + + @Override + public int hashCode() { + return Objects.hash(endpoint, upsert, authenticationMethod); + } + + @Override + public String toString() { + return "ConnectorConfiguration{" + + "endpoint='" + endpoint + '\'' + + ", upsert=" + upsert + + ", authenticationMethod=" + authenticationMethod + + '}'; + } + + static class AuthenticationMethod { + + private ElasticsearchAuthenticationMethod method = ElasticsearchAuthenticationMethod.none; + private String username; + private String password; + private String apiKeyId; + private String apiKeySecret; + + public ElasticsearchAuthenticationMethod getMethod() { + return this.method; + } + + public String getUsername() { + return this.username; + } + + public String getPassword() { + return this.password; + } + + public String getApiKeyId() { + return this.apiKeyId; + } + + public String getApiKeySecret() { + return this.apiKeySecret; + } + + public void setMethod(ElasticsearchAuthenticationMethod method) { + this.method = method; + } + + public void setUsername(String username) { + this.username = username; + } + + public void setPassword(String password) { + this.password = password; + } + + public void setApiKeyId(String apiKeyId) { + this.apiKeyId = apiKeyId; + } + + public void setApiKeySecret(String apiKeySecret) { + this.apiKeySecret = apiKeySecret; + } + + public boolean isValid() { + return switch (this.method) { + case none -> true; + case basic -> Objects.nonNull(this.username) && Objects.nonNull(this.password); + case secret -> Objects.nonNull(this.apiKeyId) && Objects.nonNull(this.apiKeySecret); + }; + } + + @Override + public boolean equals(Object o) { + if (this == o) + return true; + if (o == null || getClass() != o.getClass()) + return false; + AuthenticationMethod that = (AuthenticationMethod) o; + return method == that.method && + Objects.equals(username, that.username) && + Objects.equals(password, that.password) && + Objects.equals(apiKeyId, that.apiKeyId) && + Objects.equals(apiKeySecret, that.apiKeySecret); + } + + @Override + public int hashCode() { + return Objects.hash(method, username, password, apiKeyId, apiKeySecret); + } + + @Override + public String toString() { + return "AuthenticationMethod{" + + "method=" + method + + ", username='" + username + '\'' + + ", apiKeyId='" + apiKeyId + '\'' + + '}'; + } + + } + +} diff --git a/airbyte-integrations/connectors/destination-elasticsearch/src/main/java/io/airbyte/integrations/destination/elasticsearch/ElasticsearchAirbyteMessageConsumerFactory.java b/airbyte-integrations/connectors/destination-elasticsearch/src/main/java/io/airbyte/integrations/destination/elasticsearch/ElasticsearchAirbyteMessageConsumerFactory.java new file mode 100644 index 0000000000000..9d73cc529c6ef --- /dev/null +++ b/airbyte-integrations/connectors/destination-elasticsearch/src/main/java/io/airbyte/integrations/destination/elasticsearch/ElasticsearchAirbyteMessageConsumerFactory.java @@ -0,0 +1,114 @@ +/* + * Copyright (c) 2021 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.destination.elasticsearch; + +import co.elastic.clients.elasticsearch._core.BulkResponse; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import io.airbyte.commons.concurrency.VoidCallable; +import io.airbyte.commons.functional.CheckedConsumer; +import io.airbyte.commons.functional.CheckedFunction; +import io.airbyte.integrations.base.AirbyteMessageConsumer; +import io.airbyte.integrations.destination.buffered_stream_consumer.BufferedStreamConsumer; +import io.airbyte.integrations.destination.buffered_stream_consumer.RecordWriter; +import io.airbyte.protocol.models.AirbyteMessage; +import io.airbyte.protocol.models.ConfiguredAirbyteCatalog; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Consumer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class ElasticsearchAirbyteMessageConsumerFactory { + + private static final Logger log = LoggerFactory.getLogger(ElasticsearchAirbyteMessageConsumerFactory.class); + private static final int MAX_BATCH_SIZE = 10000; + private static final ObjectMapper mapper = new ObjectMapper(); + + private static AtomicLong recordsWritten = new AtomicLong(0); + + /** + * Holds a mapping of temp to target indices. After closing a sync job, the target index is removed + * if it already exists, and the temp index is copied to replace it. + */ + private static final Map tempIndices = new HashMap<>(); + + public static AirbyteMessageConsumer create(Consumer outputRecordCollector, + ElasticsearchConnection connection, + List writeConfigs, + ConfiguredAirbyteCatalog catalog) { + + return new BufferedStreamConsumer( + outputRecordCollector, + onStartFunction(connection, writeConfigs), + recordWriterFunction(connection, writeConfigs), + onCloseFunction(connection), + catalog, + isValidFunction(connection), + MAX_BATCH_SIZE); + } + + // is there any json node that wont fit in the index? + private static CheckedFunction isValidFunction(ElasticsearchConnection connection) { + return jsonNode -> true; + } + + private static CheckedConsumer onCloseFunction(ElasticsearchConnection connection) { + + return (hasFailed) -> { + if (!tempIndices.isEmpty() && !hasFailed) { + tempIndices.forEach(connection::replaceIndex); + } + connection.close(); + }; + } + + private static RecordWriter recordWriterFunction( + ElasticsearchConnection connection, + List writeConfigs) { + + return (pair, records) -> { + log.info("writing {} records in bulk operation", records.size()); + var optConfig = writeConfigs.stream() + .filter(c -> Objects.equals(c.getStreamName(), pair.getName()) && + Objects.equals(c.getNamespace(), pair.getNamespace())) + .findFirst(); + if (optConfig.isEmpty()) { + throw new Exception(String.format("missing write config: %s", pair)); + } + final var config = optConfig.get(); + BulkResponse response; + if (config.useTempIndex()) { + response = connection.indexDocuments(config.getTempIndexName(), records, config); + } else { + response = connection.indexDocuments(config.getIndexName(), records, config); + } + if (Objects.nonNull(response) && response.errors()) { + String msg = String.format("failed to write bulk records: %s", mapper.valueToTree(response)); + throw new Exception(msg); + } else { + log.info("bulk write took: {}ms", response.took()); + } + }; + } + + private static VoidCallable onStartFunction(ElasticsearchConnection connection, List writeConfigs) { + return () -> { + for (var config : writeConfigs) { + if (config.useTempIndex()) { + tempIndices.put(config.getTempIndexName(), config.getIndexName()); + connection.deleteIndexIfPresent(config.getTempIndexName()); + connection.createIndexIfMissing(config.getTempIndexName()); + } else { + connection.createIndexIfMissing(config.getIndexName()); + } + } + }; + } + +} diff --git a/airbyte-integrations/connectors/destination-elasticsearch/src/main/java/io/airbyte/integrations/destination/elasticsearch/ElasticsearchAuthenticationMethod.java b/airbyte-integrations/connectors/destination-elasticsearch/src/main/java/io/airbyte/integrations/destination/elasticsearch/ElasticsearchAuthenticationMethod.java new file mode 100644 index 0000000000000..0ca9a3789ffda --- /dev/null +++ b/airbyte-integrations/connectors/destination-elasticsearch/src/main/java/io/airbyte/integrations/destination/elasticsearch/ElasticsearchAuthenticationMethod.java @@ -0,0 +1,11 @@ +/* + * Copyright (c) 2021 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.destination.elasticsearch; + +public enum ElasticsearchAuthenticationMethod { + none, + secret, + basic +} diff --git a/airbyte-integrations/connectors/destination-elasticsearch/src/main/java/io/airbyte/integrations/destination/elasticsearch/ElasticsearchConnection.java b/airbyte-integrations/connectors/destination-elasticsearch/src/main/java/io/airbyte/integrations/destination/elasticsearch/ElasticsearchConnection.java new file mode 100644 index 0000000000000..6c71404f6011d --- /dev/null +++ b/airbyte-integrations/connectors/destination-elasticsearch/src/main/java/io/airbyte/integrations/destination/elasticsearch/ElasticsearchConnection.java @@ -0,0 +1,328 @@ +/* + * Copyright (c) 2021 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.destination.elasticsearch; + +import co.elastic.clients.base.*; +import co.elastic.clients.elasticsearch.ElasticsearchClient; +import co.elastic.clients.elasticsearch._core.BulkRequest; +import co.elastic.clients.elasticsearch._core.BulkResponse; +import co.elastic.clients.elasticsearch._core.CreateResponse; +import co.elastic.clients.elasticsearch._core.SearchResponse; +import co.elastic.clients.elasticsearch._core.search.Hit; +import co.elastic.clients.elasticsearch._core.search.HitsMetadata; +import co.elastic.clients.elasticsearch.cat.indices.IndicesRecord; +import co.elastic.clients.json.jackson.JacksonJsonpMapper; +import com.fasterxml.jackson.core.JsonPointer; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import io.airbyte.protocol.models.AirbyteRecordMessage; +import jakarta.json.JsonValue; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.*; +import java.util.stream.Collectors; +import org.apache.http.Header; +import org.apache.http.HttpHost; +import org.apache.http.message.BasicHeader; +import org.elasticsearch.client.Node; +import org.elasticsearch.client.RestClient; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * All communication with Elasticsearch should be done through this class. + */ +public class ElasticsearchConnection { + + // this is the max number of hits we can query without paging + private static final int MAX_HITS = 10000; + private static Logger log = LoggerFactory.getLogger(ElasticsearchConnection.class); + + private final ElasticsearchClient client; + private final RestClient restClient; + private final HttpHost httpHost; + private final ObjectMapper mapper = new ObjectMapper(); + + /** + * Creates a new ElasticsearchConnection that can be used to read/write records to indices + * + * @param config Configuration parameters for connecting to the Elasticsearch host + */ + public ElasticsearchConnection(ConnectorConfiguration config) { + log.info(String.format( + "creating ElasticsearchConnection: %s", config.getEndpoint())); + + // Create the low-level client + httpHost = HttpHost.create(config.getEndpoint()); + restClient = RestClient.builder(httpHost) + .setDefaultHeaders(configureHeaders(config)) + .setFailureListener(new FailureListener()) + .build(); + // Create the transport that provides JSON and http services to API clients + Transport transport = new RestClientTransport(restClient, new JacksonJsonpMapper()); + // And create our API client + client = new ElasticsearchClient(transport); + } + + static class FailureListener extends RestClient.FailureListener { + + @Override + public void onFailure(Node node) { + log.error("RestClient failure: {}", node); + } + + } + + /** + * Configures the default headers for requests to the Elasticsearch server + * + * @param config connection information + * @return the default headers + */ + protected Header[] configureHeaders(ConnectorConfiguration config) { + final var headerList = new ArrayList
(); + // add Authorization header if credentials are present + final var auth = config.getAuthenticationMethod(); + switch (auth.getMethod()) { + case secret -> { + var bytes = (auth.getApiKeyId() + ":" + auth.getApiKeySecret()).getBytes(StandardCharsets.UTF_8); + var header = "ApiKey " + Base64.getEncoder().encodeToString(bytes); + headerList.add(new BasicHeader("Authorization", header)); + } + case basic -> { + var basicBytes = (auth.getUsername() + ":" + auth.getPassword()).getBytes(StandardCharsets.UTF_8); + var basicHeader = "Basic " + Base64.getEncoder().encodeToString(basicBytes); + headerList.add(new BasicHeader("Authorization", basicHeader)); + } + } + return headerList.toArray(new Header[headerList.size()]); + } + + /** + * Pings the Elasticsearch server for "up" check, and configuration validation + * + * @return true if connection was successful + */ + public boolean checkConnection() { + log.info("checking elasticsearch connection"); + try { + final var info = client.info(); + log.info("checked elasticsearch connection: {}, version: {}", info.clusterName(), info.version()); + return true; + } catch (ApiException e) { + log.error("failed to ping elasticsearch", unwrappedApiException("failed write operation", e)); + return false; + } catch (Exception e) { + log.error("unknown exception while pinging elasticsearch server", e); + return false; + } + } + + /** + * Writes a single record to the Elasticsearch server + * + * @param index The index to write the record to + * @param id The ID to give the new document + * @param data The body of the document (source record) + * @return results of the create operation + * @throws Exception if an error is encountered + */ + public CreateResponse createDocument(String index, String id, JsonNode data) throws Exception { + CreateResponse createResponse = client.create(builder -> builder.id(id).document(data).index(index)); + log.debug("wrote record: {}", createResponse.result()); + return createResponse; + } + + /** + * Bulk operation to append multiple documents to an Elasticsearch server + * + * @param index The index to add the documents to + * @param records The collection of records to create documents from + * @return the response of the bulk operation + * @throws IOException if there is server connection problem, or a non-successful operation on the + * server + */ + public BulkResponse indexDocuments(String index, List records, ElasticsearchWriteConfig config) throws IOException { + var bulkRequest = new BulkRequest.Builder<>(); + for (var doc : records) { + log.debug("adding record to bulk create: {}", doc.getData()); + bulkRequest.addOperation( + b -> b.index( + c -> c.index(index).id(extractPrimaryKey(doc, config)))) + .addDocument(doc.getData()).refresh(JsonValue.TRUE); + } + + try { + return client.bulk(b -> bulkRequest); + } catch (ApiException e) { + throw unwrappedApiException("failed write operation", e); + } + } + + // TODO: Can we do something like this? + private String extractPrimaryKey(AirbyteRecordMessage doc, ElasticsearchWriteConfig config) { + if (!config.hasPrimaryKey()) { + return UUID.randomUUID().toString(); + } + var optFirst = config.getPrimaryKey().stream().findFirst(); + StringBuilder sb = new StringBuilder(); + if (optFirst.isPresent()) { + log.debug("trying to extract primary key using {}", optFirst.get()); + optFirst.get().forEach(s -> sb.append(String.format("/%s", s))); + } + if (sb.length() > 0) { + JsonPointer ptr = JsonPointer.valueOf(sb.toString()); + var pkNode = doc.getData().at(ptr); + if (!pkNode.isMissingNode() && pkNode.isValueNode()) { + return pkNode.asText(); + } + } + log.warn("unable to extract primary key"); + return UUID.randomUUID().toString(); + } + + /** + * returns the first 10k documents of a given index + * + * @param index the index to search + * @return a list of matching documents + * @throws IOException if there is server communication error, or invalid index + */ + public List getRecords(String index) throws IOException { + log.info("getting records for index: {}", index); + SearchResponse search = client.search(s -> s.index(index).size(MAX_HITS), JsonNode.class); + HitsMetadata hitMeta = search.hits(); + return hitMeta.hits().stream().map(Hit::source).collect(Collectors.toList()); + } + + /** + * Shutdown the connection to the Elasticsearch server + */ + public void close() throws IOException { + this.restClient.close(); + this.client.shutdown(); + } + + public List allIndices() { + try { + var response = client.cat().indices(r -> r); + return response.valueBody().stream().map(IndicesRecord::index).collect(Collectors.toList()); + } catch (ApiException e) { + log.error("", unwrappedApiException("failed to get indices", e)); + } catch (IOException e) { + log.error("unknown exception while getting indices", e); + } + return new ArrayList<>(); + } + + /** + * Creates an index on Elasticsearch if it's missing + * + * @param index the index name to create + */ + public void createIndexIfMissing(String index) { + try { + BooleanResponse existsResponse = client.indices().exists(b -> b.index(index)); + if (existsResponse.value()) { + log.info("index exists: {}", index); + return; + } + log.info("creating index: {}, info: {}", index, client.info()); + final co.elastic.clients.elasticsearch.indices.CreateResponse createResponse = client.indices().create(b -> b.index(index)); + if (createResponse.acknowledged() && createResponse.shardsAcknowledged()) { + log.info("created index: {}", index); + } else { + log.info("did not create index: {}, {}", index, mapper.writeValueAsString(createResponse)); + } + } catch (ApiException e) { + log.warn("", unwrappedApiException("failed to create index", e)); + } catch (IOException e) { + log.warn("unknown exception while creating index", e); + } + } + + /** + * Deletes an index if present, suppressing any exceptions + * + * @param indexName The index to delete + */ + public void deleteIndexIfPresent(String indexName) { + try { + var exists = client.indices().exists(b -> b.index(indexName)); + if (exists.value()) { + client.indices().delete(b -> b.index(indexName)); + } + } catch (ApiException e) { + log.warn("", unwrappedApiException("failed to delete index", e)); + } catch (IOException e) { + log.warn("unknown exception while deleting index", e); + } + } + + /** + * Clones a source index to a destination index. If the destination index already exists, it deletes + * it before cloning + * + * @param sourceIndexName The index to clone + * @param destinationIndexName The destination index name to clone to. + */ + public void replaceIndex(String sourceIndexName, String destinationIndexName) { + log.info("replacing index: {}, with index: {}", destinationIndexName, sourceIndexName); + try { + var sourceExists = client.indices().exists(i -> i.index(sourceIndexName)); + if (!sourceExists.value()) { + throw new RuntimeException( + String.format("the source index does not exist. unable to replace the destination index. source: %s, destination: %s", sourceIndexName, + destinationIndexName)); + } + + // delete the destination if it exists + var destinationExists = client.indices().exists(i -> i.index(destinationIndexName)); + if (destinationExists.value()) { + log.warn("deleting existing index: {}", destinationIndexName); + deleteIndexIfPresent(destinationIndexName); + } + + // make the source index read-only before cloning + // I think theres a bug in the client + // https://github.com/elastic/elasticsearch-java/issues/37 + // client.indices().addBlock(b -> b.index(sourceIndexName).block(IndicesBlockOptions.Write)); + // so we need to do it a different way + client.indices().putSettings(b -> b.index(sourceIndexName).settings(s -> s.blocks(w -> w.write(true)))); + + // clone to the destination + client.indices().clone(c -> c.index(sourceIndexName).target(destinationIndexName)); + + // enable writing on new index + client.indices().putSettings(b -> b.index(destinationIndexName).settings(s -> s.blocks(w -> w.write(false)))); + } catch (ApiException e) { + throw unwrappedApiException("failed to delete index", e); + } catch (IOException e) { + throw new RuntimeException("unknown exception while replacing index", e); + } + } + + /** + * Unwraps a rest client ApiException, so we can log the details + * + * @param message message to add to the log entry + * @param e source ApiException + * @return a new RuntimeException with the ApiException as the source + */ + private RuntimeException unwrappedApiException(String message, ApiException e) { + log.error(message); + if (Objects.isNull(e) || Objects.isNull(e.error())) { + log.error("unknown ApiException"); + return new RuntimeException(e); + } + if (ElasticsearchError.class.isAssignableFrom(e.error().getClass())) { + ElasticsearchError esException = ((ElasticsearchError) e.error()); + String errorMessage = String.format("ElasticsearchError: status:%s, error:%s", esException.status(), esException.error().toString()); + return new RuntimeException(errorMessage); + } + return new RuntimeException(e); + } + +} diff --git a/airbyte-integrations/connectors/destination-elasticsearch/src/main/java/io/airbyte/integrations/destination/elasticsearch/ElasticsearchDestination.java b/airbyte-integrations/connectors/destination-elasticsearch/src/main/java/io/airbyte/integrations/destination/elasticsearch/ElasticsearchDestination.java new file mode 100644 index 0000000000000..01ac17eefa7f7 --- /dev/null +++ b/airbyte-integrations/connectors/destination-elasticsearch/src/main/java/io/airbyte/integrations/destination/elasticsearch/ElasticsearchDestination.java @@ -0,0 +1,98 @@ +/* + * Copyright (c) 2021 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.destination.elasticsearch; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import io.airbyte.integrations.BaseConnector; +import io.airbyte.integrations.base.AirbyteMessageConsumer; +import io.airbyte.integrations.base.Destination; +import io.airbyte.integrations.base.IntegrationRunner; +import io.airbyte.protocol.models.*; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Objects; +import java.util.function.Consumer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class ElasticsearchDestination extends BaseConnector implements Destination { + + private static final Logger LOGGER = LoggerFactory.getLogger(ElasticsearchDestination.class); + private final ObjectMapper mapper = new ObjectMapper(); + + public static void main(String[] args) throws Exception { + final var destination = new ElasticsearchDestination(); + LOGGER.info("starting destination: {}", ElasticsearchDestination.class); + new IntegrationRunner(destination).run(args); + LOGGER.info("completed destination: {}", ElasticsearchDestination.class); + } + + @Override + public AirbyteConnectionStatus check(JsonNode config) { + final ConnectorConfiguration configObject = convertConfig(config); + if (Objects.isNull(configObject.getEndpoint())) { + return new AirbyteConnectionStatus() + .withStatus(AirbyteConnectionStatus.Status.FAILED).withMessage("endpoint must not be empty"); + } + if (!configObject.getAuthenticationMethod().isValid()) { + return new AirbyteConnectionStatus() + .withStatus(AirbyteConnectionStatus.Status.FAILED).withMessage("authentication options are invalid"); + } + + final ElasticsearchConnection connection = new ElasticsearchConnection(configObject); + final var result = connection.checkConnection(); + try { + connection.close(); + } catch (IOException e) { + LOGGER.warn("failed while closing connection", e); + } + if (result) { + return new AirbyteConnectionStatus().withStatus(AirbyteConnectionStatus.Status.SUCCEEDED); + } else { + return new AirbyteConnectionStatus().withStatus(AirbyteConnectionStatus.Status.FAILED).withMessage("failed to ping elasticsearch"); + } + + } + + @Override + public AirbyteMessageConsumer getConsumer(JsonNode config, + ConfiguredAirbyteCatalog configuredCatalog, + Consumer outputRecordCollector) { + + final ConnectorConfiguration configObject = convertConfig(config); + final ElasticsearchConnection connection = new ElasticsearchConnection(configObject); + + final List writeConfigs = new ArrayList<>(); + for (final ConfiguredAirbyteStream stream : configuredCatalog.getStreams()) { + final String namespace = stream.getStream().getNamespace(); + final String streamName = stream.getStream().getName(); + final DestinationSyncMode syncMode = stream.getDestinationSyncMode(); + if (syncMode == null) { + throw new IllegalStateException("Undefined destination sync mode"); + } + List> primaryKey = null; + if (syncMode != DestinationSyncMode.APPEND) { + LOGGER.info("not using DestinationSyncMode.APPEND, so using primary key"); + primaryKey = stream.getPrimaryKey(); + } + LOGGER.info("adding write config. namespace: {}, stream: {}, syncMode: {}", namespace, streamName, syncMode); + writeConfigs.add(new ElasticsearchWriteConfig() + .setSyncMode(syncMode) + .setNamespace(namespace) + .setStreamName(stream.getStream().getName()) + .setPrimaryKey(primaryKey) + .setUpsert(configObject.isUpsert())); + } + + return ElasticsearchAirbyteMessageConsumerFactory.create(outputRecordCollector, connection, writeConfigs, configuredCatalog); + } + + private ConnectorConfiguration convertConfig(JsonNode config) { + return mapper.convertValue(config, ConnectorConfiguration.class); + } + +} diff --git a/airbyte-integrations/connectors/destination-elasticsearch/src/main/java/io/airbyte/integrations/destination/elasticsearch/ElasticsearchWriteConfig.java b/airbyte-integrations/connectors/destination-elasticsearch/src/main/java/io/airbyte/integrations/destination/elasticsearch/ElasticsearchWriteConfig.java new file mode 100644 index 0000000000000..bcc561f284005 --- /dev/null +++ b/airbyte-integrations/connectors/destination-elasticsearch/src/main/java/io/airbyte/integrations/destination/elasticsearch/ElasticsearchWriteConfig.java @@ -0,0 +1,102 @@ +/* + * Copyright (c) 2021 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.destination.elasticsearch; + +import io.airbyte.integrations.destination.StandardNameTransformer; +import io.airbyte.protocol.models.DestinationSyncMode; +import java.util.List; +import java.util.Objects; + +public class ElasticsearchWriteConfig { + + private static final StandardNameTransformer namingResolver = new StandardNameTransformer(); + + private String namespace; + private String streamName; + private DestinationSyncMode syncMode; + private List> primaryKey; + private boolean upsert; + + ElasticsearchWriteConfig() {} + + ElasticsearchWriteConfig( + String namespace, + String streamName, + DestinationSyncMode destinationSyncMode, + List> primaryKey, + boolean upsert) { + this.namespace = namespace; + this.streamName = streamName; + this.syncMode = destinationSyncMode; + this.primaryKey = primaryKey; + this.upsert = upsert; + } + + public String getNamespace() { + return namespace; + } + + public ElasticsearchWriteConfig setNamespace(String namespace) { + this.namespace = namespace; + return this; + } + + public String getStreamName() { + return streamName; + } + + public ElasticsearchWriteConfig setStreamName(String streamName) { + this.streamName = streamName; + return this; + } + + public DestinationSyncMode getSyncMode() { + return syncMode; + } + + public ElasticsearchWriteConfig setSyncMode(DestinationSyncMode syncMode) { + this.syncMode = syncMode; + return this; + } + + public List> getPrimaryKey() { + return this.primaryKey; + } + + public ElasticsearchWriteConfig setPrimaryKey(List> primaryKey) { + this.primaryKey = primaryKey; + return this; + } + + public boolean hasPrimaryKey() { + return Objects.nonNull(this.primaryKey) && this.primaryKey.size() > 0; + } + + public boolean isUpsert() { + return upsert; + } + + public ElasticsearchWriteConfig setUpsert(boolean upsert) { + this.upsert = upsert; + return this; + } + + public boolean useTempIndex() { + return (this.syncMode == DestinationSyncMode.OVERWRITE) && !(this.upsert); + } + + public String getIndexName() { + String prefix = ""; + if (Objects.nonNull(namespace) && !namespace.isEmpty()) { + prefix = String.format("%s_", namespace).toLowerCase(); + } + return String.format("%s%s", prefix, namingResolver.getIdentifier(streamName).toLowerCase()); + } + + public String getTempIndexName() { + return String.format("tmp_%s", getIndexName()); + } + +} diff --git a/airbyte-integrations/connectors/destination-elasticsearch/src/main/resources/log4j2-test.example.yml b/airbyte-integrations/connectors/destination-elasticsearch/src/main/resources/log4j2-test.example.yml new file mode 100644 index 0000000000000..b09ce061fc866 --- /dev/null +++ b/airbyte-integrations/connectors/destination-elasticsearch/src/main/resources/log4j2-test.example.yml @@ -0,0 +1,33 @@ +Configuration: + status: warn + name: DefaultLog4j2Config + thresholdFilter: + level: debug + appenders: + Console: + name: STDOUT + target: SYSTEM_OUT + PatternLayout: + Pattern: "[%-6p] %c{3}.%M(%F:%L) – %m%n" + + Loggers: + logger: + - name: org.apache.http.wire + level: trace + additivity: false + AppenderRef: + ref: STDOUT + - name: co.elastic + level: debug + additivity: false + AppenderRef: + ref: STDOUT + - name: io.airbyte.integrations + level: debug + additivity: false + AppenderRef: + ref: STDOUT + Root: + level: info + AppenderRef: + ref: STDOUT \ No newline at end of file diff --git a/airbyte-integrations/connectors/destination-elasticsearch/src/main/resources/spec.json b/airbyte-integrations/connectors/destination-elasticsearch/src/main/resources/spec.json new file mode 100644 index 0000000000000..53e2eba24d9ad --- /dev/null +++ b/airbyte-integrations/connectors/destination-elasticsearch/src/main/resources/spec.json @@ -0,0 +1,93 @@ +{ + "documentationUrl": "https://docs.airbyte.io/integrations/destinations/elasticsearch", + "supportsIncremental": true, + "supportsNormalization": false, + "supportsNamespaces": true, + "supportsDBT": false, + "supported_destination_sync_modes": ["overwrite", "append"], + "connectionSpecification": { + "$schema": "http://json-schema.org/draft-07/schema#", + "title": "Elasticsearch Connection Configuration", + "type": "object", + "required": ["endpoint"], + "additionalProperties": false, + "properties": { + "endpoint": { + "title": "Server Endpoint", + "type": "string", + "description": "The full url of the Elasticsearch server" + }, + "upsert": { + "type": "boolean", + "title": "Upsert Records", + "description": "If a primary key identifier is defined in the source, an upsert will be performed using the primary key value as the elasticsearch doc id. Does not support composite primary keys.", + "default": true + }, + "authenticationMethod": { + "title": "Authentication Method", + "type": "object", + "description": "The type of authentication to be used", + "oneOf": [ + { + "title": "None", + "additionalProperties": false, + "description": "No authentication will be used", + "required": ["method"], + "properties": { + "method": { + "type": "string", + "const": "none" + } + } + }, + { + "title": "Api Key/Secret", + "additionalProperties": false, + "description": "Use a api key and secret combination to authenticate", + "required": ["method", "apiKeyId", "apiKeySecret"], + "properties": { + "method": { + "type": "string", + "const": "secret" + }, + "apiKeyId": { + "title": "API Key ID", + "description": "The Key ID to used when accessing an enterprise Elasticsearch instance.", + "type": "string" + }, + "apiKeySecret": { + "title": "API Key Secret", + "description": "The secret associated with the API Key ID.", + "type": "string", + "airbyte_secret": true + } + } + }, + { + "title": "Username/Password", + "additionalProperties": false, + "description": "Basic auth header with a username and password", + "required": ["method", "username", "password"], + "properties": { + "method": { + "type": "string", + "const": "basic" + }, + "username": { + "title": "Username", + "description": "Basic auth username to access a secure Elasticsearch server", + "type": "string" + }, + "password": { + "title": "Password", + "description": "Basic auth password to access a secure Elasticsearch server", + "type": "string", + "airbyte_secret": true + } + } + } + ] + } + } + } +} diff --git a/airbyte-integrations/connectors/destination-elasticsearch/src/test-integration/java/io/airbyte/integrations/destination/elasticsearch/ElasticsearchDestinationAcceptanceTest.java b/airbyte-integrations/connectors/destination-elasticsearch/src/test-integration/java/io/airbyte/integrations/destination/elasticsearch/ElasticsearchDestinationAcceptanceTest.java new file mode 100644 index 0000000000000..58fb20b91e3db --- /dev/null +++ b/airbyte-integrations/connectors/destination-elasticsearch/src/test-integration/java/io/airbyte/integrations/destination/elasticsearch/ElasticsearchDestinationAcceptanceTest.java @@ -0,0 +1,105 @@ +/* + * Copyright (c) 2021 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.destination.elasticsearch; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import io.airbyte.integrations.standardtest.destination.DestinationAcceptanceTest; +import java.io.IOException; +import java.time.Duration; +import java.util.List; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testcontainers.elasticsearch.ElasticsearchContainer; + +public class ElasticsearchDestinationAcceptanceTest extends DestinationAcceptanceTest { + + private static final Logger LOGGER = LoggerFactory.getLogger(ElasticsearchDestinationAcceptanceTest.class); + + private ObjectMapper mapper = new ObjectMapper(); + private static ElasticsearchContainer container; + + @BeforeAll + public static void beforeAll() { + container = new ElasticsearchContainer("docker.elastic.co/elasticsearch/elasticsearch:7.15.1") + .withEnv("ES_JAVA_OPTS", "-Xms512m -Xms512m") + .withEnv("discovery.type", "single-node") + .withEnv("network.host", "0.0.0.0") + .withEnv("logger.org.elasticsearch", "INFO") + .withEnv("ingest.geoip.downloader.enabled", "false") + .withEnv("xpack.security.enabled", "false") + .withExposedPorts(9200) + .withStartupTimeout(Duration.ofSeconds(60)); + container.start(); + } + + @AfterAll + public static void afterAll() { + container.stop(); + container.close(); + } + + @Override + protected String getImageName() { + return "airbyte/destination-elasticsearch:dev"; + } + + @Override + protected int getMaxRecordValueLimit() { + return 2000000; + } + + @Override + protected boolean implementsNamespaces() { + return true; + } + + @Override + protected boolean supportsNormalization() { + return false; + } + + @Override + protected JsonNode getConfig() { + var configJson = mapper.createObjectNode(); + configJson.put("endpoint", String.format("http://%s:%s", container.getHost(), container.getMappedPort(9200))); + return configJson; + } + + @Override + protected JsonNode getFailCheckConfig() { + // should result in a failed connection check + return mapper.createObjectNode(); + } + + @Override + protected List retrieveRecords(TestDestinationEnv testEnv, + String streamName, + String namespace, + JsonNode streamSchema) + throws IOException { + // Records returned from this method will be compared against records provided to the connector + // to verify they were written correctly + final String indexName = new ElasticsearchWriteConfig() + .setNamespace(namespace) + .setStreamName(streamName) + .getIndexName(); + + ElasticsearchConnection connection = new ElasticsearchConnection(mapper.convertValue(getConfig(), ConnectorConfiguration.class)); + return connection.getRecords(indexName); + } + + @Override + protected void setup(TestDestinationEnv testEnv) {} + + @Override + protected void tearDown(TestDestinationEnv testEnv) { + ElasticsearchConnection connection = new ElasticsearchConnection(mapper.convertValue(getConfig(), ConnectorConfiguration.class)); + connection.allIndices().forEach(connection::deleteIndexIfPresent); + } + +} diff --git a/airbyte-integrations/connectors/destination-elasticsearch/src/test/java/io/airbyte/integrations/destination/elasticsearch/ConnectorConfigurationTest.java b/airbyte-integrations/connectors/destination-elasticsearch/src/test/java/io/airbyte/integrations/destination/elasticsearch/ConnectorConfigurationTest.java new file mode 100644 index 0000000000000..a91e7d0ba87ef --- /dev/null +++ b/airbyte-integrations/connectors/destination-elasticsearch/src/test/java/io/airbyte/integrations/destination/elasticsearch/ConnectorConfigurationTest.java @@ -0,0 +1,94 @@ +/* + * Copyright (c) 2021 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.destination.elasticsearch; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ObjectNode; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +public class ConnectorConfigurationTest { + + private ObjectMapper mapper = new ObjectMapper(); + + @Test + public void testAuthenticationSecret() { + + ObjectNode node = mapper.createObjectNode(); + ObjectNode authNode = mapper.createObjectNode(); + + String endpoint = "http://localhost:123"; + String authMethod = ElasticsearchAuthenticationMethod.secret.toString(); + String apiKeyId = "foo"; + String apiKeySecret = "bar"; + + node + .put("endpoint", endpoint) + .set("authenticationMethod", authNode); + + authNode + .put("method", authMethod) + .put("apiKeyId", apiKeyId) + .put("apiKeySecret", apiKeySecret); + + ConnectorConfiguration config = mapper.convertValue(node, ConnectorConfiguration.class); + Assertions.assertTrue(config.getAuthenticationMethod().isValid()); + Assertions.assertEquals(endpoint, config.getEndpoint()); + Assertions.assertEquals(authMethod, config.getAuthenticationMethod().getMethod().toString()); + Assertions.assertEquals(apiKeyId, config.getAuthenticationMethod().getApiKeyId()); + Assertions.assertEquals(apiKeySecret, config.getAuthenticationMethod().getApiKeySecret()); + } + + @Test + public void testAuthenticationBasic() { + + ObjectNode node = mapper.createObjectNode(); + ObjectNode authNode = mapper.createObjectNode(); + + String endpoint = "http://localhost:123"; + String authMethod = ElasticsearchAuthenticationMethod.basic.toString(); + String username = "foo"; + String password = "bar"; + + node + .put("endpoint", endpoint) + .set("authenticationMethod", authNode); + + authNode + .put("method", authMethod) + .put("username", username) + .put("password", password); + + ConnectorConfiguration config = mapper.convertValue(node, ConnectorConfiguration.class); + Assertions.assertTrue(config.getAuthenticationMethod().isValid()); + Assertions.assertEquals(endpoint, config.getEndpoint()); + Assertions.assertEquals(authMethod, config.getAuthenticationMethod().getMethod().toString()); + Assertions.assertEquals(username, config.getAuthenticationMethod().getUsername()); + Assertions.assertEquals(password, config.getAuthenticationMethod().getPassword()); + } + + @Test + public void testAuthenticationNone() { + + ObjectNode node = mapper.createObjectNode(); + ObjectNode authNode = mapper.createObjectNode(); + + String endpoint = "http://localhost:123"; + String authMethod = ElasticsearchAuthenticationMethod.none.toString(); + + node + .put("endpoint", endpoint) + .set("authenticationMethod", authNode); + + authNode + .put("method", authMethod); + + ConnectorConfiguration config = mapper.convertValue(node, ConnectorConfiguration.class); + Assertions.assertTrue(config.getAuthenticationMethod().isValid()); + Assertions.assertEquals(endpoint, config.getEndpoint()); + Assertions.assertEquals(authMethod, config.getAuthenticationMethod().getMethod().toString()); + } + +} diff --git a/airbyte-integrations/connectors/destination-elasticsearch/src/test/java/io/airbyte/integrations/destination/elasticsearch/ElasticsearchConnectionTest.java b/airbyte-integrations/connectors/destination-elasticsearch/src/test/java/io/airbyte/integrations/destination/elasticsearch/ElasticsearchConnectionTest.java new file mode 100644 index 0000000000000..994f73bccb9aa --- /dev/null +++ b/airbyte-integrations/connectors/destination-elasticsearch/src/test/java/io/airbyte/integrations/destination/elasticsearch/ElasticsearchConnectionTest.java @@ -0,0 +1,59 @@ +/* + * Copyright (c) 2021 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.destination.elasticsearch; + +import java.util.Base64; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +public class ElasticsearchConnectionTest { + + String endpoint = "https:qwerty:123"; + + @Test + public void testDefaultHeadersAuthNone() { + var config = new ConnectorConfiguration(); + config.setEndpoint(endpoint); + config.getAuthenticationMethod().setMethod(ElasticsearchAuthenticationMethod.none); + var connection = new ElasticsearchConnection(config); + var headers = connection.configureHeaders(config); + Assertions.assertEquals(0, headers.length); + } + + @Test + public void testDefaultHeadersAuthBasic() { + var config = new ConnectorConfiguration(); + config.setEndpoint(endpoint); + config.getAuthenticationMethod().setUsername("user"); + config.getAuthenticationMethod().setPassword("password"); + config.getAuthenticationMethod().setMethod(ElasticsearchAuthenticationMethod.basic); + var connection = new ElasticsearchConnection(config); + var headers = connection.configureHeaders(config); + Assertions.assertEquals(1, headers.length); + + var headerValues = headers[0].getValue().split(" "); + Assertions.assertEquals("Basic", headerValues[0]); + var decoded = Base64.getDecoder().decode(headerValues[1]); + Assertions.assertTrue("user:password".contentEquals(new String(decoded))); + } + + @Test + public void testDefaultHeadersAuthSecret() { + var config = new ConnectorConfiguration(); + config.setEndpoint(endpoint); + config.getAuthenticationMethod().setApiKeyId("id"); + config.getAuthenticationMethod().setApiKeySecret("secret"); + config.getAuthenticationMethod().setMethod(ElasticsearchAuthenticationMethod.secret); + var connection = new ElasticsearchConnection(config); + var headers = connection.configureHeaders(config); + Assertions.assertEquals(1, headers.length); + + var headerValues = headers[0].getValue().split(" "); + Assertions.assertEquals("ApiKey", headerValues[0]); + var decoded = Base64.getDecoder().decode(headerValues[1]); + Assertions.assertTrue("id:secret".contentEquals(new String(decoded))); + } + +} diff --git a/airbyte-integrations/connectors/destination-elasticsearch/src/test/java/io/airbyte/integrations/destination/elasticsearch/ElasticsearchDestinationTest.java b/airbyte-integrations/connectors/destination-elasticsearch/src/test/java/io/airbyte/integrations/destination/elasticsearch/ElasticsearchDestinationTest.java new file mode 100644 index 0000000000000..176a212f489c7 --- /dev/null +++ b/airbyte-integrations/connectors/destination-elasticsearch/src/test/java/io/airbyte/integrations/destination/elasticsearch/ElasticsearchDestinationTest.java @@ -0,0 +1,185 @@ +/* + * Copyright (c) 2021 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.destination.elasticsearch; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +import com.fasterxml.jackson.databind.JsonNode; +import com.google.common.collect.ImmutableMap; +import io.airbyte.commons.json.Jsons; +import io.airbyte.integrations.base.AirbyteMessageConsumer; +import io.airbyte.integrations.base.Destination; +import io.airbyte.protocol.models.*; +import java.time.Duration; +import java.time.Instant; +import java.util.ArrayList; +import java.util.List; +import java.util.stream.Collectors; +import java.util.stream.IntStream; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testcontainers.elasticsearch.ElasticsearchContainer; + +public class ElasticsearchDestinationTest { + + private static final Logger log = LoggerFactory.getLogger(ElasticsearchDestinationTest.class); + + private static ElasticsearchContainer container; + private static JsonNode config; + + @BeforeAll + public static void beforeAll() { + container = new ElasticsearchContainer("docker.elastic.co/elasticsearch/elasticsearch:7.15.1") + .withEnv("ES_JAVA_OPTS", "-Xms256m -Xmx256m") + .withEnv("discovery.type", "single-node") + .withEnv("network.host", "0.0.0.0") + .withExposedPorts(9200) + .withStartupTimeout(Duration.ofSeconds(60)); + container.start(); + config = Jsons.jsonNode(ImmutableMap.builder() + .put("endpoint", String.format("http://%s:%s", container.getHost(), container.getMappedPort(9200))) + .build()); + } + + @AfterAll + public static void afterAll() { + container.stop(); + container.close(); + } + + @Test + public void withAppend() throws Exception { + + var primaryKey = new ArrayList>(); + primaryKey.add(List.of("id")); + + final var namespace = "public"; + final var streamName = "appended_records"; + final var testConfig = new TestConfig(namespace, streamName, DestinationSyncMode.APPEND, primaryKey); + final var testMessages = generateTestMessages(namespace, streamName, 0, 10); + final var firstRecordSet = e2e(testConfig, testMessages); + + assertEquals( + testMessages.stream().map(AirbyteMessage::getRecord).map(AirbyteRecordMessage::getData).collect(Collectors.toList()), + firstRecordSet); + + final var secondRecordSet = e2e(testConfig, testMessages); + assertEquals(testMessages.size() * 2, secondRecordSet.size(), "it should have appended the test messages twice"); + } + + @Test + public void withOverwrite() throws Exception { + var primaryKey = new ArrayList>(); + primaryKey.add(List.of("id")); + + final var namespace = "public"; + final var streamName = "overwritten_records"; + final var testConfig = new TestConfig(namespace, streamName, DestinationSyncMode.OVERWRITE, primaryKey); + final var testMessages = generateTestMessages(namespace, streamName, 0, 10); + final var firstRecordSet = e2e(testConfig, testMessages); + + assertEquals( + testMessages.stream().map(AirbyteMessage::getRecord).map(AirbyteRecordMessage::getData).collect(Collectors.toList()), + firstRecordSet); + + final var secondRecordSet = e2e(testConfig, testMessages); + assertEquals(testMessages.size(), secondRecordSet.size(), "it should only have 1 set of test messages"); + } + + @Test + public void withAppendDedup() throws Exception { + var primaryKey = new ArrayList>(); + primaryKey.add(List.of("id")); + + final var namespace = "public"; + final var streamName = "appended_and_deduped_records"; + final var testConfig = new TestConfig(namespace, streamName, DestinationSyncMode.APPEND_DEDUP, primaryKey); + final var firstTestMessages = generateTestMessages(namespace, streamName, 0, 10); + final var firstRecordSet = e2e(testConfig, firstTestMessages); + + assertEquals( + firstTestMessages.stream().map(AirbyteMessage::getRecord).map(AirbyteRecordMessage::getData).collect(Collectors.toList()), + firstRecordSet); + + final var secondTestMessages = generateTestMessages(namespace, streamName, 5, 15); + + final var secondRecordSet = e2e(testConfig, secondTestMessages); + assertEquals(15, secondRecordSet.size(), "it should upsert records with matching primary keys"); + } + + private List e2e(final TestConfig testConfig, final List testMessages) throws Exception { + final var catalog = testConfig.getCatalog(); + final var namespace = testConfig.getNamespace(); + final var streamName = testConfig.getStreamName(); + final var indexName = testConfig.getIndexName(); + final var destination = new ElasticsearchDestination(); + + final var check = destination.check(config); + log.info("check status: {}", check); + + final AirbyteMessageConsumer consumer = destination.getConsumer(config, catalog, Destination::defaultOutputRecordCollector); + + consumer.start(); + testMessages.forEach(m -> { + try { + consumer.accept(m); + } catch (final Exception e) { + throw new RuntimeException(e); + } + }); + consumer.accept(new AirbyteMessage() + .withType(AirbyteMessage.Type.STATE) + .withState(new AirbyteStateMessage().withData(Jsons.jsonNode(ImmutableMap.of(namespace + "." + streamName, testMessages.size()))))); + consumer.close(); + + final var connection = new ElasticsearchConnection(ConnectorConfiguration.fromJsonNode(config)); + + final List actualRecords = + connection.getRecords(indexName); + + for (var record : actualRecords) { + log.info("actual record: {}", record); + } + + return actualRecords; + } + + // generate some messages. Taken from the postgres destination test + private List generateTestMessages(final String namespace, final String streamName, final int start, final int end) { + return IntStream.range(start, end) + .boxed() + .map(i -> new AirbyteMessage() + .withType(AirbyteMessage.Type.RECORD) + .withRecord(new AirbyteRecordMessage() + .withStream(streamName) + .withNamespace(namespace) + .withEmittedAt(Instant.now().toEpochMilli()) + .withData(Jsons.jsonNode(ImmutableMap.of("id", i, "name", "human " + i))))) + .collect(Collectors.toList()); + } + + private static class TestConfig extends ElasticsearchWriteConfig { + + public TestConfig(String namespace, String streamName, DestinationSyncMode destinationSyncMode, ArrayList> primaryKey) { + super(namespace, streamName, destinationSyncMode, primaryKey, false); + } + + ConfiguredAirbyteCatalog getCatalog() { + return new ConfiguredAirbyteCatalog().withStreams(List.of( + CatalogHelpers.createConfiguredAirbyteStream( + this.getStreamName(), + this.getNamespace(), + Field.of("id", JsonSchemaPrimitive.NUMBER), + Field.of("name", JsonSchemaPrimitive.STRING)) + .withDestinationSyncMode(this.getSyncMode()) + .withPrimaryKey(this.getPrimaryKey()))); + } + + } + +} diff --git a/airbyte-integrations/connectors/destination-elasticsearch/src/test/java/io/airbyte/integrations/destination/elasticsearch/elasticsearch.yml b/airbyte-integrations/connectors/destination-elasticsearch/src/test/java/io/airbyte/integrations/destination/elasticsearch/elasticsearch.yml new file mode 100644 index 0000000000000..327cff8092919 --- /dev/null +++ b/airbyte-integrations/connectors/destination-elasticsearch/src/test/java/io/airbyte/integrations/destination/elasticsearch/elasticsearch.yml @@ -0,0 +1,4 @@ +node.data : true +network.host : 0.0.0.0 +discovery.seed_hosts : [] +cluster.initial_master_nodes : [] \ No newline at end of file diff --git a/docs/SUMMARY.md b/docs/SUMMARY.md index 8260d359a21a7..3e34dd3ab5fa5 100644 --- a/docs/SUMMARY.md +++ b/docs/SUMMARY.md @@ -144,6 +144,7 @@ * [BigQuery](integrations/destinations/bigquery.md) * [Databricks](integrations/destinations/databricks.md) * [DynamoDB](integrations/destinations/dynamodb.md) + * [Elasticsearch](integrations/destinations/elasticsearch.md) * [Chargify](integrations/destinations/chargify.md) * [Google Cloud Storage \(GCS\)](integrations/destinations/gcs.md) * [Google PubSub](integrations/destinations/pubsub.md) diff --git a/docs/integrations/README.md b/docs/integrations/README.md index 8ab800592153e..4b6086e77f16d 100644 --- a/docs/integrations/README.md +++ b/docs/integrations/README.md @@ -130,6 +130,7 @@ Airbyte uses a grading system for connectors to help users understand what to ex | [BigQuery](destinations/bigquery.md) | Certified | | [Chargify \(Keen\)](destinations/chargify.md) | Alpha | | [Databricks](destinations/databricks.md) | Beta | +| [Elasticsearch](destinations/elasticsearch.md) | Alpha | | [Google Cloud Storage \(GCS\)](destinations/gcs.md) | Alpha | | [Google Pubsub](destinations/pubsub.md) | Alpha | | [Kafka](destinations/kafka.md) | Alpha | diff --git a/docs/integrations/destinations/elasticsearch.md b/docs/integrations/destinations/elasticsearch.md new file mode 100644 index 0000000000000..87544ae158a4e --- /dev/null +++ b/docs/integrations/destinations/elasticsearch.md @@ -0,0 +1,69 @@ +# Elasticsearch + +## Sync overview + +### Output schema + + +Elasticsearch is a Lucene based search engine that's a type of NoSql storage. +Documents are created in an `index`, similar to a `table`in a relation database. + +The output schema matches the input schema of a source. +Each source `stream` becomes a destination `index`. +For example, in with a relational database source - +The DB table name is mapped to the destination index. +The DB table columns become fields in the destination document. +Each row becomes a document in the destination index. + +### Data type mapping + +[See Elastic documentation for detailed information about the field types](https://www.elastic.co/guide/en/elasticsearch/reference/current/mapping-types.html) +This section should contain a table mapping each of the connector's data types to Airbyte types. At the moment, Airbyte uses the same types used by [JSONSchema](https://json-schema.org/understanding-json-schema/reference/index.html). `string`, `date-time`, `object`, `array`, `boolean`, `integer`, and `number` are the most commonly used data types. + +| Integration Type | Airbyte Type | Notes | +| :--- | :--- | :--- | +| text | string | [more info](https://www.elastic.co/guide/en/elasticsearch/reference/current/text.html) +| date | date-time | [more info](https://www.elastic.co/guide/en/elasticsearch/reference/current/date.html) +| object | object | [more info](https://www.elastic.co/guide/en/elasticsearch/reference/current/object.html) +| array | array | [more info](https://www.elastic.co/guide/en/elasticsearch/reference/current/array.html) +| boolean | boolean | [more info](https://www.elastic.co/guide/en/elasticsearch/reference/current/boolean.html) +| numeric | integer | [more info](https://www.elastic.co/guide/en/elasticsearch/reference/current/number.html) +| numeric | number | [more info](https://www.elastic.co/guide/en/elasticsearch/reference/current/number.html) + + +### Features + +This section should contain a table with the following format: + +| Feature | Supported?(Yes/No) | Notes | +| :--- | :--- | :--- | +| Full Refresh Sync | yes | | +| Incremental Sync | yes | | +| Replicate Incremental Deletes | no | | +| SSL connection | yes | | +| SSH Tunnel Support | ?? | | + +### Performance considerations + +Batch/bulk writes are performed. Large records may impact performance. +The connector should be enhanced to support variable batch sizes. + +## Getting started + +### Requirements + +* Elasticsearch >= 7.x +* Configuration + * Endpoint URL [ex. https://elasticsearch.savantly.net:9423] + * Port number [defaults to 9002] + * Username [optional] (basic auth) + * Password [optional] (basic auth) + * Api key ID [optional] + * Api key secret [optional] +* If authentication is used, the user should have permission to create an index if it doesn't exist, and/or be able to `create` documents + + +### Setup guide + +Enter the hostname and/or other configuration information ... +#### TODO: more info, screenshots?, etc... \ No newline at end of file