Skip to content

Commit

Permalink
Implements a record format with very high id limits
Browse files Browse the repository at this point in the history
although iat the same time keeping store size increase to a minimum.
Most record sizes are aligned to powers of two to be more page cache and
cache line friendly. To support 50-bit IDs these pointers are compressed so
that small IDs take less space than big ones in the records, whereas
"null" references take only one bit. One "logical" record may span two
physical records for records that have most of its IDs very big,
such that one record isn't enough to hold all data.

The new format is implemented as a separate format, not plugged in by default,
but at least tested by RecordFormatTest. This commit also introduces
InternalRecordFormatSelector which has a default (the current one) and ability
to be configured to use any format by specifying RecordFormats (fully qualified)
name pointing to a class implementing RecordFormats. With this a build can be
set up to run with a custom format.
  • Loading branch information
tinwelint committed Feb 4, 2016
1 parent 941eb1d commit 4de5f1c
Show file tree
Hide file tree
Showing 51 changed files with 2,830 additions and 494 deletions.
Expand Up @@ -256,19 +256,24 @@ public static <E extends Throwable> E combine( E first, E second )
}
}

public static <T extends Throwable> T withMessage( T cause, String message )
public static void setMessage( Throwable cause, String message )
{
try
{
THROWABLE_MESSAGE_FIELD.set( cause, message );
return cause;
}
catch ( IllegalArgumentException | IllegalAccessException e )
{
throw new RuntimeException( e );
}
}

public static <T extends Throwable> T withMessage( T cause, String message )
{
setMessage( cause, message );
return cause;
}

@Deprecated
public static boolean containsStackTraceElement( Throwable cause,
final Predicate<StackTraceElement> predicate )
Expand Down
Expand Up @@ -84,7 +84,7 @@
import org.neo4j.kernel.impl.store.MetaDataStore;
import org.neo4j.kernel.impl.store.StoreId;
import org.neo4j.kernel.impl.store.UnderlyingStorageException;
import org.neo4j.kernel.impl.store.format.lowlimit.LowLimit;
import org.neo4j.kernel.impl.store.format.InternalRecordFormatSelector;
import org.neo4j.kernel.impl.store.id.IdGeneratorFactory;
import org.neo4j.kernel.impl.storemigration.DatabaseMigrator;
import org.neo4j.kernel.impl.storemigration.monitoring.VisibleMigrationProgressMonitor;
Expand Down Expand Up @@ -558,7 +558,7 @@ private StorageEngine buildStorageEngine(
labelTokens, relationshipTypeTokens, schemaStateChangeCallback, constraintSemantics, scheduler,
tokenNameLookup, lockService, schemaIndexProvider, indexingServiceMonitor, databaseHealth,
labelScanStore, legacyIndexProviderLookup, indexConfigStore, legacyIndexTransactionOrdering,
LowLimit.RECORD_FORMATS ) );
InternalRecordFormatSelector.select() ) );
}

private TransactionLogModule buildTransactionLogs(
Expand Down
Expand Up @@ -53,7 +53,7 @@
/**
* Cursor that provides a view on property blocks of a particular property record.
* This cursor is reusable and can be re-initialized with
* {@link #init(PageCursor)} method and cleaned up using {@link #clear()} method.
* {@link #init(long[], int)} method and cleaned up using {@link #clear()} method.
* <p/>
* During initialization {@link #MAX_NUMBER_OF_PAYLOAD_LONG_ARRAY} number of longs is read from
* the given {@linkplain PageCursor}. This is done eagerly to avoid reading property blocks from different versions
Expand All @@ -66,7 +66,6 @@ class StorePropertyPayloadCursor
{
static final int MAX_NUMBER_OF_PAYLOAD_LONG_ARRAY = PropertyRecordFormat.DEFAULT_PAYLOAD_SIZE / 8;

private static final long PROPERTY_KEY_ID_BITMASK = 0xFFFFFFL;
private static final int MAX_BYTES_IN_SHORT_STRING_OR_SHORT_ARRAY = 32;
private static final int INTERNAL_BYTE_ARRAY_SIZE = 4096;
private static final int INITIAL_POSITION = -1;
Expand Down Expand Up @@ -130,7 +129,7 @@ PropertyType type()

int propertyKeyId()
{
return (int) (currentHeader() & PROPERTY_KEY_ID_BITMASK);
return PropertyBlock.keyIndexId( currentHeader() );
}

boolean booleanValue()
Expand Down
Expand Up @@ -261,12 +261,12 @@ private void extractHeaderRecord() throws IOException

protected long pageIdForRecord( long id )
{
return id * getRecordSize() / storeFile.pageSize();
return RecordPageLocationCalculator.pageIdForRecord( id, storeFile.pageSize(), getRecordSize() );
}

protected int offsetForId( long id )
{
return (int) (id * getRecordSize() % storeFile.pageSize());
return RecordPageLocationCalculator.offsetForId( id, storeFile.pageSize(), getRecordSize() );
}

@Override
Expand Down Expand Up @@ -940,11 +940,13 @@ protected RECORD getRecord( long id, RECORD record, RecordLoad mode, PageCursor
protected void readRecordWithRetry( PageCursor cursor, long id, RECORD record, RecordLoad mode, int offset )
throws IOException
{
// Mark the record with this id regardless of whether or not we load the contents of it.
// This is done in this method since there are multiple call sites and they all want the id
// on that record, so it's to ensure it isn't forgotten.
record.setId( id );

do
{
// Mark the record with this id regardless of whether or not we load the contents of it.
record.setId( id );

// Mark this record as unused. This to simplify implementations of readRecord.
// readRecord can behave differently depending on RecordLoad argument and so it may be that
// contents of a record may be loaded even if that record is unused, where the contents
Expand All @@ -963,8 +965,9 @@ protected void readRecordWithRetry( PageCursor cursor, long id, RECORD record, R

/**
* Reads data from {@link PageCursor} into the record.
* @throws IOException on error reading.
*/
protected abstract void readRecord( PageCursor cursor, RECORD record, RecordLoad mode );
protected abstract void readRecord( PageCursor cursor, RECORD record, RecordLoad mode ) throws IOException;

@Override
public void updateRecord( RECORD record )
Expand Down Expand Up @@ -994,13 +997,15 @@ public void updateRecord( RECORD record )
}
}

protected abstract void writeRecord( PageCursor cursor, RECORD record );
protected abstract void writeRecord( PageCursor cursor, RECORD record ) throws IOException;

/**
* Scan the given range of records both inclusive, and pass all the in-use ones to the given processor, one by one.
*
* The record passed to the NodeRecordScanner is reused instead of reallocated for every record, so it must be
* cloned if you want to save it for later.
* @param visitor {@link Visitor} notified about all records.
* @throws IOException on error reading from store.
*/
public void scanAllRecords( Visitor<RECORD,IOException> visitor ) throws IOException
{
Expand Down
Expand Up @@ -70,15 +70,15 @@ public int getRecordDataSize()
}

@Override
protected void readRecord( PageCursor cursor, RECORD record, RecordLoad mode )
protected void readRecord( PageCursor cursor, RECORD record, RecordLoad mode ) throws IOException
{
recordFormat.read( record, cursor, mode, recordSize );
recordFormat.read( record, cursor, mode, recordSize, storeFile );
}

@Override
protected void writeRecord( PageCursor cursor, RECORD record )
protected void writeRecord( PageCursor cursor, RECORD record ) throws IOException
{
recordFormat.write( record, cursor );
recordFormat.write( record, cursor, recordSize, storeFile );
}

@Override
Expand Down
@@ -0,0 +1,55 @@
/*
* Copyright (c) 2002-2016 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.kernel.impl.store;

/**
* Calculates page ids and offset based on record ids.
*/
public class RecordPageLocationCalculator
{
/**
* Calculates which page a record with the given {@code id} should go into.
*
* @param id record id
* @param pageSize size of each page
* @param recordSize size of each record
* @return which page the record with the given {@code id} should go into, given the
* {@code pageSize} and {@code recordSize}.
*/
public static long pageIdForRecord( long id, int pageSize, int recordSize )
{
return id * recordSize / pageSize;
}

/**
* Calculates which offset into the right page (had by {@link #pageIdForRecord(long, int, int)})
* the given {@code id} lives at.
*
* @param id record id
* @param pageSize size of each page
* @param recordSize size of each record
* @return which offset into the right page the given {@code id} lives at, given the
* {@code pageSize} and {@code recordSize}.
*/
public static int offsetForId( long id, int pageSize, int recordSize )
{
return (int) (id * recordSize % pageSize);
}
}
Expand Up @@ -29,8 +29,8 @@
import org.neo4j.kernel.impl.store.id.DefaultIdGeneratorFactory;
import org.neo4j.kernel.impl.store.id.IdGeneratorFactory;
import org.neo4j.kernel.configuration.Config;
import org.neo4j.kernel.impl.store.format.InternalRecordFormatSelector;
import org.neo4j.kernel.impl.store.format.RecordFormats;
import org.neo4j.kernel.impl.store.format.lowlimit.LowLimit;
import org.neo4j.logging.LogProvider;

/**
Expand Down Expand Up @@ -84,7 +84,7 @@ public StoreFactory( File storeDir, Config config,
FileSystemAbstraction fileSystemAbstraction, LogProvider logProvider )
{
this( storeDir, config, idGeneratorFactory, pageCache, fileSystemAbstraction, logProvider,
LowLimit.RECORD_FORMATS );
InternalRecordFormatSelector.select() );
}

public StoreFactory( File storeDir, Config config,
Expand Down
Expand Up @@ -19,9 +19,11 @@
*/
package org.neo4j.kernel.impl.store.format;

import java.io.IOException;
import java.util.function.Function;

import org.neo4j.io.pagecache.PageCursor;
import org.neo4j.io.pagecache.PagedFile;
import org.neo4j.kernel.impl.store.RecordStore;
import org.neo4j.kernel.impl.store.StoreHeader;
import org.neo4j.kernel.impl.store.record.AbstractBaseRecord;
Expand All @@ -30,7 +32,7 @@
/**
* Implementation of a very common type of format where the first byte, at least one bit in it,
* say whether or not the record is in use. That can be used to let sub classes have simpler
* read/write implementations.
* read/write implementations. The rest of the 7 bits in that header byte are free to use by subclasses.
*
* @param <RECORD> type of record.
*/
Expand All @@ -43,38 +45,43 @@ protected BaseOneByteHeaderRecordFormat( Function<StoreHeader,Integer> recordSiz
}

@Override
public final void read( RECORD record, PageCursor cursor, RecordLoad mode, int recordSize )
public final void read( RECORD record, PageCursor cursor, RecordLoad mode, int recordSize, PagedFile storeFile )
throws IOException
{
byte inUseByte = cursor.getByte();
boolean inUse = isInUse( inUseByte );
byte headerByte = cursor.getByte();
boolean inUse = isInUse( headerByte );
if ( mode.shouldLoad( inUse ) )
{
doRead( record, cursor, recordSize, inUseByte, inUse );
doRead( record, cursor, recordSize, storeFile, headerByte, inUse );
}
}

/**
* Reads contents at {@code cursor} into the given record. This method is only called if the {@link RecordLoad}
* mode in {@link #read(AbstractBaseRecord, PageCursor, RecordLoad, int)} thinks it's OK to load the record,
* mode in {@link #read(AbstractBaseRecord, PageCursor, RecordLoad, int, PagedFile)} thinks it's OK to load the record,
* given its inUse status.
*
* @param record to put read data into, replacing any existing data in that record object.
* @param cursor {@link PageCursor} to read data from.
* See {@link RecordStore#getRecord(long, AbstractBaseRecord, RecordLoad)} for more information.
* @param recordSize size of records of this format. This is passed in like this since not all formats
* know the record size in advance, but may be read from store header when opening the store.
* @param inUseByte the first byte read, in order to determine inUse status.
* @param storeFile {@link PagedFile} to get additional {@link PageCursor} from, if need be.
* @param headerByte the first byte read, in order to determine inUse status.
* @param inUse whether or not the record is in use. Keep in mind that this method may be called
* even on an unused record, depending on {@link RecordLoad} mode.
* @throws IOException on error reading.
*/
protected abstract void doRead( RECORD record, PageCursor cursor, int recordSize, long inUseByte, boolean inUse );
protected abstract void doRead( RECORD record, PageCursor cursor, int recordSize, PagedFile storeFile,
long headerByte, boolean inUse ) throws IOException;

@Override
public final void write( RECORD record, PageCursor cursor )
public final void write( RECORD record, PageCursor cursor, int recordSize, PagedFile storeFile )
throws IOException
{
if ( record.inUse() )
{
doWrite( record, cursor );
doWrite( record, cursor, recordSize, storeFile );
}
else
{
Expand All @@ -91,6 +98,20 @@ public final void write( RECORD record, PageCursor cursor )
*
* @param record containing data to write.
* @param cursor {@link PageCursor} to write the record data into.
* @param recordSize size of records of this format. This is passed in like this since not all formats
* know the record size in advance, but may be read from store header when opening the store.
* @throws IOException on error writing.
*/
protected abstract void doWrite( RECORD record, PageCursor cursor );
protected abstract void doWrite( RECORD record, PageCursor cursor, int recordSize, PagedFile storeFile )
throws IOException;

protected static boolean has( long headerByte, int bitMask )
{
return (headerByte & bitMask) != 0;
}

protected static byte set( byte header, int bitMask, boolean value )
{
return (byte) (value ? header | bitMask : header);
}
}
Expand Up @@ -22,22 +22,24 @@
import java.util.function.Function;

import org.neo4j.io.pagecache.PageCursor;
import org.neo4j.io.pagecache.PagedFile;
import org.neo4j.kernel.impl.store.IntStoreHeader;
import org.neo4j.kernel.impl.store.StoreHeader;
import org.neo4j.kernel.impl.store.id.IdGeneratorImpl;
import org.neo4j.kernel.impl.store.id.IdSequence;
import org.neo4j.kernel.impl.store.record.AbstractBaseRecord;
import org.neo4j.kernel.impl.store.record.Record;

/**
* Basic abstract implementation of a {@link RecordFormat} implementing most functionality except
* {@link #read(AbstractBaseRecord, PageCursor, org.neo4j.kernel.impl.store.record.RecordLoad, int)} and
* {@link #write(AbstractBaseRecord, PageCursor)}.
* {@link #read(AbstractBaseRecord, PageCursor, org.neo4j.kernel.impl.store.record.RecordLoad, int, PagedFile)} and
* {@link #write(AbstractBaseRecord, PageCursor, int, PagedFile)}.
*
* @param <RECORD> type of record.
*/
public abstract class BaseRecordFormat<RECORD extends AbstractBaseRecord> implements RecordFormat<RECORD>
{
public static final int IN_USE_BIT = 0x1;
public static final int IN_USE_BIT = 0b0000_0001;
public static final Function<StoreHeader,Integer> INT_STORE_HEADER_READER =
(header) -> ((IntStoreHeader)header).value();

Expand Down Expand Up @@ -91,4 +93,9 @@ public static long longFromIntAndMod( long base, long modifier )
{
return modifier == 0 && base == IdGeneratorImpl.INTEGER_MINUS_ONE ? -1 : base | modifier;
}

@Override
public void prepare( RECORD record, int recordSize, IdSequence idSequence )
{ // Do nothing by default
}
}

0 comments on commit 4de5f1c

Please sign in to comment.