Skip to content

Commit

Permalink
[BACKLOG-18435] Use RemoteSource for receiving Daemon Messages and se…
Browse files Browse the repository at this point in the history
…tting PDI for Error Messages
  • Loading branch information
Francisco Luís Brinó Câmara committed Aug 23, 2017
1 parent 8620206 commit fecfc89
Show file tree
Hide file tree
Showing 21 changed files with 573 additions and 810 deletions.
Expand Up @@ -26,6 +26,7 @@




import org.pentaho.di.engine.api.model.LogicalModelElement; import org.pentaho.di.engine.api.model.LogicalModelElement;
import org.pentaho.di.engine.api.remote.RemoteSource;


import java.io.Serializable; import java.io.Serializable;


Expand Down Expand Up @@ -56,16 +57,24 @@ public BaseEvent( S source, D data ) {
} }


BaseEvent<?, ?> baseEvent = (BaseEvent<?, ?>) o; BaseEvent<?, ?> baseEvent = (BaseEvent<?, ?>) o;

if ( !source.getId().equals( baseEvent.source.getId() ) ) { if ( !source.getId().equals( baseEvent.source.getId() ) ) {
return false; return false;
} }
return data.equals( baseEvent.data ); return data.equals( baseEvent.data );
} }


@Override public int hashCode() { @Override public int hashCode() {
int result = source.getId().hashCode(); int result = 0;
result = 31 * result + data.hashCode(); if ( source instanceof RemoteSource ) {
result = this.getClass().getName().hashCode();
result = 31 * result + ( ( (RemoteSource) source ).getModelType() != null
? ( (RemoteSource) source ).getModelType().toString().hashCode() : 0 );
result =
31 * result + ( ( (RemoteSource) source ).getId() != null ? ( (RemoteSource) source ).getId().hashCode() : 0 );
} else {
result = source.getId().hashCode();
result = 31 * result + data.hashCode();
}
return result; return result;
} }
} }
Expand Down
@@ -0,0 +1,43 @@
/*
* *****************************************************************************
*
* Pentaho Data Integration
*
* Copyright (C) 2002-2017 by Pentaho : http://www.pentaho.com
*
* *******************************************************************************
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* *****************************************************************************
*
*/

package org.pentaho.di.engine.api.events;

import org.pentaho.di.engine.api.model.LogicalModelElement;
import org.pentaho.di.engine.api.reporting.LogEntry;

/**
* LogEvents encapsulate a LogEntry and contain a reference to the origin Element from the Logical Model.
* <p>
* Created by fcamara on 8/23/2017.
*/
public class ErrorEvent<S extends LogicalModelElement> extends BaseEvent<S, LogEntry> {

private static final long serialVersionUID = 6308895090845470781L;

public ErrorEvent( S source, LogEntry log ) {
super( source, log );
}

}
Expand Up @@ -21,19 +21,12 @@
* *
* ***************************************************************************** * *****************************************************************************
*/ */
package org.pentaho.di.trans.ael.websocket.event;


/** package org.pentaho.di.engine.api.model;
* Created by fcamara on 8/17/17.
*/ public enum ModelType {
public enum MessageEventType { HOP,
TRANSFORMATION_STATUS, OPERATION,
TRANSFORMATION_LOG, ROW,
OPERATION_STATUS, TRANSFORMATION
OPERATION_LOG,
METRICS,
COMPLETE,
ERROR,
STOP,
ROWS
} }
Expand Up @@ -25,6 +25,7 @@
package org.pentaho.di.engine.api.remote; package org.pentaho.di.engine.api.remote;


import org.pentaho.di.engine.api.model.LogicalModelElement; import org.pentaho.di.engine.api.model.LogicalModelElement;
import org.pentaho.di.engine.api.model.ModelType;


import java.io.Serializable; import java.io.Serializable;


Expand All @@ -40,12 +41,28 @@
public final class RemoteSource implements LogicalModelElement, Serializable { public final class RemoteSource implements LogicalModelElement, Serializable {
private static final long serialVersionUID = -8344589338390125137L; private static final long serialVersionUID = -8344589338390125137L;
private final String id; private final String id;
private final ModelType modelType;


public RemoteSource( String id ) { public RemoteSource( String id ) {
this.id = id; this.id = id;
this.modelType = null;
}

public RemoteSource( ModelType modelType ) {
this.modelType = modelType;
this.id = null;
}

public RemoteSource( ModelType modelType, String id ) {
this.modelType = modelType;
this.id = id;
} }


@Override public String getId() { @Override public String getId() {
return id; return id;
} }

public ModelType getModelType() {
return modelType;
}
} }
Expand Up @@ -24,23 +24,30 @@
package org.pentaho.di.engine.api.remote; package org.pentaho.di.engine.api.remote;


import org.junit.Test; import org.junit.Test;
import org.pentaho.di.engine.api.events.ErrorEvent;
import org.pentaho.di.engine.api.events.MetricsEvent; import org.pentaho.di.engine.api.events.MetricsEvent;
import org.pentaho.di.engine.api.events.StatusEvent;
import org.pentaho.di.engine.api.model.ModelType;
import org.pentaho.di.engine.api.model.Operation; import org.pentaho.di.engine.api.model.Operation;
import org.pentaho.di.engine.api.reporting.Status;
import org.pentaho.di.engine.model.Transformation; import org.pentaho.di.engine.model.Transformation;
import org.pentaho.di.engine.api.reporting.LogEntry; import org.pentaho.di.engine.api.reporting.LogEntry;
import org.pentaho.di.engine.api.reporting.LogLevel; import org.pentaho.di.engine.api.reporting.LogLevel;
import org.pentaho.di.engine.api.reporting.Metrics; import org.pentaho.di.engine.api.reporting.Metrics;


import java.io.Serializable; import java.io.Serializable;
import java.security.Principal; import java.security.Principal;
import java.util.Date;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet; import java.util.HashSet;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.UUID; import java.util.UUID;


import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.equalTo;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertThat; import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;


/** /**
* Tests MessageEncoder & MessageDecoder classes * Tests MessageEncoder & MessageDecoder classes
Expand Down Expand Up @@ -94,6 +101,74 @@ public void testStopMessage() throws Exception {
assertThat( oExpected.getReasonPhrase(), equalTo( oActual.getReasonPhrase() ) ); assertThat( oExpected.getReasonPhrase(), equalTo( oActual.getReasonPhrase() ) );
} }


@Test
public void testOperationRemoteSource() throws Exception {
RemoteSource step = new RemoteSource( ModelType.OPERATION, "step" );
Metrics metrics = new Metrics( 1, 2, 3, 4 );
Message metricEvent = new MetricsEvent<>( step, metrics );
String sMessage = encoder.encode( metricEvent );
Message decodeMessage = decoder.decode( sMessage );


assertTrue(
( (RemoteSource) step ).getModelType() == ( (RemoteSource) ( (MetricsEvent) decodeMessage ).getSource() )
.getModelType() );
assertTrue(
( (RemoteSource) step ).getId()
.equals( ( (RemoteSource) ( (MetricsEvent) decodeMessage ).getSource() ).getId() ) );
}

@Test
public void testTransformationRemoteSource() throws Exception {
RemoteSource step = new RemoteSource( ModelType.TRANSFORMATION );
Metrics metrics = new Metrics( 1, 2, 3, 4 );
Message statusEvent = new StatusEvent<>( step, Status.FAILED );
String sMessage = encoder.encode( statusEvent );
Message decodeMessage = decoder.decode( sMessage );


assertTrue(
( (RemoteSource) step ).getModelType() == ( (RemoteSource) ( (StatusEvent) decodeMessage ).getSource() )
.getModelType() );
assertNull( ( (RemoteSource) ( (StatusEvent) decodeMessage ).getSource() ).getId() );
}

@Test
public void testRemoteSource() throws Exception {
RemoteSource remoteSource = new RemoteSource( "remoteId" );

assertNull( remoteSource.getModelType() );
assertTrue( "remoteId".equals( remoteSource.getId() ) );
}

@Test
public void testErrorEvent() throws Exception {
HashMap<String, String> hashMap = new HashMap<>();
hashMap.put( "key", "value" );
LogEntry logEntry = new LogEntry.LogEntryBuilder().withMessage( "log message" )
.withLogLevel( LogLevel.DEBUG )
.withTimestamp( new Date() )
.withExtras( hashMap )
.build();

ErrorEvent errorEvent = new ErrorEvent( new RemoteSource( ModelType.TRANSFORMATION ), logEntry );
String sMessage = encoder.encode( errorEvent );
Message decodeMessage = decoder.decode( sMessage );


assertTrue(
( (RemoteSource) errorEvent.getSource() ).getModelType() == ( (RemoteSource) ( (ErrorEvent) decodeMessage )
.getSource() )
.getModelType() );

LogEntry decodeLogEntry = (LogEntry) ( (ErrorEvent) decodeMessage ).getData();

assertTrue( logEntry.getMessage().equals( decodeLogEntry.getMessage() ) );
assertTrue( logEntry.getLogLogLevel().equals( decodeLogEntry.getLogLogLevel() ) );
assertTrue( logEntry.getTimestamp().getTime() == decodeLogEntry.getTimestamp().getTime() );
assertTrue( logEntry.getExtras().hashCode() == decodeLogEntry.getExtras().hashCode() );
}

private MetricsEvent metricsEvent() { private MetricsEvent metricsEvent() {
Operation step1 = new TestOperation( "step2" ); Operation step1 = new TestOperation( "step2" );
Operation step2 = new TestOperation( "step2" ); Operation step2 = new TestOperation( "step2" );
Expand Down
Expand Up @@ -29,7 +29,6 @@
import org.pentaho.di.engine.api.remote.MessageDecoder; import org.pentaho.di.engine.api.remote.MessageDecoder;
import org.pentaho.di.engine.api.remote.MessageEncoder; import org.pentaho.di.engine.api.remote.MessageEncoder;
import org.pentaho.di.engine.api.remote.StopMessage; import org.pentaho.di.engine.api.remote.StopMessage;
import org.pentaho.di.trans.ael.websocket.event.MessageEventService;


import javax.websocket.ClientEndpoint; import javax.websocket.ClientEndpoint;
import javax.websocket.ContainerProvider; import javax.websocket.ContainerProvider;
Expand All @@ -40,6 +39,7 @@
import javax.websocket.Session; import javax.websocket.Session;
import javax.websocket.WebSocketContainer; import javax.websocket.WebSocketContainer;


import java.io.IOException;
import java.net.URI; import java.net.URI;


/** /**
Expand All @@ -53,15 +53,15 @@ public class DaemonMessagesClientEndpoint {
private Session userSession = null; private Session userSession = null;


public DaemonMessagesClientEndpoint( String host, String port, boolean ssl, public DaemonMessagesClientEndpoint( String host, String port, boolean ssl,
MessageEventService messageEventService ) { MessageEventService messageEventService ) throws KettleException {
try { try {
String url = ( ssl ? PRFX_WS_SSL : PRFX_WS ) + host + ":" + port + "/execution"; String url = ( ssl ? PRFX_WS_SSL : PRFX_WS ) + host + ":" + port + "/execution";
this.messageEventService = messageEventService; this.messageEventService = messageEventService;


WebSocketContainer container = ContainerProvider.getWebSocketContainer(); WebSocketContainer container = ContainerProvider.getWebSocketContainer();
container.connectToServer( this, new URI( url ) ); container.connectToServer( this, new URI( url ) );
} catch ( Exception e ) { } catch ( Exception e ) {
throw new RuntimeException( e ); throw new KettleException( e );
} }
} }


Expand All @@ -73,6 +73,8 @@ public DaemonMessagesClientEndpoint( String host, String port, boolean ssl,
@OnOpen @OnOpen
public void onOpen( Session userSession ) { public void onOpen( Session userSession ) {
this.userSession = userSession; this.userSession = userSession;
this.userSession.setMaxTextMessageBufferSize( 500000 );
this.userSession.setMaxBinaryMessageBufferSize( 500000 );
} }


/** /**
Expand Down Expand Up @@ -101,19 +103,29 @@ public void onMessage( Message message, Session session ) throws KettleException
* *
* @param request * @param request
*/ */
public void sendMessage( ExecutionRequest request ) { public void sendMessage( ExecutionRequest request ) throws KettleException {
try { try {
this.userSession.getBasicRemote().sendObject( request ); this.userSession.getBasicRemote().sendObject( request );
} catch ( Exception e ) { } catch ( Exception e ) {
e.printStackTrace(); throw new KettleException( e );
} }
} }


public void sendMessage( StopMessage stopMessage ) { public void sendMessage( StopMessage stopMessage ) throws KettleException {
try { try {
this.userSession.getBasicRemote().sendObject( stopMessage ); this.userSession.getBasicRemote().sendObject( stopMessage );
} catch ( Exception e ) { } catch ( Exception e ) {
e.printStackTrace(); throw new KettleException( e );
}
}

public void close() throws KettleException {
try {
if ( this.userSession != null && this.userSession.isOpen() ) {
this.userSession.close();
}
} catch ( IOException e ) {
throw new KettleException( e );
} }
} }
} }

0 comments on commit fecfc89

Please sign in to comment.