Skip to content
This repository has been archived by the owner on Dec 15, 2022. It is now read-only.

Commit

Permalink
Various code improvements and bug fixes for issue #1
Browse files Browse the repository at this point in the history
  • Loading branch information
Matt Casters committed Jul 20, 2018
1 parent 4687c5b commit 64afd96
Show file tree
Hide file tree
Showing 7 changed files with 269 additions and 94 deletions.
22 changes: 22 additions & 0 deletions .idea/inspectionProfiles/Project_Default.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -142,5 +142,5 @@


<groupId>kettle-azure-event-hubs</groupId>
<version>0.0.1-SNAPSHOT</version>
<version>0.0.2-SNAPSHOT</version>
</project>
Original file line number Diff line number Diff line change
@@ -1,25 +1,17 @@
package com.neo4j.kettle.azure.steps.listen;


import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.microsoft.azure.eventhubs.ConnectionStringBuilder;
import com.microsoft.azure.eventhubs.EventHubClient;
import com.microsoft.azure.eventhubs.EventHubException;
import com.microsoft.azure.eventprocessorhost.EventProcessorHost;
import com.microsoft.azure.eventprocessorhost.EventProcessorOptions;
import com.microsoft.azure.eventprocessorhost.IEventProcessor;
import com.microsoft.azure.eventprocessorhost.IEventProcessorFactory;
import com.microsoft.azure.eventprocessorhost.PartitionContext;
import com.neo4j.kettle.azure.ErrorNotificationHandler;
import com.neo4j.kettle.azure.EventProcessor;
import org.apache.commons.lang.StringUtils;
import org.pentaho.di.core.Const;
import org.pentaho.di.core.exception.KettleException;
import org.pentaho.di.core.exception.KettleStepException;
import org.pentaho.di.core.row.RowMeta;
import org.pentaho.di.core.row.RowMetaInterface;
import org.pentaho.di.trans.RowProducer;
import org.pentaho.di.trans.SingleThreadedTransExecutor;
import org.pentaho.di.trans.Trans;
import org.pentaho.di.trans.TransMeta;
Expand All @@ -30,10 +22,9 @@
import org.pentaho.di.trans.step.StepMeta;
import org.pentaho.di.trans.step.StepMetaInterface;

import java.io.IOException;
import java.util.LinkedList;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.Executors;

public class AzureListener extends BaseStep implements StepInterface {
Expand Down Expand Up @@ -105,6 +96,7 @@ public AzureListener( StepMeta stepMeta, StepDataInterface stepDataInterface, in
if (StringUtils.isNotEmpty( batchTransformationFile ) && StringUtils.isNotEmpty( batchInputStep )) {
logBasic( "Passing rows to a batching transformation running single threaded : " +batchTransformationFile);
data.stt = true;
data.sttMaxWaitTime = Const.toLong( environmentSubstitute( meta.getBatchMaxWaitTime() ), -1L);
data.sttTransMeta = meta.loadBatchTransMeta( meta, repository, metaStore, this );
data.sttTransMeta.setTransformationType( TransMeta.TransformationType.SingleThreaded );
data.sttTrans = new Trans( data.sttTransMeta, this );
Expand Down Expand Up @@ -186,13 +178,60 @@ public AzureListener( StepMeta stepMeta, StepDataInterface stepDataInterface, in
throw new KettleStepException( "Unable to create event hub client", e );
}

/*
try {
host.registerEventProcessor(AzureListenerEventProcessor.class, options)
*/
// Create our event processor which is going to actually send rows to the batch transformation (or not)
// and get rows from an optional output step.
//
final AzureListenerEventProcessor eventProcessor = new AzureListenerEventProcessor( AzureListener.this, data, data.batchSize );

// In case we have a while since an iteration was done sending rows to the batch transformation, keep an eye out for the
// maximum wait time. If we go over that time, and we have records in the input of the batch, call oneIteration.
// We need to make sure to halt the rest though.
//
if (data.stt && data.sttMaxWaitTime>0) {
// Add a timer to check every max wait time to see whether or not we have to do an iteration...
//
logBasic( "Checking for stalled rows every 100ms to see if we exceed the maximum wait time: " +data.sttMaxWaitTime );
try {
Timer timer = new Timer();
TimerTask timerTask = new TimerTask() {
@Override public void run() {
// Do nothing if we haven't started yet.
//
if ( eventProcessor.getLastIterationTime() > 0 ) {
if ( eventProcessor.getPassedRowsCount() > 0 ) {
long now = System.currentTimeMillis();

long diff = now - eventProcessor.getLastIterationTime();
if ( diff > data.sttMaxWaitTime ) {
logDetailed( "Stalled rows detected with wait time of "+((double)diff/1000) );

// Call one iteration but halt anything else first.
//
try {
eventProcessor.startWait();
eventProcessor.doOneIteration();
} catch(Exception e) {
throw new RuntimeException( "Error in batch iteration when max wait time was exceeded", e);
} finally {
eventProcessor.endWait();
}
logDetailed( "Done processing after max wait time.");

}
}
}
}
};
// Check ten times per second
//
timer.schedule( timerTask, 100, 100);
} catch(RuntimeException e) {
throw new KettleStepException( "Error in batch iteration when max wait time was exceeded", e);
}
}

try{
host.registerEventProcessorFactory( partitionContext -> new AzureListenerEventProcessor( AzureListener.this, data, data.batchSize ) )
host.registerEventProcessorFactory( partitionContext -> eventProcessor )
.whenComplete((unused, e) -> {
// whenComplete passes the result of the previous stage through unchanged,
// which makes it useful for logging a result without side effects.
Expand Down Expand Up @@ -241,6 +280,7 @@ public AzureListener( StepMeta stepMeta, StepDataInterface stepDataInterface, in
return null;
})
.get(); // Wait for everything to finish before exiting main!

} catch ( Exception e ) {
throw new KettleException( "Error in event processor", e );
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,4 +36,5 @@ public class AzureListenerData extends BaseStepData implements StepDataInterface
public SingleThreadedTransExecutor sttExecutor;
public boolean stt = false;
public RowProducer sttRowProducer;
public long sttMaxWaitTime;
}
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,8 @@ public class AzureListenerDialog extends BaseStepDialog implements StepDialogInt
private TextVar wBatchTransformation;
private TextVar wBatchInput;
private TextVar wBatchOutput;

private TextVar wMaxWaitTime;

private AzureListenerMeta input;

public AzureListenerDialog( Shell parent, Object inputMetadata, TransMeta transMeta, String stepname ) {
Expand Down Expand Up @@ -462,6 +463,24 @@ public AzureListenerDialog( Shell parent, Object inputMetadata, TransMeta transM
wBatchOutput.setLayoutData( fdBatchOutput );
lastControl = wBatchOutput;

Label wlMaxWaitTime = new Label( shell, SWT.RIGHT );
wlMaxWaitTime.setText( "Maximum wait time (ms)" );
props.setLook( wlMaxWaitTime );
FormData fdlMaxWaitTime = new FormData();
fdlMaxWaitTime.left = new FormAttachment( 0, 0 );
fdlMaxWaitTime.right = new FormAttachment( middle, -margin );
fdlMaxWaitTime.top = new FormAttachment( lastControl, 2*margin );
wlMaxWaitTime.setLayoutData( fdlMaxWaitTime );
wMaxWaitTime = new TextVar( transMeta, shell, SWT.SINGLE | SWT.LEFT | SWT.BORDER );
props.setLook( wMaxWaitTime );
wMaxWaitTime.addModifyListener( lsMod );
FormData fdMaxWaitTime = new FormData();
fdMaxWaitTime.left = new FormAttachment( middle, 0 );
fdMaxWaitTime.right = new FormAttachment( 100, 0 );
fdMaxWaitTime.top = new FormAttachment( wlMaxWaitTime, 0, SWT.CENTER );
wMaxWaitTime.setLayoutData( fdMaxWaitTime );
lastControl = wMaxWaitTime;

// Some buttons
wOK = new Button( shell, SWT.PUSH );
wOK.setText( BaseMessages.getString( PKG, "System.Button.OK" ) );
Expand Down Expand Up @@ -507,6 +526,7 @@ public void widgetDefaultSelected( SelectionEvent e ) {
wBatchTransformation.addSelectionListener( lsDef );
wBatchInput.addSelectionListener( lsDef );
wBatchOutput.addSelectionListener( lsDef );
wMaxWaitTime.addSelectionListener( lsDef );

// Detect X or ALT-F4 or something that kills this window...
shell.addShellListener( new ShellAdapter() {
Expand Down Expand Up @@ -559,6 +579,7 @@ public void getData() {
wBatchTransformation.setText(Const.NVL(input.getBatchTransformation(), ""));
wBatchInput.setText(Const.NVL(input.getBatchInputStep(), ""));
wBatchOutput.setText(Const.NVL(input.getBatchOutputStep(), ""));
wMaxWaitTime.setText(Const.NVL(input.getBatchMaxWaitTime(), ""));
}

private void ok() {
Expand Down Expand Up @@ -586,6 +607,7 @@ private void ok() {
input.setBatchTransformation( wBatchTransformation.getText() );
input.setBatchInputStep( wBatchInput.getText() );
input.setBatchOutputStep( wBatchOutput.getText() );
input.setBatchMaxWaitTime( wMaxWaitTime.getText() );

dispose();
}
Expand Down
Loading

0 comments on commit 64afd96

Please sign in to comment.