Skip to content

Commit

Permalink
[BACKLOG-21800] JMS Consumer. Configurable jms destination
Browse files Browse the repository at this point in the history
Renamed JmsContextProvider to JmsProvider, since it supplies
both context and destination.
Moved type enums into JmsProvider.

https://jira.pentaho.com/browse/BACKLOG-21800
  • Loading branch information
mkambol committed Mar 16, 2018
1 parent ea398c2 commit 022d7ca
Show file tree
Hide file tree
Showing 14 changed files with 195 additions and 101 deletions.

This file was deleted.

@@ -0,0 +1,6 @@
package org.pentaho.di.trans.step.jms;

public interface JmsConstants {

Class PKG = JmsConstants.class;
}
Expand Up @@ -29,7 +29,7 @@
import org.pentaho.di.trans.step.StepDataInterface; import org.pentaho.di.trans.step.StepDataInterface;
import org.pentaho.di.trans.step.StepInterface; import org.pentaho.di.trans.step.StepInterface;
import org.pentaho.di.trans.step.StepMeta; import org.pentaho.di.trans.step.StepMeta;
import org.pentaho.di.trans.step.jms.context.JmsContextProvider; import org.pentaho.di.trans.step.jms.context.JmsProvider;


import java.util.List; import java.util.List;


Expand All @@ -38,8 +38,8 @@
description = "", categoryDescription = "Streaming" ) description = "", categoryDescription = "Streaming" )
public class JmsConsumerMeta extends JmsMeta { public class JmsConsumerMeta extends JmsMeta {


public JmsConsumerMeta( List<JmsContextProvider> jmsContextProviders ) { public JmsConsumerMeta( List<JmsProvider> jmsProviders ) {
super( jmsContextProviders ); super( jmsProviders );
} }


@SuppressWarnings( "deprecated" ) @SuppressWarnings( "deprecated" )
Expand Down
Expand Up @@ -23,8 +23,6 @@
package org.pentaho.di.trans.step.jms; package org.pentaho.di.trans.step.jms;




import com.ibm.mq.jms.MQQueue;

import javax.jms.JMSContext; import javax.jms.JMSContext;


import org.pentaho.di.core.ObjectLocationSpecificationMethod; import org.pentaho.di.core.ObjectLocationSpecificationMethod;
Expand All @@ -37,22 +35,24 @@
import org.pentaho.di.core.util.serialization.Sensitive; import org.pentaho.di.core.util.serialization.Sensitive;
import org.pentaho.di.core.variables.VariableSpace; import org.pentaho.di.core.variables.VariableSpace;
import org.pentaho.di.trans.step.StepDataInterface; import org.pentaho.di.trans.step.StepDataInterface;
import org.pentaho.di.trans.step.jms.context.JmsContextProvider; import org.pentaho.di.trans.step.jms.context.JmsProvider;
import org.pentaho.di.trans.streaming.common.BaseStreamStepMeta; import org.pentaho.di.trans.streaming.common.BaseStreamStepMeta;


import javax.jms.Destination; import javax.jms.Destination;
import javax.jms.JMSException;
import java.util.List; import java.util.List;
import java.util.Objects; import java.util.Objects;


import static org.pentaho.di.trans.step.jms.context.JmsProvider.ConnectionType.WEBSPHERE;
import static org.pentaho.di.trans.step.jms.context.JmsProvider.DestinationType.QUEUE;



@InjectionSupported ( localizationPrefix = "IBMMQConsumerMeta.Injection." ) @InjectionSupported ( localizationPrefix = "IBMMQConsumerMeta.Injection." )
public abstract class JmsMeta extends BaseStreamStepMeta { public abstract class JmsMeta extends BaseStreamStepMeta {


//TODO move these props to a container pojo that can be added to both //TODO move these props to a container pojo that can be added to both
// the consumer and producer metas, since BaseStreamStepMeta is just consumer. // the consumer and producer metas, since BaseStreamStepMeta is just consumer.


@Injection ( name = "DESTINATION" ) public String destinationName = null; @Injection ( name = "DESTINATION" ) public String destinationName = "DEV.QUEUE.1";


@Injection ( name = "URL" ) public String url = "mq://10.177.178.135:1414/QM1?channel=DEV.APP.SVRCONN"; @Injection ( name = "URL" ) public String url = "mq://10.177.178.135:1414/QM1?channel=DEV.APP.SVRCONN";


Expand All @@ -65,26 +65,19 @@ public abstract class JmsMeta extends BaseStreamStepMeta {


@Injection ( name = "USE_JNDI" ) public boolean useJndi = false; @Injection ( name = "USE_JNDI" ) public boolean useJndi = false;


@Injection ( name = "CONNECTION_TYPE" ) public String connectionType = "WEBSPHERE"; @Injection ( name = "CONNECTION_TYPE" ) public String connectionType = WEBSPHERE.name();

@Injection ( name = "DESTINATION_TYPE" ) public String destinationType = QUEUE.name();




protected final List<JmsContextProvider> jmsContextProviders; protected final List<JmsProvider> jmsProviders;


protected JmsMeta( List<JmsContextProvider> jmsContextProviders ) { protected JmsMeta( List<JmsProvider> jmsProviders ) {
super(); super();
this.jmsContextProviders = jmsContextProviders; this.jmsProviders = jmsProviders;
setSpecificationMethod( ObjectLocationSpecificationMethod.FILENAME ); setSpecificationMethod( ObjectLocationSpecificationMethod.FILENAME );
} }



public Destination getDestination() {
try {
return new MQQueue( "DEV.QUEUE.1" );
} catch ( JMSException e ) {
throw new RuntimeException( e );
}
}

@Override public RowMeta getRowMeta( String s, VariableSpace variableSpace ) throws KettleStepException { @Override public RowMeta getRowMeta( String s, VariableSpace variableSpace ) throws KettleStepException {
RowMeta rowMeta = new RowMeta(); RowMeta rowMeta = new RowMeta();
rowMeta.addValueMeta( new ValueMetaString( "message" ) ); rowMeta.addValueMeta( new ValueMetaString( "message" ) );
Expand All @@ -95,11 +88,19 @@ public Destination getDestination() {
return new GenericStepData(); return new GenericStepData();
} }


Destination getDestination() {
return getJmsProvider().getDestination( this );
}

JMSContext getJmsContext() { JMSContext getJmsContext() {
return jmsContextProviders.stream() return getJmsProvider().getContext( this );
.map( prov -> prov.get( this ) ) }

private JmsProvider getJmsProvider() {
return jmsProviders.stream()
.filter( prov -> prov.supports( JmsProvider.ConnectionType.valueOf( connectionType ) ) )
.filter( Objects::nonNull ) .filter( Objects::nonNull )
.findFirst() .findFirst()
.orElseThrow( () -> new RuntimeException( "FIXME" ) ); // TODO .orElseThrow( () -> new RuntimeException( "FIXME" ) );
} }
} }
Expand Up @@ -35,7 +35,6 @@


public class JmsStreamSource extends BlockingQueueStreamSource<List<Object>> { public class JmsStreamSource extends BlockingQueueStreamSource<List<Object>> {



private final JmsMeta meta; private final JmsMeta meta;
private JMSConsumer consumer; private JMSConsumer consumer;


Expand All @@ -45,9 +44,7 @@ protected JmsStreamSource( BaseStreamStep streamStep, JmsMeta meta ) {
} }


@Override public void open() { @Override public void open() {

consumer = meta.getJmsContext().createConsumer( meta.getDestination() );
consumer = meta.getJmsContext()
.createConsumer( meta.getDestination() );
consumer.setMessageListener( ( message ) -> { consumer.setMessageListener( ( message ) -> {
try { try {
acceptRows( singletonList( of( message.getBody( Object.class ) ) ) ); acceptRows( singletonList( of( message.getBody( Object.class ) ) ) );
Expand Down
Expand Up @@ -24,8 +24,29 @@


import org.pentaho.di.trans.step.jms.JmsMeta; import org.pentaho.di.trans.step.jms.JmsMeta;


import javax.jms.Destination;
import javax.jms.JMSContext; import javax.jms.JMSContext;


public interface JmsContextProvider {
JMSContext get( JmsMeta meta ); public interface JmsProvider {

boolean supports( ConnectionType type );

JMSContext getContext( JmsMeta meta );

Destination getDestination( JmsMeta meta );


enum ConnectionType {
WEBSPHERE,
ACTIVEMQ,
JNDI
}

enum DestinationType {
QUEUE,
TOPIC
}

} }

Expand Up @@ -24,47 +24,68 @@




import com.ibm.mq.jms.MQConnectionFactory; import com.ibm.mq.jms.MQConnectionFactory;
import com.ibm.mq.jms.MQQueue;
import com.ibm.mq.jms.MQQueueConnectionFactory;
import com.ibm.mq.jms.MQTopic;
import com.ibm.mq.jms.MQTopicConnectionFactory;
import com.ibm.msg.client.wmq.WMQConstants; import com.ibm.msg.client.wmq.WMQConstants;
import org.pentaho.di.core.variables.VariableSpace; import org.pentaho.di.core.variables.VariableSpace;
import org.pentaho.di.core.variables.Variables; import org.pentaho.di.core.variables.Variables;
import org.pentaho.di.trans.step.jms.JmsConnectionType; import org.pentaho.di.trans.step.jms.JmsConstants;
import org.pentaho.di.trans.step.jms.JmsMeta; import org.pentaho.di.trans.step.jms.JmsMeta;


import javax.jms.Destination;
import javax.jms.JMSContext; import javax.jms.JMSContext;
import javax.jms.JMSException; import javax.jms.JMSException;

import java.util.regex.Matcher; import java.util.regex.Matcher;
import java.util.regex.Pattern; import java.util.regex.Pattern;


import static com.google.common.base.Preconditions.checkNotNull;
import static java.util.regex.Pattern.compile; import static java.util.regex.Pattern.compile;
import static org.pentaho.di.trans.step.jms.JmsConnectionType.WEBSPHERE; import static org.pentaho.di.i18n.BaseMessages.getString;
import static org.pentaho.di.trans.step.jms.context.JmsProvider.ConnectionType.WEBSPHERE;
import static org.pentaho.di.trans.step.jms.context.JmsProvider.DestinationType.QUEUE;


public class WebsphereMQContextProvider implements JmsContextProvider { public class WebsphereMQProvider implements JmsProvider {


@Override public boolean supports( ConnectionType type ) {
return type == WEBSPHERE;
}


@Override public JMSContext get( JmsMeta meta ) { @Override public JMSContext getContext( JmsMeta meta ) {
if ( JmsConnectionType.valueOf( meta.connectionType ).equals( WEBSPHERE ) ) {


MQUrlResolver resolver = new MQUrlResolver( meta, new Variables() ); MQUrlResolver resolver = new MQUrlResolver( meta, new Variables() );


MQConnectionFactory mqConnectionFactory = new MQConnectionFactory(); MQConnectionFactory connFactory = isQueue( meta )
mqConnectionFactory.setHostName( resolver.host ); ? new MQQueueConnectionFactory() : new MQTopicConnectionFactory();
try {
mqConnectionFactory.setPort( resolver.port ); connFactory.setHostName( resolver.host );
mqConnectionFactory.setBrokerQueueManager( resolver.queueManager ); try {
mqConnectionFactory.setQueueManager( "QM1" ); connFactory.setPort( resolver.port );
mqConnectionFactory.setChannel( resolver.channel ); connFactory.setQueueManager( resolver.queueManager );
mqConnectionFactory.setTransportType( WMQConstants.WMQ_CM_CLIENT ); connFactory.setChannel( resolver.channel );
} catch ( JMSException e ) { connFactory.setTransportType( WMQConstants.WMQ_CM_CLIENT );
e.printStackTrace(); } catch ( JMSException e ) {
} throw new RuntimeException( e );
return mqConnectionFactory.createContext( meta.username, meta.password );
} }
return null; return connFactory.createContext( meta.username, meta.password );
}

@Override public Destination getDestination( JmsMeta meta ) {
checkNotNull( meta.destinationName, getString( JmsConstants.PKG, "JmsWebsphereMQ.DestinationNameRequired" ) );
try {
return isQueue( meta ) ? new MQQueue( meta.destinationName ) : new MQTopic( meta.destinationName );
} catch ( JMSException e ) {
throw new RuntimeException( e );
}
}

private boolean isQueue( JmsMeta meta ) {
return DestinationType.valueOf( meta.destinationType ).equals( QUEUE );
} }




class MQUrlResolver { static class MQUrlResolver {
private final JmsMeta meta; private final JmsMeta meta;
private final Pattern pattern; private final Pattern pattern;


Expand Down
Expand Up @@ -44,10 +44,10 @@


import static com.google.common.collect.ImmutableList.of; import static com.google.common.collect.ImmutableList.of;
import static org.pentaho.di.i18n.BaseMessages.getString; import static org.pentaho.di.i18n.BaseMessages.getString;
import static org.pentaho.di.trans.step.jms.JmsConstants.PKG;
import static org.pentaho.di.ui.trans.step.BaseStreamingDialog.INPUT_WIDTH; import static org.pentaho.di.ui.trans.step.BaseStreamingDialog.INPUT_WIDTH;


public class ConnectionForm { public class ConnectionForm {
private static Class<?> PKG = ConnectionForm.class;


private final Composite parentComponent; private final Composite parentComponent;
JmsConsumerMeta jmsMeta; JmsConsumerMeta jmsMeta;
Expand Down
Expand Up @@ -34,9 +34,9 @@
import org.pentaho.di.ui.core.widget.TextVar; import org.pentaho.di.ui.core.widget.TextVar;


import static org.pentaho.di.i18n.BaseMessages.getString; import static org.pentaho.di.i18n.BaseMessages.getString;
import static org.pentaho.di.trans.step.jms.JmsConstants.PKG;


public class DestinationForm { public class DestinationForm {
private static Class<?> PKG = DestinationForm.class;


private final Composite parentComponent; private final Composite parentComponent;
private final PropsUI props; private final PropsUI props;
Expand Down
Expand Up @@ -33,8 +33,9 @@
import org.pentaho.di.trans.streaming.common.BaseStreamStepMeta; import org.pentaho.di.trans.streaming.common.BaseStreamStepMeta;
import org.pentaho.di.ui.trans.step.BaseStreamingDialog; import org.pentaho.di.ui.trans.step.BaseStreamingDialog;


import static org.pentaho.di.trans.step.jms.JmsConstants.PKG;

public class JmsConsumerDialog extends BaseStreamingDialog { public class JmsConsumerDialog extends BaseStreamingDialog {
private static Class<?> PKG = JmsConsumerDialog.class;
JmsConsumerMeta jmsMeta; JmsConsumerMeta jmsMeta;
private DestinationForm destinationForm; private DestinationForm destinationForm;
private ConnectionForm connectionForm; private ConnectionForm connectionForm;
Expand Down
Expand Up @@ -25,16 +25,16 @@




<bean id="JmsConsumer" class="org.pentaho.di.trans.step.jms.JmsConsumerMeta" scope="prototype"> <bean id="JmsConsumer" class="org.pentaho.di.trans.step.jms.JmsConsumerMeta" scope="prototype">
<argument ref="jmsContextProviders"/> <argument ref="jmsProviders"/>
<pen:di-plugin type="org.pentaho.di.core.plugins.StepPluginType"/> <pen:di-plugin type="org.pentaho.di.core.plugins.StepPluginType"/>
</bean> </bean>




<reference-list id="jmsContextProviders" interface="org.pentaho.di.trans.step.jms.context.JmsContextProvider" <reference-list id="jmsProviders" interface="org.pentaho.di.trans.step.jms.context.JmsProvider"
availability="optional"/> availability="optional"/>


<service interface="org.pentaho.di.trans.step.jms.context.JmsContextProvider"> <service interface="org.pentaho.di.trans.step.jms.context.JmsProvider">
<bean class="org.pentaho.di.trans.step.jms.context.WebsphereMQContextProvider" /> <bean class="org.pentaho.di.trans.step.jms.context.WebsphereMQProvider" />
</service> </service>


</blueprint> </blueprint>
Expand Up @@ -9,3 +9,5 @@ JmsDialog.DestinationType=Destination Type
JmsDialog.Dest.Topic=Topic JmsDialog.Dest.Topic=Topic
JmsDialog.Dest.Queue=Queue JmsDialog.Dest.Queue=Queue
JmsDialog.Dest.Name=Destination name: JmsDialog.Dest.Name=Destination name:

JmsWebsphereMQ.DestinationNameRequired=Destination name must be set.
Expand Up @@ -37,10 +37,8 @@
import javax.jms.JMSException; import javax.jms.JMSException;
import javax.jms.Message; import javax.jms.Message;
import javax.jms.MessageListener; import javax.jms.MessageListener;
import java.util.Arrays;
import java.util.Collections; import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.Objects;


import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.equalTo;
import static org.junit.Assert.assertThat; import static org.junit.Assert.assertThat;
Expand Down Expand Up @@ -86,16 +84,4 @@ public void testReceiveMessage() {
} }




List<String> foo( String s ) {
return Arrays.asList( null, "foo" );
}

@Test public void test() {
System.out.println( foo( "fo" )
.stream()
.filter( Objects::nonNull )
.findFirst().get() );


}
} }

0 comments on commit 022d7ca

Please sign in to comment.