Skip to content
Permalink
Browse files

Merge pull request #135 from mattcasters/master

issues #131 and #134
  • Loading branch information...
mattcasters committed Aug 14, 2019
2 parents 6179bba + 31ba1bd commit f94250634055ac313f8338b2666b71cdcdfedaef
@@ -115,13 +115,13 @@
<dependency>
<groupId>kettle-neo4j</groupId>
<artifactId>kettle-neo4j-core</artifactId>
<version>1.0.0</version>
<version>1.0.1</version>
</dependency>

<dependency>
<groupId>org.neo4j.driver</groupId>
<artifactId>neo4j-java-driver</artifactId>
<version>1.7.0</version>
<version>1.7.5</version>
</dependency>

<dependency>
@@ -13,6 +13,7 @@
import org.neo4j.driver.v1.summary.Notification;
import org.neo4j.driver.v1.summary.ResultSummary;
import org.neo4j.kettle.core.data.GraphData;
import org.neo4j.kettle.core.data.GraphPropertyDataType;
import org.neo4j.kettle.model.GraphPropertyType;
import org.neo4j.kettle.shared.DriverSingleton;
import org.pentaho.di.core.Const;
@@ -217,6 +218,17 @@ private void reconnect() {
parameters.put( mapping.getParameter(), neoValue );
}

// Create a map between the return value and the source type so we can do the appropriate mapping later...
//
data.returnSourceTypeMap = new HashMap<>( );
for (ReturnValue returnValue : meta.getReturnValues()) {
if (StringUtils.isNotEmpty( returnValue.getSourceType() )) {
String name = returnValue.getName();
GraphPropertyDataType type = GraphPropertyDataType.parseCode( returnValue.getSourceType() );
data.returnSourceTypeMap.put(name, type);
}
}

if ( meta.isUsingUnwind() ) {
data.unwindList.add( parameters );
data.outputCount++;
@@ -250,59 +262,67 @@ private void reconnect() {
}

private void runCypherStatement( Object[] row, String cypher, Map<String, Object> parameters ) throws KettleException {

data.cypherStatements.add( new CypherStatement( row, cypher, parameters ) );
if ( data.cypherStatements.size() >= data.batchSize) {
runCypherStatementsBatch();
}
}

if ( data.cypherStatements.size() >= data.batchSize ) {
// Execute all the statements in there in one transaction...
//
TransactionWork<Integer> transactionWork = transaction -> {

for ( CypherStatement cypherStatement : data.cypherStatements ) {
StatementResult result = transaction.run( cypherStatement.getCypher(), cypherStatement.getParameters() );
try {
List<Object[]> resultRows = writeResultRows( result, cypherStatement.getRow(), false );
// Remember the results for when the whole batch is processed.
// Only then we'll forward the results.
//
cypherStatement.setResultRows( resultRows );
} catch(Exception e) {
throw new RuntimeException( "Error parsing result of cypher statement '"+cypherStatement.getCypher()+"'", e );
}
}
private void runCypherStatementsBatch() throws KettleException {

return data.cypherStatements.size();
};
if (data.cypherStatements.size()==0) {
// Nothing to see here, move along
return;
}

try {
int nrProcessed;
if ( meta.isReadOnly() ) {
nrProcessed = data.session.readTransaction( transactionWork );
setLinesInput( getLinesInput() + data.cypherStatements.size() );
} else {
nrProcessed = data.session.writeTransaction( transactionWork );
setLinesOutput( getLinesOutput() + data.cypherStatements.size() );
// Execute all the statements in there in one transaction...
//
TransactionWork<Integer> transactionWork = transaction -> {

for ( CypherStatement cypherStatement : data.cypherStatements ) {
StatementResult result = transaction.run( cypherStatement.getCypher(), cypherStatement.getParameters() );
try {
List<Object[]> resultRows = writeResultRows( result, cypherStatement.getRow(), false );
// Remember the results for when the whole batch is processed.
// Only then we'll forward the results.
//
cypherStatement.setResultRows( resultRows );
} catch(Exception e) {
throw new RuntimeException( "Error parsing result of cypher statement '"+cypherStatement.getCypher()+"'", e );
}
}

if (log.isDebug()) {
logDebug( "Processed "+nrProcessed+" statements" );
}
return data.cypherStatements.size();
};

// Forward all rows from the batch of records...
//
for (CypherStatement cypherStatement : data.cypherStatements) {
for (Object[] resultRow : cypherStatement.getResultRows()) {
putRow( data.outputRowMeta, resultRow );
}
}
try {
int nrProcessed;
if ( meta.isReadOnly() ) {
nrProcessed = data.session.readTransaction( transactionWork );
setLinesInput( getLinesInput() + data.cypherStatements.size() );
} else {
nrProcessed = data.session.writeTransaction( transactionWork );
setLinesOutput( getLinesOutput() + data.cypherStatements.size() );
}

// Clear out the batch of statements.
//
data.cypherStatements.clear();
if (log.isDebug()) {
logDebug( "Processed "+nrProcessed+" statements" );
}

} catch ( Exception e ) {
throw new KettleException( "Unable to execute batch of cypher statements ("+data.cypherStatements.size()+")", e );
// Forward all rows from the batch of records...
//
for (CypherStatement cypherStatement : data.cypherStatements) {
for (Object[] resultRow : cypherStatement.getResultRows()) {
putRow( data.outputRowMeta, resultRow );
}
}

// Clear out the batch of statements.
//
data.cypherStatements.clear();

} catch ( Exception e ) {
throw new KettleException( "Unable to execute batch of cypher statements ("+data.cypherStatements.size()+")", e );
}
}

@@ -392,7 +412,7 @@ private StatementResult writeUnwindList() throws KettleException {
Value recordValue = record.get( returnValue.getName() );
ValueMetaInterface targetValueMeta = data.outputRowMeta.getValueMeta( index );
Object value = null;
if ( recordValue != null ) {
if ( recordValue != null && !recordValue.isNull()) {
try {
switch ( targetValueMeta.getType() ) {
case ValueMetaInterface.TYPE_STRING:
@@ -411,8 +431,27 @@ private StatementResult writeUnwindList() throws KettleException {
value = new BigDecimal( recordValue.asString() );
break;
case ValueMetaInterface.TYPE_DATE:
LocalDate localDate = recordValue.asLocalDate();
value = java.sql.Date.valueOf( localDate );
GraphPropertyDataType type = data.returnSourceTypeMap.get( returnValue.getName() );
if (type!=null) {
// Standard...
switch(type) {
case LocalDateTime: {
LocalDateTime localDateTime = recordValue.asLocalDateTime();
value = java.sql.Date.valueOf( localDateTime.toLocalDate() );
break;
}
case Date: {
LocalDate localDate = recordValue.asLocalDate();
value = java.sql.Date.valueOf( localDate );
break;
}
default:
throw new KettleException( "Conversion from Neo4j daa type "+type.name()+" to a Kettle Date isn't supported yet" );
}
} else {
LocalDate localDate = recordValue.asLocalDate();
value = java.sql.Date.valueOf( localDate );
}
break;
case ValueMetaInterface.TYPE_TIMESTAMP:
LocalDateTime localDateTime = recordValue.asLocalDateTime();
@@ -468,11 +507,26 @@ private boolean processSummary( StatementResult result ) {

@Override public void batchComplete() {

wrapUpTransaction();
try {
wrapUpTransaction();
} catch(Exception e) {
setErrors( getErrors()+1 );
stopAll();
throw new RuntimeException( "Unable to complete batch of records", e );
}

}

private void wrapUpTransaction() {

try {
runCypherStatementsBatch();
} catch(Exception e) {
setErrors( getErrors()+1 );
stopAll();
throw new RuntimeException( "Unable to run batch of cypher statements", e );
}

// At the end of each batch, do a commit.
//
if ( data.outputCount > 0 ) {
@@ -3,6 +3,7 @@
import org.neo4j.driver.v1.Driver;
import org.neo4j.driver.v1.Session;
import org.neo4j.driver.v1.Transaction;
import org.neo4j.kettle.core.data.GraphPropertyDataType;
import org.neo4j.kettle.shared.NeoConnection;
import org.pentaho.di.core.row.RowMetaInterface;
import org.pentaho.di.trans.step.BaseStepData;
@@ -32,4 +33,6 @@
public List<Map<String, Object>> unwindList;

public List<CypherStatement> cypherStatements;

public Map<String, GraphPropertyDataType> returnSourceTypeMap;
}
@@ -31,11 +31,13 @@
import org.neo4j.driver.v1.Value;
import org.neo4j.driver.v1.types.Type;
import org.neo4j.driver.v1.util.Pair;
import org.neo4j.kettle.core.data.GraphPropertyDataType;
import org.neo4j.kettle.model.GraphPropertyType;
import org.neo4j.kettle.shared.NeoConnection;
import org.pentaho.di.core.Const;
import org.pentaho.di.core.exception.KettleStepException;
import org.pentaho.di.core.row.RowMetaInterface;
import org.pentaho.di.core.row.ValueMeta;
import org.pentaho.di.core.row.value.ValueMetaFactory;
import org.pentaho.di.i18n.BaseMessages;
import org.pentaho.di.trans.Trans;
@@ -450,6 +452,7 @@ public CypherDialog( Shell parent, Object inputMetadata, TransMeta transMeta, St
new ColumnInfo[] {
new ColumnInfo( "Field name", ColumnInfo.COLUMN_TYPE_TEXT, false ),
new ColumnInfo( "Return type", ColumnInfo.COLUMN_TYPE_CCOMBO, ValueMetaFactory.getAllValueMetaNames(), false ),
new ColumnInfo( "Source type", ColumnInfo.COLUMN_TYPE_CCOMBO, GraphPropertyDataType.getNames(), false ),
};

Label wlReturns = new Label( wComposite, SWT.LEFT );
@@ -617,6 +620,7 @@ public void getData() {
TableItem item = wReturns.table.getItem( i );
item.setText( 1, Const.NVL( returnValue.getName(), "" ) );
item.setText( 2, Const.NVL( returnValue.getType(), "" ) );
item.setText( 3, Const.NVL( returnValue.getSourceType(), "" ) );
}
wReturns.removeEmptyRows();
wReturns.setRowNums();
@@ -659,7 +663,7 @@ private void getInfo(CypherMeta meta) {
List<ReturnValue> returnValues = new ArrayList<>();
for ( int i = 0; i < wReturns.nrNonEmpty(); i++ ) {
TableItem item = wReturns.getNonEmpty( i );
returnValues.add( new ReturnValue( item.getText( 1 ), item.getText( 2 ) ) );
returnValues.add( new ReturnValue( item.getText( 1 ), item.getText( 2 ), item.getText( 3 ) ) );
}
meta.setReturnValues( returnValues );
}
@@ -738,19 +742,14 @@ private void getReturnValues() {
Value returnValue = fieldPair.value();
Type valueType = returnValue.type();

GraphPropertyType type = null;
for (GraphPropertyType gpType : GraphPropertyType.values()) {
if (valueType.name().equalsIgnoreCase( gpType.name() )) {
type = gpType;
}
}
if (type==null) {
type = GraphPropertyType.String;
}
String typeName = valueType.name().replaceAll( "_", "" );
GraphPropertyDataType type = GraphPropertyDataType.parseCode( typeName );
int kettleType = type.getKettleType();

TableItem item = new TableItem(wReturns.table, SWT.NONE);
item.setText( 1, returnField );
item.setText( 2, type.name() );
item.setText( 2, ValueMetaFactory.getValueMetaName( kettleType ));
item.setText( 3, type.name() );
}
}
wReturns.removeEmptyRows();
@@ -60,6 +60,7 @@
public static final String PARAMETER = "parameter";
public static final String FIELD = "field";
public static final String TYPE = "type";
public static final String SOURCE_TYPE = "source_type";
public static final String RETURNS = "returns";
public static final String RETURN = "return";
public static final String NAME = "name";
@@ -69,6 +70,7 @@

public static final String RETURN_NAME = "return_name";
public static final String RETURN_TYPE = "return_type";
public static final String RETURN_SOURCE_TYPE = "return_source_type";

@Injection( name = CONNECTION )
private String connectionName;
@@ -187,6 +189,7 @@ public CypherMeta() {
xml.append( XMLHandler.openTag( RETURN ) );
xml.append( XMLHandler.addTagValue( NAME, returnValue.getName() ) );
xml.append( XMLHandler.addTagValue( TYPE, returnValue.getType() ) );
xml.append( XMLHandler.addTagValue( SOURCE_TYPE, returnValue.getSourceType() ) );
xml.append( XMLHandler.closeTag( RETURN ) );
}
xml.append( XMLHandler.closeTag( RETURNS ) );
@@ -230,7 +233,8 @@ public CypherMeta() {
for ( Node returnNode : returnNodes ) {
String name = XMLHandler.getTagValue( returnNode, NAME );
String type = XMLHandler.getTagValue( returnNode, TYPE );
returnValues.add( new ReturnValue( name, type ) );
String sourceType = XMLHandler.getTagValue( returnNode, SOURCE_TYPE );
returnValues.add( new ReturnValue( name, type, sourceType ) );
}

super.loadXML( stepnode, databases, metaStore );
@@ -258,6 +262,7 @@ public CypherMeta() {
ReturnValue returnValue = returnValues.get( i );
rep.saveStepAttribute( transformationId, stepId, i, RETURN_NAME, returnValue.getName() );
rep.saveStepAttribute( transformationId, stepId, i, RETURN_TYPE, returnValue.getType() );
rep.saveStepAttribute( transformationId, stepId, i, RETURN_SOURCE_TYPE, returnValue.getSourceType() );
}
}

@@ -289,7 +294,8 @@ public CypherMeta() {
for ( int i = 0; i < nrReturns; i++ ) {
String name = rep.getStepAttributeString( stepId, i, RETURN_NAME );
String type = rep.getStepAttributeString( stepId, i, RETURN_TYPE );
returnValues.add( new ReturnValue( name, type ) );
String sourceType = rep.getStepAttributeString( stepId, i, RETURN_SOURCE_TYPE );
returnValues.add( new ReturnValue( name, type, sourceType ) );
}

}

0 comments on commit f942506

Please sign in to comment.
You can’t perform that action at this time.