Skip to content

Commit

Permalink
Merge 3.1 into 3.2
Browse files Browse the repository at this point in the history
  • Loading branch information
MishaDemianenko committed Aug 13, 2017
2 parents 36970c3 + 7206285 commit 31c7877
Show file tree
Hide file tree
Showing 12 changed files with 526 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,10 @@ public class GraphDatabaseSettings implements LoadableConfig
public static final Setting<Duration> lock_acquisition_timeout = setting( "dbms.lock.acquisition.timeout", DURATION,
String.valueOf( UNSPECIFIED_TIMEOUT ) );

@Description("Configures the time interval between transaction monitor checks. Determines how often " +
"monitor thread will check transaction for timeout.")
public static final Setting<Long> transaction_monitor_check_interval = setting( "dbms.transaction.monitor.check.interval", DURATION, "5s" );

@Description( "The maximum amount of time to wait for running transactions to complete before allowing "
+ "initiated database shutdown to continue" )
public static final Setting<Duration> shutdown_transaction_end_timeout =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@
import org.neo4j.kernel.impl.api.GuardingStatementOperations;
import org.neo4j.kernel.impl.api.Kernel;
import org.neo4j.kernel.impl.api.KernelSchemaStateStore;
import org.neo4j.kernel.impl.api.KernelTransactionMonitorScheduler;
import org.neo4j.kernel.impl.api.KernelTransactionTimeoutMonitor;
import org.neo4j.kernel.impl.api.KernelTransactions;
import org.neo4j.kernel.impl.api.KernelTransactionsSnapshot;
import org.neo4j.kernel.impl.api.LegacyIndexProviderLookup;
Expand Down Expand Up @@ -734,6 +736,8 @@ private NeoStoreKernelModule buildKernel( TransactionAppender appender,
transactionCommitProcess, indexConfigStore, legacyIndexProviderLookup, hooks, transactionMonitor,
availabilityGuard, tracers, storageEngine, procedures, transactionIdStore, clock, accessCapability ) );

buildTransactionMonitor( kernelTransactions, clock, config );

final Kernel kernel = new Kernel( kernelTransactions, hooks, databaseHealth, transactionMonitor, procedures,
config );

Expand All @@ -745,6 +749,16 @@ private NeoStoreKernelModule buildKernel( TransactionAppender appender,
return new NeoStoreKernelModule( transactionCommitProcess, kernel, kernelTransactions, fileListing );
}

private void buildTransactionMonitor( KernelTransactions kernelTransactions, Clock clock, Config config )
{
KernelTransactionTimeoutMonitor kernelTransactionTimeoutMonitor =
new KernelTransactionTimeoutMonitor( kernelTransactions, clock, logService );
KernelTransactionMonitorScheduler transactionMonitorScheduler =
new KernelTransactionMonitorScheduler( kernelTransactionTimeoutMonitor, scheduler,
config.get( GraphDatabaseSettings.transaction_monitor_check_interval ) );
life.add( transactionMonitorScheduler );
}

@Override
public synchronized void stop()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,12 @@ public interface KernelTransactionHandle
*/
long startTime();

/**
* Underlying transaction specific timeout. In case if timeout is 0 - transaction does not have a timeout.
* @return transaction timeout in milliseconds, <b>0 in case if transaction does not have a timeout<b/>
*/
long timeoutMillis();

/**
* Check if the underlying transaction is open.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ class KernelTransactionImplementationHandle implements KernelTransactionHandle
private final long lastTransactionIdWhenStarted;
private final long lastTransactionTimestampWhenStarted;
private final long startTime;
private final long timeoutMillis;
private final KernelTransactionImplementation tx;
private final SecurityContext securityContext;
private final Optional<Status> terminationReason;
Expand All @@ -52,6 +53,7 @@ class KernelTransactionImplementationHandle implements KernelTransactionHandle
this.lastTransactionIdWhenStarted = tx.lastTransactionIdWhenStarted();
this.lastTransactionTimestampWhenStarted = tx.lastTransactionTimestampWhenStarted();
this.startTime = tx.startTime();
this.timeoutMillis = tx.timeout();
this.securityContext = tx.securityContext();
this.terminationReason = tx.getReasonIfTerminated();
this.executingQueries = tx.executingQueries();
Expand All @@ -76,6 +78,12 @@ public long startTime()
return startTime;
}

@Override
public long timeoutMillis()
{
return timeoutMillis;
}

@Override
public boolean isOpen()
{
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
* Copyright (c) 2002-2017 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.kernel.impl.api;

import java.util.concurrent.TimeUnit;

import org.neo4j.kernel.impl.util.JobScheduler;
import org.neo4j.kernel.lifecycle.LifecycleAdapter;

import static org.neo4j.kernel.impl.util.JobScheduler.Groups.transactionTimeoutMonitor;

public class KernelTransactionMonitorScheduler extends LifecycleAdapter
{
private final KernelTransactionTimeoutMonitor kernelTransactionTimeoutMonitor;
private final JobScheduler scheduler;
private final long checkIntervalMillis;
private JobScheduler.JobHandle monitorJobHandle;

public KernelTransactionMonitorScheduler( KernelTransactionTimeoutMonitor kernelTransactionTimeoutMonitor,
JobScheduler scheduler, long checkIntervalMillis )
{
this.kernelTransactionTimeoutMonitor = kernelTransactionTimeoutMonitor;
this.scheduler = scheduler;
this.checkIntervalMillis = checkIntervalMillis;
}

@Override
public void start() throws Throwable
{
if (checkIntervalMillis > 0)
{
monitorJobHandle = scheduler.scheduleRecurring( transactionTimeoutMonitor, kernelTransactionTimeoutMonitor,
checkIntervalMillis, TimeUnit.MILLISECONDS );
}
}

@Override
public void stop() throws Throwable
{
if ( monitorJobHandle != null )
{
monitorJobHandle.cancel( true );
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
/*
* Copyright (c) 2002-2017 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.kernel.impl.api;

import java.time.Clock;
import java.util.Set;

import org.neo4j.kernel.api.KernelTransactionHandle;
import org.neo4j.kernel.api.exceptions.Status;
import org.neo4j.kernel.impl.logging.LogService;
import org.neo4j.logging.Log;

/**
* Transaction monitor that check transactions with a configured timeout for expiration.
* In case if transaction timed out it will be terminated.
*/
public class KernelTransactionTimeoutMonitor implements Runnable
{
private final KernelTransactions kernelTransactions;
private final Clock clock;
private final Log log;

public KernelTransactionTimeoutMonitor( KernelTransactions kernelTransactions, Clock clock, LogService logService )
{
this.kernelTransactions = kernelTransactions;
this.clock = clock;
this.log = logService.getInternalLog( KernelTransactionTimeoutMonitor.class );
}

@Override
public void run()
{
Set<KernelTransactionHandle> activeTransactions = kernelTransactions.activeTransactions();
long now = clock.millis();
for ( KernelTransactionHandle activeTransaction : activeTransactions )
{
long transactionTimeoutMillis = activeTransaction.timeoutMillis();
if ( transactionTimeoutMillis > 0 )
{
if ( isTransactionExpired( activeTransaction, now, transactionTimeoutMillis ) )
{
if ( activeTransaction.markForTermination( Status.Transaction.TransactionTimedOut ) )
{
log.warn( "Transaction %s timeout.", activeTransaction );
}
}
}
}
}

private boolean isTransactionExpired( KernelTransactionHandle activeTransaction, long nowMillis,
long transactionTimeoutMillis )
{
return nowMillis > (activeTransaction.startTime() + transactionTimeoutMillis);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,11 @@ class Groups
* File watch service group
*/
public static Group fileWatch = new Group( "FileWatcher", NEW_THREAD );

/**
* Kernel transaction timeout monitor.
*/
public static Group transactionTimeoutMonitor = new Group( "TransactionTimeoutMonitor", POOLED );
}

interface JobHandle
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/*
* Copyright (c) 2002-2017 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.kernel.impl.api;

import org.junit.Test;

import java.util.concurrent.TimeUnit;

import org.neo4j.kernel.impl.util.JobScheduler;

import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyZeroInteractions;

public class KernelTransactionMonitorSchedulerTest
{

private final JobScheduler scheduler = mock( JobScheduler.class );
private final KernelTransactionTimeoutMonitor transactionTimeoutMonitor = mock( KernelTransactionTimeoutMonitor.class );

@Test
public void scheduleRecurringMonitorJobIfConfigured() throws Throwable
{
KernelTransactionMonitorScheduler transactionMonitorScheduler = createMonitorScheduler(1);
transactionMonitorScheduler.start();

verify( scheduler).scheduleRecurring( JobScheduler.Groups.transactionTimeoutMonitor, transactionTimeoutMonitor, 1, TimeUnit
.MILLISECONDS );
}

@Test
public void doNotScheduleMonitorJobIfDisabled() throws Throwable
{
KernelTransactionMonitorScheduler transactionMonitorScheduler = createMonitorScheduler( 0 );
transactionMonitorScheduler.start();

verifyZeroInteractions( scheduler);
}

private KernelTransactionMonitorScheduler createMonitorScheduler( long checkInterval )
{
return new KernelTransactionMonitorScheduler( transactionTimeoutMonitor, scheduler, checkInterval );
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
/*
* Copyright (c) 2002-2017 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.kernel.impl.api;

import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

import org.neo4j.graphdb.Node;
import org.neo4j.graphdb.Transaction;
import org.neo4j.graphdb.factory.GraphDatabaseSettings;
import org.neo4j.kernel.api.KernelTransaction;
import org.neo4j.kernel.api.security.SecurityContext;
import org.neo4j.kernel.impl.coreapi.InternalTransaction;
import org.neo4j.test.rule.DatabaseRule;
import org.neo4j.test.rule.EmbeddedDatabaseRule;

public class KernelTransactionTimeoutMonitorIT
{
@Rule
public DatabaseRule database = new EmbeddedDatabaseRule()
.withSetting( GraphDatabaseSettings.transaction_monitor_check_interval, "100ms" );
@Rule
public ExpectedException expectedException = ExpectedException.none();

private static final int NODE_ID = 0;
private ExecutorService executor;

@Before
public void setUp() throws Exception
{
executor = Executors.newSingleThreadExecutor();
}

@After
public void tearDown() throws Exception
{
executor.shutdown();
}

@Test( timeout = 30_000 )
public void terminateExpiredTransaction() throws Exception
{
try ( Transaction transaction = database.beginTx() )
{
database.createNode();
transaction.success();
}

expectedException.expectMessage( "The transaction has been terminated." );

try ( Transaction transaction = database.beginTx() )
{
Node nodeById = database.getNodeById( NODE_ID );
nodeById.setProperty( "a", "b" );
executor.submit( startAnotherTransaction() ).get();
}
}

private Runnable startAnotherTransaction()
{
return () -> {
try ( InternalTransaction transaction = database
.beginTransaction( KernelTransaction.Type.implicit, SecurityContext.AUTH_DISABLED, 1,
TimeUnit.SECONDS ) )
{
Node node = database.getNodeById( NODE_ID );
node.setProperty( "c", "d" );
}
};
}
}

0 comments on commit 31c7877

Please sign in to comment.