Skip to content

Commit

Permalink
NumberArray backed by PageCache
Browse files Browse the repository at this point in the history
  • Loading branch information
tinwelint authored and chrisvest committed Jul 13, 2017
1 parent 31ec4c9 commit 2cb67a6
Show file tree
Hide file tree
Showing 8 changed files with 404 additions and 24 deletions.
Expand Up @@ -48,18 +48,6 @@ public void set( long index, int value )
at( index ).set( index, value ); at( index ).set( index, value );
} }


@Override
public void swap( long fromIndex, long toIndex, int numberOfEntries )
{
// Let's just do this the stupid way. There's room for optimization here
for ( int i = 0; i < numberOfEntries; i++ )
{
int intermediary = get( fromIndex + i );
set( fromIndex + i, get( toIndex + i ) );
set( toIndex + i, intermediary );
}
}

@Override @Override
protected IntArray addChunk( long chunkSize, long base ) protected IntArray addChunk( long chunkSize, long base )
{ {
Expand Down
Expand Up @@ -48,18 +48,6 @@ public void set( long index, long value )
at( index ).set( index, value ); at( index ).set( index, value );
} }


@Override
public void swap( long fromIndex, long toIndex, int numberOfEntries )
{
// Let's just do this the stupid way. There's room for optimization here
for ( int i = 0; i < numberOfEntries; i++ )
{
long intermediary = get( fromIndex + i );
set( fromIndex + i, get( toIndex + i ) );
set( toIndex + i, intermediary );
}
}

@Override @Override
protected LongArray addChunk( long chunkSize, long base ) protected LongArray addChunk( long chunkSize, long base )
{ {
Expand Down
Expand Up @@ -30,4 +30,16 @@ public interface IntArray extends NumberArray<IntArray>
int get( long index ); int get( long index );


void set( long index, int value ); void set( long index, int value );

@Override
default void swap( long fromIndex, long toIndex, int numberOfEntries )
{
// Let's just do this the stupid way. There's room for optimization here
for ( int i = 0; i < numberOfEntries; i++ )
{
int intermediary = get( fromIndex+i );
set( fromIndex+i, get( toIndex+i ) );
set( toIndex+i, intermediary );
}
}
} }
Expand Up @@ -30,4 +30,16 @@ public interface LongArray extends NumberArray<LongArray>
long get( long index ); long get( long index );


void set( long index, long value ); void set( long index, long value );

@Override
default void swap( long fromIndex, long toIndex, int numberOfEntries )
{
// Let's just do this the stupid way. There's room for optimization here
for ( int i = 0; i < numberOfEntries; i++ )
{
long intermediary = get( fromIndex+i );
set( fromIndex+i, get( toIndex+i ) );
set( toIndex+i, intermediary );
}
}
} }
@@ -0,0 +1,84 @@
/*
* Copyright (c) 2002-2017 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.unsafe.impl.batchimport.cache;

import java.io.IOException;

import org.neo4j.io.pagecache.PagedFile;

import static java.lang.Math.toIntExact;

public class PageCacheIntArray extends PageCacheNumberArray<IntArray> implements IntArray
{
private static final int ENTRY_SIZE = Integer.BYTES;
private static final int ENTRIES_PER_PAGE = PAGE_SIZE / ENTRY_SIZE;

public PageCacheIntArray( PagedFile pagedFile ) throws IOException
{
super( pagedFile, ENTRIES_PER_PAGE, ENTRY_SIZE );
}

@Override
public int get( long index )
{
long pageId = index / ENTRIES_PER_PAGE;
int offset = toIntExact( index % ENTRIES_PER_PAGE ) * ENTRY_SIZE;
if ( writeCursor.getCurrentPageId() == pageId )
{
// We have to read from the write cursor, since the write cursor is on it
return writeCursor.getInt( offset );
}

// Go ahead and read from the read cursor
try
{
goTo( readCursor, pageId );
int result;
do
{
result = readCursor.getInt( offset );
}
while ( readCursor.shouldRetry() );
checkBounds( readCursor );
return result;
}
catch ( IOException e )
{
throw new RuntimeException( e );
}
}

@Override
public void set( long index, int value )
{
long pageId = index / ENTRIES_PER_PAGE;
int offset = toIntExact( index % ENTRIES_PER_PAGE ) * ENTRY_SIZE;
try
{
goTo( writeCursor, pageId );
writeCursor.putInt( offset, value );
checkBounds( writeCursor );
}
catch ( IOException e )
{
throw new RuntimeException( e );
}
}
}
@@ -0,0 +1,84 @@
/*
* Copyright (c) 2002-2017 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.unsafe.impl.batchimport.cache;

import java.io.IOException;

import org.neo4j.io.pagecache.PagedFile;

import static java.lang.Math.toIntExact;

public class PageCacheLongArray extends PageCacheNumberArray<LongArray> implements LongArray
{
private static final int ENTRY_SIZE = Long.BYTES;
private static final int ENTRIES_PER_PAGE = PAGE_SIZE / ENTRY_SIZE;

public PageCacheLongArray( PagedFile pagedFile ) throws IOException
{
super( pagedFile, ENTRIES_PER_PAGE, ENTRY_SIZE );
}

@Override
public long get( long index )
{
long pageId = index / ENTRIES_PER_PAGE;
int offset = toIntExact( index % ENTRIES_PER_PAGE ) * ENTRY_SIZE;
if ( writeCursor.getCurrentPageId() == pageId )
{
// We have to read from the write cursor, since the write cursor is on it
return writeCursor.getLong( offset );
}

// Go ahead and read from the read cursor
try
{
goTo( readCursor, pageId );
long result;
do
{
result = readCursor.getLong( offset );
}
while ( readCursor.shouldRetry() );
checkBounds( readCursor );
return result;
}
catch ( IOException e )
{
throw new RuntimeException( e );
}
}

@Override
public void set( long index, long value )
{
long pageId = index / ENTRIES_PER_PAGE;
int offset = toIntExact( index % ENTRIES_PER_PAGE ) * ENTRY_SIZE;
try
{
goTo( writeCursor, pageId );
writeCursor.putLong( offset, value );
checkBounds( writeCursor );
}
catch ( IOException e )
{
throw new RuntimeException( e );
}
}
}
@@ -0,0 +1,109 @@
/*
* Copyright (c) 2002-2017 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.unsafe.impl.batchimport.cache;

import java.io.IOException;

import org.neo4j.io.pagecache.PageCursor;
import org.neo4j.io.pagecache.PagedFile;

import static org.neo4j.io.ByteUnit.kibiBytes;

public abstract class PageCacheNumberArray<N extends NumberArray<N>> implements NumberArray<N>
{
static final int PAGE_SIZE = (int) kibiBytes( 8 );

protected final PagedFile pagedFile;
protected final PageCursor readCursor;
protected final PageCursor writeCursor;
private final int entriesPerPage;
private final int entrySize;

public PageCacheNumberArray( PagedFile pagedFile, int entriesPerPage, int entrySize ) throws IOException
{
this.pagedFile = pagedFile;
this.entriesPerPage = entriesPerPage;
this.entrySize = entrySize;
this.readCursor = pagedFile.io( 0, PagedFile.PF_SHARED_READ_LOCK );
this.writeCursor = pagedFile.io( 0, PagedFile.PF_SHARED_WRITE_LOCK );
goTo( writeCursor, 0 );
}

@Override
public long length()
{
try
{
return pagedFile.getLastPageId() * entriesPerPage;
}
catch ( IOException e )
{
throw new RuntimeException( e );
}
}

@Override
public void clear()
{
}

@Override
public void close()
{
readCursor.close();
writeCursor.close();
try
{
pagedFile.close();
}
catch ( IOException e )
{
throw new RuntimeException( e );
}
}

@Override
public N at( long index )
{
return (N) this;
}

@Override
public void acceptMemoryStatsVisitor( MemoryStatsVisitor visitor )
{
visitor.offHeapUsage( length() * entrySize );
}

protected void checkBounds( PageCursor cursor )
{
if ( cursor.checkAndClearBoundsFlag() )
{
throw new IllegalStateException();
}
}

protected void goTo( PageCursor cursor, long pageId ) throws IOException
{
if ( !cursor.next( pageId ) )
{
throw new IllegalStateException();
}
}
}

0 comments on commit 2cb67a6

Please sign in to comment.