Skip to content

Commit

Permalink
[TEST] - Tests for impl-shim-pig classes
Browse files Browse the repository at this point in the history
  • Loading branch information
Bryan Rosander committed Oct 2, 2015
1 parent a127926 commit 76c673e
Show file tree
Hide file tree
Showing 10 changed files with 746 additions and 105 deletions.
1 change: 1 addition & 0 deletions impl/shim/pig/.gitignore
@@ -0,0 +1 @@
pdi-testName
@@ -0,0 +1,76 @@
/*******************************************************************************
*
* Pentaho Big Data
*
* Copyright (C) 2002-2015 by Pentaho : http://www.pentaho.com
*
*******************************************************************************
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
******************************************************************************/

package org.pentaho.big.data.impl.shim.pig;

import org.pentaho.di.core.logging.LogChannelInterface;

import java.io.PrintStream;
import java.io.PrintWriter;

/**
* An extended PrintWriter that sends output to Kettle's logging
*
* @author Mark Hall (mhall{[at]}pentaho{[dot]}com)
*/
public class KettleLoggingPrintWriter extends PrintWriter {
private final LogChannelInterface logChannelInterface;

public KettleLoggingPrintWriter( LogChannelInterface logChannelInterface ) {
this( logChannelInterface, System.out );
}

public KettleLoggingPrintWriter( LogChannelInterface logChannelInterface, PrintStream printStream ) {
super( printStream );
this.logChannelInterface = logChannelInterface;
}

@Override
public void println( String string ) {
logChannelInterface.logBasic( string );
}

@Override
public void println( Object obj ) {
println( obj.toString() );
}

@Override
public void write( String string ) {
println( string );
}

@Override
public void print( String string ) {
println( string );
}

@Override
public void print( Object obj ) {
print( obj.toString() );
}

@Override
public void close() {
flush();
}
}
Expand Up @@ -42,9 +42,15 @@ public class PigServiceFactoryLoader implements HadoopConfigurationListener {

public PigServiceFactoryLoader( BundleContext bundleContext, ShimBridgingServiceTracker shimBridgingServiceTracker )
throws ConfigurationException {
this( bundleContext, shimBridgingServiceTracker, HadoopConfigurationBootstrap.getInstance() );
}

public PigServiceFactoryLoader( BundleContext bundleContext, ShimBridgingServiceTracker shimBridgingServiceTracker,
HadoopConfigurationBootstrap hadoopConfigurationBootstrap )
throws ConfigurationException {
this.bundleContext = bundleContext;
this.shimBridgingServiceTracker = shimBridgingServiceTracker;
HadoopConfigurationBootstrap.getInstance().registerHadoopConfigurationListener( this );
hadoopConfigurationBootstrap.registerHadoopConfigurationListener( this );
}

@Override public void onConfigurationOpen( HadoopConfiguration hadoopConfiguration, boolean defaultConfiguration ) {
Expand Down
Expand Up @@ -22,28 +22,19 @@

package org.pentaho.big.data.impl.shim.pig;

import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.apache.log4j.WriterAppender;
import org.apache.commons.vfs2.FileObject;
import org.pentaho.big.data.api.cluster.NamedCluster;
import org.pentaho.bigdata.api.pig.PigResult;
import org.pentaho.bigdata.api.pig.PigService;
import org.pentaho.bigdata.api.pig.impl.PigResultImpl;
import org.pentaho.di.core.Const;
import org.pentaho.di.core.logging.KettleLogChannelAppender;
import org.pentaho.di.core.logging.Log4jFileAppender;
import org.pentaho.di.core.logging.Log4jKettleLayout;
import org.pentaho.di.core.logging.LogChannelInterface;
import org.pentaho.di.core.logging.LogLevel;
import org.pentaho.di.core.logging.LogWriter;
import org.pentaho.di.core.variables.VariableSpace;
import org.pentaho.di.i18n.BaseMessages;
import org.pentaho.hadoop.shim.api.Configuration;
import org.pentaho.hadoop.shim.spi.HadoopShim;
import org.pentaho.hadoop.shim.spi.PigShim;

import java.io.File;
import java.io.PrintWriter;
import java.net.URL;
import java.util.ArrayList;
import java.util.List;
Expand All @@ -57,11 +48,18 @@ public class PigServiceImpl implements PigService {
private final NamedCluster namedCluster;
private final PigShim pigShim;
private final HadoopShim hadoopShim;
private final WriterAppenderManager.Factory writerAppenderManagerFactory;

public PigServiceImpl( NamedCluster namedCluster, PigShim pigShim, HadoopShim hadoopShim ) {
this( namedCluster, pigShim, hadoopShim, new WriterAppenderManager.Factory() );
}

public PigServiceImpl( NamedCluster namedCluster, PigShim pigShim, HadoopShim hadoopShim,
WriterAppenderManager.Factory writerAppenderManagerFactory ) {
this.namedCluster = namedCluster;
this.pigShim = pigShim;
this.hadoopShim = hadoopShim;
this.writerAppenderManagerFactory = writerAppenderManagerFactory;
}

@Override public boolean isLocalExecutionSupported() {
Expand All @@ -72,31 +70,10 @@ public PigServiceImpl( NamedCluster namedCluster, PigShim pigShim, HadoopShim ha
public PigResult executeScript( String scriptPath, ExecutionMode executionMode, List<String> parameters, String name,
LogChannelInterface logChannelInterface, VariableSpace variableSpace,
LogLevel logLevel ) {
// Set up an appender that will send all pig log messages to Kettle's log
// via logBasic().
KettleLoggingPrintWriter klps = new KettleLoggingPrintWriter( logChannelInterface );
WriterAppender pigToKettleAppender = new WriterAppender( new Log4jKettleLayout( true ), klps );

Logger pigLogger = Logger.getLogger( "org.apache.pig" );
Level log4jLevel = getLog4jLevel( logLevel );
pigLogger.setLevel( log4jLevel );
Log4jFileAppender appender = null;
String logFileName = "pdi-" + name; //$NON-NLS-1$
LogWriter logWriter = LogWriter.getInstance();
try {
appender = LogWriter.createFileAppender( logFileName, true, false );
logWriter.addAppender( appender );
logChannelInterface.setLogLevel( logLevel );
if ( pigLogger != null ) {
pigLogger.addAppender( pigToKettleAppender );
}
} catch ( Exception e ) {
logChannelInterface.logError( BaseMessages
.getString( PKG, "JobEntryPigScriptExecutor.FailedToOpenLogFile", logFileName, e.toString() ) ); //$NON-NLS-1$
logChannelInterface.logError( Const.getStackTracker( e ) );
}

try {
FileObject appenderFile = null;
try ( WriterAppenderManager appenderManager = writerAppenderManagerFactory.create( logChannelInterface, logLevel,
name ) ) {
appenderFile = appenderManager.getFile();
Configuration configuration = hadoopShim.createConfiguration();
if ( executionMode != ExecutionMode.LOCAL ) {
List<String> configMessages = new ArrayList<String>();
Expand All @@ -123,78 +100,11 @@ public PigResult executeScript( String scriptPath, ExecutionMode executionMode,
String pigScript = pigShim.substituteParameters( scriptU, parameters );
Properties properties = new Properties();
pigShim.configure( properties, executionMode == ExecutionMode.LOCAL ? null : configuration );
return new PigResultImpl( appender == null ? null : appender.getFile(),
return new PigResultImpl( appenderFile,
pigShim.executeScript( pigScript, executionMode == ExecutionMode.LOCAL ? PigShim.ExecutionMode.LOCAL
: PigShim.ExecutionMode.MAPREDUCE, properties ), null );
} catch ( Exception e ) {
return new PigResultImpl( appender == null ? null : appender.getFile(), null, e );
} finally {
removeAppender( appender, pigToKettleAppender );
}
}

private Level getLog4jLevel( LogLevel level ) {
// KettleLogChannelAppender does not exists in Kette core, so we'll use it from kettle5-log4j-plugin.
Level log4jLevel = KettleLogChannelAppender.LOG_LEVEL_MAP.get( level );
return log4jLevel != null ? log4jLevel : Level.INFO;
}

protected void removeAppender( Log4jFileAppender appender, WriterAppender pigToKettleAppender ) {

// remove the file appender from kettle logging
if ( appender != null ) {
LogWriter.getInstance().removeAppender( appender );
appender.close();
}

Logger pigLogger = Logger.getLogger( "org.apache.pig" );
if ( pigLogger != null && pigToKettleAppender != null ) {
pigLogger.removeAppender( pigToKettleAppender );
pigToKettleAppender.close();
}
}

/**
* An extended PrintWriter that sends output to Kettle's logging
*
* @author Mark Hall (mhall{[at]}pentaho{[dot]}com)
*/
class KettleLoggingPrintWriter extends PrintWriter {
private final LogChannelInterface logChannelInterface;

public KettleLoggingPrintWriter( LogChannelInterface logChannelInterface ) {
super( System.out );
this.logChannelInterface = logChannelInterface;
}

@Override
public void println( String string ) {
logChannelInterface.logBasic( string );
}

@Override
public void println( Object obj ) {
println( obj.toString() );
}

@Override
public void write( String string ) {
println( string );
}

@Override
public void print( String string ) {
println( string );
}

@Override
public void print( Object obj ) {
print( obj.toString() );
}

@Override
public void close() {
flush();
return new PigResultImpl( appenderFile, null, e );
}
}
}
@@ -0,0 +1,111 @@
/*******************************************************************************
*
* Pentaho Big Data
*
* Copyright (C) 2002-2015 by Pentaho : http://www.pentaho.com
*
*******************************************************************************
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
******************************************************************************/

package org.pentaho.big.data.impl.shim.pig;

import org.apache.commons.vfs2.FileObject;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.apache.log4j.WriterAppender;
import org.pentaho.di.core.Const;
import org.pentaho.di.core.logging.KettleLogChannelAppender;
import org.pentaho.di.core.logging.Log4jFileAppender;
import org.pentaho.di.core.logging.Log4jKettleLayout;
import org.pentaho.di.core.logging.LogChannelInterface;
import org.pentaho.di.core.logging.LogLevel;
import org.pentaho.di.core.logging.LogWriter;
import org.pentaho.di.i18n.BaseMessages;

import java.io.Closeable;
import java.io.IOException;

/**
* Created by bryan on 10/1/15.
*/
public class WriterAppenderManager implements Closeable {
private static final Class<?> PKG = WriterAppenderManager.class;
private final Log4jFileAppender appender;
private final WriterAppender pigToKettleAppender;
private final LogWriter logWriter;

public WriterAppenderManager( LogChannelInterface logChannelInterface, LogLevel logLevel, String name ) {
this( logChannelInterface, logLevel, name, LogWriter.getInstance() );
}

public WriterAppenderManager( LogChannelInterface logChannelInterface, LogLevel logLevel, String name,
LogWriter logWriter ) {
// Set up an appender that will send all pig log messages to Kettle's log
// via logBasic().
KettleLoggingPrintWriter klps = new KettleLoggingPrintWriter( logChannelInterface );
pigToKettleAppender = new WriterAppender( new Log4jKettleLayout( true ), klps );

Logger pigLogger = Logger.getLogger( "org.apache.pig" );
Level log4jLevel = getLog4jLevel( logLevel );
pigLogger.setLevel( log4jLevel );
String logFileName = "pdi-" + name; //$NON-NLS-1$
Log4jFileAppender appender = null;
this.logWriter = logWriter;
try {
appender = LogWriter.createFileAppender( logFileName, true, false );
logWriter.addAppender( appender );
logChannelInterface.setLogLevel( logLevel );
if ( pigLogger != null ) {
pigLogger.addAppender( pigToKettleAppender );
}
} catch ( Exception e ) {
logChannelInterface.logError( BaseMessages
.getString( PKG, "JobEntryPigScriptExecutor.FailedToOpenLogFile", logFileName, e.toString() ) ); //$NON-NLS-1$
logChannelInterface.logError( Const.getStackTracker( e ) );
}
this.appender = appender;
}

private Level getLog4jLevel( LogLevel level ) {
// KettleLogChannelAppender does not exists in Kette core, so we'll use it from kettle5-log4j-plugin.
Level log4jLevel = KettleLogChannelAppender.LOG_LEVEL_MAP.get( level );
return log4jLevel != null ? log4jLevel : Level.INFO;
}

@Override public void close() throws IOException {
// remove the file appender from kettle logging
if ( appender != null ) {
logWriter.removeAppender( appender );
appender.close();
}

Logger pigLogger = Logger.getLogger( "org.apache.pig" );
if ( pigLogger != null && pigToKettleAppender != null ) {
pigLogger.removeAppender( pigToKettleAppender );
pigToKettleAppender.close();
}
}

public FileObject getFile() {
return appender == null ? null : appender.getFile();
}

public static class Factory {
public WriterAppenderManager create( LogChannelInterface logChannelInterface, LogLevel logLevel, String name ) {
return new WriterAppenderManager( logChannelInterface, logLevel, name );
}
}
}

0 comments on commit 76c673e

Please sign in to comment.