Skip to content

Commit

Permalink
feat(es8): es8 支持证书认证
Browse files Browse the repository at this point in the history
  • Loading branch information
zhuSilence committed Apr 14, 2024
1 parent caa46f3 commit d5dba78
Show file tree
Hide file tree
Showing 2 changed files with 75 additions and 26 deletions.
@@ -1,18 +1,14 @@
package com.alibaba.otter.canal.client.adapter.es8x.support;

import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.UnknownHostException;
import java.util.Arrays;
import java.util.Map;

import com.alibaba.otter.canal.client.adapter.es.core.support.ESBulkRequest;
import org.apache.commons.lang.StringUtils;
import org.apache.http.HttpHost;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.CredentialsProvider;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.apache.http.ssl.SSLContextBuilder;
import org.apache.http.ssl.SSLContexts;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
Expand All @@ -36,7 +32,20 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.alibaba.otter.canal.client.adapter.es.core.support.ESBulkRequest;
import javax.net.ssl.SSLContext;
import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.UnknownHostException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.security.KeyStore;
import java.security.cert.Certificate;
import java.security.cert.CertificateFactory;
import java.util.Arrays;
import java.util.Map;

/**
* ES 连接器, 只支持 Rest 方式
Expand All @@ -50,20 +59,59 @@ public class ESConnection {

private RestHighLevelClient restHighLevelClient;

public ESConnection(String[] hosts, Map<String, String> properties) throws UnknownHostException{
public ESConnection(String[] hosts, Map<String, String> properties) throws UnknownHostException {
String caPath = properties.get("security.ca.path");
if (StringUtils.isNotEmpty(caPath)) {
connectEsWithCa(hosts, properties, caPath);
} else {
connectEsWithoutCa(hosts, properties);
}
}
private void connectEsWithCa(String[] hosts, Map<String, String> properties, String caPath) {
Path caCertificatePath = Paths.get(caPath);
try (InputStream is = Files.newInputStream(caCertificatePath)) {
CertificateFactory factory = CertificateFactory.getInstance("X.509");
Certificate trustedCa = factory.generateCertificate(is);
KeyStore trustStore = KeyStore.getInstance("pkcs12");
trustStore.load(null, null);
trustStore.setCertificateEntry("ca", trustedCa);
SSLContextBuilder sslContextBuilder = SSLContexts.custom()
.loadTrustMaterial(trustStore, null);
final SSLContext sslContext = sslContextBuilder.build();

HttpHost[] httpHosts = Arrays.stream(hosts).map(this::createHttpHost).toArray(HttpHost[]::new);
RestClientBuilder restClientBuilder = RestClient.builder(httpHosts);
String nameAndPwd = properties.get("security.auth");
if (StringUtils.isNotEmpty(nameAndPwd) && nameAndPwd.contains(":")) {
String[] nameAndPwdArr = nameAndPwd.split(":");
final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
credentialsProvider.setCredentials(AuthScope.ANY,
new UsernamePasswordCredentials(nameAndPwdArr[0], nameAndPwdArr[1]));
restClientBuilder.setHttpClientConfigCallback(httpClientBuilder -> {
httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
return httpClientBuilder.setSSLContext(sslContext);
});
}
restHighLevelClient = new RestHighLevelClientBuilder(restClientBuilder.build()).setApiCompatibilityMode(true).build();
} catch (Exception e) {
throw new RuntimeException(e);
}
}

private void connectEsWithoutCa(String[] hosts, Map<String, String> properties) {
HttpHost[] httpHosts = Arrays.stream(hosts).map(this::createHttpHost).toArray(HttpHost[]::new);
RestClientBuilder restClientBuilder = RestClient.builder(httpHosts);
String nameAndPwd = properties.get("security.auth");
if (StringUtils.isNotEmpty(nameAndPwd) && nameAndPwd.contains(":")) {
String[] nameAndPwdArr = nameAndPwd.split(":");
final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
credentialsProvider.setCredentials(AuthScope.ANY,
new UsernamePasswordCredentials(nameAndPwdArr[0], nameAndPwdArr[1]));
new UsernamePasswordCredentials(nameAndPwdArr[0], nameAndPwdArr[1]));
restClientBuilder.setHttpClientConfigCallback(
httpClientBuilder -> httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider));
httpClientBuilder -> httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider));
}
restHighLevelClient = new RestHighLevelClientBuilder(restClientBuilder.build()).setApiCompatibilityMode(true)
.build();
.build();
}

public void close() {
Expand Down Expand Up @@ -99,9 +147,9 @@ public class ES8xIndexRequest implements ESBulkRequest.ESIndexRequest {

private IndexRequestBuilder indexRequestBuilder;

private IndexRequest indexRequest;
private IndexRequest indexRequest;

public ES8xIndexRequest(String index, String id){
public ES8xIndexRequest(String index, String id) {
indexRequest = new IndexRequest(index);
indexRequest.id(id);

Expand Down Expand Up @@ -142,9 +190,9 @@ public class ES8xUpdateRequest implements ESBulkRequest.ESUpdateRequest {

private UpdateRequestBuilder updateRequestBuilder;

private UpdateRequest updateRequest;
private UpdateRequest updateRequest;

public ES8xUpdateRequest(String index, String id){
public ES8xUpdateRequest(String index, String id) {

updateRequest = new UpdateRequest(index, id);
}
Expand Down Expand Up @@ -191,9 +239,9 @@ public class ES8xDeleteRequest implements ESBulkRequest.ESDeleteRequest {

private DeleteRequestBuilder deleteRequestBuilder;

private DeleteRequest deleteRequest;
private DeleteRequest deleteRequest;

public ES8xDeleteRequest(String index, String id){
public ES8xDeleteRequest(String index, String id) {

deleteRequest = new DeleteRequest(index, id);

Expand All @@ -220,11 +268,11 @@ public class ESSearchRequest {

private SearchRequestBuilder searchRequestBuilder;

private SearchRequest searchRequest;
private SearchRequest searchRequest;

private SearchSourceBuilder sourceBuilder;
private SearchSourceBuilder sourceBuilder;

public ESSearchRequest(String index){
public ESSearchRequest(String index) {

searchRequest = new SearchRequest(index);
sourceBuilder = new SearchSourceBuilder();
Expand Down Expand Up @@ -277,9 +325,9 @@ public class ES8xBulkRequest implements ESBulkRequest {

private BulkRequestBuilder bulkRequestBuilder;

private BulkRequest bulkRequest;
private BulkRequest bulkRequest;

public ES8xBulkRequest(){
public ES8xBulkRequest() {

bulkRequest = new BulkRequest();

Expand Down Expand Up @@ -350,7 +398,7 @@ public static class ES8xBulkResponse implements ESBulkRequest.ESBulkResponse {

private BulkResponse bulkResponse;

public ES8xBulkResponse(BulkResponse bulkResponse){
public ES8xBulkResponse(BulkResponse bulkResponse) {
this.bulkResponse = bulkResponse;
}

Expand Down Expand Up @@ -390,7 +438,7 @@ private HttpHost createHttpHost(String uriStr) {
}
try {
return HttpHost.create(new URI(uri
.getScheme(), null, uri.getHost(), uri.getPort(), uri.getPath(), uri.getQuery(), uri.getFragment())
.getScheme(), null, uri.getHost(), uri.getPort(), uri.getPath(), uri.getQuery(), uri.getFragment())
.toString());
} catch (URISyntaxException ex) {
throw new IllegalStateException(ex);
Expand Down
3 changes: 2 additions & 1 deletion client-adapter/launcher/src/main/resources/application.yml
Expand Up @@ -91,6 +91,7 @@ canal.conf:
# hosts: 127.0.0.1:9300 # 127.0.0.1:9200 for rest mode
# properties:
# mode: transport # or rest
# # security.ca.path: /etc/es8/ca.crt
# # security.auth: test:123456 # only used for rest mode
# cluster.name: elasticsearch
# - name: kudu
Expand All @@ -113,4 +114,4 @@ canal.conf:
# jdbc.password: 123456
# batchSize: 3000
# scheduleTime: 600 # second unit
# threads: 3 # parallel threads
# threads: 3 # parallel threads

0 comments on commit d5dba78

Please sign in to comment.