Skip to content

Commit

Permalink
Allow Procedures to create transaction in new thread
Browse files Browse the repository at this point in the history
  • Loading branch information
OliviaYtterbrink authored and eebus committed Feb 15, 2017
1 parent b797487 commit e8cf5d1
Show file tree
Hide file tree
Showing 8 changed files with 144 additions and 113 deletions.
Expand Up @@ -1542,6 +1542,7 @@ private RawIterator<Object[],ProcedureException> callProcedure(
BasicContext ctx = new BasicContext();
ctx.put( Context.KERNEL_TRANSACTION, tx );
ctx.put( Context.THREAD, Thread.currentThread() );
ctx.put( Context.SECURITY_CONTEXT, procedureSecurityContext );
procedureCall = procedures.callProcedure( ctx, name, input );
}
return new RawIterator<Object[],ProcedureException>()
Expand Down
Expand Up @@ -235,6 +235,10 @@ public DataSourceModule( final PlatformModule platformModule, EditionModule edit

this.storeId = neoStoreDataSource::getStoreId;
this.kernelAPI = neoStoreDataSource::getKernel;

ProcedureGDSFactory gdsFactory = new ProcedureGDSFactory( platformModule, this, deps,
editionModule.coreAPIAvailabilityGuard );
procedures.registerComponent( GraphDatabaseService.class, gdsFactory::apply, true );
}

protected RelationshipProxy.RelationshipActions createRelationshipActions(
Expand Down Expand Up @@ -368,10 +372,6 @@ private Procedures setupProcedures( PlatformModule platform, EditionModule editi
procedures.registerComponent( TerminationGuard.class, new TerminationGuardProvider( guard ), true );

// Register injected private API components: useful to have available in procedures to access the kernel etc.
ProcedureGDSFactory gdsFactory = new ProcedureGDSFactory( platform.config, platform.storeDir,
platform.dependencies, storeId, this.queryExecutor, editionModule.coreAPIAvailabilityGuard,
platform.urlAccessRule );
procedures.registerComponent( GraphDatabaseService.class, gdsFactory::apply, true );

// Below components are not public API, but are made available for internal
// procedures to call, and to provide temporary workarounds for the following
Expand Down
Expand Up @@ -22,60 +22,47 @@
import java.io.File;
import java.net.URL;
import java.util.Map;
import java.util.function.Supplier;

import org.neo4j.function.ThrowingFunction;
import org.neo4j.graphdb.DependencyResolver;
import org.neo4j.graphdb.NotInTransactionException;
import org.neo4j.graphdb.Result;
import org.neo4j.graphdb.event.KernelEventHandler;
import org.neo4j.graphdb.event.TransactionEventHandler;
import org.neo4j.graphdb.security.URLAccessValidationError;
import org.neo4j.kernel.GraphDatabaseQueryService;
import org.neo4j.kernel.api.KernelTransaction;
import org.neo4j.kernel.api.Statement;
import org.neo4j.kernel.api.exceptions.TransactionFailureException;
import org.neo4j.kernel.api.legacyindex.AutoIndexing;
import org.neo4j.kernel.api.security.SecurityContext;
import org.neo4j.kernel.impl.coreapi.CoreAPIAvailabilityGuard;
import org.neo4j.kernel.impl.factory.DataSourceModule;
import org.neo4j.kernel.impl.factory.GraphDatabaseFacade;
import org.neo4j.kernel.impl.query.QueryExecutionEngine;
import org.neo4j.kernel.impl.factory.PlatformModule;
import org.neo4j.kernel.impl.query.QueryExecutionKernelException;
import org.neo4j.kernel.impl.query.TransactionalContext;
import org.neo4j.kernel.impl.store.StoreId;

class ProcedureGDBFacadeSPI implements GraphDatabaseFacade.SPI
{
private final Thread transactionThread;
private final KernelTransaction transaction;

private final Supplier<QueryExecutionEngine> queryExecutor;

private final File storeDir;
private final DataSourceModule sourceModule;
private final DependencyResolver resolver;
private final AutoIndexing autoIndexing;
private final Supplier<StoreId> storeId;
private final CoreAPIAvailabilityGuard availability;
private final ThrowingFunction<URL,URL,URLAccessValidationError> urlValidator;
private final SecurityContext securityContext;

public ProcedureGDBFacadeSPI(
Thread transactionThread,
KernelTransaction transaction,
Supplier<QueryExecutionEngine> queryExecutor,
File storeDir,
DependencyResolver resolver,
AutoIndexing autoIndexing,
Supplier<StoreId> storeId,
CoreAPIAvailabilityGuard availability,
ThrowingFunction<URL,URL,URLAccessValidationError> urlValidator )
{
this.transactionThread = transactionThread;
this.transaction = transaction;
this.queryExecutor = queryExecutor;
this.storeDir = storeDir;
public ProcedureGDBFacadeSPI( PlatformModule platform, DataSourceModule sourceModule, DependencyResolver resolver,
CoreAPIAvailabilityGuard availability, ThrowingFunction<URL,URL,URLAccessValidationError> urlValidator,
SecurityContext securityContext )
{
this.storeDir = platform.storeDir;
this.sourceModule = sourceModule;
this.resolver = resolver;
this.autoIndexing = autoIndexing;
this.storeId = storeId;
this.availability = availability;
this.urlValidator = urlValidator;
this.securityContext = securityContext;
}

@Override
Expand All @@ -93,7 +80,7 @@ public DependencyResolver resolver()
@Override
public StoreId storeId()
{
return storeId.get();
return sourceModule.storeId.get();
}

@Override
Expand All @@ -108,35 +95,28 @@ public String name()
return "ProcedureGraphDatabaseService";
}

private void assertSameThread()
{
if ( transactionThread != Thread.currentThread() )
{
throw new UnsupportedOperationException( "Creating new transactions and/or spawning threads are " +
"not supported operations in store procedures." );
}
}

@Override
public KernelTransaction currentTransaction()
{
availability.assertDatabaseAvailable();
assertSameThread();
return transaction;
KernelTransaction tx = sourceModule.threadToTransactionBridge.getKernelTransactionBoundToThisThread( false );
if( tx == null )
{
throw new NotInTransactionException();
}
return tx;
}

@Override
public boolean isInOpenTransaction()
{
assertSameThread();
return transaction.isOpen();
return sourceModule.threadToTransactionBridge.hasTransaction();
}

@Override
public Statement currentStatement()
{
assertSameThread();
return transaction.acquireStatement();
return sourceModule.threadToTransactionBridge.get();
}

@Override
Expand All @@ -145,8 +125,7 @@ public Result executeQuery( String query, Map<String,Object> parameters, Transac
try
{
availability.assertDatabaseAvailable();
assertSameThread();
return queryExecutor.get().executeQuery( query, parameters, tc );
return sourceModule.queryExecutor.get().executeQuery( query, parameters, tc );
}
catch ( QueryExecutionKernelException e )
{
Expand All @@ -157,7 +136,7 @@ public Result executeQuery( String query, Map<String,Object> parameters, Transac
@Override
public AutoIndexing autoIndexing()
{
return autoIndexing;
return sourceModule.autoIndexing;
}

@Override
Expand Down Expand Up @@ -205,6 +184,18 @@ public void shutdown()
@Override
public KernelTransaction beginTransaction( KernelTransaction.Type type, SecurityContext securityContext, long timeout )
{
throw new UnsupportedOperationException();
try
{
availability.assertDatabaseAvailable();
KernelTransaction kernelTx = sourceModule.kernelAPI.get().newTransaction( type, this.securityContext, timeout );
kernelTx.registerCloseListener(
( txId ) -> sourceModule.threadToTransactionBridge.unbindTransactionFromCurrentThread() );
sourceModule.threadToTransactionBridge.bindTransactionToCurrentThread( kernelTx );
return kernelTx;
}
catch ( TransactionFailureException e )
{
throw new org.neo4j.graphdb.TransactionFailureException( e.getMessage(), e );
}
}
}
Expand Up @@ -19,79 +19,55 @@
*/
package org.neo4j.kernel.impl.proc;

import java.io.File;
import java.net.URL;
import java.util.function.Supplier;

import org.neo4j.function.ThrowingFunction;
import org.neo4j.graphdb.DependencyResolver;
import org.neo4j.graphdb.GraphDatabaseService;
import org.neo4j.graphdb.security.URLAccessRule;
import org.neo4j.graphdb.security.URLAccessValidationError;
import org.neo4j.kernel.api.KernelTransaction;
import org.neo4j.kernel.api.exceptions.ProcedureException;
import org.neo4j.kernel.api.legacyindex.AutoIndexing;
import org.neo4j.kernel.api.proc.Context;
import org.neo4j.kernel.configuration.Config;
import org.neo4j.kernel.guard.Guard;
import org.neo4j.kernel.impl.core.ThreadToStatementContextBridge;
import org.neo4j.kernel.api.security.SecurityContext;
import org.neo4j.kernel.impl.coreapi.CoreAPIAvailabilityGuard;
import org.neo4j.kernel.impl.factory.DataSourceModule;
import org.neo4j.kernel.impl.factory.GraphDatabaseFacade;
import org.neo4j.kernel.impl.query.QueryExecutionEngine;
import org.neo4j.kernel.impl.store.StoreId;
import org.neo4j.kernel.impl.factory.PlatformModule;

public class ProcedureGDSFactory implements ThrowingFunction<Context,GraphDatabaseService,ProcedureException>
{
private Config config;
private final File storeDir;
private final PlatformModule platform;
private final DataSourceModule dataSource;
private final DependencyResolver resolver;
private final Supplier<StoreId> storeId;
private final Supplier<QueryExecutionEngine> queryExecutor;
private final CoreAPIAvailabilityGuard availability;
private final ThrowingFunction<URL, URL, URLAccessValidationError> urlValidator;
private final Guard guard;
private final ThreadToStatementContextBridge txBridge;

public ProcedureGDSFactory( Config config,
File storeDir,
DependencyResolver resolver,
Supplier<StoreId> storeId,
Supplier<QueryExecutionEngine> queryExecutor,
CoreAPIAvailabilityGuard availability,
URLAccessRule urlAccessRule )
public ProcedureGDSFactory( PlatformModule platform, DataSourceModule dataSource, DependencyResolver resolver,
CoreAPIAvailabilityGuard coreAPIAvailabilityGuard )
{
this.config = config;
this.storeDir = storeDir;
this.platform = platform;
this.dataSource = dataSource;
this.resolver = resolver;
this.storeId = storeId;
this.queryExecutor = queryExecutor;
this.availability = availability;
this.urlValidator = url -> urlAccessRule.validate( config, url );
this.guard = resolver.resolveDependency( Guard.class );
this.txBridge = resolver.resolveDependency( ThreadToStatementContextBridge.class );
this.availability = coreAPIAvailabilityGuard;
this.urlValidator = url -> platform.urlAccessRule.validate( platform.config, url );
}

@Override
public GraphDatabaseService apply( Context context ) throws ProcedureException
{
KernelTransaction transaction = context.get( Context.KERNEL_TRANSACTION );
Thread owningThread = context.get( Context.THREAD );
SecurityContext securityContext = context.getOrElse( Context.SECURITY_CONTEXT, SecurityContext.AUTH_DISABLED );
GraphDatabaseFacade facade = new GraphDatabaseFacade();
facade.init(
new ProcedureGDBFacadeSPI(
owningThread,
transaction,
queryExecutor,
storeDir,
platform,
dataSource,
resolver,
AutoIndexing.UNSUPPORTED,
storeId,
availability,
urlValidator
urlValidator,
securityContext
),
guard,
txBridge,
config
dataSource.guard,
dataSource.threadToTransactionBridge,
platform.config
);
return facade;
}
Expand Down
Expand Up @@ -26,10 +26,12 @@

import org.neo4j.bolt.v1.transport.socket.client.TransportConnection;
import org.neo4j.kernel.api.exceptions.InvalidArgumentsException;
import org.neo4j.test.DoubleLatch;

import static java.lang.String.format;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.hasItem;
Expand All @@ -41,6 +43,7 @@
import static org.neo4j.helpers.collection.MapUtil.map;
import static org.neo4j.kernel.api.security.AuthenticationResult.PASSWORD_CHANGE_REQUIRED;
import static org.neo4j.server.security.enterprise.auth.InternalFlatFileRealm.IS_SUSPENDED;
import static org.neo4j.server.security.enterprise.auth.ProcedureInteractionTestBase.ClassWithProcedures.exceptionsInProcedure;
import static org.neo4j.server.security.enterprise.auth.plugin.api.PredefinedRoles.ADMIN;
import static org.neo4j.server.security.enterprise.auth.plugin.api.PredefinedRoles.ARCHITECT;
import static org.neo4j.server.security.enterprise.auth.plugin.api.PredefinedRoles.PUBLISHER;
Expand Down Expand Up @@ -885,6 +888,35 @@ public void shouldPrintUserAndRolesWhenPermissionDenied() throws Throwable
"Read operations are not allowed for user 'mats' with roles [failer]." );
}

@Test
public void shouldAllowProcedureStartingTransactionInNewThread() throws Throwable
{
exceptionsInProcedure.clear();
DoubleLatch latch = new DoubleLatch( 2 );
ClassWithProcedures.doubleLatch = latch;
latch.start();
assertEmpty( writeSubject, "CALL test.threadTransaction" );
latch.finishAndWaitForAllToFinish();
assertThat( exceptionsInProcedure.size(), equalTo( 0 ) );
assertSuccess( adminSubject, "MATCH (:VeryUniqueLabel) RETURN toString(count(*)) as n",
r -> assertKeyIs( r, "n", "1" ) );
}

@Test
public void shouldInheritSecurityContextWhenProcedureStartingTransactionInNewThread() throws Throwable
{
exceptionsInProcedure.clear();
DoubleLatch latch = new DoubleLatch( 2 );
ClassWithProcedures.doubleLatch = latch;
latch.start();
assertEmpty( readSubject, "CALL test.threadReadDoingWriteTransaction" );
latch.finishAndWaitForAllToFinish();
assertThat( exceptionsInProcedure.size(), equalTo( 1 ) );
assertThat( exceptionsInProcedure.get( 0 ).getMessage(), containsString( WRITE_OPS_NOT_ALLOWED ) );
assertSuccess( adminSubject, "MATCH (:VeryUniqueLabel) RETURN toString(count(*)) as n",
r -> assertKeyIs( r, "n", "0" ) );
}

@Test
public void shouldSetCorrectUnAuthenticatedPermissions() throws Throwable
{
Expand Down

0 comments on commit e8cf5d1

Please sign in to comment.