Skip to content

Commit

Permalink
Proposal for enterprise separation
Browse files Browse the repository at this point in the history
  • Loading branch information
spacecowboy committed Feb 15, 2016
1 parent 26cd35d commit 7e165d1
Show file tree
Hide file tree
Showing 6 changed files with 179 additions and 48 deletions.
Expand Up @@ -31,8 +31,6 @@
import org.neo4j.kernel.impl.store.record.AbstractBaseRecord;
import org.neo4j.kernel.impl.store.record.Record;

import static org.neo4j.kernel.impl.store.RecordPageLocationCalculator.offsetForId;
import static org.neo4j.kernel.impl.store.RecordPageLocationCalculator.pageIdForRecord;
import static org.neo4j.kernel.impl.store.format.highlimit.Reference.PAGE_CURSOR_ADAPTER;

/**
Expand Down Expand Up @@ -81,59 +79,37 @@ abstract class BaseHighLimitRecordFormat<RECORD extends AbstractBaseRecord>
static final long NULL = Record.NULL_REFERENCE.intValue();
static final int HEADER_BIT_RECORD_UNIT = 0b0000_0010;
static final int HEADER_BIT_FIRST_RECORD_UNIT = 0b0000_0100;
// Default to community record format
private final RecordIO<RECORD> recordIO; // = new RecordIO.CommunityRecordIO<>();

protected BaseHighLimitRecordFormat( Function<StoreHeader,Integer> recordSize, int recordHeaderSize )
protected BaseHighLimitRecordFormat( Function<StoreHeader,Integer> recordSize, int recordHeaderSize,
RecordIO<RECORD> recordIO )
{
super( recordSize, recordHeaderSize, IN_USE_BIT );
this.recordIO = recordIO;
}

@Override
protected final void doRead( RECORD record, PageCursor primaryCursor, int recordSize, PagedFile storeFile,
long headerByte, boolean inUse ) throws IOException
{
boolean recordUnit = has( headerByte, HEADER_BIT_RECORD_UNIT );
if ( recordUnit )
boolean doubleRecordUnit = has( headerByte, HEADER_BIT_RECORD_UNIT );
if ( doubleRecordUnit )
{
boolean firstRecordUnit = has( headerByte, HEADER_BIT_FIRST_RECORD_UNIT );
if ( !firstRecordUnit )
{
// This is a record unit and not even the first one, so you cannot go here directly and read it,
// it may only be read as part of reading the primary unit.
record.clear();
// Return and try again
return;
} else {
recordIO.read( record, primaryCursor, recordSize, storeFile,
( readAdapter ) -> doReadInternal( record, primaryCursor, recordSize, headerByte, inUse,
readAdapter ) );
}
}

if ( recordUnit )
{
int primaryEndOffset = primaryCursor.getOffset() + recordSize - 1 /*we've already read the header byte*/;

// This is a record that is split into multiple record units. We need a bit more clever
// data structures here. For the time being this means instantiating one object,
// but the trade-off is a great reduction in complexity.
long secondaryId = Reference.decode( primaryCursor, PAGE_CURSOR_ADAPTER );
@SuppressWarnings( "resource" )
SecondaryPageCursorReadDataAdapter readAdapter = new SecondaryPageCursorReadDataAdapter(
primaryCursor, storeFile,
pageIdForRecord( secondaryId, storeFile.pageSize(), recordSize ),
offsetForId( secondaryId, storeFile.pageSize(), recordSize ),
primaryEndOffset, PagedFile.PF_SHARED_READ_LOCK );

try ( SecondaryPageCursorControl secondaryPageCursorControl = readAdapter )
{
do
{
// (re)sets offsets for both cursors
secondaryPageCursorControl.reposition();
doReadInternal( record, primaryCursor, recordSize, headerByte, inUse, readAdapter );
}
while ( secondaryPageCursorControl.shouldRetry() );

record.setSecondaryId( secondaryId );
}
}
else
{
} else {
doReadInternal( record, primaryCursor, recordSize, headerByte, inUse, PAGE_CURSOR_ADAPTER );
}
}
Expand All @@ -154,20 +130,15 @@ protected final void doWrite( RECORD record, PageCursor primaryCursor, int recor
headerByte = set( headerByte, HEADER_BIT_FIRST_RECORD_UNIT, true );
primaryCursor.putByte( headerByte );

DataAdapter<PageCursor> dataAdapter = PAGE_CURSOR_ADAPTER;
if ( record.requiresTwoUnits() )
{
int primaryEndOffset = primaryCursor.getOffset() + recordSize - 1 /*we've already written the header byte*/;

// Write using the normal adapter since the first reference we write cannot really overflow
// into the secondary record
Reference.encode( record.getSecondaryId(), primaryCursor, PAGE_CURSOR_ADAPTER );
dataAdapter = new SecondaryPageCursorWriteDataAdapter(
pageIdForRecord( record.getSecondaryId(), storeFile.pageSize(), recordSize ),
offsetForId( record.getSecondaryId(), storeFile.pageSize(), recordSize ), primaryEndOffset );
recordIO.write( record, primaryCursor, recordSize, storeFile,
( dataAdapter ) -> doWriteInternal( record, primaryCursor, dataAdapter ) );
}
else
{
doWriteInternal( record, primaryCursor, PAGE_CURSOR_ADAPTER );
}

doWriteInternal( record, primaryCursor, dataAdapter );
}

protected abstract void doWriteInternal( RECORD record, PageCursor cursor, DataAdapter<PageCursor> adapter )
Expand Down
Expand Up @@ -35,6 +35,10 @@ class NodeRecordFormat extends BaseHighLimitRecordFormat<NodeRecord>
private static final int HAS_PROPERTY_BIT = 0b0010_0000;
private static final int HAS_LABELS_BIT = 0b0100_0000;

public NodeRecordFormat( RecordIO<NodeRecord> recordIO )
{
super( fixedRecordSize( RECORD_SIZE ), 0, recordIO );
}
public NodeRecordFormat()
{
super( fixedRecordSize( RECORD_SIZE ), 0 );
Expand Down
Expand Up @@ -37,6 +37,11 @@ public RelationshipGroupRecordFormat()
super( fixedRecordSize( RECORD_SIZE ), 0 );
}

public RelationshipGroupRecordFormat( RecordIO<RelationshipGroupRecord> recordIO )
{
super( fixedRecordSize( RECORD_SIZE ), 0, recordIO );
}

@Override
public RelationshipGroupRecord newRecord()
{
Expand Down
Expand Up @@ -37,6 +37,10 @@ public RelationshipRecordFormat()
{
super( fixedRecordSize( RECORD_SIZE ), 0 );
}
public RelationshipRecordFormat( RecordIO<RelationshipRecord> recordIO )
{
super( fixedRecordSize( RECORD_SIZE ), 0, recordIO );
}

@Override
public RelationshipRecord newRecord()
Expand Down
@@ -0,0 +1,60 @@
/*
* Copyright (c) 2002-2016 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.kernel.impl.store.format.highlimit;


import org.neo4j.kernel.impl.store.format.RecordFormat;
import org.neo4j.kernel.impl.store.record.NodeRecord;
import org.neo4j.kernel.impl.store.record.RelationshipGroupRecord;
import org.neo4j.kernel.impl.store.record.RelationshipRecord;

/**
* Record format with very high limits, 50-bit per ID, as well the ability to use two record units per record, while at
* the same time keeping store size small.
*
* @see HighLimit
*/
public class EnterpriseHighLimit extends HighLimit
{
@Override
public String storeVersion()
{
// Enterprise.HighLimit.Zero
return "vE.H.0";
}

@Override
public RecordFormat<NodeRecord> node()
{
return new NodeRecordFormat( new EnterpriseRecordIO<>() );
}

@Override
public RecordFormat<RelationshipRecord> relationship()
{
return new RelationshipRecordFormat( new EnterpriseRecordIO<>() );
}

@Override
public RecordFormat<RelationshipGroupRecord> relationshipGroup()
{
return new RelationshipGroupRecordFormat( new EnterpriseRecordIO<>() );
}
}
@@ -0,0 +1,87 @@
/*
* Copyright (c) 2002-2016 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.kernel.impl.store.format.highlimit;

import java.io.IOException;
import java.util.function.Consumer;

import org.neo4j.function.ThrowingConsumer;
import org.neo4j.io.pagecache.PageCursor;
import org.neo4j.io.pagecache.PagedFile;
import org.neo4j.kernel.impl.store.format.highlimit.Reference.DataAdapter;
import org.neo4j.kernel.impl.store.record.AbstractBaseRecord;

import static org.neo4j.kernel.impl.store.RecordPageLocationCalculator.offsetForId;
import static org.neo4j.kernel.impl.store.RecordPageLocationCalculator.pageIdForRecord;
import static org.neo4j.kernel.impl.store.format.highlimit.Reference.PAGE_CURSOR_ADAPTER;

/**
* Enterprise supports double record units for records.
*/
public class EnterpriseRecordIO<RECORD extends AbstractBaseRecord> implements RecordIO<RECORD>
{

@Override
public void read( RECORD record, PageCursor primaryCursor, int recordSize, PagedFile storeFile,
Consumer<DataAdapter<PageCursor>> reader ) throws IOException
{
int primaryEndOffset = primaryCursor.getOffset() + recordSize - 1 /*we've already read the header byte*/;

// This is a record that is split into multiple record units. We need a bit more clever
// data structures here. For the time being this means instantiating one object,
// but the trade-off is a great reduction in complexity.
long secondaryId = Reference.decode( primaryCursor, PAGE_CURSOR_ADAPTER );
@SuppressWarnings( "resource" ) SecondaryPageCursorReadDataAdapter readAdapter =
new SecondaryPageCursorReadDataAdapter( primaryCursor, storeFile,
pageIdForRecord( secondaryId, storeFile.pageSize(), recordSize ),
offsetForId( secondaryId, storeFile.pageSize(), recordSize ), primaryEndOffset,
PagedFile.PF_SHARED_READ_LOCK );

try ( SecondaryPageCursorControl secondaryPageCursorControl = readAdapter )
{
do
{
// (re)sets offsets for both cursors
secondaryPageCursorControl.reposition();
//doReadInternal( record, primaryCursor, recordSize, headerByte, inUse, readAdapter );
reader.accept( readAdapter );
}
while ( secondaryPageCursorControl.shouldRetry() );

record.setSecondaryId( secondaryId );
}
}

@Override
public void write( RECORD record, PageCursor primaryCursor, int recordSize, PagedFile storeFile,
ThrowingConsumer<DataAdapter<PageCursor>, IOException> writer ) throws IOException
{
int primaryEndOffset = primaryCursor.getOffset() + recordSize - 1 /*we've already written the header byte*/;

// Write using the normal adapter since the first reference we write cannot really overflow
// into the secondary record
Reference.encode( record.getSecondaryId(), primaryCursor, PAGE_CURSOR_ADAPTER );
DataAdapter<PageCursor> dataAdapter = new SecondaryPageCursorWriteDataAdapter(
pageIdForRecord( record.getSecondaryId(), storeFile.pageSize(), recordSize ),
offsetForId( record.getSecondaryId(), storeFile.pageSize(), recordSize ), primaryEndOffset );

writer.accept( dataAdapter );
}
}

0 comments on commit 7e165d1

Please sign in to comment.