Skip to content

Commit

Permalink
Consolidate the use of request handlers
Browse files Browse the repository at this point in the history
  • Loading branch information
RagnarW committed Apr 11, 2018
1 parent fb2e5ea commit 098a8f9
Show file tree
Hide file tree
Showing 8 changed files with 170 additions and 272 deletions.
Expand Up @@ -25,11 +25,10 @@
import java.util.function.BooleanSupplier;
import java.util.function.Supplier;

import org.neo4j.causalclustering.catchup.storecopy.GetIndexSnapshotRequestHandler;
import org.neo4j.causalclustering.catchup.storecopy.GetStoreFileRequestHandler;
import org.neo4j.causalclustering.catchup.storecopy.GetStoreIdRequestHandler;
import org.neo4j.causalclustering.catchup.storecopy.PrepareStoreCopyFilesProvider;
import org.neo4j.causalclustering.catchup.storecopy.PrepareStoreCopyRequestHandler;
import org.neo4j.causalclustering.catchup.storecopy.StoreCopyRequestHandler;
import org.neo4j.causalclustering.catchup.storecopy.StoreFileStreamingProtocol;
import org.neo4j.causalclustering.catchup.tx.TxPullRequestHandler;
import org.neo4j.causalclustering.core.state.CoreSnapshotService;
Expand Down Expand Up @@ -104,15 +103,16 @@ public ChannelHandler storeListingRequestHandler( CatchupServerProtocol catchupS
@Override
public ChannelHandler getStoreFileRequestHandler( CatchupServerProtocol catchupServerProtocol )
{
return new GetStoreFileRequestHandler( catchupServerProtocol, dataSourceSupplier, checkPointerSupplier, new StoreFileStreamingProtocol(), pageCache, fs,
return new StoreCopyRequestHandler.GetStoreFileRequestHandler( catchupServerProtocol, dataSourceSupplier, checkPointerSupplier,
new StoreFileStreamingProtocol(), pageCache, fs,
logProvider );
}

@Override
public ChannelHandler getIndexSnapshotRequestHandler( CatchupServerProtocol catchupServerProtocol )
{
return new GetIndexSnapshotRequestHandler( catchupServerProtocol, dataSourceSupplier, checkPointerSupplier, new StoreFileStreamingProtocol(), pageCache,
fs );
return new StoreCopyRequestHandler.GetIndexSnapshotRequestHandler( catchupServerProtocol, dataSourceSupplier, checkPointerSupplier,
new StoreFileStreamingProtocol(), pageCache, fs, logProvider );
}

@Override
Expand Down
Expand Up @@ -30,15 +30,15 @@
import org.neo4j.causalclustering.catchup.RequestMessageType;
import org.neo4j.causalclustering.core.state.storage.SafeChannelMarshal;
import org.neo4j.causalclustering.identity.StoreId;
import org.neo4j.causalclustering.messaging.CatchUpRequest;
import org.neo4j.causalclustering.messaging.EndOfStreamException;
import org.neo4j.causalclustering.messaging.NetworkFlushableByteBuf;
import org.neo4j.causalclustering.messaging.NetworkReadableClosableChannelNetty4;
import org.neo4j.causalclustering.messaging.StoreCopyRequest;
import org.neo4j.causalclustering.messaging.marshalling.storeid.StoreIdMarshal;
import org.neo4j.storageengine.api.ReadableChannel;
import org.neo4j.storageengine.api.WritableChannel;

public class GetIndexFilesRequest implements CatchUpRequest
public class GetIndexFilesRequest implements StoreCopyRequest
{
private final StoreId expectedStoreId;
private final long indexId;
Expand All @@ -51,12 +51,14 @@ public GetIndexFilesRequest( StoreId expectedStoreId, long indexId, long require
this.requiredTransactionId = requiredTransactionId;
}

@Override
public StoreId expectedStoreId()
{
return expectedStoreId;
}

long requiredTransactionId()
@Override
public long requiredTransactionId()
{
return requiredTransactionId;
}
Expand Down

This file was deleted.

Expand Up @@ -31,16 +31,16 @@
import org.neo4j.causalclustering.catchup.RequestMessageType;
import org.neo4j.causalclustering.core.state.storage.SafeChannelMarshal;
import org.neo4j.causalclustering.identity.StoreId;
import org.neo4j.causalclustering.messaging.CatchUpRequest;
import org.neo4j.causalclustering.messaging.EndOfStreamException;
import org.neo4j.causalclustering.messaging.NetworkFlushableByteBuf;
import org.neo4j.causalclustering.messaging.NetworkReadableClosableChannelNetty4;
import org.neo4j.causalclustering.messaging.StoreCopyRequest;
import org.neo4j.causalclustering.messaging.marshalling.storeid.StoreIdMarshal;
import org.neo4j.storageengine.api.ReadableChannel;
import org.neo4j.storageengine.api.WritableChannel;
import org.neo4j.string.UTF8;

public class GetStoreFileRequest implements CatchUpRequest
public class GetStoreFileRequest implements StoreCopyRequest
{
private final StoreId expectedStoreId;
private final File file;
Expand All @@ -53,12 +53,14 @@ public GetStoreFileRequest( StoreId expectedStoreId, File file, long requiredTra
this.requiredTransactionId = requiredTransactionId;
}

long requiredTransactionId()
@Override
public long requiredTransactionId()
{
return requiredTransactionId;
}

StoreId expectedStoreId()
@Override
public StoreId expectedStoreId()
{
return expectedStoreId;
}
Expand Down
Expand Up @@ -24,13 +24,16 @@

import java.io.File;
import java.io.IOException;
import java.util.Iterator;
import java.util.List;
import java.util.function.Predicate;
import java.util.function.Supplier;
import java.util.stream.Collectors;

import org.neo4j.causalclustering.catchup.CatchupServerProtocol;
import org.neo4j.causalclustering.messaging.StoreCopyRequest;
import org.neo4j.graphdb.ResourceIterator;
import org.neo4j.helpers.collection.Iterators;
import org.neo4j.io.fs.FileSystemAbstraction;
import org.neo4j.io.pagecache.PageCache;
import org.neo4j.kernel.NeoStoreDataSource;
Expand All @@ -44,7 +47,7 @@
import static org.neo4j.causalclustering.catchup.storecopy.DataSourceChecks.isTransactionWithinReach;
import static org.neo4j.io.fs.FileUtils.relativePath;

public class GetStoreFileRequestHandler extends SimpleChannelInboundHandler<GetStoreFileRequest>
public abstract class StoreCopyRequestHandler<T extends StoreCopyRequest> extends SimpleChannelInboundHandler<T>
{
private final CatchupServerProtocol protocol;
private final Supplier<NeoStoreDataSource> dataSource;
Expand All @@ -54,7 +57,7 @@ public class GetStoreFileRequestHandler extends SimpleChannelInboundHandler<GetS
private final FileSystemAbstraction fs;
private final Log log;

public GetStoreFileRequestHandler( CatchupServerProtocol protocol, Supplier<NeoStoreDataSource> dataSource, Supplier<CheckPointer> checkpointerSupplier,
StoreCopyRequestHandler( CatchupServerProtocol protocol, Supplier<NeoStoreDataSource> dataSource, Supplier<CheckPointer> checkpointerSupplier,
StoreFileStreamingProtocol storeFileStreamingProtocol, PageCache pageCache, FileSystemAbstraction fs, LogProvider logProvider )
{
this.protocol = protocol;
Expand All @@ -63,32 +66,38 @@ public GetStoreFileRequestHandler( CatchupServerProtocol protocol, Supplier<NeoS
this.storeFileStreamingProtocol = storeFileStreamingProtocol;
this.pageCache = pageCache;
this.fs = fs;
this.log = logProvider.getLog( GetStoreFileRequestHandler.class );
this.log = logProvider.getLog( StoreCopyRequestHandler.class );
}

@Override
protected void channelRead0( ChannelHandlerContext ctx, GetStoreFileRequest fileRequest ) throws Exception
protected void channelRead0( ChannelHandlerContext ctx, T request ) throws Exception
{
log.debug( "Requesting file %s", fileRequest.file() );
log.debug( "Handling request %s", request );
StoreCopyFinishedResponse.Status responseStatus = StoreCopyFinishedResponse.Status.E_UNKNOWN;
try
{
NeoStoreDataSource neoStoreDataSource = dataSource.get();
if ( !hasSameStoreId( fileRequest.expectedStoreId(), neoStoreDataSource ) )
if ( !hasSameStoreId( request.expectedStoreId(), neoStoreDataSource ) )
{
responseStatus = StoreCopyFinishedResponse.Status.E_STORE_ID_MISMATCH;
}
else if ( !isTransactionWithinReach( fileRequest.requiredTransactionId(), checkpointerSupplier.get() ) )
else if ( !isTransactionWithinReach( request.requiredTransactionId(), checkpointerSupplier.get() ) )
{
responseStatus = StoreCopyFinishedResponse.Status.E_TOO_FAR_BEHIND;
}
else
{
File storeDir = neoStoreDataSource.getStoreDir();
StoreFileMetadata storeFileMetadata = findFile( fileRequest.file().getName() );
storeFileStreamingProtocol.stream( ctx,
new StoreResource( storeFileMetadata.file(), relativePath( storeDir, storeFileMetadata.file() ), storeFileMetadata.recordSize(),
pageCache, fs ) );
try ( ResourceIterator<StoreFileMetadata> resourceIterator = files( request, neoStoreDataSource ) )
{
while ( resourceIterator.hasNext() )
{
StoreFileMetadata storeFileMetadata = resourceIterator.next();
storeFileStreamingProtocol.stream( ctx,
new StoreResource( storeFileMetadata.file(), relativePath( storeDir, storeFileMetadata.file() ), storeFileMetadata.recordSize(),
pageCache, fs ) );
}
}
responseStatus = StoreCopyFinishedResponse.Status.SUCCESS;
}
}
Expand All @@ -99,25 +108,55 @@ else if ( !isTransactionWithinReach( fileRequest.requiredTransactionId(), checkp
}
}

private StoreFileMetadata findFile( String fileName ) throws IOException
{
try ( ResourceIterator<StoreFileMetadata> resourceIterator = dataSource.get().listStoreFiles( false ) )
{
return onlyOne( resourceIterator.stream().filter( matchesRequested( fileName ) ).collect( Collectors.toList() ), fileName );
}
}
abstract ResourceIterator<StoreFileMetadata> files( T request, NeoStoreDataSource neoStoreDataSource ) throws IOException;

private StoreFileMetadata onlyOne( List<StoreFileMetadata> files, String description )
private static Iterator<StoreFileMetadata> onlyOne( List<StoreFileMetadata> files, String description )
{
if ( files.size() != 1 )
{
throw new IllegalStateException( format( "Expected exactly one file '%s'. Got %d", description, files.size() ) );
}
return files.get( 0 );
return files.iterator();
}

private static Predicate<StoreFileMetadata> matchesRequested( String fileName )
{
return f -> f.file().getName().equals( fileName );
}

public static class GetStoreFileRequestHandler extends StoreCopyRequestHandler<GetStoreFileRequest>
{
public GetStoreFileRequestHandler( CatchupServerProtocol protocol, Supplier<NeoStoreDataSource> dataSource, Supplier<CheckPointer> checkpointerSupplier,
StoreFileStreamingProtocol storeFileStreamingProtocol, PageCache pageCache, FileSystemAbstraction fs, LogProvider logProvider )
{
super( protocol, dataSource, checkpointerSupplier, storeFileStreamingProtocol, pageCache, fs, logProvider );
}

@Override
ResourceIterator<StoreFileMetadata> files( GetStoreFileRequest request, NeoStoreDataSource neoStoreDataSource ) throws IOException
{
try ( ResourceIterator<StoreFileMetadata> resourceIterator = neoStoreDataSource.listStoreFiles( false ) )
{
String fileName = request.file().getName();
return Iterators.asResourceIterator(
onlyOne( resourceIterator.stream().filter( matchesRequested( fileName ) ).collect( Collectors.toList() ), fileName ) );
}
}
}

public static class GetIndexSnapshotRequestHandler extends StoreCopyRequestHandler<GetIndexFilesRequest>
{
public GetIndexSnapshotRequestHandler( CatchupServerProtocol protocol, Supplier<NeoStoreDataSource> dataSource,
Supplier<CheckPointer> checkpointerSupplier, StoreFileStreamingProtocol storeFileStreamingProtocol, PageCache pageCache,
FileSystemAbstraction fs, LogProvider logProvider )
{
super( protocol, dataSource, checkpointerSupplier, storeFileStreamingProtocol, pageCache, fs, logProvider );
}

@Override
ResourceIterator<StoreFileMetadata> files( GetIndexFilesRequest request, NeoStoreDataSource neoStoreDataSource ) throws IOException
{
return neoStoreDataSource.getNeoStoreFileListing().getNeoStoreFileIndexListing().getSnapshot( request.indexId() );
}
}
}
@@ -0,0 +1,29 @@
/*
* Copyright (c) 2002-2018 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.causalclustering.messaging;

import org.neo4j.causalclustering.identity.StoreId;

public interface StoreCopyRequest extends CatchUpRequest
{
long requiredTransactionId();

StoreId expectedStoreId();
}

0 comments on commit 098a8f9

Please sign in to comment.