From 022d7ca9b4a8b5bf366ed62cfd3c9695f9bb0f07 Mon Sep 17 00:00:00 2001 From: Matt Campbell Date: Thu, 15 Mar 2018 09:04:06 -0400 Subject: [PATCH] [BACKLOG-21800] JMS Consumer. Configurable jms destination Renamed JmsContextProvider to JmsProvider, since it supplies both context and destination. Moved type enums into JmsProvider. https://jira.pentaho.com/browse/BACKLOG-21800 --- .../di/trans/step/jms/JmsConnectionType.java | 29 ------ .../di/trans/step/jms/JmsConstants.java | 6 ++ .../di/trans/step/jms/JmsConsumerMeta.java | 6 +- .../pentaho/di/trans/step/jms/JmsMeta.java | 43 ++++----- .../di/trans/step/jms/JmsStreamSource.java | 5 +- ...sContextProvider.java => JmsProvider.java} | 25 +++++- ...Provider.java => WebsphereMQProvider.java} | 63 ++++++++----- .../di/trans/step/jms/ui/ConnectionForm.java | 2 +- .../di/trans/step/jms/ui/DestinationForm.java | 2 +- .../trans/step/jms/ui/JmsConsumerDialog.java | 3 +- .../OSGI-INF/blueprint/blueprint.xml | 8 +- .../messages/messages_en_US.properties | 2 + .../trans/step/jms/JmsStreamSourceTest.java | 14 --- .../jms/context/WebsphereMQProviderTest.java | 88 +++++++++++++++++++ 14 files changed, 195 insertions(+), 101 deletions(-) delete mode 100644 plugins/streaming/impls/jms/src/main/java/org/pentaho/di/trans/step/jms/JmsConnectionType.java create mode 100644 plugins/streaming/impls/jms/src/main/java/org/pentaho/di/trans/step/jms/JmsConstants.java rename plugins/streaming/impls/jms/src/main/java/org/pentaho/di/trans/step/jms/context/{JmsContextProvider.java => JmsProvider.java} (76%) rename plugins/streaming/impls/jms/src/main/java/org/pentaho/di/trans/step/jms/context/{WebsphereMQContextProvider.java => WebsphereMQProvider.java} (56%) rename plugins/streaming/impls/jms/src/main/resources/org/pentaho/di/trans/step/jms/{ui => }/messages/messages_en_US.properties (83%) create mode 100644 plugins/streaming/impls/jms/src/test/java/org/pentaho/di/trans/step/jms/context/WebsphereMQProviderTest.java diff --git a/plugins/streaming/impls/jms/src/main/java/org/pentaho/di/trans/step/jms/JmsConnectionType.java b/plugins/streaming/impls/jms/src/main/java/org/pentaho/di/trans/step/jms/JmsConnectionType.java deleted file mode 100644 index a9d5e9d72485..000000000000 --- a/plugins/streaming/impls/jms/src/main/java/org/pentaho/di/trans/step/jms/JmsConnectionType.java +++ /dev/null @@ -1,29 +0,0 @@ -/*! ****************************************************************************** - * - * Pentaho Data Integration - * - * Copyright (C) 2002-2018 by Hitachi Vantara : http://www.pentaho.com - * - ******************************************************************************* - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - ******************************************************************************/ - -package org.pentaho.di.trans.step.jms; - -public enum JmsConnectionType { - WEBSPHERE, - ACTIVEMQ, - JNDI -} diff --git a/plugins/streaming/impls/jms/src/main/java/org/pentaho/di/trans/step/jms/JmsConstants.java b/plugins/streaming/impls/jms/src/main/java/org/pentaho/di/trans/step/jms/JmsConstants.java new file mode 100644 index 000000000000..fdc3eef1b0fe --- /dev/null +++ b/plugins/streaming/impls/jms/src/main/java/org/pentaho/di/trans/step/jms/JmsConstants.java @@ -0,0 +1,6 @@ +package org.pentaho.di.trans.step.jms; + +public interface JmsConstants { + + Class PKG = JmsConstants.class; +} diff --git a/plugins/streaming/impls/jms/src/main/java/org/pentaho/di/trans/step/jms/JmsConsumerMeta.java b/plugins/streaming/impls/jms/src/main/java/org/pentaho/di/trans/step/jms/JmsConsumerMeta.java index 2eaebbfa0055..a91e5e0ed3cb 100644 --- a/plugins/streaming/impls/jms/src/main/java/org/pentaho/di/trans/step/jms/JmsConsumerMeta.java +++ b/plugins/streaming/impls/jms/src/main/java/org/pentaho/di/trans/step/jms/JmsConsumerMeta.java @@ -29,7 +29,7 @@ import org.pentaho.di.trans.step.StepDataInterface; import org.pentaho.di.trans.step.StepInterface; import org.pentaho.di.trans.step.StepMeta; -import org.pentaho.di.trans.step.jms.context.JmsContextProvider; +import org.pentaho.di.trans.step.jms.context.JmsProvider; import java.util.List; @@ -38,8 +38,8 @@ description = "", categoryDescription = "Streaming" ) public class JmsConsumerMeta extends JmsMeta { - public JmsConsumerMeta( List jmsContextProviders ) { - super( jmsContextProviders ); + public JmsConsumerMeta( List jmsProviders ) { + super( jmsProviders ); } @SuppressWarnings( "deprecated" ) diff --git a/plugins/streaming/impls/jms/src/main/java/org/pentaho/di/trans/step/jms/JmsMeta.java b/plugins/streaming/impls/jms/src/main/java/org/pentaho/di/trans/step/jms/JmsMeta.java index 413f38613753..d01166e3aefb 100644 --- a/plugins/streaming/impls/jms/src/main/java/org/pentaho/di/trans/step/jms/JmsMeta.java +++ b/plugins/streaming/impls/jms/src/main/java/org/pentaho/di/trans/step/jms/JmsMeta.java @@ -23,8 +23,6 @@ package org.pentaho.di.trans.step.jms; -import com.ibm.mq.jms.MQQueue; - import javax.jms.JMSContext; import org.pentaho.di.core.ObjectLocationSpecificationMethod; @@ -37,14 +35,16 @@ import org.pentaho.di.core.util.serialization.Sensitive; import org.pentaho.di.core.variables.VariableSpace; import org.pentaho.di.trans.step.StepDataInterface; -import org.pentaho.di.trans.step.jms.context.JmsContextProvider; +import org.pentaho.di.trans.step.jms.context.JmsProvider; import org.pentaho.di.trans.streaming.common.BaseStreamStepMeta; import javax.jms.Destination; -import javax.jms.JMSException; import java.util.List; import java.util.Objects; +import static org.pentaho.di.trans.step.jms.context.JmsProvider.ConnectionType.WEBSPHERE; +import static org.pentaho.di.trans.step.jms.context.JmsProvider.DestinationType.QUEUE; + @InjectionSupported ( localizationPrefix = "IBMMQConsumerMeta.Injection." ) public abstract class JmsMeta extends BaseStreamStepMeta { @@ -52,7 +52,7 @@ public abstract class JmsMeta extends BaseStreamStepMeta { //TODO move these props to a container pojo that can be added to both // the consumer and producer metas, since BaseStreamStepMeta is just consumer. - @Injection ( name = "DESTINATION" ) public String destinationName = null; + @Injection ( name = "DESTINATION" ) public String destinationName = "DEV.QUEUE.1"; @Injection ( name = "URL" ) public String url = "mq://10.177.178.135:1414/QM1?channel=DEV.APP.SVRCONN"; @@ -65,26 +65,19 @@ public abstract class JmsMeta extends BaseStreamStepMeta { @Injection ( name = "USE_JNDI" ) public boolean useJndi = false; - @Injection ( name = "CONNECTION_TYPE" ) public String connectionType = "WEBSPHERE"; + @Injection ( name = "CONNECTION_TYPE" ) public String connectionType = WEBSPHERE.name(); + + @Injection ( name = "DESTINATION_TYPE" ) public String destinationType = QUEUE.name(); - protected final List jmsContextProviders; + protected final List jmsProviders; - protected JmsMeta( List jmsContextProviders ) { + protected JmsMeta( List jmsProviders ) { super(); - this.jmsContextProviders = jmsContextProviders; + this.jmsProviders = jmsProviders; setSpecificationMethod( ObjectLocationSpecificationMethod.FILENAME ); } - - public Destination getDestination() { - try { - return new MQQueue( "DEV.QUEUE.1" ); - } catch ( JMSException e ) { - throw new RuntimeException( e ); - } - } - @Override public RowMeta getRowMeta( String s, VariableSpace variableSpace ) throws KettleStepException { RowMeta rowMeta = new RowMeta(); rowMeta.addValueMeta( new ValueMetaString( "message" ) ); @@ -95,11 +88,19 @@ public Destination getDestination() { return new GenericStepData(); } + Destination getDestination() { + return getJmsProvider().getDestination( this ); + } + JMSContext getJmsContext() { - return jmsContextProviders.stream() - .map( prov -> prov.get( this ) ) + return getJmsProvider().getContext( this ); + } + + private JmsProvider getJmsProvider() { + return jmsProviders.stream() + .filter( prov -> prov.supports( JmsProvider.ConnectionType.valueOf( connectionType ) ) ) .filter( Objects::nonNull ) .findFirst() - .orElseThrow( () -> new RuntimeException( "FIXME" ) ); // TODO + .orElseThrow( () -> new RuntimeException( "FIXME" ) ); } } diff --git a/plugins/streaming/impls/jms/src/main/java/org/pentaho/di/trans/step/jms/JmsStreamSource.java b/plugins/streaming/impls/jms/src/main/java/org/pentaho/di/trans/step/jms/JmsStreamSource.java index 4f43aad3c85b..1f2a1b6166a8 100644 --- a/plugins/streaming/impls/jms/src/main/java/org/pentaho/di/trans/step/jms/JmsStreamSource.java +++ b/plugins/streaming/impls/jms/src/main/java/org/pentaho/di/trans/step/jms/JmsStreamSource.java @@ -35,7 +35,6 @@ public class JmsStreamSource extends BlockingQueueStreamSource> { - private final JmsMeta meta; private JMSConsumer consumer; @@ -45,9 +44,7 @@ protected JmsStreamSource( BaseStreamStep streamStep, JmsMeta meta ) { } @Override public void open() { - - consumer = meta.getJmsContext() - .createConsumer( meta.getDestination() ); + consumer = meta.getJmsContext().createConsumer( meta.getDestination() ); consumer.setMessageListener( ( message ) -> { try { acceptRows( singletonList( of( message.getBody( Object.class ) ) ) ); diff --git a/plugins/streaming/impls/jms/src/main/java/org/pentaho/di/trans/step/jms/context/JmsContextProvider.java b/plugins/streaming/impls/jms/src/main/java/org/pentaho/di/trans/step/jms/context/JmsProvider.java similarity index 76% rename from plugins/streaming/impls/jms/src/main/java/org/pentaho/di/trans/step/jms/context/JmsContextProvider.java rename to plugins/streaming/impls/jms/src/main/java/org/pentaho/di/trans/step/jms/context/JmsProvider.java index 88a9f8291633..9fe503e9cfa7 100644 --- a/plugins/streaming/impls/jms/src/main/java/org/pentaho/di/trans/step/jms/context/JmsContextProvider.java +++ b/plugins/streaming/impls/jms/src/main/java/org/pentaho/di/trans/step/jms/context/JmsProvider.java @@ -24,8 +24,29 @@ import org.pentaho.di.trans.step.jms.JmsMeta; +import javax.jms.Destination; import javax.jms.JMSContext; -public interface JmsContextProvider { - JMSContext get( JmsMeta meta ); + +public interface JmsProvider { + + boolean supports( ConnectionType type ); + + JMSContext getContext( JmsMeta meta ); + + Destination getDestination( JmsMeta meta ); + + + enum ConnectionType { + WEBSPHERE, + ACTIVEMQ, + JNDI + } + + enum DestinationType { + QUEUE, + TOPIC + } + } + diff --git a/plugins/streaming/impls/jms/src/main/java/org/pentaho/di/trans/step/jms/context/WebsphereMQContextProvider.java b/plugins/streaming/impls/jms/src/main/java/org/pentaho/di/trans/step/jms/context/WebsphereMQProvider.java similarity index 56% rename from plugins/streaming/impls/jms/src/main/java/org/pentaho/di/trans/step/jms/context/WebsphereMQContextProvider.java rename to plugins/streaming/impls/jms/src/main/java/org/pentaho/di/trans/step/jms/context/WebsphereMQProvider.java index 238e07049bc5..2c3875486831 100644 --- a/plugins/streaming/impls/jms/src/main/java/org/pentaho/di/trans/step/jms/context/WebsphereMQContextProvider.java +++ b/plugins/streaming/impls/jms/src/main/java/org/pentaho/di/trans/step/jms/context/WebsphereMQProvider.java @@ -24,47 +24,68 @@ import com.ibm.mq.jms.MQConnectionFactory; +import com.ibm.mq.jms.MQQueue; +import com.ibm.mq.jms.MQQueueConnectionFactory; +import com.ibm.mq.jms.MQTopic; +import com.ibm.mq.jms.MQTopicConnectionFactory; import com.ibm.msg.client.wmq.WMQConstants; import org.pentaho.di.core.variables.VariableSpace; import org.pentaho.di.core.variables.Variables; -import org.pentaho.di.trans.step.jms.JmsConnectionType; +import org.pentaho.di.trans.step.jms.JmsConstants; import org.pentaho.di.trans.step.jms.JmsMeta; +import javax.jms.Destination; import javax.jms.JMSContext; import javax.jms.JMSException; - import java.util.regex.Matcher; import java.util.regex.Pattern; +import static com.google.common.base.Preconditions.checkNotNull; import static java.util.regex.Pattern.compile; -import static org.pentaho.di.trans.step.jms.JmsConnectionType.WEBSPHERE; +import static org.pentaho.di.i18n.BaseMessages.getString; +import static org.pentaho.di.trans.step.jms.context.JmsProvider.ConnectionType.WEBSPHERE; +import static org.pentaho.di.trans.step.jms.context.JmsProvider.DestinationType.QUEUE; -public class WebsphereMQContextProvider implements JmsContextProvider { +public class WebsphereMQProvider implements JmsProvider { + @Override public boolean supports( ConnectionType type ) { + return type == WEBSPHERE; + } - @Override public JMSContext get( JmsMeta meta ) { - if ( JmsConnectionType.valueOf( meta.connectionType ).equals( WEBSPHERE ) ) { + @Override public JMSContext getContext( JmsMeta meta ) { - MQUrlResolver resolver = new MQUrlResolver( meta, new Variables() ); + MQUrlResolver resolver = new MQUrlResolver( meta, new Variables() ); - MQConnectionFactory mqConnectionFactory = new MQConnectionFactory(); - mqConnectionFactory.setHostName( resolver.host ); - try { - mqConnectionFactory.setPort( resolver.port ); - mqConnectionFactory.setBrokerQueueManager( resolver.queueManager ); - mqConnectionFactory.setQueueManager( "QM1" ); - mqConnectionFactory.setChannel( resolver.channel ); - mqConnectionFactory.setTransportType( WMQConstants.WMQ_CM_CLIENT ); - } catch ( JMSException e ) { - e.printStackTrace(); - } - return mqConnectionFactory.createContext( meta.username, meta.password ); + MQConnectionFactory connFactory = isQueue( meta ) + ? new MQQueueConnectionFactory() : new MQTopicConnectionFactory(); + + connFactory.setHostName( resolver.host ); + try { + connFactory.setPort( resolver.port ); + connFactory.setQueueManager( resolver.queueManager ); + connFactory.setChannel( resolver.channel ); + connFactory.setTransportType( WMQConstants.WMQ_CM_CLIENT ); + } catch ( JMSException e ) { + throw new RuntimeException( e ); } - return null; + return connFactory.createContext( meta.username, meta.password ); + } + + @Override public Destination getDestination( JmsMeta meta ) { + checkNotNull( meta.destinationName, getString( JmsConstants.PKG, "JmsWebsphereMQ.DestinationNameRequired" ) ); + try { + return isQueue( meta ) ? new MQQueue( meta.destinationName ) : new MQTopic( meta.destinationName ); + } catch ( JMSException e ) { + throw new RuntimeException( e ); + } + } + + private boolean isQueue( JmsMeta meta ) { + return DestinationType.valueOf( meta.destinationType ).equals( QUEUE ); } - class MQUrlResolver { + static class MQUrlResolver { private final JmsMeta meta; private final Pattern pattern; diff --git a/plugins/streaming/impls/jms/src/main/java/org/pentaho/di/trans/step/jms/ui/ConnectionForm.java b/plugins/streaming/impls/jms/src/main/java/org/pentaho/di/trans/step/jms/ui/ConnectionForm.java index 4f44f0a1e917..d4bae300dca8 100644 --- a/plugins/streaming/impls/jms/src/main/java/org/pentaho/di/trans/step/jms/ui/ConnectionForm.java +++ b/plugins/streaming/impls/jms/src/main/java/org/pentaho/di/trans/step/jms/ui/ConnectionForm.java @@ -44,10 +44,10 @@ import static com.google.common.collect.ImmutableList.of; import static org.pentaho.di.i18n.BaseMessages.getString; +import static org.pentaho.di.trans.step.jms.JmsConstants.PKG; import static org.pentaho.di.ui.trans.step.BaseStreamingDialog.INPUT_WIDTH; public class ConnectionForm { - private static Class PKG = ConnectionForm.class; private final Composite parentComponent; JmsConsumerMeta jmsMeta; diff --git a/plugins/streaming/impls/jms/src/main/java/org/pentaho/di/trans/step/jms/ui/DestinationForm.java b/plugins/streaming/impls/jms/src/main/java/org/pentaho/di/trans/step/jms/ui/DestinationForm.java index ae09a9d4a217..d0775aa1fc1c 100644 --- a/plugins/streaming/impls/jms/src/main/java/org/pentaho/di/trans/step/jms/ui/DestinationForm.java +++ b/plugins/streaming/impls/jms/src/main/java/org/pentaho/di/trans/step/jms/ui/DestinationForm.java @@ -34,9 +34,9 @@ import org.pentaho.di.ui.core.widget.TextVar; import static org.pentaho.di.i18n.BaseMessages.getString; +import static org.pentaho.di.trans.step.jms.JmsConstants.PKG; public class DestinationForm { - private static Class PKG = DestinationForm.class; private final Composite parentComponent; private final PropsUI props; diff --git a/plugins/streaming/impls/jms/src/main/java/org/pentaho/di/trans/step/jms/ui/JmsConsumerDialog.java b/plugins/streaming/impls/jms/src/main/java/org/pentaho/di/trans/step/jms/ui/JmsConsumerDialog.java index dcd35ff3250e..d3057ed5fc15 100644 --- a/plugins/streaming/impls/jms/src/main/java/org/pentaho/di/trans/step/jms/ui/JmsConsumerDialog.java +++ b/plugins/streaming/impls/jms/src/main/java/org/pentaho/di/trans/step/jms/ui/JmsConsumerDialog.java @@ -33,8 +33,9 @@ import org.pentaho.di.trans.streaming.common.BaseStreamStepMeta; import org.pentaho.di.ui.trans.step.BaseStreamingDialog; +import static org.pentaho.di.trans.step.jms.JmsConstants.PKG; + public class JmsConsumerDialog extends BaseStreamingDialog { - private static Class PKG = JmsConsumerDialog.class; JmsConsumerMeta jmsMeta; private DestinationForm destinationForm; private ConnectionForm connectionForm; diff --git a/plugins/streaming/impls/jms/src/main/resources/OSGI-INF/blueprint/blueprint.xml b/plugins/streaming/impls/jms/src/main/resources/OSGI-INF/blueprint/blueprint.xml index c7400d4e8423..347726b5efc2 100644 --- a/plugins/streaming/impls/jms/src/main/resources/OSGI-INF/blueprint/blueprint.xml +++ b/plugins/streaming/impls/jms/src/main/resources/OSGI-INF/blueprint/blueprint.xml @@ -25,16 +25,16 @@ - + - - - + + \ No newline at end of file diff --git a/plugins/streaming/impls/jms/src/main/resources/org/pentaho/di/trans/step/jms/ui/messages/messages_en_US.properties b/plugins/streaming/impls/jms/src/main/resources/org/pentaho/di/trans/step/jms/messages/messages_en_US.properties similarity index 83% rename from plugins/streaming/impls/jms/src/main/resources/org/pentaho/di/trans/step/jms/ui/messages/messages_en_US.properties rename to plugins/streaming/impls/jms/src/main/resources/org/pentaho/di/trans/step/jms/messages/messages_en_US.properties index 9e3cd0106c33..164b062a74b7 100644 --- a/plugins/streaming/impls/jms/src/main/resources/org/pentaho/di/trans/step/jms/ui/messages/messages_en_US.properties +++ b/plugins/streaming/impls/jms/src/main/resources/org/pentaho/di/trans/step/jms/messages/messages_en_US.properties @@ -9,3 +9,5 @@ JmsDialog.DestinationType=Destination Type JmsDialog.Dest.Topic=Topic JmsDialog.Dest.Queue=Queue JmsDialog.Dest.Name=Destination name: + +JmsWebsphereMQ.DestinationNameRequired=Destination name must be set. diff --git a/plugins/streaming/impls/jms/src/test/java/org/pentaho/di/trans/step/jms/JmsStreamSourceTest.java b/plugins/streaming/impls/jms/src/test/java/org/pentaho/di/trans/step/jms/JmsStreamSourceTest.java index 2b4ef28cf722..cc32225f7bca 100644 --- a/plugins/streaming/impls/jms/src/test/java/org/pentaho/di/trans/step/jms/JmsStreamSourceTest.java +++ b/plugins/streaming/impls/jms/src/test/java/org/pentaho/di/trans/step/jms/JmsStreamSourceTest.java @@ -37,10 +37,8 @@ import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageListener; -import java.util.Arrays; import java.util.Collections; import java.util.List; -import java.util.Objects; import static org.hamcrest.CoreMatchers.equalTo; import static org.junit.Assert.assertThat; @@ -86,16 +84,4 @@ public void testReceiveMessage() { } - List foo( String s ) { - return Arrays.asList( null, "foo" ); - } - - @Test public void test() { - System.out.println( foo( "fo" ) - .stream() - .filter( Objects::nonNull ) - .findFirst().get() ); - - - } } diff --git a/plugins/streaming/impls/jms/src/test/java/org/pentaho/di/trans/step/jms/context/WebsphereMQProviderTest.java b/plugins/streaming/impls/jms/src/test/java/org/pentaho/di/trans/step/jms/context/WebsphereMQProviderTest.java new file mode 100644 index 000000000000..a99dff37dc1e --- /dev/null +++ b/plugins/streaming/impls/jms/src/test/java/org/pentaho/di/trans/step/jms/context/WebsphereMQProviderTest.java @@ -0,0 +1,88 @@ +/*! ****************************************************************************** + * + * Pentaho Data Integration + * + * Copyright (C) 2002-2018 by Hitachi Vantara : http://www.pentaho.com + * + ******************************************************************************* + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + ******************************************************************************/ + +package org.pentaho.di.trans.step.jms.context; + + +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.runners.MockitoJUnitRunner; +import org.pentaho.di.trans.step.jms.JmsConsumerMeta; + +import javax.jms.Destination; +import javax.jms.Queue; +import javax.jms.Topic; + +import static junit.framework.TestCase.assertFalse; +import static junit.framework.TestCase.assertTrue; +import static org.junit.Assert.fail; +import static org.pentaho.di.trans.step.jms.context.JmsProvider.ConnectionType.ACTIVEMQ; +import static org.pentaho.di.trans.step.jms.context.JmsProvider.ConnectionType.JNDI; +import static org.pentaho.di.trans.step.jms.context.JmsProvider.ConnectionType.WEBSPHERE; +import static org.pentaho.di.trans.step.jms.context.JmsProvider.DestinationType.QUEUE; +import static org.pentaho.di.trans.step.jms.context.JmsProvider.DestinationType.TOPIC; + +@RunWith ( MockitoJUnitRunner.class ) +public class WebsphereMQProviderTest { + + JmsProvider jmsProvider = new WebsphereMQProvider(); + @Mock JmsConsumerMeta meta; + + + @Test + public void onlySupportsWebsphere() { + assertTrue( jmsProvider.supports( WEBSPHERE ) ); + assertFalse( jmsProvider.supports( ACTIVEMQ ) ); + assertFalse( jmsProvider.supports( JNDI ) ); + } + + @Test + public void getQueueDestination() { + meta.destinationType = QUEUE.name(); + meta.destinationName = "somename"; + Destination dest = jmsProvider.getDestination( meta ); + assertTrue( dest instanceof Queue ); + } + + @Test + public void getTopicDestination() { + meta.destinationType = TOPIC.name(); + meta.destinationName = "somename"; + Destination dest = jmsProvider.getDestination( meta ); + assertTrue( dest instanceof Topic ); + } + + @Test + public void noDestinationNameSetCausesError() { + meta.destinationType = QUEUE.name(); + meta.destinationName = null; + + try { + jmsProvider.getDestination( meta ); + fail(); + } catch ( Exception e ) { + assertTrue( e.getMessage().contains( "Destination name must be set." ) ); + } + + } +}