Skip to content

Commit

Permalink
[BACKLOG-23645] 2 commits (#5458)
Browse files Browse the repository at this point in the history
* [BACKLOG-23645] MQTT Prod - support for fields in topic

Preparatory refactoring, plus added the new TOPIC_IN_FIELD boolean meta element.

https://jira.pentaho.com/browse/BACKLOG-23645

* [BACKLOG-23645] MQTT Prod - support for fields in topic

Dialog changes to match Ux spec:
http://ux.pentaho.com/pdi-mqtt-producer/#g=1&p=concept-topic-stream-fields

https://jira.pentaho.com/browse/BACKLOG-23645
  • Loading branch information
mkambol authored and Kurtis Walker committed Jun 8, 2018
1 parent 4d021f2 commit 037e609
Show file tree
Hide file tree
Showing 8 changed files with 363 additions and 465 deletions.
Expand Up @@ -26,6 +26,7 @@ class MQTTConstants {
static final String MQTT_SERVER = "MQTT_SERVER"; static final String MQTT_SERVER = "MQTT_SERVER";
static final String TOPICS = "TOPICS"; static final String TOPICS = "TOPICS";
static final String TOPIC = "TOPIC"; static final String TOPIC = "TOPIC";
static final String TOPIC_IN_FIELD = "TOPIC_IN_FIELD";
static final String MSG_OUTPUT_NAME = "MSG_OUTPUT_NAME"; static final String MSG_OUTPUT_NAME = "MSG_OUTPUT_NAME";
static final String TOPIC_OUTPUT_NAME = "TOPIC_OUTPUT_NAME"; static final String TOPIC_OUTPUT_NAME = "TOPIC_OUTPUT_NAME";
static final String QOS = "QOS"; static final String QOS = "QOS";
Expand Down
Expand Up @@ -22,13 +22,15 @@


package org.pentaho.di.trans.step.mqtt; package org.pentaho.di.trans.step.mqtt;


import com.google.common.base.Supplier;
import com.google.common.base.Suppliers;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttException; import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage; import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.pentaho.di.core.CheckResultInterface; import org.pentaho.di.core.CheckResultInterface;
import org.pentaho.di.core.exception.KettleException; import org.pentaho.di.core.exception.KettleException;
import org.pentaho.di.core.exception.KettleStepException; import org.pentaho.di.core.exception.KettleStepException;
import org.pentaho.di.core.util.serialization.BaseSerializingMeta; import org.pentaho.di.core.util.serialization.BaseSerializingMeta;
import org.pentaho.di.i18n.BaseMessages;
import org.pentaho.di.trans.Trans; import org.pentaho.di.trans.Trans;
import org.pentaho.di.trans.TransMeta; import org.pentaho.di.trans.TransMeta;
import org.pentaho.di.trans.step.BaseStep; import org.pentaho.di.trans.step.BaseStep;
Expand All @@ -38,16 +40,19 @@
import org.pentaho.di.trans.step.StepMetaInterface; import org.pentaho.di.trans.step.StepMetaInterface;


import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections;
import java.util.List; import java.util.List;


import static com.google.common.base.Preconditions.checkArgument;
import static java.nio.charset.Charset.defaultCharset; import static java.nio.charset.Charset.defaultCharset;
import static java.util.Optional.ofNullable;
import static org.pentaho.di.i18n.BaseMessages.getString;


public class MQTTProducer extends BaseStep implements StepInterface { public class MQTTProducer extends BaseStep implements StepInterface {
private static Class<?> PKG = MQTTProducer.class; private static Class<?> PKG = MQTTProducer.class;


private MQTTProducerMeta meta; private MQTTProducerMeta meta;
private MQTTProducerData data;
Supplier<MqttClient> client = Suppliers.memoize( this::connectToClient );


/** /**
* This is the base step that forms that basis for all steps. You can derive from this class to implement your own * This is the base step that forms that basis for all steps. You can derive from this class to implement your own
Expand All @@ -71,7 +76,6 @@ public boolean init( StepMetaInterface stepMetaInterface, StepDataInterface step
boolean isInitalized = super.init( stepMetaInterface, stepDataInterface ); boolean isInitalized = super.init( stepMetaInterface, stepDataInterface );
BaseSerializingMeta serializingMeta = (BaseSerializingMeta) stepMetaInterface; BaseSerializingMeta serializingMeta = (BaseSerializingMeta) stepMetaInterface;
meta = (MQTTProducerMeta) serializingMeta.withVariables( this ); // handle variable substitution up-front meta = (MQTTProducerMeta) serializingMeta.withVariables( this ); // handle variable substitution up-front
data = ( (MQTTProducerData) stepDataInterface );


List<CheckResultInterface> remarks = new ArrayList<>(); List<CheckResultInterface> remarks = new ArrayList<>();
meta.check( meta.check(
Expand All @@ -82,11 +86,7 @@ remarks, getTransMeta(), meta.getParentStepMeta(),
remarks.stream().filter( result -> result.getType() == CheckResultInterface.TYPE_RESULT_ERROR ) remarks.stream().filter( result -> result.getType() == CheckResultInterface.TYPE_RESULT_ERROR )
.peek( result -> logError( result.getText() ) ) .peek( result -> logError( result.getText() ) )
.count() > 0; .count() > 0;
if ( errorsPresent ) { return !errorsPresent && isInitalized;
return false;
}

return isInitalized;
} }


@Override @Override
Expand All @@ -98,69 +98,91 @@ public boolean processRow( StepMetaInterface smi, StepDataInterface sdi ) throws
stopMqttClient(); stopMqttClient();
return false; return false;
} }

if ( first ) {
logDebug( "Publishing using a quality of service level of " + environmentSubstitute( meta.getQOS() ) );
data.messageFieldIndex = getInputRowMeta().indexOfValue( environmentSubstitute( meta.getMessageField() ) );
try {
data.mqttClient = MQTTClientBuilder.builder()
.withBroker( meta.getMqttServer() )
.withTopics( Collections.singletonList( meta.getTopic() ) )
.withClientId( meta.getClientId() )
.withQos( meta.getQOS() )
.withStep( this )
.withUsername( meta.getUsername() )
.withPassword( meta.getPassword() )
.withSslConfig( meta.getSslConfig() )
.withIsSecure( meta.isUseSsl() )
.withKeepAliveInterval( meta.getKeepAliveInterval() )
.withMaxInflight( meta.getMaxInflight() )
.withConnectionTimeout( meta.getConnectionTimeout() )
.withCleanSession( meta.getCleanSession() )
.withStorageLevel( meta.getStorageLevel() )
.withServerUris( meta.getServerUris() )
.withMqttVersion( meta.getMqttVersion() )
.withAutomaticReconnect( meta.getAutomaticReconnect() )
.buildAndConnect();
} catch ( Exception e ) {
stopAll();
logError( e.toString() );
return false;
}

first = false;
}

MqttMessage mqttMessage = new MqttMessage();
try { try {
mqttMessage.setQos( Integer.parseInt( environmentSubstitute( meta.getQOS() ) ) ); client.get() // client is memoized, loaded on first use
} catch ( NumberFormatException e ) { .publish( getTopic( row ), getMessage( row ) );
throw new KettleStepException(
BaseMessages.getString( PKG, "MQTTProducer.Error.QOS", environmentSubstitute( meta.getQOS() ) ) );
}
mqttMessage.setPayload( ( row[ data.messageFieldIndex ] ).toString().getBytes( defaultCharset() ) );

try {
data.mqttClient.publish( environmentSubstitute( meta.getTopic() ), mqttMessage );


incrementLinesOutput(); incrementLinesOutput();
putRow( getInputRowMeta(), row ); // copy row to possible alternate rowset(s). putRow( getInputRowMeta(), row ); // copy row to possible alternate rowset(s).


if ( checkFeedback( getLinesRead() ) ) { if ( checkFeedback( getLinesRead() ) ) {
if ( log.isBasic() ) { if ( log.isBasic() ) {
logBasic( BaseMessages.getString( PKG, "MQTTProducer.Log.LineNumber" ) + getLinesRead() ); logBasic( getString( PKG, "MQTTProducer.Log.LineNumber" ) + getLinesRead() );
} }
} }
} catch ( MqttException e ) { } catch ( MqttException e ) {
logError( BaseMessages.getString( PKG, "MQTTProducer.Error.QOSNotSupported", meta.getQOS() ) ); logError( getString( PKG, "MQTTProducer.Error.QOSNotSupported", meta.qos ) );
logError( e.getMessage(), e ); logError( e.getMessage(), e );
setErrors( 1 ); setErrors( 1 );
stopAll(); stopAll();
return false;
} catch ( RuntimeException re ) {
stopAll();
logError( re.getMessage(), re );
return false;
} }

return true; return true;
} }


private MqttClient connectToClient() {
logDebug( "Publishing using a quality of service level of " + environmentSubstitute( meta.qos ) );
try {
return
MQTTClientBuilder.builder()
.withBroker( this.meta.mqttServer )
.withClientId( meta.clientId )
.withQos( meta.qos )
.withStep( this )
.withUsername( meta.username )
.withPassword( meta.password )
.withSslConfig( meta.getSslConfig() )
.withIsSecure( meta.useSsl )
.withKeepAliveInterval( meta.keepAliveInterval )
.withMaxInflight( meta.maxInflight )
.withConnectionTimeout( meta.connectionTimeout )
.withCleanSession( meta.cleanSession )
.withStorageLevel( meta.storageLevel )
.withServerUris( meta.serverUris )
.withMqttVersion( meta.mqttVersion )
.withAutomaticReconnect( meta.automaticReconnect )
.buildAndConnect();
} catch ( MqttException e ) {
throw new RuntimeException( e );
}
}

private MqttMessage getMessage( Object[] row ) throws KettleStepException {
MqttMessage mqttMessage = new MqttMessage();
try {
mqttMessage.setQos( Integer.parseInt( meta.qos ) );
} catch ( NumberFormatException e ) {
throw new KettleStepException(
getString( PKG, "MQTTProducer.Error.QOS", environmentSubstitute( meta.qos ) ) );
}
String fieldAsString = getFieldAsString( row, meta.messageField );
mqttMessage.setPayload( fieldAsString.getBytes( defaultCharset() ) );
return mqttMessage;
}

/**
* Retrieves the topic, either a raw string, or a field value if meta.topicInField==true
*/
private String getTopic( Object[] row ) {
String topic;
if ( meta.topicInField ) {
topic = getFieldAsString( row, meta.topic );
} else {
topic = meta.topic;
}
return topic;
}

private String getFieldAsString( Object[] row, String field ) {
int messageFieldIndex = getInputRowMeta().indexOfValue( field );
checkArgument( messageFieldIndex > -1, getString( PKG, "MQTTProducer.Error.FieldNotFound", field ) );
return ofNullable( ( row[ messageFieldIndex ] ).toString() ).orElse( "" );
}

@Override public void stopRunning( StepMetaInterface stepMetaInterface, StepDataInterface stepDataInterface ) @Override public void stopRunning( StepMetaInterface stepMetaInterface, StepDataInterface stepDataInterface )
throws KettleException { throws KettleException {
stopMqttClient(); stopMqttClient();
Expand All @@ -170,9 +192,9 @@ public boolean processRow( StepMetaInterface smi, StepDataInterface sdi ) throws
private void stopMqttClient() { private void stopMqttClient() {
try { try {
// Check if connected so subsequent calls does not produce an already stopped exception // Check if connected so subsequent calls does not produce an already stopped exception
if ( null != data.mqttClient && data.mqttClient.isConnected() ) { if ( null != client.get() && client.get().isConnected() ) {
data.mqttClient.disconnect(); client.get().disconnect();
data.mqttClient.close(); client.get().close();
} }
} catch ( MqttException e ) { } catch ( MqttException e ) {
logError( e.getMessage() ); logError( e.getMessage() );
Expand Down

This file was deleted.

0 comments on commit 037e609

Please sign in to comment.