Skip to content
This repository has been archived by the owner on Jan 26, 2022. It is now read-only.

Commit

Permalink
Issue #125
Browse files Browse the repository at this point in the history
  • Loading branch information
mattcasters committed Apr 18, 2019
1 parent f239cb3 commit 6202b84
Show file tree
Hide file tree
Showing 3 changed files with 61 additions and 5 deletions.
19 changes: 16 additions & 3 deletions src/main/java/bi/know/kettle/neo4j/steps/cypher/Cypher.java
Expand Up @@ -6,6 +6,7 @@
import org.apache.commons.lang.StringUtils;
import org.neo4j.driver.v1.Record;
import org.neo4j.driver.v1.StatementResult;
import org.neo4j.driver.v1.Transaction;
import org.neo4j.driver.v1.Value;
import org.neo4j.driver.v1.exceptions.ServiceUnavailableException;
import org.neo4j.driver.v1.summary.Notification;
Expand Down Expand Up @@ -260,7 +261,11 @@ private StatementResult runCypherStatement( Map<String, Object> parameters ) {
if ( data.outputCount == 0 ) {
data.transaction = data.session.beginTransaction();
}
result = data.transaction.run( data.cypher, parameters );
if (meta.isReadOnly()) {
result = data.session.readTransaction( tx-> tx.run(data.cypher, parameters) );
} else {
result = data.session.writeTransaction( tx-> tx.run(data.cypher, parameters) );
}
data.outputCount++;
incrementLinesOutput();

Expand All @@ -278,13 +283,21 @@ private StatementResult writeUnwindList() {
unwindMap.put(data.unwindMapName, data.unwindList);
StatementResult result;
try {
result = data.session.run( data.cypher, unwindMap );
if (meta.isReadOnly()) {
result = data.session.readTransaction( tx-> tx.run(data.cypher, unwindMap) );
} else {
result = data.session.writeTransaction( tx-> tx.run(data.cypher, unwindMap) );
}
} catch(ServiceUnavailableException e) {
// retry once after reconnecting.
// This can fix certain time-out issues
//
reconnect();
result = data.session.run( data.cypher, unwindMap );
if (meta.isReadOnly()) {
result = data.session.readTransaction( tx-> tx.run(data.cypher, unwindMap) );
} else {
result = data.session.writeTransaction( tx-> tx.run(data.cypher, unwindMap) );
}
}
setLinesOutput( getLinesOutput()+data.unwindList.size() );
data.unwindList.clear();
Expand Down
23 changes: 21 additions & 2 deletions src/main/java/bi/know/kettle/neo4j/steps/cypher/CypherDialog.java
Expand Up @@ -69,9 +69,8 @@ public class CypherDialog extends BaseStepDialog implements StepDialogInterface
private Text wStepname;

private CCombo wConnection;

private TextVar wBatchSize;

private Button wReadOnly;
private Button wCypherFromField;
private CCombo wCypherField;
private Button wUnwind;
Expand Down Expand Up @@ -212,6 +211,23 @@ public CypherDialog( Shell parent, Object inputMetadata, TransMeta transMeta, St
wBatchSize.setLayoutData( fdBatchSize );
lastControl = wBatchSize;

Label wlReadOnly = new Label( wComposite, SWT.RIGHT );
wlReadOnly.setText( "Read only statement? " );
props.setLook( wlReadOnly );
FormData fdlReadOnly = new FormData();
fdlReadOnly.left = new FormAttachment( 0, 0 );
fdlReadOnly.right = new FormAttachment( middle, -margin );
fdlReadOnly.top = new FormAttachment( lastControl, 2 * margin );
wlReadOnly.setLayoutData( fdlReadOnly );
wReadOnly = new Button( wComposite, SWT.CHECK | SWT.BORDER );
props.setLook( wReadOnly );
FormData fdReadOnly = new FormData();
fdReadOnly.left = new FormAttachment( middle, 0 );
fdReadOnly.right = new FormAttachment( 100, 0 );
fdReadOnly.top = new FormAttachment( wlReadOnly, 0, SWT.CENTER );
wReadOnly.setLayoutData( fdReadOnly );
lastControl = wReadOnly;

Label wlCypherFromField = new Label( wComposite, SWT.RIGHT );
wlCypherFromField.setText( "Get Cypher from input field? " );
props.setLook( wlCypherFromField );
Expand Down Expand Up @@ -489,6 +505,7 @@ public void widgetDefaultSelected( SelectionEvent e ) {
wConnection.addSelectionListener( lsDef );
wStepname.addSelectionListener( lsDef );
wBatchSize.addSelectionListener( lsDef );
wReadOnly.addSelectionListener( lsDef );
wCypherFromField.addSelectionListener( lsDef );
wCypherField.addSelectionListener( lsDef );
wUnwind.addSelectionListener( lsDef );
Expand Down Expand Up @@ -557,6 +574,7 @@ public void getData() {
wStepname.setText( Const.NVL( stepname, "" ) );
wConnection.setText( Const.NVL( input.getConnectionName(), "" ) );

wReadOnly.setSelection( input.isReadOnly() );
wCypherFromField.setSelection( input.isCypherFromField() );
wCypherField.setText( Const.NVL( input.getCypherField(), "" ) );
try {
Expand Down Expand Up @@ -621,6 +639,7 @@ private void getInfo(CypherMeta meta) {
meta.setBatchSize( wBatchSize.getText() );
meta.setCypher( wCypher.getText() );

meta.setReadOnly( wReadOnly.getSelection() );
meta.setCypherFromField( wCypherFromField.getSelection() );
meta.setCypherField( wCypherField.getText() );

Expand Down
24 changes: 24 additions & 0 deletions src/main/java/bi/know/kettle/neo4j/steps/cypher/CypherMeta.java
Expand Up @@ -47,6 +47,7 @@ public class CypherMeta extends BaseStepMeta implements StepMetaInterface {
public static final String CONNECTION = "connection";
public static final String CYPHER = "cypher";
public static final String BATCH_SIZE = "batch_size";
public static final String READ_ONLY = "read_only";
public static final String CYPHER_FROM_FIELD = "cypher_from_field";
public static final String CYPHER_FIELD = "cypher_field";
public static final String UNWIND = "unwind";
Expand Down Expand Up @@ -78,6 +79,9 @@ public class CypherMeta extends BaseStepMeta implements StepMetaInterface {
@Injection( name = BATCH_SIZE )
private String batchSize;

@Injection( name = READ_ONLY )
private boolean readOnly;

@Injection( name = CYPHER_FROM_FIELD )
private boolean cypherFromField;

Expand Down Expand Up @@ -160,6 +164,7 @@ public CypherMeta() {
xml.append( XMLHandler.addTagValue( CONNECTION, connectionName ) );
xml.append( XMLHandler.addTagValue( CYPHER, cypher ) );
xml.append( XMLHandler.addTagValue( BATCH_SIZE, batchSize ) );
xml.append( XMLHandler.addTagValue( READ_ONLY, readOnly ) );
xml.append( XMLHandler.addTagValue( CYPHER_FROM_FIELD, cypherFromField ) );
xml.append( XMLHandler.addTagValue( CYPHER_FIELD, cypherField ) );
xml.append( XMLHandler.addTagValue( UNWIND, usingUnwind ) );
Expand Down Expand Up @@ -194,6 +199,7 @@ public CypherMeta() {
connectionName = XMLHandler.getTagValue( stepnode, CONNECTION );
cypher = XMLHandler.getTagValue( stepnode, CYPHER );
batchSize = XMLHandler.getTagValue( stepnode, BATCH_SIZE );
readOnly = "Y".equalsIgnoreCase( XMLHandler.getTagValue( stepnode, READ_ONLY ) );
cypherFromField = "Y".equalsIgnoreCase( XMLHandler.getTagValue( stepnode, CYPHER_FROM_FIELD ) );
cypherField = XMLHandler.getTagValue( stepnode, CYPHER_FIELD );
usingUnwind = "Y".equalsIgnoreCase( XMLHandler.getTagValue( stepnode, UNWIND ) );
Expand Down Expand Up @@ -234,6 +240,7 @@ public CypherMeta() {
rep.saveStepAttribute( transformationId, stepId, CONNECTION, connectionName );
rep.saveStepAttribute( transformationId, stepId, CYPHER, cypher );
rep.saveStepAttribute( transformationId, stepId, BATCH_SIZE, batchSize );
rep.saveStepAttribute( transformationId, stepId, READ_ONLY, readOnly );
rep.saveStepAttribute( transformationId, stepId, CYPHER_FROM_FIELD, cypherFromField );
rep.saveStepAttribute( transformationId, stepId, CYPHER_FIELD, cypherField );
rep.saveStepAttribute( transformationId, stepId, UNWIND, usingUnwind );
Expand All @@ -258,6 +265,7 @@ public CypherMeta() {
connectionName = rep.getStepAttributeString( stepId, CONNECTION );
cypher = rep.getStepAttributeString( stepId, CYPHER );
batchSize = rep.getStepAttributeString( stepId, BATCH_SIZE );
readOnly = rep.getStepAttributeBoolean( stepId, READ_ONLY );
cypherFromField = rep.getStepAttributeBoolean( stepId, CYPHER_FROM_FIELD );
cypherField = rep.getStepAttributeString( stepId, CYPHER_FIELD );
usingUnwind = rep.getStepAttributeBoolean( stepId, UNWIND );
Expand Down Expand Up @@ -350,6 +358,22 @@ public void setCypherFromField( boolean cypherFromField ) {
this.cypherFromField = cypherFromField;
}

/**
* Gets readOnly
*
* @return value of readOnly
*/
public boolean isReadOnly() {
return readOnly;
}

/**
* @param readOnly The readOnly to set
*/
public void setReadOnly( boolean readOnly ) {
this.readOnly = readOnly;
}

/**
* Gets cypherField
*
Expand Down

0 comments on commit 6202b84

Please sign in to comment.