Skip to content

Commit

Permalink
Transaction kill procedures.
Browse files Browse the repository at this point in the history
Introduce procedures that allow killing one or several transactions.
As a result of the kill procedure invocation transaction will be
marked as terminated and thread executing marked transaction is
responsible for transaction closing.
On a moment when the transaction is terminated, it will release all its
locks and should not block any other transaction after that.

In case if executing transaction thread will never come back the transaction
will hang in a list of transactions as terminated and we will not perform
any force close.
  • Loading branch information
MishaDemianenko committed Nov 27, 2018
1 parent 725478d commit 103f940
Show file tree
Hide file tree
Showing 13 changed files with 135 additions and 46 deletions.
Expand Up @@ -57,8 +57,6 @@
import org.neo4j.kernel.impl.api.ExplicitIndexTransactionStateProvider;
import org.neo4j.kernel.impl.api.KernelAuxTransactionStateManager;
import org.neo4j.kernel.impl.api.KernelImpl;
import org.neo4j.kernel.impl.api.KernelTransactionMonitorScheduler;
import org.neo4j.kernel.impl.api.KernelTransactionTimeoutMonitor;
import org.neo4j.kernel.impl.api.KernelTransactions;
import org.neo4j.kernel.impl.api.KernelTransactionsSnapshot;
import org.neo4j.kernel.impl.api.SchemaState;
Expand All @@ -71,6 +69,8 @@
import org.neo4j.kernel.impl.api.index.IndexingService;
import org.neo4j.kernel.impl.api.operations.QueryRegistrationOperations;
import org.neo4j.kernel.impl.api.state.ConstraintIndexCreator;
import org.neo4j.kernel.impl.api.transaciton.monitor.KernelTransactionMonitor;
import org.neo4j.kernel.impl.api.transaciton.monitor.KernelTransactionMonitorScheduler;
import org.neo4j.kernel.impl.constraints.ConstraintSemantics;
import org.neo4j.kernel.impl.core.TokenHolders;
import org.neo4j.kernel.impl.factory.AccessCapability;
Expand Down Expand Up @@ -668,8 +668,7 @@ private AtomicReference<HeapAllocation> setupHeapAllocationAtomicReference()

private void buildTransactionMonitor( KernelTransactions kernelTransactions, Clock clock, Config config )
{
KernelTransactionTimeoutMonitor kernelTransactionTimeoutMonitor =
new KernelTransactionTimeoutMonitor( kernelTransactions, clock, logService );
KernelTransactionMonitor kernelTransactionTimeoutMonitor = new KernelTransactionMonitor( kernelTransactions, clock, logService );
dataSourceDependencies.satisfyDependency( kernelTransactionTimeoutMonitor );
KernelTransactionMonitorScheduler transactionMonitorScheduler =
new KernelTransactionMonitorScheduler( kernelTransactionTimeoutMonitor, scheduler,
Expand Down
Expand Up @@ -124,8 +124,7 @@ public class KernelTransactions extends LifecycleAdapter implements Supplier<Ker
// This is the factory that actually builds brand-new instances.
private final Factory<KernelTransactionImplementation> factory = new KernelTransactionImplementationFactory( allTransactions );
// Global pool of transactions, wrapped by the thread-local marshland pool and so is not used directly.
private final LinkedQueuePool<KernelTransactionImplementation> globalTxPool =
new GlobalKernelTransactionPool( allTransactions, factory );
private final GlobalKernelTransactionPool globalTxPool = new GlobalKernelTransactionPool( allTransactions, factory );
// Pool of unused transactions.
private final MarshlandPool<KernelTransactionImplementation> localTxPool = new MarshlandPool<>( globalTxPool );
private final ConstraintSemantics constraintSemantics;
Expand Down Expand Up @@ -206,7 +205,7 @@ public KernelTransaction newInstance( KernelTransaction.Type type, LoginContext
}
catch ( InterruptedException ie )
{
Thread.interrupted();
Thread.currentThread().interrupt();
throw new TransactionFailureException( "Fail to start new transaction.", ie );
}
}
Expand Down
Expand Up @@ -17,38 +17,44 @@
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.kernel.impl.api;
package org.neo4j.kernel.impl.api.transaciton.monitor;

import java.time.Clock;
import java.util.Set;

import org.neo4j.kernel.api.KernelTransactionHandle;
import org.neo4j.kernel.api.exceptions.Status;
import org.neo4j.kernel.impl.api.KernelTransactions;
import org.neo4j.logging.Log;
import org.neo4j.logging.internal.LogService;

/**
* Transaction monitor that check transactions with a configured timeout for expiration.
* In case if transaction timed out it will be terminated.
*/
public class KernelTransactionTimeoutMonitor implements Runnable
public class KernelTransactionMonitor implements Runnable
{
private final KernelTransactions kernelTransactions;
private final Clock clock;
private final Log log;

public KernelTransactionTimeoutMonitor( KernelTransactions kernelTransactions, Clock clock, LogService logService )
public KernelTransactionMonitor( KernelTransactions kernelTransactions, Clock clock, LogService logService )
{
this.kernelTransactions = kernelTransactions;
this.clock = clock;
this.log = logService.getInternalLog( KernelTransactionTimeoutMonitor.class );
this.log = logService.getInternalLog( KernelTransactionMonitor.class );
}

@Override
public synchronized void run()
{
Set<KernelTransactionHandle> activeTransactions = kernelTransactions.activeTransactions();
long now = clock.millis();
Set<KernelTransactionHandle> activeTransactions = kernelTransactions.activeTransactions();
checkExpiredTransactions( activeTransactions, now );
}

private void checkExpiredTransactions( Set<KernelTransactionHandle> activeTransactions, long now )
{
for ( KernelTransactionHandle activeTransaction : activeTransactions )
{
long transactionTimeoutMillis = activeTransaction.timeoutMillis();
Expand Down
Expand Up @@ -17,7 +17,7 @@
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.kernel.impl.api;
package org.neo4j.kernel.impl.api.transaciton.monitor;

import java.util.concurrent.TimeUnit;

Expand All @@ -28,15 +28,15 @@

public class KernelTransactionMonitorScheduler extends LifecycleAdapter
{
private final KernelTransactionTimeoutMonitor kernelTransactionTimeoutMonitor;
private final KernelTransactionMonitor kernelTransactionMonitor;
private final JobScheduler scheduler;
private final long checkIntervalMillis;
private JobHandle monitorJobHandle;

public KernelTransactionMonitorScheduler( KernelTransactionTimeoutMonitor kernelTransactionTimeoutMonitor,
public KernelTransactionMonitorScheduler( KernelTransactionMonitor kernelTransactionMonitor,
JobScheduler scheduler, long checkIntervalMillis )
{
this.kernelTransactionTimeoutMonitor = kernelTransactionTimeoutMonitor;
this.kernelTransactionMonitor = kernelTransactionMonitor;
this.scheduler = scheduler;
this.checkIntervalMillis = checkIntervalMillis;
}
Expand All @@ -46,7 +46,7 @@ public void start()
{
if ( checkIntervalMillis > 0 )
{
monitorJobHandle = scheduler.scheduleRecurring( Group.TRANSACTION_TIMEOUT_MONITOR, kernelTransactionTimeoutMonitor,
monitorJobHandle = scheduler.scheduleRecurring( Group.TRANSACTION_TIMEOUT_MONITOR, kernelTransactionMonitor,
checkIntervalMillis, TimeUnit.MILLISECONDS );
}
}
Expand Down
Expand Up @@ -101,8 +101,7 @@ private void assertInUnterminatedTransaction( KernelTransaction transaction )
}
if ( transaction.isTerminated() )
{
Status terminationReason = transaction.getReasonIfTerminated().orElse( Status.Transaction.Terminated );
throw new TransactionTerminatedException( terminationReason );
throw new TransactionTerminatedException( transaction.getReasonIfTerminated().orElse( Status.Transaction.Terminated ) );
}
}

Expand Down
Expand Up @@ -19,25 +19,26 @@
*/
package org.neo4j.kernel.impl.api;

import org.junit.Test;
import org.junit.jupiter.api.Test;

import java.util.concurrent.TimeUnit;

import org.neo4j.kernel.impl.api.transaciton.monitor.KernelTransactionMonitor;
import org.neo4j.kernel.impl.api.transaciton.monitor.KernelTransactionMonitorScheduler;
import org.neo4j.scheduler.Group;
import org.neo4j.scheduler.JobScheduler;

import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyZeroInteractions;

public class KernelTransactionMonitorSchedulerTest
class KernelTransactionMonitorSchedulerTest
{

private final JobScheduler scheduler = mock( JobScheduler.class );
private final KernelTransactionTimeoutMonitor transactionTimeoutMonitor = mock( KernelTransactionTimeoutMonitor.class );
private final KernelTransactionMonitor transactionTimeoutMonitor = mock( KernelTransactionMonitor.class );

@Test
public void scheduleRecurringMonitorJobIfConfigured()
void scheduleRecurringMonitorJobIfConfigured()
{
KernelTransactionMonitorScheduler transactionMonitorScheduler = createMonitorScheduler(1);
transactionMonitorScheduler.start();
Expand All @@ -47,7 +48,7 @@ public void scheduleRecurringMonitorJobIfConfigured()
}

@Test
public void doNotScheduleMonitorJobIfDisabled()
void doNotScheduleMonitorJobIfDisabled()
{
KernelTransactionMonitorScheduler transactionMonitorScheduler = createMonitorScheduler( 0 );
transactionMonitorScheduler.start();
Expand Down
Expand Up @@ -19,11 +19,13 @@
*/
package org.neo4j.kernel.impl.api;

import org.junit.Test;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;

import java.util.concurrent.TimeUnit;

import org.neo4j.kernel.impl.api.transaciton.monitor.KernelTransactionMonitor;
import org.neo4j.kernel.impl.api.transaciton.monitor.KernelTransactionMonitorScheduler;
import org.neo4j.scheduler.Group;
import org.neo4j.scheduler.JobHandle;
import org.neo4j.scheduler.JobScheduler;
Expand All @@ -35,14 +37,14 @@
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

public class KernelTransactionTimeoutMonitorSchedulerTest
class KernelTransactionTimeoutMonitorSchedulerTest
{

private final KernelTransactionTimeoutMonitor transactionMonitor = mock( KernelTransactionTimeoutMonitor.class );
private final KernelTransactionMonitor transactionMonitor = mock( KernelTransactionMonitor.class );
private final JobScheduler jobScheduler = mock( JobScheduler.class );

@Test
public void startJobTransactionMonitor()
void startJobTransactionMonitor()
{
JobHandle jobHandle = Mockito.mock( JobHandle.class );
when( jobScheduler.scheduleRecurring( eq(Group.TRANSACTION_TIMEOUT_MONITOR ), eq( transactionMonitor), anyLong(),
Expand Down
Expand Up @@ -19,14 +19,15 @@
*/
package org.neo4j.kernel.impl.api;

import org.junit.Before;
import org.junit.Test;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import java.util.HashSet;
import java.util.concurrent.TimeUnit;

import org.neo4j.kernel.api.KernelTransactionHandle;
import org.neo4j.kernel.api.exceptions.Status;
import org.neo4j.kernel.impl.api.transaciton.monitor.KernelTransactionMonitor;
import org.neo4j.logging.AssertableLogProvider;
import org.neo4j.logging.internal.LogService;
import org.neo4j.logging.internal.SimpleLogService;
Expand All @@ -38,16 +39,16 @@
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

public class KernelTransactionTimeoutMonitorTest
class KernelTransactionTimeoutMonitorTest
{
private static final int EXPECTED_REUSE_COUNT = 2;
private KernelTransactions kernelTransactions;
private FakeClock fakeClock;
private AssertableLogProvider logProvider;
private LogService logService;

@Before
public void setUp()
@BeforeEach
void setUp()
{
kernelTransactions = mock( KernelTransactions.class );
fakeClock = Clocks.fakeClock();
Expand All @@ -56,19 +57,19 @@ public void setUp()
}

@Test
public void terminateExpiredTransactions()
void terminateExpiredTransactions()
{
HashSet<KernelTransactionHandle> transactions = new HashSet<>();
KernelTransactionImplementation tx1 = prepareTxMock( 1, 3 );
KernelTransactionImplementation tx2 = prepareTxMock( 1, 8 );
KernelTransactionImplementation tx1 = prepareTxMock( 3, 1, 3 );
KernelTransactionImplementation tx2 = prepareTxMock( 4, 1, 8 );
KernelTransactionImplementationHandle handle1 = new KernelTransactionImplementationHandle( tx1, fakeClock );
KernelTransactionImplementationHandle handle2 = new KernelTransactionImplementationHandle( tx2, fakeClock );
transactions.add( handle1 );
transactions.add( handle2 );

when( kernelTransactions.activeTransactions()).thenReturn( transactions );

KernelTransactionTimeoutMonitor transactionMonitor = buildTransactionMonitor();
KernelTransactionMonitor transactionMonitor = buildTransactionMonitor();

fakeClock.forward( 3, TimeUnit.MILLISECONDS );
transactionMonitor.run();
Expand All @@ -93,19 +94,19 @@ public void terminateExpiredTransactions()
}

@Test
public void skipTransactionWithoutTimeout()
void skipTransactionWithoutTimeout()
{
HashSet<KernelTransactionHandle> transactions = new HashSet<>();
KernelTransactionImplementation tx1 = prepareTxMock( 3, 0 );
KernelTransactionImplementation tx2 = prepareTxMock( 4, 0 );
KernelTransactionImplementation tx1 = prepareTxMock( 7, 3, 0 );
KernelTransactionImplementation tx2 = prepareTxMock( 8, 4, 0 );
KernelTransactionImplementationHandle handle1 = new KernelTransactionImplementationHandle( tx1, fakeClock );
KernelTransactionImplementationHandle handle2 = new KernelTransactionImplementationHandle( tx2, fakeClock );
transactions.add( handle1 );
transactions.add( handle2 );

when( kernelTransactions.activeTransactions()).thenReturn( transactions );

KernelTransactionTimeoutMonitor transactionMonitor = buildTransactionMonitor();
KernelTransactionMonitor transactionMonitor = buildTransactionMonitor();

fakeClock.forward( 300, TimeUnit.MILLISECONDS );
transactionMonitor.run();
Expand All @@ -115,15 +116,16 @@ public void skipTransactionWithoutTimeout()
logProvider.assertNoMessagesContaining( "timeout" );
}

private KernelTransactionTimeoutMonitor buildTransactionMonitor()
private KernelTransactionMonitor buildTransactionMonitor()
{
return new KernelTransactionTimeoutMonitor( kernelTransactions, fakeClock, logService );
return new KernelTransactionMonitor( kernelTransactions, fakeClock, logService );
}

private KernelTransactionImplementation prepareTxMock( long startMillis, long timeoutMillis )
private static KernelTransactionImplementation prepareTxMock( long userTxId, long startMillis, long timeoutMillis )
{
KernelTransactionImplementation transaction = mock( KernelTransactionImplementation.class );
when( transaction.startTime() ).thenReturn( startMillis );
when( transaction.userTransactionId() ).thenReturn( userTxId );
when( transaction.getReuseCount() ).thenReturn( EXPECTED_REUSE_COUNT );
when( transaction.timeout() ).thenReturn( timeoutMillis );
when( transaction.markForTermination( EXPECTED_REUSE_COUNT, Status.Transaction.TransactionTimedOut ) ).thenReturn( true );
Expand Down
Expand Up @@ -97,6 +97,7 @@
import org.neo4j.storageengine.api.schema.StoreIndexDescriptor;
import org.neo4j.test.Barrier;
import org.neo4j.test.DoubleLatch;
import org.neo4j.test.rule.SuppressOutput;
import org.neo4j.values.storable.Values;

import static java.lang.String.format;
Expand Down Expand Up @@ -158,6 +159,8 @@ public class IndexingServiceTest
public final LifeRule life = new LifeRule();
@Rule
public ExpectedException expectedException = ExpectedException.none();
@Rule
public SuppressOutput suppressOutput = SuppressOutput.suppressAll();

private static final LogMatcherBuilder logMatch = inLog( IndexingService.class );
private static final IndexProviderDescriptor lucene10Descriptor = new IndexProviderDescriptor( LUCENE10.providerKey(), LUCENE10.providerVersion() );
Expand Down
Expand Up @@ -35,7 +35,7 @@ class TransitionalTxManagementKernelTransaction
private final GraphDatabaseFacade db;
private final KernelTransaction.Type type;
private final LoginContext loginContext;
private long customTransactionTimeout;
private final long customTransactionTimeout;
private final ThreadToStatementContextBridge bridge;

private InternalTransaction tx;
Expand Down
Expand Up @@ -188,6 +188,7 @@ public Map<String, String> createConfiguration( File temporaryFolder )
properties.put( GraphDatabaseSettings.logical_logs_location.name(),
new File( temporaryFolder, "transaction-logs" ).getAbsolutePath() );
properties.put( GraphDatabaseSettings.pagecache_memory.name(), "8m" );
properties.put( GraphDatabaseSettings.shutdown_transaction_end_timeout.name(), "0s" );

for ( Object key : arbitraryProperties.keySet() )
{
Expand Down
@@ -0,0 +1,33 @@
/*
* Copyright (c) 2002-2018 "Neo4j,"
* Neo4j Sweden AB [http://neo4j.com]
*
* This file is part of Neo4j Enterprise Edition. The included source
* code can be redistributed and/or modified under the terms of the
* GNU AFFERO GENERAL PUBLIC LICENSE Version 3
* (http://www.fsf.org/licensing/licenses/agpl-3.0.html) with the
* Commons Clause, as found in the associated LICENSE.txt file.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* Neo4j object code can be licensed independently from the source
* under separate terms from the AGPL. Inquiries can be directed to:
* licensing@neo4j.com
*
* More information is also available at:
* https://neo4j.com/licensing/
*/
package org.neo4j.kernel.enterprise.builtinprocs;

class TransactionMarkForTerminationFailedResult extends TransactionMarkForTerminationResult
{
private static final String FAILURE_MESSAGE = "Transaction not found.";

TransactionMarkForTerminationFailedResult( String transactionId, String userName )
{
super( transactionId, userName, FAILURE_MESSAGE );
}
}

0 comments on commit 103f940

Please sign in to comment.