Skip to content

Commit

Permalink
Resumable copy
Browse files Browse the repository at this point in the history
This PR introduces resumable store copy in the sense that failed store
file download will be retried. Previously all store files would be
streamed to the client in one response and if any issue would occur we
would have to redo the entire process. The new strategy is as follows:

CLIENT -> Prepare store request
SERVER -> Grabs checkpoint lock and streams all "atomic-files" (count
store, legacy index and additional providers). It then returns a
response that includes last committed transaction id, a list of store
file names, a list of lucene index descriptors.
CLIENT -> for each store file: send get store file request
SERVER -> stream requested file
CLIENT -> for each descriptor: send get lucene snapshot request
SERVER -> stream lucene snapshot files.

If prepare store request fails then store copy fails. Each get
file/snapshot request may be retried as long as the configured
termination condition allows.

Each file/snapshot request include the "last committed tx id" given by
the prepare store copy respons so we can try to download from any
instance in the cluster. In this PR all requests are to leader.

As of yet, nothing has been done to "pull tx" which is called after the
store files has been copied. This will be investigated in a different
PR.

KERNEL:
NeoStoreFileListing have been modified to provide builders for listing
files. IndexService has been modified to expose descriptors and to allow
getting specific snapshot for descriptor.

PrepareStoreCopyRequestHandler, GetStoreFileRequestHandler and
GetLuceneSnapshotRequestHandler all use NeoStoreFileListing and would be
good looking into.
  • Loading branch information
phughk committed Mar 5, 2018
1 parent 789832f commit ea851ac
Show file tree
Hide file tree
Showing 99 changed files with 4,738 additions and 519 deletions.
Expand Up @@ -19,6 +19,9 @@
*/ */
package org.neo4j.internal.kernel.api.schema; package org.neo4j.internal.kernel.api.schema;


/**
* Please note that this interface has dependencies in other modules. See {@see org.neo4j.causalclustering.catchup.storecopy.IndexDescriptorSerializer}
*/
public interface LabelSchemaDescriptor extends SchemaDescriptor, LabelSchemaSupplier public interface LabelSchemaDescriptor extends SchemaDescriptor, LabelSchemaSupplier
{ {
int getLabelId(); int getLabelId();
Expand Down
Expand Up @@ -127,6 +127,7 @@
import org.neo4j.kernel.impl.transaction.log.rotation.LogRotation; import org.neo4j.kernel.impl.transaction.log.rotation.LogRotation;
import org.neo4j.kernel.impl.transaction.log.rotation.LogRotationImpl; import org.neo4j.kernel.impl.transaction.log.rotation.LogRotationImpl;
import org.neo4j.kernel.impl.transaction.state.DefaultSchemaIndexProviderMap; import org.neo4j.kernel.impl.transaction.state.DefaultSchemaIndexProviderMap;
import org.neo4j.kernel.impl.transaction.state.NeoStoreFileIndexListing;
import org.neo4j.kernel.impl.transaction.state.NeoStoreFileListing; import org.neo4j.kernel.impl.transaction.state.NeoStoreFileListing;
import org.neo4j.kernel.impl.util.Dependencies; import org.neo4j.kernel.impl.util.Dependencies;
import org.neo4j.kernel.impl.util.SynchronizedArrayIdOrderingQueue; import org.neo4j.kernel.impl.util.SynchronizedArrayIdOrderingQueue;
Expand Down Expand Up @@ -828,7 +829,19 @@ public InwardKernel getKernel()


public ResourceIterator<StoreFileMetadata> listStoreFiles( boolean includeLogs ) throws IOException public ResourceIterator<StoreFileMetadata> listStoreFiles( boolean includeLogs ) throws IOException
{ {
return kernelModule.fileListing().listStoreFiles( includeLogs ); if ( includeLogs )
{
return getNeoStoreFileListing().builder().build();
}
else
{
return getNeoStoreFileListing().builder().excludeLogFiles().build();
}
}

public NeoStoreFileListing getNeoStoreFileListing()
{
return kernelModule.fileListing();
} }


public void registerDiagnosticsWith( DiagnosticsManager manager ) public void registerDiagnosticsWith( DiagnosticsManager manager )
Expand Down
Expand Up @@ -36,6 +36,8 @@
/** /**
* Internal representation of a graph index, including the schema unit it targets (eg. label-property combination) * Internal representation of a graph index, including the schema unit it targets (eg. label-property combination)
* and the type of index. UNIQUE indexes are used to back uniqueness constraints. * and the type of index. UNIQUE indexes are used to back uniqueness constraints.
*
* Please note that this class has dependencies in other modules. See {@see org.neo4j.causalclustering.catchup.storecopy.IndexDescriptorSerializer}
*/ */
public class IndexDescriptor implements LabelSchemaSupplier public class IndexDescriptor implements LabelSchemaSupplier
{ {
Expand Down Expand Up @@ -81,7 +83,7 @@ public interface Supplier
private final LabelSchemaDescriptor schema; private final LabelSchemaDescriptor schema;
private final IndexDescriptor.Type type; private final IndexDescriptor.Type type;


IndexDescriptor( LabelSchemaDescriptor schema, Type type ) public IndexDescriptor( LabelSchemaDescriptor schema, Type type )
{ {
this.schema = schema; this.schema = schema;
this.type = type; this.type = type;
Expand Down
Expand Up @@ -24,11 +24,14 @@
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.EnumMap; import java.util.EnumMap;
import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future; import java.util.concurrent.Future;
import java.util.function.BiConsumer; import java.util.function.BiConsumer;
import java.util.function.Predicate;


import org.neo4j.collection.primitive.Primitive; import org.neo4j.collection.primitive.Primitive;
import org.neo4j.collection.primitive.PrimitiveLongIterator; import org.neo4j.collection.primitive.PrimitiveLongIterator;
Expand Down Expand Up @@ -635,16 +638,35 @@ private void closeAllIndexes()
} ); } );
} }


public ResourceIterator<File> snapshotStoreFiles() throws IOException public Set<IndexDescriptor> getIndexDescriptors()
{
Set<IndexDescriptor> indexDescriptors = new HashSet<>();
for ( IndexProxy indexProxy : indexMapRef.getAllIndexProxies() )
{
indexDescriptors.add( indexProxy.getDescriptor() );
}
return indexDescriptors;
}

public ResourceIterator<File> snapshotIndexFiles( Predicate<IndexDescriptor> filter ) throws IOException
{ {
Collection<ResourceIterator<File>> snapshots = new ArrayList<>(); Collection<ResourceIterator<File>> snapshots = new ArrayList<>();
for ( IndexProxy indexProxy : indexMapRef.getAllIndexProxies() ) for ( IndexProxy indexProxy : indexMapRef.getAllIndexProxies() )
{ {
snapshots.add( indexProxy.snapshotFiles() ); IndexDescriptor providerDescriptor = indexProxy.getDescriptor();
if ( filter.test( providerDescriptor ) )
{
snapshots.add( indexProxy.snapshotFiles() );
}
} }
return Iterators.concatResourceIterators( snapshots.iterator() ); return Iterators.concatResourceIterators( snapshots.iterator() );
} }


public ResourceIterator<File> snapshotIndexFiles() throws IOException
{
return snapshotIndexFiles( d -> true );
}

private IndexPopulationJob newIndexPopulationJob() private IndexPopulationJob newIndexPopulationJob()
{ {
MultipleIndexPopulator multiPopulator = multiPopulatorFactory.create( storeView, logProvider ); MultipleIndexPopulator multiPopulator = multiPopulatorFactory.create( storeView, logProvider );
Expand Down
@@ -0,0 +1,108 @@
/*
* Copyright (c) 2002-2018 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.kernel.impl.transaction.state;

import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Set;
import java.util.function.Function;
import java.util.function.Predicate;

import org.neo4j.graphdb.Resource;
import org.neo4j.graphdb.ResourceIterator;
import org.neo4j.kernel.api.index.SchemaIndexProvider;
import org.neo4j.kernel.api.labelscan.LabelScanStore;
import org.neo4j.kernel.api.schema.index.IndexDescriptor;
import org.neo4j.kernel.impl.api.ExplicitIndexProviderLookup;
import org.neo4j.kernel.impl.api.index.IndexingService;
import org.neo4j.kernel.impl.store.format.RecordFormat;
import org.neo4j.kernel.impl.util.MultiResource;
import org.neo4j.kernel.spi.explicitindex.IndexImplementation;
import org.neo4j.storageengine.api.StoreFileMetadata;

import static org.neo4j.helpers.collection.Iterators.resourceIterator;

public class NeoStoreFileIndexListing
{
private final LabelScanStore labelScanStore;
private final IndexingService indexingService;
private final ExplicitIndexProviderLookup explicitIndexProviders;

private static final Function<File,StoreFileMetadata> toStoreFileMetatadata = file -> new StoreFileMetadata( file, RecordFormat.NO_RECORD_SIZE );

NeoStoreFileIndexListing( LabelScanStore labelScanStore, IndexingService indexingService, ExplicitIndexProviderLookup explicitIndexProviders )
{
this.labelScanStore = labelScanStore;
this.indexingService = indexingService;
this.explicitIndexProviders = explicitIndexProviders;
}

public Collection<IndexDescriptor> listIndexDescriptors()
{
return indexingService.getIndexDescriptors();
}

Resource gatherSchemaIndexFiles( Collection<StoreFileMetadata> targetFiles ) throws IOException
{
ResourceIterator<File> snapshot = indexingService.snapshotIndexFiles();
getSnapshotFilesMetadata( snapshot, targetFiles );
// Intentionally don't close the snapshot here, return it for closing by the consumer of
// the targetFiles list.
return snapshot;
}

Resource gatherLabelScanStoreFiles( Collection<StoreFileMetadata> targetFiles )
{
ResourceIterator<File> snapshot = labelScanStore.snapshotStoreFiles();
getSnapshotFilesMetadata( snapshot, targetFiles );
// Intentionally don't close the snapshot here, return it for closing by the consumer of
// the targetFiles list.
return snapshot;
}

Resource gatherExplicitIndexFiles( Collection<StoreFileMetadata> files ) throws IOException
{
final Collection<ResourceIterator<File>> snapshots = new ArrayList<>();
for ( IndexImplementation indexProvider : explicitIndexProviders.all() )
{
ResourceIterator<File> snapshot = indexProvider.listStoreFiles();
snapshots.add( snapshot );
getSnapshotFilesMetadata( snapshot, files );
}
// Intentionally don't close the snapshot here, return it for closing by the consumer of
// the targetFiles list.
return new MultiResource( snapshots );
}

private void getSnapshotFilesMetadata( ResourceIterator<File> snapshot, Collection<StoreFileMetadata> targetFiles )
{
snapshot.stream().map( toStoreFileMetatadata ).forEach( targetFiles::add );
}

public ResourceIterator<StoreFileMetadata> getSnapshot( IndexDescriptor descriptor ) throws IOException
{
ResourceIterator<File> snapshot = indexingService.snapshotIndexFiles( descriptor::equals );
ArrayList<StoreFileMetadata> files = new ArrayList<>();
getSnapshotFilesMetadata( snapshot, files );
return resourceIterator( files.iterator(), snapshot );
}
}

0 comments on commit ea851ac

Please sign in to comment.