diff --git a/enterprise/com/src/main/java/org/neo4j/com/Protocol.java b/enterprise/com/src/main/java/org/neo4j/com/Protocol.java index e81236def0d36..05e914c828613 100644 --- a/enterprise/com/src/main/java/org/neo4j/com/Protocol.java +++ b/enterprise/com/src/main/java/org/neo4j/com/Protocol.java @@ -19,11 +19,6 @@ */ package org.neo4j.com; -import java.io.IOException; -import java.nio.ByteBuffer; -import java.util.LinkedList; -import java.util.List; - import org.jboss.netty.buffer.ChannelBuffer; import org.jboss.netty.channel.Channel; import org.jboss.netty.channel.ChannelPipeline; @@ -31,6 +26,11 @@ import org.jboss.netty.handler.codec.frame.LengthFieldPrepender; import org.jboss.netty.handler.queue.BlockingReadHandler; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.LinkedList; +import java.util.List; + import org.neo4j.com.storecopy.StoreWriter; import org.neo4j.helpers.collection.Visitor; import org.neo4j.kernel.NeoStoreDataSource; @@ -293,7 +293,9 @@ public Void read( ChannelBuffer buffer, ByteBuffer temporaryBuffer ) throws IOEx { String path = readString( buffer, pathLength ); boolean hasData = buffer.readByte() == 1; - writer.write( path, hasData ? new BlockLogReader( buffer ) : null, temporaryBuffer, hasData ); + int recordSize = buffer.readInt(); + writer.write( path, hasData ? new BlockLogReader( buffer ) : null, temporaryBuffer, hasData, + recordSize ); } writer.close(); return null; diff --git a/enterprise/com/src/main/java/org/neo4j/com/storecopy/StoreCopyClient.java b/enterprise/com/src/main/java/org/neo4j/com/storecopy/StoreCopyClient.java index 9b4223a926d07..84a3933c227bd 100644 --- a/enterprise/com/src/main/java/org/neo4j/com/storecopy/StoreCopyClient.java +++ b/enterprise/com/src/main/java/org/neo4j/com/storecopy/StoreCopyClient.java @@ -333,10 +333,10 @@ private StoreWriter decorateWithProgressIndicator( final StoreWriter actual ) @Override public long write( String path, ReadableByteChannel data, ByteBuffer temporaryBuffer, - boolean hasData ) throws IOException + boolean hasData, int recordSize ) throws IOException { log.info( "Copying %s", path ); - long written = actual.write( path, data, temporaryBuffer, hasData ); + long written = actual.write( path, data, temporaryBuffer, hasData, recordSize ); log.info( "Copied %s %s", path, bytes( written ) ); totalFiles++; return written; diff --git a/enterprise/com/src/main/java/org/neo4j/com/storecopy/StoreCopyServer.java b/enterprise/com/src/main/java/org/neo4j/com/storecopy/StoreCopyServer.java index a9a18859cb6ba..428682178f01f 100644 --- a/enterprise/com/src/main/java/org/neo4j/com/storecopy/StoreCopyServer.java +++ b/enterprise/com/src/main/java/org/neo4j/com/storecopy/StoreCopyServer.java @@ -159,7 +159,9 @@ public RequestContext flushStoresAndStreamStoreFiles( String triggerName, StoreW { while ( files.hasNext() ) { - File file = files.next().file(); + StoreFileMetadata meta = files.next(); + File file = meta.file(); + int recordSize = meta.recordSize(); // Read from paged file if mapping exists. Otherwise read through file system. final Optional optionalPagedFile = pageCache.getExistingMapping( file ); @@ -169,7 +171,7 @@ public RequestContext flushStoresAndStreamStoreFiles( String triggerName, StoreW { monitor.startStreamingStoreFile( file ); writer.write( relativePath( storeDirectory, file ), fileChannel, - temporaryBuffer, file.length() > 0 ); + temporaryBuffer, file.length() > 0, recordSize ); monitor.finishStreamingStoreFile( file ); } finally diff --git a/enterprise/com/src/main/java/org/neo4j/com/storecopy/StoreWriter.java b/enterprise/com/src/main/java/org/neo4j/com/storecopy/StoreWriter.java index 2b5cc286916ff..06388e1406b95 100644 --- a/enterprise/com/src/main/java/org/neo4j/com/storecopy/StoreWriter.java +++ b/enterprise/com/src/main/java/org/neo4j/com/storecopy/StoreWriter.java @@ -29,7 +29,7 @@ public interface StoreWriter extends Closeable // "hasData" is an effect of the block format not supporting a zero length block // whereas a neostore file may actually be 0 bytes we'll have to keep track // of that special case. - long write( String path, ReadableByteChannel data, ByteBuffer temporaryBuffer, boolean hasData ) + long write( String path, ReadableByteChannel data, ByteBuffer temporaryBuffer, boolean hasData, int recordSize ) throws IOException; @Override diff --git a/enterprise/com/src/main/java/org/neo4j/com/storecopy/ToFileStoreWriter.java b/enterprise/com/src/main/java/org/neo4j/com/storecopy/ToFileStoreWriter.java index 21bd2a44b698f..8e278ba7dc907 100644 --- a/enterprise/com/src/main/java/org/neo4j/com/storecopy/ToFileStoreWriter.java +++ b/enterprise/com/src/main/java/org/neo4j/com/storecopy/ToFileStoreWriter.java @@ -49,8 +49,8 @@ public ToFileStoreWriter( File graphDbStoreDir, StoreCopyClient.Monitor monitor, } @Override - public long write( String path, ReadableByteChannel data, ByteBuffer temporaryBuffer, - boolean hasData ) throws IOException + public long write( String path, ReadableByteChannel data, ByteBuffer temporaryBuffer, boolean hasData, + int recordSize ) throws IOException { try { @@ -74,7 +74,8 @@ public long write( String path, ReadableByteChannel data, ByteBuffer temporaryBu } if ( storeType.isPresent() && storeType.get().isRecordStore() ) { - try ( PagedFile pagedFile = pageCache.map( file, pageCache.pageSize(), CREATE, WRITE ) ) + int filePageSize = pageCache.pageSize() - pageCache.pageSize() % recordSize; + try ( PagedFile pagedFile = pageCache.map( file, filePageSize, CREATE, WRITE ) ) { return writeDataThroughPageCache( pagedFile, data, temporaryBuffer, hasData ); } diff --git a/enterprise/com/src/main/java/org/neo4j/com/storecopy/ToNetworkStoreWriter.java b/enterprise/com/src/main/java/org/neo4j/com/storecopy/ToNetworkStoreWriter.java index b2f7bc59635ef..962e907d6ca25 100644 --- a/enterprise/com/src/main/java/org/neo4j/com/storecopy/ToNetworkStoreWriter.java +++ b/enterprise/com/src/main/java/org/neo4j/com/storecopy/ToNetworkStoreWriter.java @@ -45,7 +45,7 @@ public ToNetworkStoreWriter( ChannelBuffer targetBuffer, Monitors monitors ) @Override public long write( String path, ReadableByteChannel data, ByteBuffer temporaryBuffer, - boolean hasData ) throws IOException + boolean hasData, int recordSize ) throws IOException { char[] chars = path.toCharArray(); targetBuffer.writeShort( chars.length ); @@ -56,6 +56,8 @@ public long write( String path, ReadableByteChannel data, ByteBuffer temporaryBu long totalWritten = 2 + chars.length*2 + 1; if ( hasData ) { + targetBuffer.writeInt( recordSize ); + totalWritten += 4; totalWritten += buffer.write( data ); buffer.close();