diff --git a/community/kernel/src/test/java/org/neo4j/kernel/impl/transaction/log/stresstest/TransactionAppenderStressTest.java b/community/kernel/src/test/java/org/neo4j/kernel/impl/transaction/log/stresstest/TransactionAppenderStressTest.java new file mode 100644 index 0000000000000..c607101df2c59 --- /dev/null +++ b/community/kernel/src/test/java/org/neo4j/kernel/impl/transaction/log/stresstest/TransactionAppenderStressTest.java @@ -0,0 +1,154 @@ +/* + * Copyright (c) 2002-2016 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Neo4j is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ +package org.neo4j.kernel.impl.transaction.log.stresstest; + +import org.junit.Rule; +import org.junit.Test; + +import java.io.File; +import java.io.IOException; +import java.util.concurrent.Callable; +import java.util.concurrent.TimeUnit; + +import org.neo4j.function.BooleanSupplier; +import org.neo4j.io.fs.DefaultFileSystemAbstraction; +import org.neo4j.io.fs.FileSystemAbstraction; +import org.neo4j.kernel.impl.transaction.log.PhysicalLogFile; +import org.neo4j.kernel.impl.transaction.log.PhysicalLogFiles; +import org.neo4j.kernel.impl.transaction.log.PhysicalLogVersionedStoreChannel; +import org.neo4j.kernel.impl.transaction.log.ReadAheadLogChannel; +import org.neo4j.kernel.impl.transaction.log.ReadableLogChannel; +import org.neo4j.kernel.impl.transaction.log.ReaderLogVersionBridge; +import org.neo4j.kernel.impl.transaction.log.entry.LogEntry; +import org.neo4j.kernel.impl.transaction.log.entry.LogEntryByteCodes; +import org.neo4j.kernel.impl.transaction.log.entry.LogEntryReader; +import org.neo4j.kernel.impl.transaction.log.entry.LogEntryVersion; +import org.neo4j.kernel.impl.transaction.log.entry.OnePhaseCommit; +import org.neo4j.kernel.impl.transaction.log.entry.VersionAwareLogEntryReader; +import org.neo4j.kernel.impl.transaction.log.stresstest.workload.Runner; +import org.neo4j.test.TargetDirectory; + +import static java.lang.System.currentTimeMillis; +import static java.util.concurrent.TimeUnit.SECONDS; +import static org.junit.Assert.assertEquals; +import static org.neo4j.kernel.impl.transaction.log.ReadAheadLogChannel.DEFAULT_READ_AHEAD_SIZE; + +public class TransactionAppenderStressTest +{ + @Rule + public final TargetDirectory.TestDirectory directory = TargetDirectory.testDirForTest( getClass() ); + + @Test + public void concurrentTransactionAppendingTest() throws Exception + { + int threads = 10; + File workingDirectory = directory.directory( "work" ); + Callable runner = new Builder() + .with( Builder.untilTimeExpired( 10, SECONDS ) ) + .withWorkingDirectory( workingDirectory ) + .withNumThreads( threads ) + .build(); + + long appendedTxs = runner.call(); + + assertEquals( new TransactionIdChecker( workingDirectory ).parseAllTxLogs(), appendedTxs ); + } + + public static class Builder + { + private BooleanSupplier condition; + private File workingDirectory; + private int threads; + + public static BooleanSupplier untilTimeExpired( long duration, TimeUnit unit ) + { + final long endTimeInMilliseconds = currentTimeMillis() + unit.toMillis( duration ); + return new BooleanSupplier() + { + @Override + public boolean getAsBoolean() + { + return currentTimeMillis() <= endTimeInMilliseconds; + } + }; + } + + public Builder with( BooleanSupplier condition ) + { + this.condition = condition; + return this; + } + + public Builder withWorkingDirectory( File workingDirectory ) + { + this.workingDirectory = workingDirectory; + return this; + } + + public Builder withNumThreads( int threads ) + { + this.threads = threads; + return this; + } + + public Callable build() + { + return new Runner( workingDirectory, condition, threads ); + } + } + + public static class TransactionIdChecker + { + private File workingDirectory; + + public TransactionIdChecker( File workingDirectory ) + { + this.workingDirectory = workingDirectory; + } + + public long parseAllTxLogs() throws IOException + { + FileSystemAbstraction fs = new DefaultFileSystemAbstraction(); + long txId = -1; + try ( ReadableLogChannel channel = openLogFile( fs, 0 ) ) + { + LogEntryReader reader = + new VersionAwareLogEntryReader<>( LogEntryVersion.CURRENT.byteCode() ); + LogEntry logEntry = reader.readLogEntry( channel ); + for (; logEntry != null; logEntry = reader.readLogEntry( channel ) ) + { + if ( logEntry.getType() == LogEntryByteCodes.TX_1P_COMMIT ) + { + txId = logEntry.as().getTxId(); + } + } + } + return txId; + } + + private ReadableLogChannel openLogFile( FileSystemAbstraction fs, int version ) throws IOException + { + PhysicalLogFiles logFiles = new PhysicalLogFiles( workingDirectory, fs ); + PhysicalLogVersionedStoreChannel channel = PhysicalLogFile.openForVersion( logFiles, fs, version ); + return new ReadAheadLogChannel( channel, new ReaderLogVersionBridge( fs, logFiles ), + DEFAULT_READ_AHEAD_SIZE ); + } + } +} diff --git a/community/kernel/src/test/java/org/neo4j/kernel/impl/transaction/log/stresstest/workload/Runner.java b/community/kernel/src/test/java/org/neo4j/kernel/impl/transaction/log/stresstest/workload/Runner.java new file mode 100644 index 0000000000000..732dfb8f1c346 --- /dev/null +++ b/community/kernel/src/test/java/org/neo4j/kernel/impl/transaction/log/stresstest/workload/Runner.java @@ -0,0 +1,155 @@ +/* + * Copyright (c) 2002-2016 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Neo4j is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ +package org.neo4j.kernel.impl.transaction.log.stresstest.workload; + +import java.io.File; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; + +import org.neo4j.function.BooleanSupplier; +import org.neo4j.graphdb.factory.GraphDatabaseSettings; +import org.neo4j.io.fs.DefaultFileSystemAbstraction; +import org.neo4j.kernel.KernelEventHandlers; +import org.neo4j.kernel.KernelHealth; +import org.neo4j.kernel.configuration.Settings; +import org.neo4j.kernel.impl.core.KernelPanicEventGenerator; +import org.neo4j.kernel.impl.transaction.DeadSimpleLogVersionRepository; +import org.neo4j.kernel.impl.transaction.DeadSimpleTransactionIdStore; +import org.neo4j.kernel.impl.transaction.log.BatchingTransactionAppender; +import org.neo4j.kernel.impl.transaction.log.LogFile; +import org.neo4j.kernel.impl.transaction.log.PhysicalLogFile; +import org.neo4j.kernel.impl.transaction.log.PhysicalLogFiles; +import org.neo4j.kernel.impl.transaction.log.TransactionAppender; +import org.neo4j.kernel.impl.transaction.log.TransactionIdStore; +import org.neo4j.kernel.impl.transaction.log.TransactionMetadataCache; +import org.neo4j.kernel.impl.transaction.log.rotation.LogRotation; +import org.neo4j.kernel.impl.transaction.log.rotation.LogRotationImpl; +import org.neo4j.kernel.impl.util.IdOrderingQueue; +import org.neo4j.kernel.lifecycle.Lifespan; +import org.neo4j.logging.Log; +import org.neo4j.logging.NullLog; + +public class Runner implements Callable +{ + private final File workingDirectory; + private final BooleanSupplier condition; + private final int threads; + + public Runner( File workingDirectory, BooleanSupplier condition, int threads ) + { + this.workingDirectory = workingDirectory; + this.condition = condition; + this.threads = threads; + } + + @Override + public Long call() throws Exception + { + long lastCommittedTransactionId; + + try ( Lifespan life = new Lifespan() ) + { + TransactionIdStore transactionIdStore = new DeadSimpleTransactionIdStore(); + TransactionMetadataCache transactionMetadataCache = new TransactionMetadataCache( 1000, 100_000 ); + + LogFile logFile = life.add( createPhysicalLogFile( transactionIdStore, transactionMetadataCache ) ); + + TransactionAppender transactionAppender = life.add( + createBatchingTransactionAppender( transactionIdStore, transactionMetadataCache, logFile ) ); + + ExecutorService executorService = Executors.newFixedThreadPool( threads ); + try + { + Future[] handlers = new Future[threads]; + for ( int i = 0; i < threads; i++ ) + { + TransactionRepresentationFactory factory = new TransactionRepresentationFactory(); + Worker task = new Worker( transactionAppender, factory, condition ); + handlers[i] = executorService.submit( task ); + } + + // wait for all the workers to complete + for ( Future handle : handlers ) + { + handle.get(); + } + } + finally + { + executorService.shutdown(); + } + + lastCommittedTransactionId = transactionIdStore.getLastCommittedTransactionId(); + } + + return lastCommittedTransactionId; + } + + private BatchingTransactionAppender createBatchingTransactionAppender( TransactionIdStore transactionIdStore, + TransactionMetadataCache transactionMetadataCache, LogFile logFile ) + { + Log log = NullLog.getInstance(); + KernelEventHandlers kernelEventHandlers = new KernelEventHandlers( log ); + KernelPanicEventGenerator kpe = new KernelPanicEventGenerator( kernelEventHandlers ); + KernelHealth kernelHealth = new KernelHealth( kpe, log ); + LogRotationImpl logRotation = new LogRotationImpl( NOOP_LOGROTATION_MONITOR, logFile, kernelHealth ); + return new BatchingTransactionAppender( logFile, logRotation, + transactionMetadataCache, transactionIdStore, IdOrderingQueue.BYPASS, kernelHealth ); + } + + private PhysicalLogFile createPhysicalLogFile( TransactionIdStore transactionIdStore, + TransactionMetadataCache transactionMetadataCache ) + { + DefaultFileSystemAbstraction fs = new DefaultFileSystemAbstraction(); + PhysicalLogFiles logFiles = new PhysicalLogFiles( workingDirectory, fs ); + long rotateAtSize = Settings.BYTES.apply( + GraphDatabaseSettings.logical_log_rotation_threshold.getDefaultValue() ); + DeadSimpleLogVersionRepository logVersionRepository = new DeadSimpleLogVersionRepository( 0 ); + return new PhysicalLogFile( fs, logFiles, rotateAtSize, transactionIdStore, + logVersionRepository, NOOP_LOGFILE_MONITOR, transactionMetadataCache ); + } + + private static final PhysicalLogFile.Monitor NOOP_LOGFILE_MONITOR = new PhysicalLogFile.Monitor() + { + @Override + public void opened( File logFile, long logVersion, long lastTransactionId, boolean clean ) + { + + } + }; + + private static final LogRotation.Monitor NOOP_LOGROTATION_MONITOR = new LogRotation.Monitor() + { + @Override + public void startedRotating( long currentVersion ) + { + + } + + @Override + public void finishedRotating( long currentVersion ) + { + + } + }; + +} diff --git a/community/kernel/src/test/java/org/neo4j/kernel/impl/transaction/log/stresstest/workload/TransactionRepresentationFactory.java b/community/kernel/src/test/java/org/neo4j/kernel/impl/transaction/log/stresstest/workload/TransactionRepresentationFactory.java new file mode 100644 index 0000000000000..1082de746dba1 --- /dev/null +++ b/community/kernel/src/test/java/org/neo4j/kernel/impl/transaction/log/stresstest/workload/TransactionRepresentationFactory.java @@ -0,0 +1,84 @@ +/* + * Copyright (c) 2002-2016 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Neo4j is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ +package org.neo4j.kernel.impl.transaction.log.stresstest.workload; + + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.concurrent.ThreadLocalRandom; + +import org.neo4j.kernel.impl.api.TransactionHeaderInformation; +import org.neo4j.kernel.impl.store.record.NodeRecord; +import org.neo4j.kernel.impl.transaction.TransactionRepresentation; +import org.neo4j.kernel.impl.transaction.command.Command; +import org.neo4j.kernel.impl.transaction.log.PhysicalTransactionRepresentation; + +import static java.lang.System.currentTimeMillis; +import static org.neo4j.kernel.impl.transaction.TransactionHeaderInformationFactory.DEFAULT; + +class TransactionRepresentationFactory +{ + private final CommandGenerator commandGenerator = new CommandGenerator(); + + public TransactionRepresentation nextTransaction( long txId ) + { + PhysicalTransactionRepresentation representation = + new PhysicalTransactionRepresentation( createRandomCommands() ); + TransactionHeaderInformation headerInfo = DEFAULT.create(); + representation.setHeader( headerInfo.getAdditionalHeader(), headerInfo.getMasterId(), + headerInfo.getAuthorId(), headerInfo.getAuthorId(), txId, currentTimeMillis(), 42 ); + return representation; + } + + private Collection createRandomCommands() + { + int commandNum = ThreadLocalRandom.current().nextInt( 1, 17 ); + List commands = new ArrayList<>( commandNum ); + for ( int i = 0; i < commandNum; i++ ) + { + commands.add( commandGenerator.nextCommand() ); + } + return commands; + } + + private static class CommandGenerator + { + private NodeRecordGenerator nodeRecordGenerator = new NodeRecordGenerator(); + + public Command nextCommand() + { + Command.NodeCommand nodeCommand = new Command.NodeCommand(); + nodeCommand.init( nodeRecordGenerator.nextRecord(), nodeRecordGenerator.nextRecord() ); + return nodeCommand; + } + } + + private static class NodeRecordGenerator + { + + public NodeRecord nextRecord() + { + ThreadLocalRandom random = ThreadLocalRandom.current(); + return new NodeRecord( random.nextLong(), random.nextBoolean(), random.nextBoolean(), + random.nextLong(), random.nextLong(), random.nextLong() ); + } + } +} diff --git a/community/kernel/src/test/java/org/neo4j/kernel/impl/transaction/log/stresstest/workload/Worker.java b/community/kernel/src/test/java/org/neo4j/kernel/impl/transaction/log/stresstest/workload/Worker.java new file mode 100644 index 0000000000000..311478d60e053 --- /dev/null +++ b/community/kernel/src/test/java/org/neo4j/kernel/impl/transaction/log/stresstest/workload/Worker.java @@ -0,0 +1,64 @@ +/* + * Copyright (c) 2002-2016 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Neo4j is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ +package org.neo4j.kernel.impl.transaction.log.stresstest.workload; + +import java.io.IOException; + +import org.neo4j.function.BooleanSupplier; +import org.neo4j.kernel.impl.transaction.TransactionRepresentation; +import org.neo4j.kernel.impl.transaction.log.Commitment; +import org.neo4j.kernel.impl.transaction.log.TransactionAppender; +import org.neo4j.kernel.impl.transaction.log.TransactionIdStore; +import org.neo4j.kernel.impl.transaction.tracing.LogAppendEvent; + +class Worker implements Runnable +{ + private final TransactionAppender transactionAppender; + private final TransactionRepresentationFactory factory; + private final BooleanSupplier condition; + + public Worker( TransactionAppender transactionAppender, + TransactionRepresentationFactory factory, BooleanSupplier condition ) + { + this.transactionAppender = transactionAppender; + this.factory = factory; + this.condition = condition; + } + + @Override + public void run() + { + long latestTxId = TransactionIdStore.BASE_TX_ID; + while ( condition.getAsBoolean() ) + { + TransactionRepresentation representation = factory.nextTransaction( latestTxId ); + try + { + Commitment commitment = transactionAppender.append( representation, LogAppendEvent.NULL ); + commitment.publishAsCommitted(); + latestTxId = commitment.transactionId(); + } + catch ( IOException e ) + { + throw new RuntimeException( e ); + } + } + } +} diff --git a/stresstests/pom.xml b/stresstests/pom.xml index d0777f184a777..0e657fad1cf0b 100644 --- a/stresstests/pom.xml +++ b/stresstests/pom.xml @@ -75,6 +75,19 @@ test test-jar + + org.neo4j + neo4j-kernel + ${project.version} + test + + + org.neo4j + neo4j-kernel + ${project.version} + test + test-jar + org.neo4j neo4j-backup diff --git a/stresstests/src/test/java/org/neo4j/kernel/stresstests/transaction/log/TransactionAppenderStressTesting.java b/stresstests/src/test/java/org/neo4j/kernel/stresstests/transaction/log/TransactionAppenderStressTesting.java new file mode 100644 index 0000000000000..fc3e1ae6d4a6a --- /dev/null +++ b/stresstests/src/test/java/org/neo4j/kernel/stresstests/transaction/log/TransactionAppenderStressTesting.java @@ -0,0 +1,85 @@ +/* + * Copyright (c) 2002-2016 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Neo4j is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +package org.neo4j.kernel.stresstests.transaction.log; + +import org.junit.Test; + +import java.io.File; +import java.util.concurrent.Callable; + +import org.neo4j.io.fs.FileUtils; +import org.neo4j.kernel.impl.transaction.log.stresstest.TransactionAppenderStressTest.Builder; +import org.neo4j.kernel.impl.transaction.log.stresstest.TransactionAppenderStressTest.TransactionIdChecker; + +import static java.lang.Integer.parseInt; +import static java.lang.System.getProperty; +import static java.lang.System.getenv; +import static java.util.concurrent.TimeUnit.MINUTES; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +/** + * Notice the class name: this is _not_ going to be run as part of the main build. + */ +public class TransactionAppenderStressTesting +{ + private static final String DEFAULT_DURATION_IN_MINUTES = "5"; + private static final String DEFAULT_WORKING_DIR = new File( getProperty( "java.io.tmpdir" ), "working" ).getPath(); + private static final String DEFAULT_NUM_THREADS = "10"; + + @Test + public void shouldBehaveCorrectlyUnderStress() throws Throwable + { + int durationInMinutes = parseInt( fromEnv( "TX_APPENDER_STRESS_DURATION", DEFAULT_DURATION_IN_MINUTES ) ); + File workingDirectory = ensureExistsAndEmpty( fromEnv( "TX_APPENDER_WORKING_DIRECTORY", DEFAULT_WORKING_DIR ) ); + int threads = parseInt( fromEnv( "TX_APPENDER_NUM_THREADS", DEFAULT_NUM_THREADS ) ); + + Callable runner = new Builder() + .with( Builder.untilTimeExpired( durationInMinutes, MINUTES ) ) + .withWorkingDirectory( workingDirectory ) + .withNumThreads( threads ) + .build(); + + long appendedTxs = runner.call(); + + assertEquals( new TransactionIdChecker( workingDirectory ).parseAllTxLogs(), appendedTxs ); + + // let's cleanup disk space when everything went well + FileUtils.deleteRecursively( workingDirectory ); + } + + private File ensureExistsAndEmpty( String directory ) + { + File dir = new File( directory ); + if ( !dir.mkdirs() ) + { + assertTrue( dir.exists() ); + assertTrue( dir.isDirectory() ); + assertEquals( 0, dir.list().length ); + } + return dir; + } + + private static String fromEnv( String environmentVariableName, String defaultValue ) + { + String environmentVariableValue = getenv( environmentVariableName ); + return environmentVariableValue == null ? defaultValue : environmentVariableValue; + } +}