Skip to content

Commit

Permalink
Reference decoding across two record units
Browse files Browse the repository at this point in the history
This commit makes Reference#decode able to decode references that are split
between two record units.
  • Loading branch information
lutovich committed Mar 24, 2016
1 parent 9805b31 commit f69d1c5
Show file tree
Hide file tree
Showing 4 changed files with 196 additions and 15 deletions.
Expand Up @@ -62,21 +62,29 @@ enum Reference

public interface DataAdapter<SOURCE>
{
byte get( SOURCE source );
byte getByte( SOURCE source );

void put( byte oneByte, SOURCE source ) throws IOException;
short getShort( SOURCE source );

void putByte( byte oneByte, SOURCE source ) throws IOException;
}

public static final DataAdapter<PageCursor> PAGE_CURSOR_ADAPTER = new DataAdapter<PageCursor>()
{
@Override
public byte get( PageCursor source )
public byte getByte( PageCursor source )
{
return source.getByte();
}

@Override
public void put( byte oneByte, PageCursor source )
public short getShort( PageCursor cursor )
{
return cursor.getShort();
}

@Override
public void putByte( byte oneByte, PageCursor source )
{
source.putByte( oneByte );
}
Expand Down Expand Up @@ -124,12 +132,12 @@ private <SOURCE> void encode( long absoluteReference, boolean positive, SOURCE s
byte signBit = (byte) ((positive ? 0 : 1) << (headerShift - 1));

// first (most significant) byte
adapter.put( (byte) (highHeader | signBit | (byte) (absoluteReference >>> shift)), source );
adapter.putByte( (byte) (highHeader | signBit | (byte) (absoluteReference >>> shift)), source );

do // rest of the bytes
{
shift -= 8;
adapter.put( (byte) (absoluteReference >>> shift), source );
adapter.putByte( (byte) (absoluteReference >>> shift), source );
}
while ( shift > 0 );
}
Expand Down Expand Up @@ -189,19 +197,17 @@ private static int maxBits()

public static <SOURCE> long decode( SOURCE source, DataAdapter<SOURCE> adapter )
{
PageCursor cursor = (PageCursor) source;

int header = cursor.getByte() & 0xFF;
int header = adapter.getByte( source ) & 0xFF;
int sizeMarks = Integer.numberOfLeadingZeros( (~(header & 0xF8)) & 0xFF ) - 24;
int signShift = 8 - sizeMarks - (sizeMarks == 5 ? 1 : 2);
long signBit = ~((header >>> signShift) & 1) + 1;
long register = (header & ((1 << signShift) - 1)) << 16;
register += cursor.getShort() & 0xFFFFL; // 3 bytes
register += adapter.getShort( source ) & 0xFFFFL; // 3 bytes

while ( sizeMarks > 0 )
{
register <<= 8;
register += cursor.getByte() & 0xFF;
register += adapter.getByte( source ) & 0xFF;
sizeMarks--;
}

Expand Down
Expand Up @@ -52,7 +52,7 @@ class SecondaryPageCursorReadDataAdapter implements DataAdapter<PageCursor>, Sec
}

@Override
public byte get( PageCursor primaryCursor /*same as the one we have*/ )
public byte getByte( PageCursor primaryCursor /*same as the one we have*/ )
{
if ( primaryCursor.getOffset() == primaryEndOffset )
{
Expand All @@ -72,7 +72,15 @@ public byte get( PageCursor primaryCursor /*same as the one we have*/ )
}

@Override
public void put( byte oneByte, PageCursor primaryCursor )
public short getShort( PageCursor cursor )
{
byte highByte = getByte( cursor );
byte lowByte = getByte( cursor );
return (short) (((highByte & 0xFF) << 8) | (lowByte & 0xFF));
}

@Override
public void putByte( byte oneByte, PageCursor primaryCursor )
{
throw new UnsupportedOperationException();
}
Expand Down
Expand Up @@ -48,13 +48,19 @@ class SecondaryPageCursorWriteDataAdapter implements DataAdapter<PageCursor>
}

@Override
public byte get( PageCursor source )
public byte getByte( PageCursor source )
{
throw new UnsupportedOperationException();
}

@Override
public void put( byte oneByte, PageCursor cursor ) throws IOException
public short getShort( PageCursor cursor )
{
throw new UnsupportedOperationException();
}

@Override
public void putByte( byte oneByte, PageCursor cursor ) throws IOException
{
if ( !switched && cursor.getOffset() == primaryEndOffset )
{
Expand Down
@@ -0,0 +1,161 @@
/*
* Copyright (c) 2002-2016 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.kernel.impl.store.format.highlimit;

import org.junit.Rule;
import org.junit.Test;

import java.io.IOException;
import java.nio.ByteBuffer;

import org.neo4j.io.pagecache.PageCursor;
import org.neo4j.io.pagecache.PagedFile;
import org.neo4j.io.pagecache.StubPageCursor;
import org.neo4j.test.RandomRule;

import static org.junit.Assert.assertEquals;
import static org.mockito.Matchers.anyInt;
import static org.mockito.Matchers.anyLong;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

public class SecondaryPageCursorReadDataAdapterTest
{
@Rule
public final RandomRule random = new RandomRule();

@Test
public void getShortEntirelyFromPrimaryCursor() throws Exception
{
int secondaryPageId = 42;
int secondaryCursorOffset = 0;
int primaryCursorEndOffset = Short.BYTES + 1; // full short can fit into the primary cursor
short value = (short) random.nextInt();

PageCursor primaryCursor = newPageCursor();
PageCursor secondaryCursor = newPageCursor();

writeShortToPrimary( primaryCursor, value );

SecondaryPageCursorReadDataAdapter adapter = new SecondaryPageCursorReadDataAdapter( primaryCursor,
newPagedFile( secondaryCursor ), secondaryPageId, secondaryCursorOffset, primaryCursorEndOffset,
PagedFile.PF_SHARED_READ_LOCK );

short read = adapter.getShort( primaryCursor );

assertEquals( value, read );
}

@Test
public void getShortEntirelyFromSecondaryCursor() throws Exception
{
int secondaryPageId = 42;
int secondaryCursorOffset = 0;
int primaryCursorEndOffset = 0; // can't read anything else from the primary cursor
short value = (short) random.nextInt();

PageCursor primaryCursor = newPageCursor();
PageCursor secondaryCursor = newPageCursor();

writeShortToSecondary( secondaryCursor, value );

SecondaryPageCursorReadDataAdapter adapter = new SecondaryPageCursorReadDataAdapter( primaryCursor,
newPagedFile( secondaryCursor ), secondaryPageId, secondaryCursorOffset, primaryCursorEndOffset,
PagedFile.PF_SHARED_READ_LOCK );

short read = adapter.getShort( primaryCursor );

assertEquals( value, read );
}

@Test
public void getShortFromPrimaryAndSecondaryCursor() throws Exception
{
int secondaryPageId = 42;
int secondaryCursorOffset = 0;
int primaryCursorEndOffset = Short.BYTES / 2; // only one byte can be read from the primary cursor

short value = (short) random.nextInt();
ByteBuffer buffer = newByteBuffer( Short.BYTES );
buffer.putShort( value );
buffer.flip();

PageCursor primaryCursor = newPageCursor();
PageCursor secondaryCursor = newPageCursor();

writeByteToPrimary( primaryCursor, buffer.get() );
writeByteToSecondary( secondaryCursor, buffer.get() );

SecondaryPageCursorReadDataAdapter adapter = new SecondaryPageCursorReadDataAdapter( primaryCursor,
newPagedFile( secondaryCursor ), secondaryPageId, secondaryCursorOffset, primaryCursorEndOffset,
PagedFile.PF_SHARED_READ_LOCK );

short read = adapter.getShort( primaryCursor );

assertEquals( value, read );
}

private static void writeShortToPrimary( PageCursor cursor, short value )
{
int offset = cursor.getOffset();
cursor.putShort( value );
cursor.setOffset( offset );
}

private static void writeByteToPrimary( PageCursor cursor, byte value )
{
int offset = cursor.getOffset();
cursor.putByte( value );
cursor.setOffset( offset );
}

private static void writeShortToSecondary( PageCursor cursor, short value )
{
int offset = cursor.getOffset();
cursor.putByte( (byte) 0 ); // put dummy header byte for the secondary record unit
cursor.putShort( value );
cursor.setOffset( offset );
}

private static void writeByteToSecondary( PageCursor cursor, byte value )
{
int offset = cursor.getOffset();
cursor.putByte( (byte) 0 ); // put dummy header byte for the secondary record unit
cursor.putByte( value );
cursor.setOffset( offset );
}

private static PagedFile newPagedFile( PageCursor cursor ) throws IOException
{
PagedFile pagedFile = mock( PagedFile.class );
when( pagedFile.io( anyLong(), anyInt() ) ).thenReturn( cursor );
return pagedFile;
}

private static PageCursor newPageCursor()
{
return new StubPageCursor( 0, newByteBuffer( 100 ) );
}

private static ByteBuffer newByteBuffer( int capacity )
{
return ByteBuffer.allocateDirect( capacity );
}
}

0 comments on commit f69d1c5

Please sign in to comment.