Skip to content

Commit

Permalink
Merge branch '3.2' into 3.3
Browse files Browse the repository at this point in the history
  • Loading branch information
ali-ince committed Sep 4, 2018
2 parents df27eb9 + cb4b8f4 commit cd1db7b
Show file tree
Hide file tree
Showing 5 changed files with 242 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -93,9 +93,9 @@ public void txIdStoreRefreshedAfterRestart() throws Throwable
GraphDatabaseAPI db = newDbMock();
DependencyResolver dependencyResolver = db.getDependencyResolver();
TransactionIdStore txIdStoreBeforeRestart = mock( TransactionIdStore.class );
when( txIdStoreBeforeRestart.getLastClosedTransactionId() ).thenReturn( 42L );
when( txIdStoreBeforeRestart.getLastCommittedTransactionId() ).thenReturn( 42L );
TransactionIdStore txIdStoreAfterRestart = mock( TransactionIdStore.class );
when( txIdStoreAfterRestart.getLastClosedTransactionId() ).thenReturn( 4242L );
when( txIdStoreAfterRestart.getLastCommittedTransactionId() ).thenReturn( 4242L );
when( dependencyResolver.provideDependency( TransactionIdStore.class ) )
.thenReturn( () -> txIdStoreBeforeRestart ).thenReturn( () -> txIdStoreAfterRestart );

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,8 @@ public void awaitUpToDate( long oldestAcceptableTxId, Duration timeout ) throws

try
{
// await for the last closed transaction id to to have at least the expected value
// it has to be "last closed" and not "last committed" becase all transactions before the expected one should also be committed
transactionIdStore().awaitClosedTransactionId( oldestAcceptableTxId, timeout.toMillis() );
}
catch ( InterruptedException | TimeoutException e )
Expand Down Expand Up @@ -113,6 +115,8 @@ private TransactionIdStore transactionIdStore()
*/
public long newestEncounteredTxId()
{
return transactionIdStore().getLastClosedTransactionId();
// return the "last committed" because it is the newest id
// "last closed" will return the last gap-free id, pottentially for some old transaction because there might be other committing transactions
return transactionIdStore().getLastCommittedTransactionId();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ private long appendToLog( TransactionToApply batch, CommitEvent commitEvent ) th
}
}

private void applyToStore( TransactionToApply batch, CommitEvent commitEvent, TransactionApplicationMode mode )
protected void applyToStore( TransactionToApply batch, CommitEvent commitEvent, TransactionApplicationMode mode )
throws TransactionFailureException
{
try ( StoreApplyEvent storeApplyEvent = commitEvent.beginStoreApply() )
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,4 +123,13 @@ public void shouldNotWaitIfTheDatabaseIsUnavailable() throws Exception

verify( transactionIdStore, never() ).awaitClosedTransactionId( anyLong(), anyLong() );
}

@Test
public void shouldReturnNewestTransactionId()
{
when( transactionIdStore.getLastClosedTransactionId() ).thenReturn( 42L );
when( transactionIdStore.getLastCommittedTransactionId() ).thenReturn( 4242L );

assertEquals( 4242L, transactionIdTracker.newestEncounteredTxId() );
}
}
225 changes: 225 additions & 0 deletions integrationtests/src/test/java/org/neo4j/bolt/BookmarkIT.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,225 @@
/*
* Copyright (c) 2002-2018 "Neo4j,"
* Neo4j Sweden AB [http://neo4j.com]
*
* This file is part of Neo4j Enterprise Edition. The included source
* code can be redistributed and/or modified under the terms of the
* GNU AFFERO GENERAL PUBLIC LICENSE Version 3
* (http://www.fsf.org/licensing/licenses/agpl-3.0.html) with the
* Commons Clause, as found in the associated LICENSE.txt file.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* Neo4j object code can be licensed independently from the source
* under separate terms from the AGPL. Inquiries can be directed to:
* licensing@neo4j.com
*
* More information is also available at:
* https://neo4j.com/licensing/
*/
package org.neo4j.bolt;

import org.junit.After;
import org.junit.Rule;
import org.junit.Test;

import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Function;
import java.util.stream.Stream;

import org.neo4j.driver.v1.Driver;
import org.neo4j.driver.v1.GraphDatabase;
import org.neo4j.driver.v1.Session;
import org.neo4j.driver.v1.Transaction;
import org.neo4j.graphdb.GraphDatabaseService;
import org.neo4j.graphdb.factory.GraphDatabaseFactoryState;
import org.neo4j.kernel.api.exceptions.TransactionFailureException;
import org.neo4j.kernel.configuration.Config;
import org.neo4j.kernel.impl.api.CommitProcessFactory;
import org.neo4j.kernel.impl.api.TransactionCommitProcess;
import org.neo4j.kernel.impl.api.TransactionRepresentationCommitProcess;
import org.neo4j.kernel.impl.api.TransactionToApply;
import org.neo4j.kernel.impl.factory.CommunityEditionModule;
import org.neo4j.kernel.impl.factory.DatabaseInfo;
import org.neo4j.kernel.impl.factory.EditionModule;
import org.neo4j.kernel.impl.factory.GraphDatabaseFacadeFactory;
import org.neo4j.kernel.impl.factory.PlatformModule;
import org.neo4j.kernel.impl.transaction.log.TransactionAppender;
import org.neo4j.kernel.impl.transaction.tracing.CommitEvent;
import org.neo4j.kernel.internal.GraphDatabaseAPI;
import org.neo4j.storageengine.api.StorageEngine;
import org.neo4j.storageengine.api.TransactionApplicationMode;
import org.neo4j.test.rule.TestDirectory;

import static java.util.Collections.singletonMap;
import static java.util.concurrent.TimeUnit.MINUTES;
import static java.util.stream.Collectors.toSet;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.is;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertNotNull;
import static org.neo4j.kernel.configuration.Settings.TRUE;
import static org.neo4j.test.assertion.Assert.assertEventually;

public class BookmarkIT
{
@Rule
public final TestDirectory directory = TestDirectory.testDirectory( getClass() );

private Driver driver;
private GraphDatabaseService db;

@After
public void tearDown() throws Exception
{
if ( driver != null )
{
driver.close();
}
if ( db != null )
{
db.shutdown();
}
}

@Test
public void shouldReturnUpToDateBookmarkWhenSomeTransactionIsCommitting() throws Exception
{
CommitBlocker commitBlocker = new CommitBlocker();
db = createDb( commitBlocker );
driver = GraphDatabase.driver( "bolt://localhost:7687" );

String firstBookmark = createNode( driver );

// make next transaction append to the log and then pause before applying to the store
// this makes it allocate a transaction ID but wait before acknowledging the commit operation
commitBlocker.blockNextTransaction();
CompletableFuture<String> secondBookmarkFuture = CompletableFuture.supplyAsync( () -> createNode( driver ) );
assertEventually( "Transaction did not block as expected", commitBlocker::hasBlockedTransaction, is( true ), 1, MINUTES );

Set<String> otherBookmarks = Stream.generate( () -> createNode( driver ) )
.limit( 10 )
.collect( toSet() );

commitBlocker.unblock();
String lastBookmark = secondBookmarkFuture.get();

// first and last bookmarks should not be null and should be different
assertNotNull( firstBookmark );
assertNotNull( lastBookmark );
assertNotEquals( firstBookmark, lastBookmark );

// all bookmarks received while a transaction was blocked committing should be unique
assertThat( otherBookmarks, hasSize( 10 ) );
}

private GraphDatabaseAPI createDb( CommitBlocker commitBlocker )
{
return createDb( platformModule -> new CustomCommunityEditionModule( platformModule, commitBlocker ) );
}

private GraphDatabaseAPI createDb( Function<PlatformModule,EditionModule> editionModuleFactory )
{
GraphDatabaseFactoryState state = new GraphDatabaseFactoryState();
GraphDatabaseFacadeFactory facadeFactory = new GraphDatabaseFacadeFactory( DatabaseInfo.COMMUNITY, editionModuleFactory );
return facadeFactory.newFacade( directory.graphDbDir(), configWithBoltEnabled(), state.databaseDependencies() );
}

private static String createNode( Driver driver )
{
try ( Session session = driver.session() )
{
try ( Transaction tx = session.beginTransaction() )
{
tx.run( "CREATE ()" );
tx.success();
}
return session.lastBookmark();
}
}

private static Config configWithBoltEnabled()
{
return Config.defaults().augment( singletonMap( "dbms.connector.bolt.enabled", TRUE ) );
}

private static class CustomCommunityEditionModule extends CommunityEditionModule
{
CustomCommunityEditionModule( PlatformModule platformModule, CommitBlocker commitBlocker )
{
super( platformModule );
commitProcessFactory = new CustomCommitProcessFactory( commitBlocker );
}
}

private static class CustomCommitProcessFactory implements CommitProcessFactory
{
final CommitBlocker commitBlocker;

private CustomCommitProcessFactory( CommitBlocker commitBlocker )
{
this.commitBlocker = commitBlocker;
}

@Override
public TransactionCommitProcess create( TransactionAppender appender, StorageEngine storageEngine, Config config )
{
return new CustomCommitProcess( appender, storageEngine, commitBlocker );
}
}

private static class CustomCommitProcess extends TransactionRepresentationCommitProcess
{
final CommitBlocker commitBlocker;

CustomCommitProcess( TransactionAppender appender, StorageEngine storageEngine, CommitBlocker commitBlocker )
{
super( appender, storageEngine );
this.commitBlocker = commitBlocker;
}

@Override
protected void applyToStore( TransactionToApply batch, CommitEvent commitEvent, TransactionApplicationMode mode ) throws TransactionFailureException
{
commitBlocker.blockWhileWritingToStoreIfNeeded();
super.applyToStore( batch, commitEvent, mode );
}
}

private static class CommitBlocker
{
final ReentrantLock lock = new ReentrantLock();
volatile boolean shouldBlock;

void blockNextTransaction()
{
shouldBlock = true;
lock.lock();
}

void blockWhileWritingToStoreIfNeeded()
{
if ( shouldBlock )
{
shouldBlock = false;
lock.lock();
}
}

void unblock()
{
lock.unlock();
}

boolean hasBlockedTransaction()
{
return lock.getQueueLength() == 1;
}
}
}

0 comments on commit cd1db7b

Please sign in to comment.