Skip to content

Commit

Permalink
Add a simple transaction appender stress test
Browse files Browse the repository at this point in the history
  • Loading branch information
davidegrohmann committed Jan 11, 2016
1 parent 417cb13 commit 37e1f2b
Show file tree
Hide file tree
Showing 6 changed files with 555 additions and 0 deletions.
@@ -0,0 +1,154 @@
/*
* Copyright (c) 2002-2016 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.kernel.impl.transaction.log.stresstest;

import org.junit.Rule;
import org.junit.Test;

import java.io.File;
import java.io.IOException;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;

import org.neo4j.function.BooleanSupplier;
import org.neo4j.io.fs.DefaultFileSystemAbstraction;
import org.neo4j.io.fs.FileSystemAbstraction;
import org.neo4j.kernel.impl.transaction.log.PhysicalLogFile;
import org.neo4j.kernel.impl.transaction.log.PhysicalLogFiles;
import org.neo4j.kernel.impl.transaction.log.PhysicalLogVersionedStoreChannel;
import org.neo4j.kernel.impl.transaction.log.ReadAheadLogChannel;
import org.neo4j.kernel.impl.transaction.log.ReadableLogChannel;
import org.neo4j.kernel.impl.transaction.log.ReaderLogVersionBridge;
import org.neo4j.kernel.impl.transaction.log.entry.LogEntry;
import org.neo4j.kernel.impl.transaction.log.entry.LogEntryByteCodes;
import org.neo4j.kernel.impl.transaction.log.entry.LogEntryReader;
import org.neo4j.kernel.impl.transaction.log.entry.LogEntryVersion;
import org.neo4j.kernel.impl.transaction.log.entry.OnePhaseCommit;
import org.neo4j.kernel.impl.transaction.log.entry.VersionAwareLogEntryReader;
import org.neo4j.kernel.impl.transaction.log.stresstest.workload.Runner;
import org.neo4j.test.TargetDirectory;

import static java.lang.System.currentTimeMillis;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.junit.Assert.assertEquals;
import static org.neo4j.kernel.impl.transaction.log.ReadAheadLogChannel.DEFAULT_READ_AHEAD_SIZE;

public class TransactionAppenderStressTest
{
@Rule
public final TargetDirectory.TestDirectory directory = TargetDirectory.testDirForTest( getClass() );

@Test
public void concurrentTransactionAppendingTest() throws Exception
{
int threads = 10;
File workingDirectory = directory.directory( "work" );
Callable<Long> runner = new Builder()
.with( Builder.untilTimeExpired( 10, SECONDS ) )
.withWorkingDirectory( workingDirectory )
.withNumThreads( threads )
.build();

long appendedTxs = runner.call();

assertEquals( new TransactionIdChecker( workingDirectory ).parseAllTxLogs(), appendedTxs );
}

public static class Builder
{
private BooleanSupplier condition;
private File workingDirectory;
private int threads;

public static BooleanSupplier untilTimeExpired( long duration, TimeUnit unit )
{
final long endTimeInMilliseconds = currentTimeMillis() + unit.toMillis( duration );
return new BooleanSupplier()
{
@Override
public boolean getAsBoolean()
{
return currentTimeMillis() <= endTimeInMilliseconds;
}
};
}

public Builder with( BooleanSupplier condition )
{
this.condition = condition;
return this;
}

public Builder withWorkingDirectory( File workingDirectory )
{
this.workingDirectory = workingDirectory;
return this;
}

public Builder withNumThreads( int threads )
{
this.threads = threads;
return this;
}

public Callable<Long> build()
{
return new Runner( workingDirectory, condition, threads );
}
}

public static class TransactionIdChecker
{
private File workingDirectory;

public TransactionIdChecker( File workingDirectory )
{
this.workingDirectory = workingDirectory;
}

public long parseAllTxLogs() throws IOException
{
FileSystemAbstraction fs = new DefaultFileSystemAbstraction();
long txId = -1;
try ( ReadableLogChannel channel = openLogFile( fs, 0 ) )
{
LogEntryReader<ReadableLogChannel> reader =
new VersionAwareLogEntryReader<>( LogEntryVersion.CURRENT.byteCode() );
LogEntry logEntry = reader.readLogEntry( channel );
for (; logEntry != null; logEntry = reader.readLogEntry( channel ) )
{
if ( logEntry.getType() == LogEntryByteCodes.TX_1P_COMMIT )
{
txId = logEntry.<OnePhaseCommit>as().getTxId();
}
}
}
return txId;
}

private ReadableLogChannel openLogFile( FileSystemAbstraction fs, int version ) throws IOException
{
PhysicalLogFiles logFiles = new PhysicalLogFiles( workingDirectory, fs );
PhysicalLogVersionedStoreChannel channel = PhysicalLogFile.openForVersion( logFiles, fs, version );
return new ReadAheadLogChannel( channel, new ReaderLogVersionBridge( fs, logFiles ),
DEFAULT_READ_AHEAD_SIZE );
}
}
}
@@ -0,0 +1,155 @@
/*
* Copyright (c) 2002-2016 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.kernel.impl.transaction.log.stresstest.workload;

import java.io.File;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

import org.neo4j.function.BooleanSupplier;
import org.neo4j.graphdb.factory.GraphDatabaseSettings;
import org.neo4j.io.fs.DefaultFileSystemAbstraction;
import org.neo4j.kernel.KernelEventHandlers;
import org.neo4j.kernel.KernelHealth;
import org.neo4j.kernel.configuration.Settings;
import org.neo4j.kernel.impl.core.KernelPanicEventGenerator;
import org.neo4j.kernel.impl.transaction.DeadSimpleLogVersionRepository;
import org.neo4j.kernel.impl.transaction.DeadSimpleTransactionIdStore;
import org.neo4j.kernel.impl.transaction.log.BatchingTransactionAppender;
import org.neo4j.kernel.impl.transaction.log.LogFile;
import org.neo4j.kernel.impl.transaction.log.PhysicalLogFile;
import org.neo4j.kernel.impl.transaction.log.PhysicalLogFiles;
import org.neo4j.kernel.impl.transaction.log.TransactionAppender;
import org.neo4j.kernel.impl.transaction.log.TransactionIdStore;
import org.neo4j.kernel.impl.transaction.log.TransactionMetadataCache;
import org.neo4j.kernel.impl.transaction.log.rotation.LogRotation;
import org.neo4j.kernel.impl.transaction.log.rotation.LogRotationImpl;
import org.neo4j.kernel.impl.util.IdOrderingQueue;
import org.neo4j.kernel.lifecycle.Lifespan;
import org.neo4j.logging.Log;
import org.neo4j.logging.NullLog;

public class Runner implements Callable<Long>
{
private final File workingDirectory;
private final BooleanSupplier condition;
private final int threads;

public Runner( File workingDirectory, BooleanSupplier condition, int threads )
{
this.workingDirectory = workingDirectory;
this.condition = condition;
this.threads = threads;
}

@Override
public Long call() throws Exception
{
long lastCommittedTransactionId;

try ( Lifespan life = new Lifespan() )
{
TransactionIdStore transactionIdStore = new DeadSimpleTransactionIdStore();
TransactionMetadataCache transactionMetadataCache = new TransactionMetadataCache( 1000, 100_000 );

LogFile logFile = life.add( createPhysicalLogFile( transactionIdStore, transactionMetadataCache ) );

TransactionAppender transactionAppender = life.add(
createBatchingTransactionAppender( transactionIdStore, transactionMetadataCache, logFile ) );

ExecutorService executorService = Executors.newFixedThreadPool( threads );
try
{
Future<?>[] handlers = new Future[threads];
for ( int i = 0; i < threads; i++ )
{
TransactionRepresentationFactory factory = new TransactionRepresentationFactory();
Worker task = new Worker( transactionAppender, factory, condition );
handlers[i] = executorService.submit( task );
}

// wait for all the workers to complete
for ( Future<?> handle : handlers )
{
handle.get();
}
}
finally
{
executorService.shutdown();
}

lastCommittedTransactionId = transactionIdStore.getLastCommittedTransactionId();
}

return lastCommittedTransactionId;
}

private BatchingTransactionAppender createBatchingTransactionAppender( TransactionIdStore transactionIdStore,
TransactionMetadataCache transactionMetadataCache, LogFile logFile )
{
Log log = NullLog.getInstance();
KernelEventHandlers kernelEventHandlers = new KernelEventHandlers( log );
KernelPanicEventGenerator kpe = new KernelPanicEventGenerator( kernelEventHandlers );
KernelHealth kernelHealth = new KernelHealth( kpe, log );
LogRotationImpl logRotation = new LogRotationImpl( NOOP_LOGROTATION_MONITOR, logFile, kernelHealth );
return new BatchingTransactionAppender( logFile, logRotation,
transactionMetadataCache, transactionIdStore, IdOrderingQueue.BYPASS, kernelHealth );
}

private PhysicalLogFile createPhysicalLogFile( TransactionIdStore transactionIdStore,
TransactionMetadataCache transactionMetadataCache )
{
DefaultFileSystemAbstraction fs = new DefaultFileSystemAbstraction();
PhysicalLogFiles logFiles = new PhysicalLogFiles( workingDirectory, fs );
long rotateAtSize = Settings.BYTES.apply(
GraphDatabaseSettings.logical_log_rotation_threshold.getDefaultValue() );
DeadSimpleLogVersionRepository logVersionRepository = new DeadSimpleLogVersionRepository( 0 );
return new PhysicalLogFile( fs, logFiles, rotateAtSize, transactionIdStore,
logVersionRepository, NOOP_LOGFILE_MONITOR, transactionMetadataCache );
}

private static final PhysicalLogFile.Monitor NOOP_LOGFILE_MONITOR = new PhysicalLogFile.Monitor()
{
@Override
public void opened( File logFile, long logVersion, long lastTransactionId, boolean clean )
{

}
};

private static final LogRotation.Monitor NOOP_LOGROTATION_MONITOR = new LogRotation.Monitor()
{
@Override
public void startedRotating( long currentVersion )
{

}

@Override
public void finishedRotating( long currentVersion )
{

}
};

}
@@ -0,0 +1,84 @@
/*
* Copyright (c) 2002-2016 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.kernel.impl.transaction.log.stresstest.workload;


import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.ThreadLocalRandom;

import org.neo4j.kernel.impl.api.TransactionHeaderInformation;
import org.neo4j.kernel.impl.store.record.NodeRecord;
import org.neo4j.kernel.impl.transaction.TransactionRepresentation;
import org.neo4j.kernel.impl.transaction.command.Command;
import org.neo4j.kernel.impl.transaction.log.PhysicalTransactionRepresentation;

import static java.lang.System.currentTimeMillis;
import static org.neo4j.kernel.impl.transaction.TransactionHeaderInformationFactory.DEFAULT;

class TransactionRepresentationFactory
{
private final CommandGenerator commandGenerator = new CommandGenerator();

public TransactionRepresentation nextTransaction( long txId )
{
PhysicalTransactionRepresentation representation =
new PhysicalTransactionRepresentation( createRandomCommands() );
TransactionHeaderInformation headerInfo = DEFAULT.create();
representation.setHeader( headerInfo.getAdditionalHeader(), headerInfo.getMasterId(),
headerInfo.getAuthorId(), headerInfo.getAuthorId(), txId, currentTimeMillis(), 42 );
return representation;
}

private Collection<Command> createRandomCommands()
{
int commandNum = ThreadLocalRandom.current().nextInt( 1, 17 );
List<Command> commands = new ArrayList<>( commandNum );
for ( int i = 0; i < commandNum; i++ )
{
commands.add( commandGenerator.nextCommand() );
}
return commands;
}

private static class CommandGenerator
{
private NodeRecordGenerator nodeRecordGenerator = new NodeRecordGenerator();

public Command nextCommand()
{
Command.NodeCommand nodeCommand = new Command.NodeCommand();
nodeCommand.init( nodeRecordGenerator.nextRecord(), nodeRecordGenerator.nextRecord() );
return nodeCommand;
}
}

private static class NodeRecordGenerator
{

public NodeRecord nextRecord()
{
ThreadLocalRandom random = ThreadLocalRandom.current();
return new NodeRecord( random.nextLong(), random.nextBoolean(), random.nextBoolean(),
random.nextLong(), random.nextLong(), random.nextLong() );
}
}
}

0 comments on commit 37e1f2b

Please sign in to comment.