Skip to content

Commit

Permalink
[BACKLOG-25823] Upgrade to ElasticSearch 6.4.2
Browse files Browse the repository at this point in the history
  • Loading branch information
Jonathan Jarvis authored and Benjamin Morrise committed Oct 4, 2018
1 parent efaa18b commit cabfd3b
Show file tree
Hide file tree
Showing 6 changed files with 258 additions and 297 deletions.
Expand Up @@ -26,40 +26,9 @@
<outputDirectory>elasticsearch-bulk-insert-plugin/lib</outputDirectory>
<scope>runtime</scope>
<useTransitiveDependencies>true</useTransitiveDependencies>
<includes>
<include>com.wcohen:com.wcohen.secondstring:jar</include>
<include>commons-cli:commons-cli:jar</include>
<include>com.ning:compress-lzf:jar</include>
<include>org.elasticsearch:elasticsearch:jar</include>
<include>com.google.guava:guava:jar</include>
<include>org.hdrhistogram:HdrHistogram:jar</include>
<include>com.carrotsearch:hppc:jar</include>
<include>com.fasterxml.jackson.dataformat:jackson-dataformat-cbor:jar</include>
<include>com.fasterxml.jackson.dataformat:jackson-dataformat-smile:jar</include>
<include>com.fasterxml.jackson.dataformat:jackson-dataformat-yaml:jar</include>
<include>org.joda:joda-convert:jar</include>
<include>joda-time:joda-time:jar</include>
<include>com.twitter:jsr166e:jar</include>
<include>org.apache.lucene:lucene-analyzers-common:jar</include>
<include>org.apache.lucene:lucene-backward-codecs:jar</include>
<include>org.apache.lucene:lucene-core:jar</include>
<include>org.apache.lucene:lucene-grouping:jar</include>
<include>org.apache.lucene:lucene-highlighter:jar</include>
<include>org.apache.lucene:lucene-join:jar</include>
<include>org.apache.lucene:lucene-memory:jar</include>
<include>org.apache.lucene:lucene-misc:jar</include>
<include>org.apache.lucene:lucene-queries:jar</include>
<include>org.apache.lucene:lucene-queryparser:jar</include>
<include>org.apache.lucene:lucene-sandbox:jar</include>
<include>org.apache.lucene:lucene-spatial3d:jar</include>
<include>org.apache.lucene:lucene-spatial:jar</include>
<include>org.apache.lucene:lucene-suggest:jar</include>
<include>io.netty:netty:jar</include>
<include>org.elasticsearch:securesm:jar</include>
<include>org.yaml:snakeyaml:jar</include>
<include>com.spatial4j:spatial4j:jar</include>
<include>com.tdunning:t-digest:jar</include>
</includes>
<excludes>
<exclude>org.pentaho.di.plugins:elasticsearch-bulk-insert-core:jar</exclude>
</excludes>
</dependencySet>
</dependencySets>
</assembly>
</assembly>
28 changes: 22 additions & 6 deletions plugins/elasticsearch-bulk-insert/core/pom.xml
Expand Up @@ -14,61 +14,77 @@

<name>PDI Elasticsearch Bulk Insert Plugin Core</name>
<description>Elasticsearch Bulk Insert Plugin</description>

<properties>
<pdi.version>8.2.0.0-SNAPSHOT</pdi.version>
<build.revision>${project.version}</build.revision>
<timestamp>${maven.build.timestamp}</timestamp>
<build.description>${project.description}</build.description>
<maven.build.timestamp.format>yyyy/MM/dd hh:mm</maven.build.timestamp.format>
<elasticsearch.version>6.4.2</elasticsearch.version>
</properties>

<dependencies>
<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch</artifactId>
<version>2.2.0</version>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch</artifactId>
<version>${elasticsearch.version}</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>transport</artifactId>
<version>${elasticsearch.version}</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>pentaho-kettle</groupId>
<artifactId>kettle-engine</artifactId>
<version>${pdi.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>pentaho-kettle</groupId>
<artifactId>kettle-core</artifactId>
<version>${pdi.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>pentaho-kettle</groupId>
<artifactId>kettle-ui-swt</artifactId>
<version>${pdi.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.eclipse.swt</groupId>
<artifactId>org.eclipse.swt.gtk.linux.x86_64</artifactId>
<version>4.6</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.eclipse</groupId>
<artifactId>jface</artifactId>
<version>3.3.0-I20070606-0010</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.11</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-all</artifactId>
<version>1.9.5</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>pentaho-kettle</groupId>
<artifactId>kettle-engine</artifactId>
<classifier>tests</classifier>
<version>${pdi.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>pentaho-kettle</groupId>
Expand All @@ -78,7 +94,7 @@
<scope>test</scope>
</dependency>
</dependencies>

<build>
<resources>
<resource>
Expand Down
Expand Up @@ -2,7 +2,7 @@
*
* Pentaho Data Integration
*
* Copyright (C) 2002-2017 by Hitachi Vantara : http://www.pentaho.com
* Copyright (C) 2002-2018 by Hitachi Vantara : http://www.pentaho.com
*
*******************************************************************************
*
Expand All @@ -22,32 +22,25 @@

package org.pentaho.di.trans.steps.elasticsearchbulk;

import java.io.IOException;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;

import org.apache.commons.lang.StringUtils;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchTimeoutException;
import org.elasticsearch.action.ListenableActionFuture;
import org.elasticsearch.action.ActionFuture;
import org.elasticsearch.action.DocWriteRequest.OpType;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexRequest.OpType;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.transport.NoNodeAvailableException;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.node.Node;
import org.elasticsearch.node.NodeBuilder;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.transport.client.PreBuiltTransportClient;
import org.pentaho.di.core.exception.KettleException;
import org.pentaho.di.core.exception.KettleStepException;
import org.pentaho.di.core.row.RowDataUtil;
Expand All @@ -61,6 +54,16 @@
import org.pentaho.di.trans.step.StepInterface;
import org.pentaho.di.trans.step.StepMeta;
import org.pentaho.di.trans.step.StepMetaInterface;
import org.pentaho.di.trans.steps.elasticsearchbulk.ElasticSearchBulkMeta.Server;

import java.io.IOException;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;

/**
* Does bulk insert of data into ElasticSearch
Expand All @@ -77,7 +80,6 @@ public class ElasticSearchBulk extends BaseStep implements StepInterface {

TransportClient tc;

private Node node;
private Client client;
private String index;
private String type;
Expand Down Expand Up @@ -106,10 +108,10 @@ public class ElasticSearchBulk extends BaseStep implements StepInterface {
private Map<String, String> columnsToJson;
private boolean hasFields;

private IndexRequest.OpType opType = OpType.CREATE;
private IndexRequest.OpType opType = org.elasticsearch.action.DocWriteRequest.OpType.CREATE;

public ElasticSearchBulk( StepMeta stepMeta, StepDataInterface stepDataInterface, int copyNr, TransMeta transMeta,
Trans trans ) {
Trans trans ) {
super( stepMeta, stepDataInterface, copyNr, transMeta, trans );
}

Expand Down Expand Up @@ -196,10 +198,8 @@ private static Integer getFieldIdx( RowMetaInterface rowMeta, String fieldName )
}

/**
* @param rowMeta
* The metadata for the row to be indexed
* @param row
* The data for the row to be indexed
* @param rowMeta The metadata for the row to be indexed
* @param row The data for the row to be indexed
*/

private boolean indexRow( RowMetaInterface rowMeta, Object[] row ) throws KettleStepException {
Expand Down Expand Up @@ -233,7 +233,7 @@ private boolean indexRow( RowMetaInterface rowMeta, Object[] row ) throws Kettle
throw new KettleStepException( BaseMessages.getString( PKG, "ElasticSearchBulkDialog.Error.NoNodesFound" ) );
} catch ( Exception e ) {
throw new KettleStepException( BaseMessages.getString( PKG, "ElasticSearchBulk.Log.Exception", e
.getLocalizedMessage() ), e );
.getLocalizedMessage() ), e );
}
}

Expand All @@ -244,9 +244,9 @@ private boolean indexRow( RowMetaInterface rowMeta, Object[] row ) throws Kettle
private void addSourceFromJsonString( Object[] row, IndexRequestBuilder requestBuilder ) throws KettleStepException {
Object jsonString = row[jsonFieldIdx];
if ( jsonString instanceof byte[] ) {
requestBuilder.setSource( (byte[]) jsonString );
requestBuilder.setSource( (byte[]) jsonString, XContentType.JSON );
} else if ( jsonString instanceof String ) {
requestBuilder.setSource( ( (String) jsonString ).getBytes() );
requestBuilder.setSource( (String) jsonString, XContentType.JSON );
} else {
throw new KettleStepException( BaseMessages.getString( "ElasticSearchBulk.Error.NoJsonFieldFormat" ) );
}
Expand All @@ -259,7 +259,7 @@ private void addSourceFromJsonString( Object[] row, IndexRequestBuilder requestB
* @throws IOException
*/
private void addSourceFromRowFields( IndexRequestBuilder requestBuilder, RowMetaInterface rowMeta, Object[] row )
throws IOException {
throws IOException {
XContentBuilder jsonBuilder = XContentFactory.jsonBuilder().startObject();

for ( int i = 0; i < rowMeta.size(); i++ ) {
Expand Down Expand Up @@ -301,7 +301,7 @@ public boolean init( StepMetaInterface smi, StepDataInterface sdi ) {

} catch ( Exception e ) {
logError( BaseMessages.getString( PKG, "ElasticSearchBulk.Log.ErrorOccurredDuringStepInitialize" )
+ e.getMessage() );
+ e.getMessage() );
}
return true;
}
Expand All @@ -326,13 +326,14 @@ private void initFromMeta() {
this.hasFields = columnsToJson.size() > 0;

this.opType =
StringUtils.isNotBlank( meta.getIdInField() ) && meta.isOverWriteIfSameId() ? OpType.INDEX : OpType.CREATE;
StringUtils.isNotBlank( meta.getIdInField() ) && meta.isOverWriteIfSameId() ? OpType.INDEX : OpType.CREATE;

}

private boolean processBatch( boolean makeNew ) throws KettleStepException {

ListenableActionFuture<BulkResponse> actionFuture = currentRequest.execute();

ActionFuture<BulkResponse> actionFuture = currentRequest.execute();
boolean responseOk = false;

BulkResponse response = null;
Expand Down Expand Up @@ -420,7 +421,7 @@ private boolean handleResponse( BulkResponse response ) {
private void addIdToRow( String id, int rowIndex ) {

data.inputRowBuffer[rowIndex] =
RowDataUtil.resizeArray( data.inputRowBuffer[rowIndex], getInputRowMeta().size() + 1 );
RowDataUtil.resizeArray( data.inputRowBuffer[rowIndex], getInputRowMeta().size() + 1 );
data.inputRowBuffer[rowIndex][getInputRowMeta().size()] = id;

}
Expand Down Expand Up @@ -467,36 +468,37 @@ private void rejectAllRows( String errorMsg ) {
}

private void initClient() throws UnknownHostException {


Settings.Builder settingsBuilder = Settings.builder();
settingsBuilder.put( Settings.Builder.EMPTY_SETTINGS ); // keep default classloader
settingsBuilder.put( meta.getSettingsMap() );
// Settings settings = settingsBuilder.build();
TransportClient.Builder tClientBuilder = TransportClient.builder().settings( settingsBuilder );

if ( !meta.servers.isEmpty() ) {
node = null;
TransportClient tClient = tClientBuilder.build();
for ( ElasticSearchBulkMeta.Server s : meta.servers ) {
tClient.addTransportAddress( s.getAddr() );
}
client = tClient;
} else {
NodeBuilder nodeBuilder = NodeBuilder.nodeBuilder();
nodeBuilder.settings( settingsBuilder );
node = nodeBuilder.client( true ).node(); // this node will not hold data
client = node.client();
node.start();
settingsBuilder.put( Settings.Builder.EMPTY_SETTINGS );
meta.getSettingsMap().entrySet().stream().forEach( ( s ) -> settingsBuilder.put( s.getKey(),
environmentSubstitute( s.getValue() ) ) );

PreBuiltTransportClient tClient = new PreBuiltTransportClient( settingsBuilder.build() );

for ( Server server : meta.getServers() ) {
tClient.addTransportAddress( new TransportAddress(
InetAddress.getByName( environmentSubstitute( server.getAddress() ) ),
server.getPort() ) );
}

client = tClient;

/** With the upgrade to elasticsearch 6.3.0, removed the NodeBuilder,
* which was removed from the elasticsearch 5.0 API, see:
* https://www.elastic.co/guide/en/elasticsearch/reference/5.0/breaking_50_java_api_changes
* .html#_nodebuilder_removed
*/

}

private void disposeClient() {

if ( client != null ) {
client.close();
}
if ( node != null ) {
node.close();
}


}

Expand Down

0 comments on commit cabfd3b

Please sign in to comment.