Skip to content

Commit

Permalink
Merge pull request #1074 from jgminder/develop
Browse files Browse the repository at this point in the history
[BACKLOG-30681] Pentaho MapReduce HBaseRowDecoder errors out with an …
  • Loading branch information
peterrinehart committed Nov 25, 2019
2 parents bb26b79 + c77c91e commit c4400d0
Showing 1 changed file with 13 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -123,11 +123,8 @@ public class DistributedCacheUtilImpl implements org.pentaho.hadoop.shim.api.int
* Prefix for properties we want to omit when copying to the cluster
*/
private static final String AUTH_PREFIX = "pentaho.authentication";
public static final String SYSTEM_KARAF_SYSTEM_ORG_PENTAHO_HADOOP_SHIMS =
"system/karaf/system/org/pentaho/hadoop/shims";
public static final String SYSTEM_KARAF_SYSTEM_COM_PENTAHO_HADOOP_SHIMS =
"system/karaf/system/com/pentaho/hadoop/shims";

private static final String DEFAULT_DRIVERS_DIR = "./drivers";

/**
* Creates the path to a lock file within the provided directory
Expand Down Expand Up @@ -184,8 +181,11 @@ public void installKettleEnvironment( FileObject pmrArchive, FileSystem fs, Path
out.close();

stageForCache( extracted, fs, destination, true, false );
java.nio.file.Path kettleHomeDir = Paths.get( System.getProperty( "karaf.home" ) ).getParent().getParent();
stagePentahoHadoopShims( fs, destination, kettleHomeDir );

java.nio.file.Path shimDriverInstallationDirectory =
Paths.get( System.getProperty( Const.SHIM_DRIVER_DEPLOYMENT_LOCATION, DEFAULT_DRIVERS_DIR ) );

stagePentahoHadoopShims( fs, destination, shimDriverInstallationDirectory );
stageBigDataPlugin( fs, destination, bigDataPlugin, shimIdentifier );

if ( StringUtils.isNotEmpty( additionalPlugins ) ) {
Expand All @@ -197,12 +197,12 @@ public void installKettleEnvironment( FileObject pmrArchive, FileSystem fs, Path
fs.delete( lockFile, true );
}

private Map<String, String> getAbsoluteAndRelativePathOfFilesToCopy( java.nio.file.Path dir, String childDir )
private Map<String, String> getDrivers( java.nio.file.Path dir )
throws IOException {
Map<String, String> files = new HashMap<>();

if ( dir.resolve( childDir ).toFile().exists() ) {
try ( Stream<java.nio.file.Path> input = Files.walk( dir.resolve( childDir ) ) ) {
if ( dir.toFile().exists() ) {
try ( Stream<java.nio.file.Path> input = Files.walk( dir ) ) {
files = input.filter( x -> x.toFile().isFile() )
.collect( Collectors
.toMap( java.nio.file.Path::toString, x -> x.toString().replace( dir.toString() + File.separator, "" ) ) );
Expand All @@ -211,16 +211,15 @@ private Map<String, String> getAbsoluteAndRelativePathOfFilesToCopy( java.nio.fi
return files;
}

private void stagePentahoHadoopShims( FileSystem fs, Path dest, java.nio.file.Path kettleHomeDir )
private void stagePentahoHadoopShims( FileSystem fs, Path dest, java.nio.file.Path shimDriverInstallationDirectory )
throws IOException {
Map<String, String> files =
getAbsoluteAndRelativePathOfFilesToCopy( kettleHomeDir, SYSTEM_KARAF_SYSTEM_ORG_PENTAHO_HADOOP_SHIMS );
files
.putAll( getAbsoluteAndRelativePathOfFilesToCopy( kettleHomeDir, SYSTEM_KARAF_SYSTEM_COM_PENTAHO_HADOOP_SHIMS ) );
getDrivers( shimDriverInstallationDirectory );

files.forEach( ( localPath, relativePath ) -> {
try {
stageForCache( KettleVFS.getFileObject( localPath ), fs, new Path( dest, relativePath ), true, false );
stageForCache( KettleVFS.getFileObject( localPath ), fs,
new Path( dest, "drivers" + Path.SEPARATOR + relativePath ), true, false );
} catch ( IOException | KettleFileException e ) {
logger.info( "Failed to stage the shims to distributed cache." );
}
Expand Down

0 comments on commit c4400d0

Please sign in to comment.