Skip to content

Commit

Permalink
Ephemeral file system file buffer allocation simplification
Browse files Browse the repository at this point in the history
Update ephemeral file system to use 1KB buffer for new file by default.
In case if file data size over exceed available buffer capacity it will be doubled till enough space will be available.
  • Loading branch information
MishaDemianenko committed Nov 3, 2016
1 parent cc45c98 commit 8ddee5a
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 79 deletions.
Expand Up @@ -47,13 +47,15 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Function;
Expand All @@ -62,6 +64,7 @@
import java.util.zip.ZipEntry;
import java.util.zip.ZipOutputStream;

import org.neo4j.io.ByteUnit;
import org.neo4j.io.fs.FileSystemAbstraction;
import org.neo4j.io.fs.StoreChannel;
import org.neo4j.io.fs.StoreFileChannel;
Expand Down Expand Up @@ -136,9 +139,8 @@ public synchronized void shutdown()
{
for ( EphemeralFileData file : files.values() )
{
free( file );
file.free();
}

files.clear();

for ( ThirdPartyFileSystem thirdPartyFileSystem : thirdPartyFileSystems.values() )
Expand Down Expand Up @@ -240,14 +242,6 @@ private void addRecursively( ZipOutputStream output, File input ) throws IOExcep
}
}

private void free( EphemeralFileData file )
{
if ( file != null )
{
file.fileAsBuffer.free();
}
}

@Override
public synchronized StoreChannel open( File fileName, String mode ) throws IOException
{
Expand Down Expand Up @@ -295,7 +289,7 @@ public synchronized StoreChannel create( File fileName ) throws IOException
}

EphemeralFileData data = new EphemeralFileData( clock );
free( files.put( canonicalFile( fileName ), data ) );
Optional.ofNullable( files.put( canonicalFile( fileName ), data ) ).ifPresent( EphemeralFileData::free );
return new StoreFileChannel(
new EphemeralFileChannel( data, new FileStillOpenException( fileName.getPath() ) ) );
}
Expand Down Expand Up @@ -364,9 +358,9 @@ public boolean deleteFile( File fileName )
{
fileName = canonicalFile( fileName );
EphemeralFileData removed = files.remove( fileName );
free( removed );
if ( removed != null )
{
removed.free();
return true;
}
else
Expand Down Expand Up @@ -546,7 +540,7 @@ public long checksum()
List<File> names = new ArrayList<>( files.size() );
names.addAll( files.keySet() );

Collections.sort( names, ( o1, o2 ) -> o1.getAbsolutePath().compareTo( o2.getAbsolutePath() ) );
names.sort( Comparator.comparing( File::getAbsolutePath ) );

for ( File name : names )
{
Expand All @@ -565,7 +559,7 @@ public long checksum()

private ByteBuffer newCopyBuffer()
{
return ByteBuffer.allocate( 1024 * 1024 );
return ByteBuffer.allocate( (int) ByteUnit.mebiBytes( 1 ) );
}

private void copyRecursivelyFromOtherFs( File from, FileSystemAbstraction fromFs, File to, ByteBuffer buffer )
Expand Down Expand Up @@ -666,7 +660,7 @@ static class LocalPosition implements Positionable
{
private long position;

public LocalPosition( long position )
LocalPosition( long position )
{
this.position = position;
}
Expand Down Expand Up @@ -798,7 +792,7 @@ public long transferFrom( ReadableByteChannel src, long position, long count ) t
try
{
long transferred = 0;
ByteBuffer intermediary = ByteBuffer.allocateDirect( 8096 );
ByteBuffer intermediary = ByteBuffer.allocateDirect( (int) ByteUnit.mebiBytes( 8 ) );
while ( transferred < count )
{
intermediary.clear();
Expand Down Expand Up @@ -888,14 +882,8 @@ public void pos( long position )

private static class EphemeralFileData
{
private static final ThreadLocal<byte[]> SCRATCH_PAD = new ThreadLocal<byte[]>()
{
@Override
protected byte[] initialValue()
{
return new byte[1024];
}
};
private static final ThreadLocal<byte[]> SCRATCH_PAD =
ThreadLocal.withInitial( () -> new byte[(int) ByteUnit.kibiBytes( 1 )] );
private DynamicByteBuffer fileAsBuffer;
private DynamicByteBuffer forcedBuffer;
private final Collection<WeakReference<EphemeralFileChannel>> channels = new LinkedList<>();
Expand Down Expand Up @@ -978,6 +966,11 @@ synchronized EphemeralFileData copy()
return copy;
}

void free()
{
fileAsBuffer.free();
}

void open( EphemeralFileChannel channel )
{
synchronized ( channels )
Expand Down Expand Up @@ -1103,16 +1096,15 @@ public void release() throws IOException
* Dynamically expanding ByteBuffer substitute/wrapper. This will allocate ByteBuffers on the go
* so that we don't have to allocate too big of a buffer up-front.
*/
private static class DynamicByteBuffer
static class DynamicByteBuffer
{
private static final int[] SIZES;
private static final byte[] zeroBuffer = new byte[1024];
private static final byte[] zeroBuffer = new byte[(int) ByteUnit.kibiBytes( 1 )];
private ByteBuffer buf;
private Exception freeCall;

public DynamicByteBuffer()
{
buf = allocate( 0 );
buf = allocate( ByteUnit.kibiBytes( 1 ) );
}

public ByteBuffer buf()
Expand All @@ -1124,43 +1116,15 @@ public ByteBuffer buf()
/** This is a copying constructor, the input buffer is just read from, never stored in 'this'. */
private DynamicByteBuffer( ByteBuffer toClone )
{
int sizeIndex = sizeIndexFor( toClone.capacity() );
buf = allocate( sizeIndex );
buf = allocate( toClone.capacity() );
copyByteBufferContents( toClone, buf );
}

private static int sizeIndexFor( int capacity )
{
// Double size each time, but after 1M only increase by 1M at a time, until required amount is reached.
int sizeIndex = capacity / SIZES[SIZES.length - 1];
if ( sizeIndex == 0 )
{
for (; sizeIndex < SIZES.length; sizeIndex++ )
{
if ( capacity == SIZES[sizeIndex] )
{
break;
}
}
}
else
{
sizeIndex += SIZES.length - 1;
}
return sizeIndex;
}

synchronized DynamicByteBuffer copy()
{
return new DynamicByteBuffer( buf() ); // invoke "copy constructor"
}

static
{
int K = 1024;
SIZES = new int[]{64 * K, 128 * K, 256 * K, 512 * K, 1024 * K};
}

private void copyByteBufferContents( ByteBuffer from, ByteBuffer to )
{
int positionBefore = from.position();
Expand All @@ -1176,24 +1140,17 @@ private void copyByteBufferContents( ByteBuffer from, ByteBuffer to )
}
}

/**
* Tries to allocate a buffer of at least the specified size.
* If no free buffers are available of the available capacity, we
* check for buffers up to two sizes larger. If still no buffers
* are found we allocate a new buffer of the specified size.
*/
private ByteBuffer allocate( int sizeIndex )
private ByteBuffer allocate( long capacity )
{
int capacity = capacity( sizeIndex );
try
{
return ByteBuffer.allocateDirect( capacity );
return ByteBuffer.allocateDirect( Math.toIntExact( capacity ) );
}
catch ( OutOfMemoryError oom )
{
try
{
return ByteBuffer.allocate( capacity );
return ByteBuffer.allocate( Math.toIntExact( capacity ) );
}
catch ( OutOfMemoryError secondOom )
{
Expand All @@ -1203,12 +1160,6 @@ private ByteBuffer allocate( int sizeIndex )
}
}

private int capacity( int sizeIndex )
{
return (sizeIndex < SIZES.length) ?
SIZES[sizeIndex] : ((sizeIndex - SIZES.length + 1) * SIZES[SIZES.length - 1]);
}

void free()
{
assertNotFreed();
Expand Down Expand Up @@ -1273,16 +1224,14 @@ private void verifySize( int totalAmount )

// Double size each time, but after 1M only increase by 1M at a time, until required amount is reached.
int newSize = buf.capacity();
int sizeIndex = sizeIndexFor( newSize );
while ( capacity( sizeIndex ) < totalAmount )
while ( newSize < totalAmount )
{
newSize += Math.min( newSize, 1024 * 1024 );
sizeIndex++;
newSize = newSize << 1;
}
int oldPosition = buf.position();

// allocate new buffer
ByteBuffer newBuf = allocate( sizeIndex );
ByteBuffer newBuf = allocate( newSize );

// copy contents of current buffer into new buffer
buf.position( 0 );
Expand Down
Expand Up @@ -53,7 +53,7 @@ public void setUp()
}

@Test
public void allowStoreThatExceedPredefinedSizes() throws IOException
public void allowStoreThatExceedDefaultSize() throws IOException
{
File aFile = new File( "test" );
StoreChannel channel = fs.open( aFile, "rw" );
Expand All @@ -70,6 +70,27 @@ public void allowStoreThatExceedPredefinedSizes() throws IOException
channel.close();
}

@Test
public void growEphemeralFileBuffer()
{
EphemeralFileSystemAbstraction.DynamicByteBuffer byteBuffer =
new EphemeralFileSystemAbstraction.DynamicByteBuffer();

byte[] testBytes = {1, 2, 3, 4};
int length = testBytes.length;
byteBuffer.put( 0, testBytes, 0, length );
assertEquals( (int) ByteUnit.kibiBytes( 1 ), byteBuffer.buf().capacity() );

byteBuffer.put( (int) (ByteUnit.kibiBytes( 1 ) + 2), testBytes, 0, length );
assertEquals( (int) ByteUnit.kibiBytes( 2 ), byteBuffer.buf().capacity() );

byteBuffer.put( (int) (ByteUnit.kibiBytes( 5 ) + 2), testBytes, 0, length );
assertEquals( (int) ByteUnit.kibiBytes( 8 ), byteBuffer.buf().capacity() );

byteBuffer.put( (int) (ByteUnit.mebiBytes( 2 ) + 2), testBytes, 0, length );
assertEquals( (int) ByteUnit.mebiBytes( 4 ), byteBuffer.buf().capacity() );
}

@Test
public void shouldNotLoseDataForcedBeforeFileSystemCrashes() throws Exception
{
Expand Down

0 comments on commit 8ddee5a

Please sign in to comment.