Skip to content

Commit

Permalink
First draft of getting ReadableByteChannels from PagedFiles
Browse files Browse the repository at this point in the history
  • Loading branch information
chrisvest committed May 16, 2016
1 parent d46e9ba commit f9a287c
Show file tree
Hide file tree
Showing 9 changed files with 195 additions and 22 deletions.
19 changes: 17 additions & 2 deletions community/io/src/main/java/org/neo4j/io/pagecache/PagedFile.java
Expand Up @@ -20,6 +20,7 @@
package org.neo4j.io.pagecache;

import java.io.IOException;
import java.nio.channels.ReadableByteChannel;

/**
* The representation of a file that has been mapped into the associated page cache.
Expand Down Expand Up @@ -105,8 +106,9 @@ public interface PagedFile extends AutoCloseable
* {@link org.neo4j.io.pagecache.PagedFile#PF_SHARED_READ_LOCK}.
* <p>
* The two locking modes cannot be combined, but other intents can be combined with them. For instance, if you want
* to write to a page, but also make sure that you don't write beyond the end of the file, then you can express your
* intent with {@code PF_SHARED_WRITE_LOCK | PF_NO_GROW} – note how the flags are combined with a bitwise-OR operator.
* to write to a page, but also make sure that you don't write beyond the end of the file, then you can express
* your intent with {@code PF_SHARED_WRITE_LOCK | PF_NO_GROW} – note how the flags are combined with a bitwise-OR
* operator.
* Arithmetic addition can also be used, but might not make it as clear that we are dealing with a bit-set.
*
* @param pageId The initial file-page-id, that the cursor will be bound to
Expand All @@ -133,6 +135,7 @@ public interface PagedFile extends AutoCloseable
/**
* Flush all dirty pages into the file channel, and force the file channel to disk, but limit the rate of IO as
* advised by the given IOPSLimiter.
*
* @param limiter The {@link IOLimiter} that determines if pauses or sleeps should be injected into the flushing
* process to keep the IO rate down.
*/
Expand All @@ -159,4 +162,16 @@ public interface PagedFile extends AutoCloseable
* @see AutoCloseable#close()
*/
void close() throws IOException;

/**
* Open a {@link ReadableByteChannel} view of this PagedFile.
* <p>
* The channel will read the file sequentially from the beginning till the end.
* Seeking is not supported.
* <p>
* The channel is not thread-safe.
*
* @return A channel for reading the paged file.
*/
ReadableByteChannel openReadableByteChannel() throws IOException;
}
Expand Up @@ -22,6 +22,7 @@
import java.io.File;
import java.io.Flushable;
import java.io.IOException;
import java.nio.channels.ReadableByteChannel;

import org.neo4j.io.pagecache.IOLimiter;
import org.neo4j.io.pagecache.PageCursor;
Expand Down Expand Up @@ -181,6 +182,12 @@ public void close() throws IOException
pageCache.unmap( this );
}

@Override
public ReadableByteChannel openReadableByteChannel() throws IOException
{
return new PagedReadableByteChannel( this );
}

void closeSwapper() throws IOException
{
if ( !deleteOnClose )
Expand Down
@@ -0,0 +1,87 @@
/*
* Copyright (c) 2002-2016 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.io.pagecache.impl.muninn;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.ReadableByteChannel;

import org.neo4j.io.pagecache.PageCursor;
import org.neo4j.io.pagecache.PagedFile;

final class PagedReadableByteChannel implements ReadableByteChannel
{
private final PageCursor cursor;
private boolean open = true;
private int bytesLeftInCurrentPage;

public PagedReadableByteChannel( PagedFile pagedFile ) throws IOException
{
cursor = pagedFile.io( 0, PagedFile.PF_SHARED_READ_LOCK | PagedFile.PF_READ_AHEAD );
}

@Override
public int read( ByteBuffer dst ) throws IOException
{
if ( !open )
{
throw new ClosedChannelException();
}
if ( bytesLeftInCurrentPage == 0 )
{
if ( cursor.next() )
{
bytesLeftInCurrentPage = cursor.getCurrentPageSize();
}
else
{
return -1;
}
}
int position = dst.position();
int remaining = Math.min( dst.remaining(), bytesLeftInCurrentPage );
int offset = cursor.getOffset();
do
{
dst.position( position );
cursor.setOffset( offset );
for ( int i = 0; i < remaining; i++ )
{
dst.put( cursor.getByte() );
}
}
while ( cursor.shouldRetry() );
bytesLeftInCurrentPage -= remaining;
return remaining;
}

@Override
public boolean isOpen()
{
return open;
}

@Override
public void close() throws IOException
{
open = false;
}
}
Expand Up @@ -21,6 +21,7 @@

import java.io.FileNotFoundException;
import java.io.IOException;
import java.nio.channels.ReadableByteChannel;
import java.util.Objects;

import org.neo4j.adversaries.Adversary;
Expand Down Expand Up @@ -92,4 +93,10 @@ public void close() throws IOException
adversary.injectFailure( FileNotFoundException.class, IOException.class, SecurityException.class );
delegate.close();
}

@Override
public ReadableByteChannel openReadableByteChannel()
{
throw new UnsupportedOperationException( "Not implemented for AdversarialPagedFile" );
}
}
Expand Up @@ -20,6 +20,7 @@
package org.neo4j.io.pagecache;

import java.io.IOException;
import java.nio.channels.ReadableByteChannel;

public class DelegatingPagedFile implements PagedFile
{
Expand Down Expand Up @@ -55,6 +56,12 @@ public void close() throws IOException
delegate.close();
}

@Override
public ReadableByteChannel openReadableByteChannel() throws IOException
{
return delegate.openReadableByteChannel();
}

public void flushAndForce( IOLimiter limiter ) throws IOException
{
delegate.flushAndForce( limiter );
Expand Down
Expand Up @@ -32,6 +32,8 @@
import java.io.PrintStream;
import java.lang.ref.WeakReference;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.ReadableByteChannel;
import java.nio.file.NoSuchFileException;
import java.nio.file.OpenOption;
import java.nio.file.StandardOpenOption;
Expand All @@ -42,7 +44,6 @@
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
Expand All @@ -55,19 +56,14 @@
import org.neo4j.io.fs.FileSystemAbstraction;
import org.neo4j.io.fs.StoreChannel;
import org.neo4j.io.pagecache.impl.SingleFilePageSwapperFactory;
import org.neo4j.io.pagecache.randomharness.PageCountRecordFormat;
import org.neo4j.io.pagecache.randomharness.Phase;
import org.neo4j.io.pagecache.randomharness.RandomPageCacheTestHarness;
import org.neo4j.io.pagecache.randomharness.Record;
import org.neo4j.io.pagecache.randomharness.RecordFormat;
import org.neo4j.io.pagecache.randomharness.StandardRecordFormat;
import org.neo4j.io.pagecache.tracing.DefaultPageCacheTracer;
import org.neo4j.io.pagecache.tracing.PageCacheTracer;
import org.neo4j.io.pagecache.tracing.PinEvent;
import org.neo4j.test.RepeatRule;

import static java.lang.Long.toHexString;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static org.hamcrest.Matchers.both;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
Expand All @@ -84,14 +80,6 @@
import static org.neo4j.io.pagecache.PagedFile.PF_NO_GROW;
import static org.neo4j.io.pagecache.PagedFile.PF_SHARED_READ_LOCK;
import static org.neo4j.io.pagecache.PagedFile.PF_SHARED_WRITE_LOCK;
import static org.neo4j.io.pagecache.randomharness.Command.FlushCache;
import static org.neo4j.io.pagecache.randomharness.Command.FlushFile;
import static org.neo4j.io.pagecache.randomharness.Command.MapFile;
import static org.neo4j.io.pagecache.randomharness.Command.ReadMulti;
import static org.neo4j.io.pagecache.randomharness.Command.ReadRecord;
import static org.neo4j.io.pagecache.randomharness.Command.UnmapFile;
import static org.neo4j.io.pagecache.randomharness.Command.WriteMulti;
import static org.neo4j.io.pagecache.randomharness.Command.WriteRecord;
import static org.neo4j.test.ByteArrayMatcher.byteArray;
import static org.neo4j.test.ThreadTestUtils.fork;

Expand Down Expand Up @@ -3955,7 +3943,7 @@ public void openingLinkedCursorMustCloseExistingLinkedCursor() throws Exception
assertTrue( linked.next() );
writeRecords( parent );
writeRecords( linked );
PageCursor secondLinked = parent.openLinkedCursor( 2 );
parent.openLinkedCursor( 2 );

// should cause out of bounds condition because it should be closed by our opening of another linked cursor
linked.putByte( 0, (byte) 1 );
Expand All @@ -3969,7 +3957,7 @@ public void openingLinkedCursorMustCloseExistingLinkedCursor() throws Exception
PageCursor linked = parent.openLinkedCursor( 1 );
assertTrue( parent.next() );
assertTrue( linked.next() );
PageCursor secondLinked = parent.openLinkedCursor( 2 );
parent.openLinkedCursor( 2 );

// should cause out of bounds condition because it should be closed by our opening of another linked cursor
linked.getByte( 0 );
Expand Down Expand Up @@ -4012,5 +4000,47 @@ public void checkAndClearBoundsFlagMustCheckAndClearLinkedCursor() throws Except
}
}

// TODO paged file must be able to present itself as a ReadableByteChannel
@Test
public void readableByteChannelMustBeOpenUntilClosed() throws Exception
{
getPageCache( fs, maxPages, pageCachePageSize, PageCacheTracer.NULL );
try ( PagedFile pf = pageCache.map( file( "a" ), filePageSize ) )
{
ReadableByteChannel channel;
try ( ReadableByteChannel ch = pf.openReadableByteChannel() )
{
assertTrue( ch.isOpen() );
channel = ch;
}
assertFalse( channel.isOpen() );
}
}

@Test
public void readableByteChannelMustReadAllBytesInFile() throws Exception
{
File file = file( "a" );
generateFileWithRecords( file, recordCount, recordSize );
getPageCache( fs, maxPages, pageCachePageSize, PageCacheTracer.NULL );
try ( PagedFile pf = pageCache.map( file, filePageSize );
ReadableByteChannel channel = pf.openReadableByteChannel() )
{
verifyRecordsInFile( channel, recordCount );
}
}

@Test( expected = ClosedChannelException.class )
public void readingFromClosedReadableByteChannelMustThrow() throws Exception
{
File file = file( "a" );
generateFileWithRecords( file, recordCount, recordSize );
getPageCache( fs, maxPages, pageCachePageSize, PageCacheTracer.NULL );
try ( PagedFile pf = pageCache.map( file, filePageSize ) )
{
ReadableByteChannel channel = pf.openReadableByteChannel();
channel.close();
channel.read( ByteBuffer.allocate( recordSize ) );
fail( "That read should have thrown" );
}
}
}
Expand Up @@ -24,11 +24,11 @@
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.internal.AssumptionViolatedException;

import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.ReadableByteChannel;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.function.Function;
Expand Down Expand Up @@ -257,7 +257,14 @@ protected static void generateRecordForId( long id, ByteBuffer buf )

protected void verifyRecordsInFile( File file, int recordCount ) throws IOException
{
StoreChannel channel = fs.open( file, "r" );
try ( StoreChannel channel = fs.open( file, "r" ) )
{
verifyRecordsInFile( channel, recordCount );
}
}

protected void verifyRecordsInFile( ReadableByteChannel channel, int recordCount ) throws IOException
{
ByteBuffer buf = ByteBuffer.allocate( recordSize );
ByteBuffer observation = ByteBuffer.allocate( recordSize );
for ( int i = 0; i < recordCount; i++ )
Expand All @@ -267,7 +274,6 @@ protected void verifyRecordsInFile( File file, int recordCount ) throws IOExcept
channel.read( observation );
assertRecord( i, observation, buf );
}
channel.close();
}

protected Runnable $close( final PagedFile file )
Expand Down
Expand Up @@ -20,6 +20,7 @@
package org.neo4j.io.pagecache;

import java.io.IOException;
import java.nio.channels.ReadableByteChannel;

public class StubPagedFile implements PagedFile
{
Expand Down Expand Up @@ -71,4 +72,10 @@ public long getLastPageId() throws IOException
public void close() throws IOException
{
}

@Override
public ReadableByteChannel openReadableByteChannel()
{
throw new UnsupportedOperationException( "Not implemented for StubPagedFile" );
}
}
Expand Up @@ -21,6 +21,7 @@

import java.io.File;
import java.io.IOException;
import java.nio.channels.ReadableByteChannel;

import org.neo4j.io.pagecache.IOLimiter;
import org.neo4j.io.pagecache.PageCursor;
Expand Down Expand Up @@ -79,6 +80,12 @@ public void close() throws IOException
actual.close();
}

@Override
public ReadableByteChannel openReadableByteChannel() throws IOException
{
return actual.openReadableByteChannel();
}

public int ioCalls()
{
return ioCalls;
Expand Down

0 comments on commit f9a287c

Please sign in to comment.