Skip to content

Commit

Permalink
[BISERVER-12904] Adding new IRepositoryFactory interface and implemen…
Browse files Browse the repository at this point in the history
…tations for obtaining PDI Repository instances. Implementation utilizing ICacheManager put in place to prevent new Repository instances from being created for every call. Converted PDIImportUtil to use these new classes.
  • Loading branch information
pentaho-nbaker committed Nov 6, 2015
1 parent 7f7a0cf commit 1f167c1
Show file tree
Hide file tree
Showing 5 changed files with 301 additions and 74 deletions.
4 changes: 3 additions & 1 deletion plugins/pdi-pur-plugin/ivy.xml
Expand Up @@ -56,7 +56,7 @@
<!-- test dependencies -->
<dependency org="org.springframework.security" name="spring-security-core" rev="2.0.5.RELEASE"
transitive="false" />
<dependency org="junit" name="junit" rev="4.5" conf="test->default" />
<dependency org="junit" name="junit" rev="4.11" conf="test->default" />
<dependency org="commons-codec" name="commons-codec" rev="1.3" conf="test->default" />
<dependency org="commons-logging" name="commons-logging" rev="1.1" conf="test->default" />
<dependency org="commons-digester" name="commons-digester" rev="1.8" conf="test->default" />
Expand Down Expand Up @@ -117,5 +117,7 @@
<dependency org="org.jvnet.ws.wadl" name="wadl-core" rev="1.1.6" conf="wadl2java->default" />
<dependency org="org.jvnet.ws.wadl" name="wadl-ant" rev="1.1.6" conf="wadl2java->default" />

<dependency org="org.hamcrest" name="hamcrest-all" rev="1.3" conf="test->default"/>

</dependencies>
</ivy-module>
Expand Up @@ -40,97 +40,32 @@
import org.apache.commons.lang.StringUtils;
import org.pentaho.di.core.exception.KettleException;
import org.pentaho.di.core.exception.KettleSecurityException;
import org.pentaho.di.core.plugins.PluginRegistry;
import org.pentaho.di.core.plugins.RepositoryPluginType;
import org.pentaho.di.repository.RepositoriesMeta;
import org.pentaho.di.repository.Repository;
import org.pentaho.di.repository.RepositoryMeta;
import org.pentaho.di.repository.RepositoryObjectType;
import org.pentaho.di.repository.utils.IRepositoryFactory;
import org.pentaho.platform.api.engine.ActionExecutionException;
import org.pentaho.platform.engine.core.system.PentahoSessionHolder;
import org.pentaho.platform.engine.core.system.PentahoSystem;
import org.w3c.dom.Document;
import org.xml.sax.SAXException;

public class PDIImportUtil {

private static final String SINGLE_DI_SERVER_INSTANCE = "singleDiServerInstance"; //$NON-NLS-1$
private static IRepositoryFactory repositoryFactory = new IRepositoryFactory.CachingRepositoryFactory();

/**
* Connects to the PDI repository
*
* @param logWriter
*
* @param repositoryName
* @return
* @throws KettleException
* @throws KettleSecurityException
* @throws ActionExecutionException
*/
public static Repository connectToRepository( String repositoryName ) throws KettleException {
return repositoryFactory.connect( repositoryName );
}

RepositoriesMeta repositoriesMeta = new RepositoriesMeta();
boolean singleDiServerInstance =
"true".equals( PentahoSystem.getSystemSetting( SINGLE_DI_SERVER_INSTANCE, "true" ) ); //$NON-NLS-1$ //$NON-NLS-2$

try {
if ( singleDiServerInstance ) {

// only load a default enterprise repository. If this option is set, then you cannot load
// transformations or jobs from anywhere but the local server.

String repositoriesXml =
"<?xml version=\"1.0\" encoding=\"UTF-8\"?><repositories>" //$NON-NLS-1$
+ "<repository><id>PentahoEnterpriseRepository</id>" //$NON-NLS-1$
+ "<name>" + SINGLE_DI_SERVER_INSTANCE + "</name>" //$NON-NLS-1$ //$NON-NLS-2$
+ "<description>" + SINGLE_DI_SERVER_INSTANCE + "</description>" //$NON-NLS-1$ //$NON-NLS-2$
+ "<repository_location_url>" + PentahoSystem.getApplicationContext().getFullyQualifiedServerURL() + "</repository_location_url>" //$NON-NLS-1$ //$NON-NLS-2$
+ "<version_comment_mandatory>N</version_comment_mandatory>" //$NON-NLS-1$
+ "</repository>" //$NON-NLS-1$
+ "</repositories>"; //$NON-NLS-1$

ByteArrayInputStream sbis = new ByteArrayInputStream( repositoriesXml.getBytes( "UTF8" ) );
repositoriesMeta.readDataFromInputStream( sbis );
} else {
// TODO: add support for specified repositories.xml files...
repositoriesMeta.readData(); // Read from the default $HOME/.kettle/repositories.xml file.
}
} catch ( Exception e ) {
throw new KettleException( "Meta repository not populated", e ); //$NON-NLS-1$
}

// Find the specified repository.
RepositoryMeta repositoryMeta = null;
try {
if ( singleDiServerInstance ) {
repositoryMeta = repositoriesMeta.findRepository( SINGLE_DI_SERVER_INSTANCE );
} else {
repositoryMeta = repositoriesMeta.findRepository( repositoryName );
}

} catch ( Exception e ) {
throw new KettleException( "Repository not found", e ); //$NON-NLS-1$
}

if ( repositoryMeta == null ) {
throw new KettleException( "RepositoryMeta is null" ); //$NON-NLS-1$
}

Repository repository = null;
try {
repository =
PluginRegistry.getInstance().loadClass( RepositoryPluginType.class,
repositoryMeta.getId(), Repository.class );
repository.init( repositoryMeta );

} catch ( Exception e ) {
throw new KettleException( "Could not get repository instance", e ); //$NON-NLS-1$
}

// Two scenarios here: internal to server or external to server. If internal, you are already authenticated. If
// external, you must provide a username and additionally specify that the IP address of the machine running this
// code is trusted.
repository.connect( PentahoSessionHolder.getSession().getName(), "password" );

return repository;
public static void setRepositoryFactory( IRepositoryFactory factory ){
repositoryFactory = factory;
}

public static Document loadXMLFrom( String xml ) throws SAXException, IOException {
Expand Down
@@ -0,0 +1,168 @@
package org.pentaho.di.repository.utils;

import org.pentaho.di.core.exception.KettleException;
import org.pentaho.di.core.plugins.PluginRegistry;
import org.pentaho.di.core.plugins.RepositoryPluginType;
import org.pentaho.di.repository.RepositoriesMeta;
import org.pentaho.di.repository.Repository;
import org.pentaho.di.repository.RepositoryMeta;
import org.pentaho.platform.api.engine.ICacheManager;
import org.pentaho.platform.api.engine.IPentahoSession;
import org.pentaho.platform.engine.core.system.PentahoSessionHolder;
import org.pentaho.platform.engine.core.system.PentahoSystem;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.ByteArrayInputStream;

/**
* Implementations can be used to obtain a PDI Repository instance in the platform. Created by nbaker on 11/5/15.
*/
public interface IRepositoryFactory {

String SINGLE_DI_SERVER_INSTANCE = "singleDiServerInstance";

Repository connect( String repositoryName ) throws KettleException;

IRepositoryFactory DEFAULT = new CachingRepositoryFactory();

/**
* Sets the "ID" of the Repository Plugin Type to use (filebased, db, enterprise)
*
* @param id
*/
void setRepositoryId( String id );


/**
* Decorating implementation which caches Repository instances by Principal name in the ICacheManager.
* DefaultRepositoryFactory used by default if a delegate factory isn't supplied.
*/
class CachingRepositoryFactory implements IRepositoryFactory {

public static final String REGION = "pdi-repository-cache";
private IRepositoryFactory delegate;
private Logger logger = LoggerFactory.getLogger( getClass() );

public CachingRepositoryFactory() {
this( new DefaultRepositoryFactory() );
}

public CachingRepositoryFactory( IRepositoryFactory delegate ) {
this.delegate = delegate;
// Make sure we're registered with PentahoSystem so we can be found.
PentahoSystem.registerObject( this );
}

@Override public void setRepositoryId( String id ) {
delegate.setRepositoryId( id );
}

@Override public Repository connect( String repositoryName ) throws KettleException {

IPentahoSession session = PentahoSessionHolder.getSession();
if ( session == null ) {
logger.debug( "No active Pentaho Session, attempting to load PDI repository unauthenticated." );
throw new KettleException( "Attempting to create PDI Repository with no Active PentahoSession. "
+ "This is not allowed." );
}
ICacheManager cacheManager = PentahoSystem.getCacheManager( session );


String sessionName = session.getName();
Repository repository = (Repository) cacheManager.getFromRegionCache( REGION, sessionName );
if ( repository == null ) {
logger.debug( "Repository not cached for user: " + sessionName + ". Creating new Repository." );
repository = delegate.connect( repositoryName );
if ( !cacheManager.cacheEnabled( REGION ) ) {
cacheManager.addCacheRegion( REGION );
}
cacheManager.putInRegionCache( REGION, sessionName, repository );
} else {
logger.debug( "Repository was cached for user: " + sessionName );
}
return repository;
}
}

/**
* Default implementation of the RepositoryFactory. Code moved from PDIImportUtil here pretty much as-is.
*/
class DefaultRepositoryFactory implements IRepositoryFactory {
private String repositoryId = "PentahoEnterpriseRepository";

@Override public void setRepositoryId( String id ) {
this.repositoryId = id;
}

@Override public Repository connect( String repositoryName ) throws KettleException {

RepositoriesMeta repositoriesMeta = new RepositoriesMeta();
boolean singleDiServerInstance =
"true".equals(
PentahoSystem.getSystemSetting( SINGLE_DI_SERVER_INSTANCE, "true" ) ); //$NON-NLS-1$ //$NON-NLS-2$

try {
if ( singleDiServerInstance ) {

// only load a default enterprise repository. If this option is set, then you cannot load
// transformations or jobs from anywhere but the local server.

String repositoriesXml =
"<?xml version=\"1.0\" encoding=\"UTF-8\"?><repositories>" //$NON-NLS-1$
+ "<repository><id>" + repositoryId + "</id>" //$NON-NLS-1$
+ "<name>" + SINGLE_DI_SERVER_INSTANCE + "</name>" //$NON-NLS-1$ //$NON-NLS-2$
+ "<description>" + SINGLE_DI_SERVER_INSTANCE + "</description>" //$NON-NLS-1$ //$NON-NLS-2$
+ "<repository_location_url>" + PentahoSystem.getApplicationContext().getFullyQualifiedServerURL()
+ "</repository_location_url>" //$NON-NLS-1$ //$NON-NLS-2$
+ "<version_comment_mandatory>N</version_comment_mandatory>" //$NON-NLS-1$
+ "</repository>" //$NON-NLS-1$
+ "</repositories>"; //$NON-NLS-1$

ByteArrayInputStream sbis = new ByteArrayInputStream( repositoriesXml.getBytes( "UTF8" ) );
repositoriesMeta.readDataFromInputStream( sbis );
} else {
// TODO: add support for specified repositories.xml files...
repositoriesMeta.readData(); // Read from the default $HOME/.kettle/repositories.xml file.
}
} catch ( Exception e ) {
throw new KettleException( "Meta repository not populated", e ); //$NON-NLS-1$
}

// Find the specified repository.
RepositoryMeta repositoryMeta = null;
try {
if ( singleDiServerInstance ) {
repositoryMeta = repositoriesMeta.findRepository( SINGLE_DI_SERVER_INSTANCE );
} else {
repositoryMeta = repositoriesMeta.findRepository( repositoryName );
}

} catch ( Exception e ) {
throw new KettleException( "Repository not found", e ); //$NON-NLS-1$
}

if ( repositoryMeta == null ) {
throw new KettleException( "RepositoryMeta is null" ); //$NON-NLS-1$
}

Repository repository = null;
try {
repository =
PluginRegistry.getInstance().loadClass( RepositoryPluginType.class,
repositoryMeta.getId(), Repository.class );
repository.init( repositoryMeta );

} catch ( Exception e ) {
throw new KettleException( "Could not get repository instance", e ); //$NON-NLS-1$
}

// Two scenarios here: internal to server or external to server. If internal, you are already authenticated. If
// external, you must provide a username and additionally specify that the IP address of the machine running this
// code is trusted.
repository.connect( PentahoSessionHolder.getSession().getName(), "password" );

return repository;
}
}
}
@@ -0,0 +1,25 @@
package com.pentaho.repository.importexport;

import org.junit.Test;
import org.pentaho.di.repository.utils.IRepositoryFactory;

import static org.junit.Assert.*;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;

/**
* Created by nbaker on 11/5/15.
*/
public class PDIImportUtilTest {

@Test
public void testConnectToRepository() throws Exception {
IRepositoryFactory mock = mock( IRepositoryFactory.class );
PDIImportUtil.setRepositoryFactory( mock );

PDIImportUtil.connectToRepository( "foo" );

verify( mock, times(1) ).connect( "foo" );
}
}

0 comments on commit 1f167c1

Please sign in to comment.