Skip to content

Commit

Permalink
Add possibility to archive whole fusion index when
Browse files Browse the repository at this point in the history
GraphDatabaseSettings.archive_failed_index is configured
  • Loading branch information
MishaDemianenko committed Mar 27, 2018
1 parent a540008 commit f46cbe7
Show file tree
Hide file tree
Showing 8 changed files with 89 additions and 24 deletions.
Expand Up @@ -38,18 +38,20 @@ class FusionIndexPopulator extends FusionIndexBase<IndexPopulator> implements In
{
private final long indexId;
private final DropAction dropAction;
private final boolean archiveFailedIndex;

FusionIndexPopulator( IndexPopulator[] populators, Selector selector, long indexId, DropAction dropAction )
FusionIndexPopulator( IndexPopulator[] populators, Selector selector, long indexId, DropAction dropAction, boolean archiveFailedIndex )
{
super( populators, selector );
this.indexId = indexId;
this.dropAction = dropAction;
this.archiveFailedIndex = archiveFailedIndex;
}

@Override
public void create() throws IOException
{
dropAction.drop( indexId );
dropAction.drop( indexId, archiveFailedIndex );
forAll( IndexPopulator::create, instances );
}

Expand Down
Expand Up @@ -19,14 +19,26 @@
*/
package org.neo4j.kernel.impl.index.schema.fusion;

import java.io.File;
import java.io.IOException;
import java.net.URI;
import java.nio.file.FileSystem;
import java.nio.file.FileSystems;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Arrays;
import java.util.List;
import java.util.Map;

import org.neo4j.graphdb.factory.GraphDatabaseSettings;
import org.neo4j.helpers.collection.MapUtil;
import org.neo4j.internal.kernel.api.IndexCapability;
import org.neo4j.internal.kernel.api.IndexOrder;
import org.neo4j.internal.kernel.api.IndexQuery;
import org.neo4j.internal.kernel.api.InternalIndexState;
import org.neo4j.io.fs.FileHandle;
import org.neo4j.io.fs.FileSystemAbstraction;
import org.neo4j.io.fs.FileUtils;
import org.neo4j.io.pagecache.PageCache;
import org.neo4j.kernel.api.index.IndexAccessor;
import org.neo4j.kernel.api.index.IndexDirectoryStructure;
Expand All @@ -40,6 +52,7 @@
import org.neo4j.values.storable.Value;
import org.neo4j.values.storable.ValueGroup;

import static java.util.stream.Collectors.toList;
import static org.neo4j.internal.kernel.api.InternalIndexState.FAILED;
import static org.neo4j.internal.kernel.api.InternalIndexState.POPULATING;
import static org.neo4j.kernel.impl.index.schema.fusion.FusionIndexBase.INSTANCE_COUNT;
Expand Down Expand Up @@ -74,6 +87,7 @@ default <T> T select( T[] instances, Value... values )
IndexReader select( IndexReader[] instances, IndexQuery... predicates );
}

private final boolean archiveFailedIndex;
private final IndexProvider[] providers = new IndexProvider[INSTANCE_COUNT];
private final Selector selector;
private final DropAction dropAction;
Expand All @@ -89,11 +103,13 @@ public FusionIndexProvider(
Descriptor descriptor,
int priority,
IndexDirectoryStructure.Factory directoryStructure,
FileSystemAbstraction fs )
FileSystemAbstraction fs,
boolean archiveFailedIndex )
{
super( descriptor, priority, directoryStructure );
fillProvidersArray( stringProvider, numberProvider, spatialProvider, temporalProvider, luceneProvider );
selector.validateSatisfied( providers );
this.archiveFailedIndex = archiveFailedIndex;
this.selector = selector;
this.dropAction = new FileSystemDropAction( fs, directoryStructure() );
}
Expand All @@ -112,7 +128,7 @@ private void fillProvidersArray( IndexProvider stringProvider, IndexProvider num
public IndexPopulator getPopulator( long indexId, SchemaIndexDescriptor descriptor, IndexSamplingConfig samplingConfig )
{
return new FusionIndexPopulator( instancesAs( providers, IndexPopulator.class,
provider -> provider.getPopulator( indexId, descriptor, samplingConfig ) ), selector, indexId, dropAction );
provider -> provider.getPopulator( indexId, descriptor, samplingConfig ) ), selector, indexId, dropAction, archiveFailedIndex );
}

@Override
Expand Down Expand Up @@ -197,26 +213,33 @@ public IndexOrder[] orderCapability( ValueGroup... valueGroups )
@Override
public StoreMigrationParticipant storeMigrationParticipant( FileSystemAbstraction fs, PageCache pageCache )
{
// TODO implementation of this depends on decisions around defaults and migration. Coming soon.
return StoreMigrationParticipant.NOT_PARTICIPATING;
}

/**
* As an interface because this is actually dependent on whether or not an index lives on a {@link FileSystemAbstraction}
* or a page cache. At the time of writing this there's only the possibility to put these on the file system,
* but there will be a possibility to put these in the page cache file management instead and having this abstracted
* will help when making that switch/decision.
*/
@FunctionalInterface
interface DropAction
{
/**
* Deletes the index directory and everything in it, as last part of dropping an index.
* Can be configured to create archive with content of index directories for future analysis.
*
* @param indexId the index id, for which directory to drop.
* @param archiveExistentIndex create archive with content of dropped directories
* @throws IOException on I/O error.
* @see GraphDatabaseSettings#archive_failed_index
*/
void drop( long indexId ) throws IOException;
void drop( long indexId, boolean archiveExistentIndex ) throws IOException;

/**
* Deletes the index directory and everything in it, as last part of dropping an index.
*
* @param indexId the index id, for which directory to drop.
* @throws IOException on I/O error.
*/
default void drop( long indexId ) throws IOException
{
drop( indexId, false );
}
}

private static class FileSystemDropAction implements DropAction
Expand All @@ -231,9 +254,36 @@ private static class FileSystemDropAction implements DropAction
}

@Override
public void drop( long indexId ) throws IOException
public void drop( long indexId, boolean archiveExistentIndex ) throws IOException
{
File rootIndexDirectory = directoryStructure.directoryForIndex( indexId );
if ( archiveExistentIndex && !FileUtils.isEmptyDirectory( rootIndexDirectory ) )
{
Map<String,String> env = MapUtil.stringMap( "create", "true" );
Path rootPath = rootIndexDirectory.toPath();
URI archiveAbsoluteURI = URI.create( "jar:file:" + archiveFile( rootIndexDirectory ).toPath() );

try ( FileSystem zipFs = FileSystems.newFileSystem( archiveAbsoluteURI, env ) )
{
List<FileHandle> fileHandles = fs.streamFilesRecursive( rootIndexDirectory ).collect( toList() );
for ( FileHandle fileHandle : fileHandles )
{
Path sourcePath = fileHandle.getFile().toPath();
Path zipFsPath = zipFs.getPath( rootPath.relativize( sourcePath ).toString() );
if ( zipFsPath.getParent() != null )
{
Files.createDirectories( zipFsPath.getParent() );
}
Files.copy( sourcePath, zipFsPath );
}
}
}
fs.deleteRecursively( rootIndexDirectory );
}

private File archiveFile( File folder )
{
fs.deleteRecursively( directoryStructure.directoryForIndex( indexId ) );
return new File( folder.getParent(), "archive-" + folder.getName() + "-" + System.currentTimeMillis() + ".zip" );
}
}
}
Expand Up @@ -119,7 +119,7 @@ private void initiateMocks()
throw new RuntimeException();
}
}
fusionIndexPopulator = new FusionIndexPopulator( populators, fusionVersion.selector(), indexId, dropAction );
fusionIndexPopulator = new FusionIndexPopulator( populators, fusionVersion.selector(), indexId, dropAction, false );
}

private void resetMocks()
Expand Down Expand Up @@ -150,7 +150,7 @@ public void createRemoveAnyLeftOversThatWasThereInIndexDirectoryBeforePopulation
{
fusionIndexPopulator.create();

verify( dropAction ).drop( indexId );
verify( dropAction ).drop( indexId, false );
}

@Test
Expand Down
Expand Up @@ -318,7 +318,7 @@ private void setupMocks()
providers[SPATIAL],
providers[TEMPORAL],
providers[LUCENE],
fusionVersion.selector(), DESCRIPTOR, 10, NONE, mock( FileSystemAbstraction.class ) );
fusionVersion.selector(), DESCRIPTOR, 10, NONE, mock( FileSystemAbstraction.class ), true );
}

private IndexProvider mockProvider( Class<? extends IndexProvider> providerClass, String name )
Expand Down
Expand Up @@ -96,6 +96,7 @@ public static FusionIndexProvider newInstance( PageCache pageCache, File storeDi
RecoveryCleanupWorkCollector recoveryCleanupWorkCollector )
{
boolean readOnly = IndexProviderFactoryUtil.isReadOnly( config, operationalMode );
boolean archiveFailedIndex = config.get( GraphDatabaseSettings.archive_failed_index );
IndexDirectoryStructure.Factory baseDirStructure = directoriesByProviderKey( storeDir );
IndexDirectoryStructure.Factory childDirectoryStructure = directoriesBySubProvider( baseDirStructure.forProvider( PROVIDER_DESCRIPTOR ) );

Expand All @@ -112,7 +113,7 @@ public static FusionIndexProvider newInstance( PageCache pageCache, File storeDi
priority = 100;
}
return new FusionIndexProvider( EMPTY, EMPTY, spatial, temporal, lucene, new FusionSelector00(),
PROVIDER_DESCRIPTOR, priority, directoriesByProvider( storeDir ), fs );
PROVIDER_DESCRIPTOR, priority, directoriesByProvider( storeDir ), fs, archiveFailedIndex );
}

}
Expand Up @@ -77,6 +77,7 @@ public static FusionIndexProvider create( PageCache pageCache, File storeDir, Fi
{
IndexDirectoryStructure.Factory childDirectoryStructure = subProviderDirectoryStructure( storeDir );
boolean readOnly = IndexProviderFactoryUtil.isReadOnly( config, operationalMode );
boolean archiveFailedIndex = config.get( GraphDatabaseSettings.archive_failed_index );

NumberIndexProvider number =
IndexProviderFactoryUtil.numberProvider( pageCache, fs, childDirectoryStructure, monitor, recoveryCleanupWorkCollector, readOnly );
Expand All @@ -93,7 +94,7 @@ public static FusionIndexProvider create( PageCache pageCache, File storeDir, Fi
priority = 100;
}
return new FusionIndexProvider( EMPTY, number, spatial, temporal, lucene, new FusionSelector10(),
DESCRIPTOR, priority, directoriesByProvider( storeDir ), fs );
DESCRIPTOR, priority, directoriesByProvider( storeDir ), fs, archiveFailedIndex );
}

private static IndexDirectoryStructure.Factory subProviderDirectoryStructure( File storeDir )
Expand Down
Expand Up @@ -77,6 +77,7 @@ public static FusionIndexProvider create( PageCache pageCache, File storeDir, Fi
{
IndexDirectoryStructure.Factory childDirectoryStructure = subProviderDirectoryStructure( storeDir );
boolean readOnly = IndexProviderFactoryUtil.isReadOnly( config, operationalMode );
boolean archiveFailedIndex = config.get( GraphDatabaseSettings.archive_failed_index );

StringIndexProvider string =
IndexProviderFactoryUtil.stringProvider( pageCache, fs, childDirectoryStructure, monitor, recoveryCleanupWorkCollector, readOnly );
Expand All @@ -95,7 +96,7 @@ public static FusionIndexProvider create( PageCache pageCache, File storeDir, Fi
priority = 100;
}
return new FusionIndexProvider( string, number, spatial, temporal, lucene, new FusionSelector20(),
DESCRIPTOR, priority, directoriesByProvider( storeDir ), fs );
DESCRIPTOR, priority, directoriesByProvider( storeDir ), fs, archiveFailedIndex );
}

public static IndexDirectoryStructure.Factory subProviderDirectoryStructure( File storeDir )
Expand Down
Expand Up @@ -35,6 +35,7 @@
import org.neo4j.graphdb.schema.Schema;
import org.neo4j.io.fs.DefaultFileSystemAbstraction;
import org.neo4j.io.fs.FileSystemAbstraction;
import org.neo4j.kernel.api.index.IndexDirectoryStructure;
import org.neo4j.test.rule.DatabaseRule;
import org.neo4j.test.rule.EmbeddedDatabaseRule;

Expand Down Expand Up @@ -127,9 +128,8 @@ private File archiveFile() throws IOException
{
try ( FileSystemAbstraction fs = new DefaultFileSystemAbstraction() )
{
File indexDir = soleIndexDir( db.getStoreDir() );
File[] files = indexDir.getParentFile()
.listFiles( pathname -> pathname.isFile() && pathname.getName().startsWith( "archive-" ) );
File indexDir = indexRootDirectory( db.getStoreDir() );
File[] files = indexDir.listFiles( pathname -> pathname.isFile() && pathname.getName().startsWith( "archive-" ) );
if ( files == null || files.length == 0 )
{
return null;
Expand Down Expand Up @@ -197,8 +197,18 @@ public void run( FileSystemAbstraction fs, File base )
}
}

private static File indexRootDirectory( File base )
{
return providerDirectoryStructure( base ).rootDirectory();
}

private static File soleIndexDir( File base )
{
return subProviderDirectoryStructure( base ).forProvider( PROVIDER_DESCRIPTOR ).directoryForIndex( 1 );
return providerDirectoryStructure( base ).directoryForIndex( 1 );
}

private static IndexDirectoryStructure providerDirectoryStructure( File base )
{
return subProviderDirectoryStructure( base ).forProvider( PROVIDER_DESCRIPTOR );
}
}

0 comments on commit f46cbe7

Please sign in to comment.