Skip to content

Commit

Permalink
Added record size to store copy protocol
Browse files Browse the repository at this point in the history
  • Loading branch information
burqen authored and chrisvest committed Sep 16, 2016
1 parent abfb3c0 commit 19030cf
Show file tree
Hide file tree
Showing 6 changed files with 22 additions and 15 deletions.
14 changes: 8 additions & 6 deletions enterprise/com/src/main/java/org/neo4j/com/Protocol.java
Expand Up @@ -19,18 +19,18 @@
*/
package org.neo4j.com;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.LinkedList;
import java.util.List;

import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.handler.codec.frame.LengthFieldBasedFrameDecoder;
import org.jboss.netty.handler.codec.frame.LengthFieldPrepender;
import org.jboss.netty.handler.queue.BlockingReadHandler;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.LinkedList;
import java.util.List;

import org.neo4j.com.storecopy.StoreWriter;
import org.neo4j.helpers.collection.Visitor;
import org.neo4j.kernel.NeoStoreDataSource;
Expand Down Expand Up @@ -293,7 +293,9 @@ public Void read( ChannelBuffer buffer, ByteBuffer temporaryBuffer ) throws IOEx
{
String path = readString( buffer, pathLength );
boolean hasData = buffer.readByte() == 1;
writer.write( path, hasData ? new BlockLogReader( buffer ) : null, temporaryBuffer, hasData );
int recordSize = buffer.readInt();
writer.write( path, hasData ? new BlockLogReader( buffer ) : null, temporaryBuffer, hasData,
recordSize );
}
writer.close();
return null;
Expand Down
Expand Up @@ -333,10 +333,10 @@ private StoreWriter decorateWithProgressIndicator( final StoreWriter actual )

@Override
public long write( String path, ReadableByteChannel data, ByteBuffer temporaryBuffer,
boolean hasData ) throws IOException
boolean hasData, int recordSize ) throws IOException
{
log.info( "Copying %s", path );
long written = actual.write( path, data, temporaryBuffer, hasData );
long written = actual.write( path, data, temporaryBuffer, hasData, recordSize );
log.info( "Copied %s %s", path, bytes( written ) );
totalFiles++;
return written;
Expand Down
Expand Up @@ -159,7 +159,9 @@ public RequestContext flushStoresAndStreamStoreFiles( String triggerName, StoreW
{
while ( files.hasNext() )
{
File file = files.next().file();
StoreFileMetadata meta = files.next();
File file = meta.file();
int recordSize = meta.recordSize();

// Read from paged file if mapping exists. Otherwise read through file system.
final Optional<PagedFile> optionalPagedFile = pageCache.getExistingMapping( file );
Expand All @@ -169,7 +171,7 @@ public RequestContext flushStoresAndStreamStoreFiles( String triggerName, StoreW
{
monitor.startStreamingStoreFile( file );
writer.write( relativePath( storeDirectory, file ), fileChannel,
temporaryBuffer, file.length() > 0 );
temporaryBuffer, file.length() > 0, recordSize );
monitor.finishStreamingStoreFile( file );
}
finally
Expand Down
Expand Up @@ -29,7 +29,7 @@ public interface StoreWriter extends Closeable
// "hasData" is an effect of the block format not supporting a zero length block
// whereas a neostore file may actually be 0 bytes we'll have to keep track
// of that special case.
long write( String path, ReadableByteChannel data, ByteBuffer temporaryBuffer, boolean hasData )
long write( String path, ReadableByteChannel data, ByteBuffer temporaryBuffer, boolean hasData, int recordSize )
throws IOException;

@Override
Expand Down
Expand Up @@ -49,8 +49,8 @@ public ToFileStoreWriter( File graphDbStoreDir, StoreCopyClient.Monitor monitor,
}

@Override
public long write( String path, ReadableByteChannel data, ByteBuffer temporaryBuffer,
boolean hasData ) throws IOException
public long write( String path, ReadableByteChannel data, ByteBuffer temporaryBuffer, boolean hasData,
int recordSize ) throws IOException
{
try
{
Expand All @@ -74,7 +74,8 @@ public long write( String path, ReadableByteChannel data, ByteBuffer temporaryBu
}
if ( storeType.isPresent() && storeType.get().isRecordStore() )
{
try ( PagedFile pagedFile = pageCache.map( file, pageCache.pageSize(), CREATE, WRITE ) )
int filePageSize = pageCache.pageSize() - pageCache.pageSize() % recordSize;
try ( PagedFile pagedFile = pageCache.map( file, filePageSize, CREATE, WRITE ) )
{
return writeDataThroughPageCache( pagedFile, data, temporaryBuffer, hasData );
}
Expand Down
Expand Up @@ -45,7 +45,7 @@ public ToNetworkStoreWriter( ChannelBuffer targetBuffer, Monitors monitors )

@Override
public long write( String path, ReadableByteChannel data, ByteBuffer temporaryBuffer,
boolean hasData ) throws IOException
boolean hasData, int recordSize ) throws IOException
{
char[] chars = path.toCharArray();
targetBuffer.writeShort( chars.length );
Expand All @@ -56,6 +56,8 @@ public long write( String path, ReadableByteChannel data, ByteBuffer temporaryBu
long totalWritten = 2 + chars.length*2 + 1;
if ( hasData )
{
targetBuffer.writeInt( recordSize );
totalWritten += 4;
totalWritten += buffer.write( data );
buffer.close();

Expand Down

0 comments on commit 19030cf

Please sign in to comment.