Skip to content

Commit

Permalink
Merge pull request #9682 from MishaDemianenko/3.1-large-tx-logs-files…
Browse files Browse the repository at this point in the history
…-checkpoint-finder

Allow checkpoint finder to find transaction after checkpoint in large files
  • Loading branch information
MishaDemianenko committed Jul 24, 2017
2 parents 9afb987 + 30f83c8 commit e34463f
Show file tree
Hide file tree
Showing 4 changed files with 120 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -116,9 +116,8 @@ public int compareTo( LogPosition o )
{
if ( logVersion != o.logVersion )
{
return (int) (logVersion - o.logVersion);
return Long.compare( logVersion, o.logVersion );
}

return (int) (byteOffset - o.byteOffset);
return Long.compare( byteOffset, o.byteOffset );
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,8 @@ public LatestCheckPoint find( long fromVersionBackwards ) throws IOException
return new LatestCheckPoint( null, commitsAfterCheckPoint, firstTxAfterPosition, oldestVersionFound );
}

private LatestCheckPoint latestCheckPoint( long fromVersionBackwards, long version, LogEntryStart latestStartEntry,
protected LatestCheckPoint latestCheckPoint( long fromVersionBackwards, long version, LogEntryStart
latestStartEntry,
long oldestVersionFound, CheckPoint latestCheckPoint ) throws IOException
{
// Is the latest start entry in this log file version later than what the latest check point targets?
Expand Down Expand Up @@ -166,7 +167,7 @@ private LatestCheckPoint latestCheckPoint( long fromVersionBackwards, long versi
* if not found.
* @throws IOException on I/O error.
*/
private long extractFirstTxIdAfterPosition( LogPosition initialPosition, long maxLogVersion ) throws IOException
protected long extractFirstTxIdAfterPosition( LogPosition initialPosition, long maxLogVersion ) throws IOException
{
LogPosition currentPosition = initialPosition;
while ( currentPosition.getLogVersion() <= maxLogVersion )
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
* Copyright (c) 2002-2017 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.kernel.impl.transaction.log;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

import java.util.Arrays;
import java.util.Collection;

import static org.junit.Assert.assertEquals;

@RunWith( Parameterized.class )
public class LogPositionTest
{
@Parameterized.Parameter()
public LogPosition logPositionA;

@Parameterized.Parameter( 1 )
public LogPosition logPositionB;

@Parameterized.Parameters
public static Collection<LogPosition[]> logPositions()
{
return Arrays.asList( new LogPosition[]{new LogPosition( 0, 1 ), new LogPosition( 0, 0 )},
new LogPosition[]{new LogPosition( 0, 11 ), new LogPosition( 0, 7 )},
new LogPosition[]{new LogPosition( 2, 1 ), new LogPosition( 2, 0 )},
new LogPosition[]{new LogPosition( 2, 17 ), new LogPosition( 2, 15 )},
new LogPosition[]{new LogPosition( 1, 1 ), new LogPosition( 0, 1 )},
new LogPosition[]{new LogPosition( 5, 1 ), new LogPosition( 3, 10 )},
new LogPosition[]{new LogPosition( Integer.MAX_VALUE, Integer.MAX_VALUE + 1L ),
new LogPosition( Integer.MAX_VALUE, Integer.MAX_VALUE )},
new LogPosition[]{new LogPosition( Long.MAX_VALUE, Long.MAX_VALUE ),
new LogPosition( Integer.MAX_VALUE + 1L, Long.MAX_VALUE )},
new LogPosition[]{new LogPosition( Long.MAX_VALUE, Long.MAX_VALUE ),
new LogPosition( Long.MAX_VALUE, Long.MAX_VALUE - 1 )} );
}

@SuppressWarnings( "EqualsWithItself" )
@Test
public void logPositionComparison() throws Exception
{
assertEquals( 1, logPositionA.compareTo( logPositionB ) );
assertEquals( -1, logPositionB.compareTo( logPositionA ) );
assertEquals( 0, logPositionA.compareTo( logPositionA ) );
assertEquals( 0, logPositionB.compareTo( logPositionB ) );
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

import java.io.File;
import java.io.IOException;
import java.io.UncheckedIOException;
Expand All @@ -34,6 +35,7 @@
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Supplier;

import org.neo4j.io.fs.FileSystemAbstraction;
import org.neo4j.kernel.impl.transaction.DeadSimpleLogVersionRepository;
import org.neo4j.kernel.impl.transaction.log.FlushablePositionAwareChannel;
import org.neo4j.kernel.impl.transaction.log.LogHeaderCache;
Expand All @@ -43,7 +45,9 @@
import org.neo4j.kernel.impl.transaction.log.PhysicalLogFile;
import org.neo4j.kernel.impl.transaction.log.PhysicalLogFiles;
import org.neo4j.kernel.impl.transaction.log.ReadableClosablePositionAwareChannel;
import org.neo4j.kernel.impl.transaction.log.entry.CheckPoint;
import org.neo4j.kernel.impl.transaction.log.entry.LogEntryReader;
import org.neo4j.kernel.impl.transaction.log.entry.LogEntryStart;
import org.neo4j.kernel.impl.transaction.log.entry.LogEntryWriter;
import org.neo4j.kernel.impl.transaction.log.entry.VersionAwareLogEntryReader;
import org.neo4j.kernel.lifecycle.LifeSupport;
Expand Down Expand Up @@ -229,6 +233,22 @@ public void latestLogFileContainingACheckPointAndAStartBefore() throws Throwable
assertLatestCheckPoint( true, false, NO_TRANSACTION_ID, endLogVersion, latestCheckPoint );
}

@Test
public void bigFileLatestCheckpointFindsStartAfter() throws Throwable
{
long firstTxAfterCheckpoint = Integer.MAX_VALUE + 4L;

LatestCheckPointFinder checkPointFinder = new FirstTxIdConfigurableCheckpointFinder( firstTxAfterCheckpoint, logFiles, fsRule.get(), reader);
LogEntryStart startEntry = new LogEntryStart( 1, 2, 3L, 4L, new byte[]{5, 6},
new LogPosition( endLogVersion, Integer.MAX_VALUE + 17L ) );
CheckPoint checkPoint = new CheckPoint( new LogPosition( endLogVersion, 16L ) );
LatestCheckPoint latestCheckPoint = checkPointFinder.latestCheckPoint( endLogVersion, endLogVersion, startEntry,
endLogVersion, checkPoint );

assertLatestCheckPoint( true, true, firstTxAfterCheckpoint, endLogVersion,
latestCheckPoint );
}

@Test
public void latestLogFileContainingACheckPointAndAStartAfter() throws Throwable
{
Expand Down Expand Up @@ -522,4 +542,33 @@ private void assertLatestCheckPoint( boolean hasCheckPointEntry, boolean commits
}
assertEquals( logVersion, latestCheckPoint.oldestLogVersionFound );
}

private static class FirstTxIdConfigurableCheckpointFinder extends LatestCheckPointFinder
{

private final long txId;

FirstTxIdConfigurableCheckpointFinder( long txId, PhysicalLogFiles logFiles, FileSystemAbstraction fileSystem,
LogEntryReader<ReadableClosablePositionAwareChannel> logEntryReader )
{
super( logFiles, fileSystem, logEntryReader );
this.txId = txId;
}

@Override
public LatestCheckPoint latestCheckPoint( long fromVersionBackwards, long version,
LogEntryStart latestStartEntry, long oldestVersionFound, CheckPoint latestCheckPoint )
throws IOException
{
return super.latestCheckPoint( fromVersionBackwards, version, latestStartEntry, oldestVersionFound,
latestCheckPoint );
}

@Override
protected long extractFirstTxIdAfterPosition( LogPosition initialPosition, long maxLogVersion )
throws IOException
{
return txId;
}
}
}

0 comments on commit e34463f

Please sign in to comment.