Skip to content

Commit

Permalink
[BACKLOG-14180] shared object synchronization with repository and nam…
Browse files Browse the repository at this point in the history
…e changes (#3567)
  • Loading branch information
Tiago Ferreira authored and Ben Morrise committed Feb 20, 2017
1 parent ae078d4 commit fafca6e
Show file tree
Hide file tree
Showing 26 changed files with 1,104 additions and 451 deletions.
87 changes: 86 additions & 1 deletion engine/src/org/pentaho/di/base/AbstractMeta.java
Expand Up @@ -2,7 +2,7 @@
* *
* Pentaho Data Integration * Pentaho Data Integration
* *
* Copyright (C) 2002-2016 by Pentaho : http://www.pentaho.com * Copyright (C) 2002-2017 by Pentaho : http://www.pentaho.com
* *
******************************************************************************* *******************************************************************************
* *
Expand Down Expand Up @@ -66,6 +66,7 @@
import org.pentaho.di.repository.Repository; import org.pentaho.di.repository.Repository;
import org.pentaho.di.repository.RepositoryDirectory; import org.pentaho.di.repository.RepositoryDirectory;
import org.pentaho.di.repository.RepositoryDirectoryInterface; import org.pentaho.di.repository.RepositoryDirectoryInterface;
import org.pentaho.di.shared.SharedObjectInterface;
import org.pentaho.di.shared.SharedObjects; import org.pentaho.di.shared.SharedObjects;
import org.pentaho.di.trans.HasDatabasesInterface; import org.pentaho.di.trans.HasDatabasesInterface;
import org.pentaho.di.trans.HasSlaveServersInterface; import org.pentaho.di.trans.HasSlaveServersInterface;
Expand All @@ -79,6 +80,7 @@
import java.util.Collections; import java.util.Collections;
import java.util.Date; import java.util.Date;
import java.util.HashMap; import java.util.HashMap;
import java.util.Hashtable;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
Expand Down Expand Up @@ -1064,6 +1066,7 @@ protected void addDatabase( DatabaseMeta databaseMeta, boolean replace ) {
} else if ( replace ) { } else if ( replace ) {
DatabaseMeta previous = getDatabase( index ); DatabaseMeta previous = getDatabase( index );
previous.replaceMeta( databaseMeta ); previous.replaceMeta( databaseMeta );
previous.setShared( databaseMeta.isShared() );
} }
changedDatabases = true; changedDatabases = true;
} }
Expand Down Expand Up @@ -1526,6 +1529,46 @@ public void setSharedObjects( SharedObjects sharedObjects ) {
this.sharedObjects = sharedObjects; this.sharedObjects = sharedObjects;
} }


/**
* Read shared objects.
*
* @return the shared objects
* @throws KettleException the kettle exception
*/
public SharedObjects readSharedObjects() throws KettleException {
// Extract the shared steps, connections, etc. using the SharedObjects
// class
//
String soFile = environmentSubstitute( sharedObjectsFile );
SharedObjects sharedObjects = new SharedObjects( soFile );
Map<?, SharedObjectInterface> objectsMap = sharedObjects.getObjectsMap();

// First read the databases...
// We read databases & slaves first because there might be dependencies
// that need to be resolved.
//
for ( SharedObjectInterface object : objectsMap.values() ) {
loadSharedObject( object );
}

return sharedObjects;
}

protected boolean loadSharedObject( SharedObjectInterface object ) {
if ( object instanceof DatabaseMeta ) {
DatabaseMeta databaseMeta = (DatabaseMeta) object;
databaseMeta.shareVariablesWith( this );
addOrReplaceDatabase( databaseMeta );
} else if ( object instanceof SlaveServer ) {
SlaveServer slaveServer = (SlaveServer) object;
slaveServer.shareVariablesWith( this );
addOrReplaceSlaveServer( slaveServer );
} else {
return false;
}
return true;
}

/** /**
* Sets the internal name kettle variable. * Sets the internal name kettle variable.
* *
Expand Down Expand Up @@ -1819,4 +1862,46 @@ public Set<String> getPrivateDatabases() {
public void setPrivateDatabases( Set<String> privateDatabases ) { public void setPrivateDatabases( Set<String> privateDatabases ) {
this.privateDatabases = privateDatabases; this.privateDatabases = privateDatabases;
} }

public void saveSharedObjects() throws KettleException {
try {
// Load all the shared objects...
String soFile = environmentSubstitute( sharedObjectsFile );
SharedObjects sharedObjects = new SharedObjects( soFile );
// in-memory shared objects are supposed to be in sync, discard those on file to allow edit/delete
sharedObjects.setObjectsMap( new Hashtable<>() );

for ( SharedObjectInterface sharedObject : getAllSharedObjects() ) {
if ( sharedObject.isShared() ) {
sharedObjects.storeObject( sharedObject );
}
}

sharedObjects.saveToFile();
} catch ( Exception e ) {
throw new KettleException( "Unable to save shared ojects", e );
}
}

protected List<SharedObjectInterface> getAllSharedObjects() {
List<SharedObjectInterface> shared = new ArrayList<>();
shared.addAll( databases );
shared.addAll( slaveServers );
return shared;
}

/**
* This method needs to be called to store those objects which are used and referenced in the transformation metadata
* but not saved in the XML serialization. For example, the Kettle data service definition is referenced by name but
* not stored when getXML() is called.<br>
* @deprecated This method is empty since 2013.
*
* @param metaStore
* The store to save to
* @throws MetaStoreException
* in case there is an error.
*/
public void saveMetaStoreObjects( Repository repository, IMetaStore metaStore ) throws MetaStoreException {

}
} }
4 changes: 2 additions & 2 deletions engine/src/org/pentaho/di/cluster/SlaveServer.java
Expand Up @@ -2,7 +2,7 @@
* *
* Pentaho Data Integration * Pentaho Data Integration
* *
* Copyright (C) 2002-2016 by Pentaho : http://www.pentaho.com * Copyright (C) 2002-2017 by Pentaho : http://www.pentaho.com
* *
******************************************************************************* *******************************************************************************
* *
Expand Down Expand Up @@ -352,7 +352,7 @@ public boolean equals( Object obj ) {
} }


public int hashCode() { public int hashCode() {
return name.hashCode(); return name.toLowerCase().hashCode();
} }


public String getHostname() { public String getHostname() {
Expand Down
77 changes: 0 additions & 77 deletions engine/src/org/pentaho/di/job/JobMeta.java
Expand Up @@ -88,10 +88,7 @@
import org.pentaho.di.resource.ResourceExportInterface; import org.pentaho.di.resource.ResourceExportInterface;
import org.pentaho.di.resource.ResourceNamingInterface; import org.pentaho.di.resource.ResourceNamingInterface;
import org.pentaho.di.resource.ResourceReference; import org.pentaho.di.resource.ResourceReference;
import org.pentaho.di.shared.SharedObjectInterface;
import org.pentaho.di.shared.SharedObjects;
import org.pentaho.metastore.api.IMetaStore; import org.pentaho.metastore.api.IMetaStore;
import org.pentaho.metastore.api.exceptions.MetaStoreException;
import org.w3c.dom.Document; import org.w3c.dom.Document;
import org.w3c.dom.Node; import org.w3c.dom.Node;


Expand Down Expand Up @@ -1154,70 +1151,6 @@ public void loadXML( Node jobnode, String fname, Repository rep, IMetaStore meta
} }
} }


/**
* Read shared objects.
*
* @return the shared objects
* @throws KettleException the kettle exception
*/
public SharedObjects readSharedObjects() throws KettleException {
// Extract the shared steps, connections, etc. using the SharedObjects
// class
//
String soFile = environmentSubstitute( sharedObjectsFile );
SharedObjects sharedObjects = new SharedObjects( soFile );
Map<?, SharedObjectInterface> objectsMap = sharedObjects.getObjectsMap();

// First read the databases...
// We read databases & slaves first because there might be dependencies
// that need to be resolved.
//
for ( SharedObjectInterface object : objectsMap.values() ) {
if ( object instanceof DatabaseMeta ) {
DatabaseMeta databaseMeta = (DatabaseMeta) object;
databaseMeta.shareVariablesWith( this );
addOrReplaceDatabase( databaseMeta );
} else if ( object instanceof SlaveServer ) {
SlaveServer slaveServer = (SlaveServer) object;
slaveServer.shareVariablesWith( this );
addOrReplaceSlaveServer( slaveServer );
}
}

return sharedObjects;
}

/*
* (non-Javadoc)
*
* @see org.pentaho.di.core.EngineMetaInterface#saveSharedObjects()
*/
public void saveSharedObjects() throws KettleException {
try {
// First load all the shared objects...
String soFile = environmentSubstitute( sharedObjectsFile );
SharedObjects sharedObjects = new SharedObjects( soFile );

// Now overwrite the objects in there
List<Object> shared = new ArrayList<Object>();
shared.addAll( databases );
shared.addAll( slaveServers );

// The databases connections...
for ( int i = 0; i < shared.size(); i++ ) {
SharedObjectInterface sharedObject = (SharedObjectInterface) shared.get( i );
if ( sharedObject.isShared() ) {
sharedObjects.storeObject( sharedObject );
}
}

// Save the objects
sharedObjects.saveToFile();
} catch ( Exception e ) {
throw new KettleException( "Unable to save shared ojects", e );
}
}

/** /**
* Gets the job entry copy. * Gets the job entry copy.
* *
Expand Down Expand Up @@ -2816,16 +2749,6 @@ public boolean isForcingSeparateLogging() {
public void setForcingSeparateLogging( boolean forcingSeparateLogging ) { public void setForcingSeparateLogging( boolean forcingSeparateLogging ) {
} }


/**
* This method needs to be called to store those objects which are used and referenced in the job metadata but not
* saved in the serialization.
*
* @param metaStore The store to save to
* @throws MetaStoreException in case there is an error.
*/
public void saveMetaStoreObjects( Repository repository, IMetaStore metaStore ) throws MetaStoreException {
}

public List<LogTableInterface> getExtraLogTables() { public List<LogTableInterface> getExtraLogTables() {
return extraLogTables; return extraLogTables;
} }
Expand Down
46 changes: 14 additions & 32 deletions engine/src/org/pentaho/di/shared/SharedObjects.java
Expand Up @@ -2,7 +2,7 @@
* *
* Pentaho Data Integration * Pentaho Data Integration
* *
* Copyright (C) 2002-2016 by Pentaho : http://www.pentaho.com * Copyright (C) 2002-2017 by Pentaho : http://www.pentaho.com
* *
******************************************************************************* *******************************************************************************
* *
Expand All @@ -22,12 +22,10 @@


package org.pentaho.di.shared; package org.pentaho.di.shared;


import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream; import java.io.OutputStream;
import java.io.PrintStream; import java.io.PrintStream;
import java.nio.channels.FileChannel;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.Hashtable; import java.util.Hashtable;
Expand All @@ -36,6 +34,7 @@


import org.apache.commons.vfs2.FileObject; import org.apache.commons.vfs2.FileObject;
import org.apache.commons.vfs2.FileSystemException; import org.apache.commons.vfs2.FileSystemException;
import org.apache.poi.util.IOUtils;
import org.pentaho.di.cluster.ClusterSchema; import org.pentaho.di.cluster.ClusterSchema;
import org.pentaho.di.cluster.SlaveServer; import org.pentaho.di.cluster.SlaveServer;
import org.pentaho.di.core.Const; import org.pentaho.di.core.Const;
Expand Down Expand Up @@ -355,7 +354,7 @@ protected OutputStream initOutputStreamUsingKettleVFS( FileObject fileObject ) t
* @throws IOException * @throws IOException
*/ */
@VisibleForTesting @VisibleForTesting
protected void restoreFileFromBackup( String backupFileName ) throws IOException { protected void restoreFileFromBackup( String backupFileName ) throws IOException, KettleFileException {
copyFile( backupFileName, filename ); copyFile( backupFileName, filename );
} }


Expand All @@ -378,42 +377,25 @@ private String createOrGetFileBackup( FileObject fileObject ) throws IOException


private boolean getBackupFileFromFileSystem( String backupFileName ) throws KettleException { private boolean getBackupFileFromFileSystem( String backupFileName ) throws KettleException {
FileObject fileObject = getFileObjectFromKettleVFS( backupFileName ); FileObject fileObject = getFileObjectFromKettleVFS( backupFileName );
boolean isFileCreated = false;
try { try {
fileObject.exists(); return fileObject.exists();
isFileCreated = true;
} catch ( FileSystemException e ) { } catch ( FileSystemException e ) {
// NOP return false;
} }
return isFileCreated;
} }


private boolean createFileBackup( String backupFileName ) throws IOException { private boolean createFileBackup( String backupFileName ) throws IOException, KettleFileException {
return copyFile( filename, backupFileName ); return copyFile( filename, backupFileName );
} }


/** private boolean copyFile( String src, String dest ) throws KettleFileException, IOException {
* Copy src file to dest file FileObject srcFile = getFileObjectFromKettleVFS( src );
* FileObject destFile = getFileObjectFromKettleVFS( dest );
* @param src try ( InputStream in = KettleVFS.getInputStream( srcFile );
* @param dist OutputStream out = KettleVFS.getOutputStream( destFile, false ) ) {
* @throws IOException IOUtils.copy( in, out );
*/
private boolean copyFile( String src, String dist ) throws IOException {
boolean isFileCopied = false;
FileChannel sourceChannel = null;
FileChannel destChannel = null;
try {
sourceChannel = new FileInputStream( src ).getChannel();
destChannel = new FileOutputStream( dist ).getChannel();
destChannel.transferFrom( sourceChannel, 0, sourceChannel.size() );
isFileCopied = true;
} finally {
sourceChannel.close();
destChannel.close();
} }

return true;
return isFileCopied;
} }


@Override @Override
Expand Down
6 changes: 5 additions & 1 deletion engine/src/org/pentaho/di/shared/SharedObjectsMetaStore.java
Expand Up @@ -2,7 +2,7 @@
* *
* Pentaho Data Integration * Pentaho Data Integration
* *
* Copyright (C) 2002-2013 by Pentaho : http://www.pentaho.com * Copyright (C) 2002-2017 by Pentaho : http://www.pentaho.com
* *
******************************************************************************* *******************************************************************************
* *
Expand Down Expand Up @@ -39,6 +39,10 @@
import org.pentaho.metastore.stores.memory.MemoryMetaStore; import org.pentaho.metastore.stores.memory.MemoryMetaStore;
import org.pentaho.metastore.util.PentahoDefaults; import org.pentaho.metastore.util.PentahoDefaults;


/*
* unused?
*/
@Deprecated
public class SharedObjectsMetaStore extends MemoryMetaStore implements IMetaStore { public class SharedObjectsMetaStore extends MemoryMetaStore implements IMetaStore {


protected IMetaStoreElementType databaseElementType; protected IMetaStoreElementType databaseElementType;
Expand Down

0 comments on commit fafca6e

Please sign in to comment.