From 985c9aa2ca538612153e95861b2b958040b685f3 Mon Sep 17 00:00:00 2001 From: Tobias Lindaaker Date: Fri, 13 Jan 2017 15:37:36 +0100 Subject: [PATCH] Make Locks.Client.activeLocks() return a Stream --- .../kernel/api/KernelTransactionHandle.java | 5 +- .../api/KernelTransactionImplementation.java | 14 ++- ...KernelTransactionImplementationHandle.java | 5 +- .../neo4j/kernel/impl/locking/ActiveLock.java | 116 ++++++++++++++++++ .../org/neo4j/kernel/impl/locking/Locks.java | 68 +--------- .../neo4j/kernel/impl/locking/NoOpClient.java | 7 +- .../impl/locking/SimpleStatementLocks.java | 4 +- .../kernel/impl/locking/StatementLocks.java | 6 +- .../community/CommunityLockClient.java | 32 +++-- .../storageengine/api/lock/ResourceType.java | 3 + .../neo4j/kernel/api/ExecutingQueryTest.java | 10 +- .../impl/api/TestKernelTransactionHandle.java | 5 +- .../ActiveLocksListingCompatibility.java | 24 ++-- .../machines/locks/LeaderOnlyLockManager.java | 7 +- .../impl/locking/DeferringLockClient.java | 10 +- .../impl/locking/DeferringStatementLocks.java | 28 +---- .../neo4j/kernel/impl/locking/LockUnit.java | 8 +- .../impl/locking/DeferringLockClientTest.java | 10 +- .../kernel/ha/lock/SlaveLocksClient.java | 23 +++- .../builtinprocs/ActiveLocksQueryResult.java | 8 +- .../EnterpriseBuiltInDbmsProcedures.java | 2 +- .../lock/forseti/ForsetiClient.java | 41 +++---- .../ListQueriesProcedureTest.java | 14 +-- 23 files changed, 257 insertions(+), 193 deletions(-) create mode 100644 community/kernel/src/main/java/org/neo4j/kernel/impl/locking/ActiveLock.java diff --git a/community/kernel/src/main/java/org/neo4j/kernel/api/KernelTransactionHandle.java b/community/kernel/src/main/java/org/neo4j/kernel/api/KernelTransactionHandle.java index 5982bdcc5aa37..2dfc37d6df3de 100644 --- a/community/kernel/src/main/java/org/neo4j/kernel/api/KernelTransactionHandle.java +++ b/community/kernel/src/main/java/org/neo4j/kernel/api/KernelTransactionHandle.java @@ -19,14 +19,13 @@ */ package org.neo4j.kernel.api; -import java.util.Collection; import java.util.Optional; import java.util.stream.Stream; import org.neo4j.kernel.api.exceptions.Status; import org.neo4j.kernel.api.security.SecurityContext; import org.neo4j.kernel.impl.api.Kernel; -import org.neo4j.kernel.impl.locking.Locks; +import org.neo4j.kernel.impl.locking.ActiveLock; /** * View of a {@link KernelTransaction} that provides a limited set of actions against the transaction. @@ -102,5 +101,5 @@ public interface KernelTransactionHandle /** * @return the lock requests granted for this transaction. */ - Collection activeLocks(); + Stream activeLocks(); } diff --git a/community/kernel/src/main/java/org/neo4j/kernel/impl/api/KernelTransactionImplementation.java b/community/kernel/src/main/java/org/neo4j/kernel/impl/api/KernelTransactionImplementation.java index c0ddef161252a..b99e95a60c14c 100644 --- a/community/kernel/src/main/java/org/neo4j/kernel/impl/api/KernelTransactionImplementation.java +++ b/community/kernel/src/main/java/org/neo4j/kernel/impl/api/KernelTransactionImplementation.java @@ -29,6 +29,7 @@ import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; import java.util.function.Supplier; +import java.util.stream.Stream; import org.neo4j.collection.pool.Pool; import org.neo4j.graphdb.TransactionTerminatedException; @@ -50,6 +51,7 @@ import org.neo4j.kernel.impl.api.state.ConstraintIndexCreator; import org.neo4j.kernel.impl.api.state.TxState; import org.neo4j.kernel.impl.factory.AccessCapability; +import org.neo4j.kernel.impl.locking.ActiveLock; import org.neo4j.kernel.impl.locking.LockTracer; import org.neo4j.kernel.impl.locking.Locks; import org.neo4j.kernel.impl.locking.StatementLocks; @@ -793,10 +795,18 @@ public void dispose() storageStatement.close(); } - public Collection activeLocks() + /** + * This method will be invoked by concurrent threads for inspecting the locks held by this transaction. + *

+ * The fact that {@link #statementLocks} is a volatile fields, grants us enough of a read barrier to get a good + * enough snapshot of the lock state (as long as the underlying methods give us such guarantees). + * + * @return the locks held by this transaction. + */ + public Stream activeLocks() { StatementLocks locks = this.statementLocks; - return locks == null ? Collections.emptyList() : locks.activeLocks(); + return locks == null ? Stream.empty() : locks.activeLocks(); } /** diff --git a/community/kernel/src/main/java/org/neo4j/kernel/impl/api/KernelTransactionImplementationHandle.java b/community/kernel/src/main/java/org/neo4j/kernel/impl/api/KernelTransactionImplementationHandle.java index 556917581b9b7..5193793255c9c 100644 --- a/community/kernel/src/main/java/org/neo4j/kernel/impl/api/KernelTransactionImplementationHandle.java +++ b/community/kernel/src/main/java/org/neo4j/kernel/impl/api/KernelTransactionImplementationHandle.java @@ -19,7 +19,6 @@ */ package org.neo4j.kernel.impl.api; -import java.util.Collection; import java.util.Optional; import java.util.stream.Stream; @@ -28,7 +27,7 @@ import org.neo4j.kernel.api.KernelTransactionHandle; import org.neo4j.kernel.api.exceptions.Status; import org.neo4j.kernel.api.security.SecurityContext; -import org.neo4j.kernel.impl.locking.Locks; +import org.neo4j.kernel.impl.locking.ActiveLock; /** * A {@link KernelTransactionHandle} that wraps the given {@link KernelTransactionImplementation}. @@ -114,7 +113,7 @@ public Stream executingQueries() } @Override - public Collection activeLocks() + public Stream activeLocks() { return tx.activeLocks(); } diff --git a/community/kernel/src/main/java/org/neo4j/kernel/impl/locking/ActiveLock.java b/community/kernel/src/main/java/org/neo4j/kernel/impl/locking/ActiveLock.java new file mode 100644 index 0000000000000..7dd1d5cc81b7f --- /dev/null +++ b/community/kernel/src/main/java/org/neo4j/kernel/impl/locking/ActiveLock.java @@ -0,0 +1,116 @@ +/* + * Copyright (c) 2002-2017 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Neo4j is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ +package org.neo4j.kernel.impl.locking; + +import java.util.Objects; + +import org.neo4j.storageengine.api.lock.ResourceType; + +public interface ActiveLock +{ + String SHARED_MODE = "SHARED", EXCLUSIVE_MODE = "EXCLUSIVE"; + + String mode(); + + ResourceType resourceType(); + + long resourceId(); + + static ActiveLock exclusiveLock( ResourceType resourceType, long resourceId ) + { + return new Implementation( resourceType, resourceId ) + { + @Override + public String mode() + { + return EXCLUSIVE_MODE; + } + }; + } + + static ActiveLock sharedLock( ResourceType resourceType, long resourceId ) + { + return new Implementation( resourceType, resourceId ) + { + @Override + public String mode() + { + return SHARED_MODE; + } + }; + } + + interface Factory + { + Factory SHARED_LOCK = ActiveLock::sharedLock, EXCLUSIVE_LOCK = ActiveLock::exclusiveLock; + + ActiveLock create( ResourceType resourceType, long resourceId ); + } + + abstract class Implementation implements ActiveLock + { + private final ResourceType resourceType; + private final long resourceId; + + private Implementation( ResourceType resourceType, long resourceId ) + { + this.resourceType = resourceType; + this.resourceId = resourceId; + } + + @Override + public abstract String mode(); + + @Override + public ResourceType resourceType() + { + return resourceType; + } + + @Override + public long resourceId() + { + return resourceId; + } + + @Override + public boolean equals( Object o ) + { + if ( this == o ) + { + return true; + } + if ( !(o instanceof ActiveLock) ) + { + return false; + } + ActiveLock that = (ActiveLock) o; + return resourceId == that.resourceId() && + Objects.equals( mode(), that.mode() ) && + Objects.equals( resourceType, that.resourceType() ); + } + + @Override + public int hashCode() + { + return Objects.hash( resourceType, resourceId, mode() ); + } + } +} diff --git a/community/kernel/src/main/java/org/neo4j/kernel/impl/locking/Locks.java b/community/kernel/src/main/java/org/neo4j/kernel/impl/locking/Locks.java index cabde98cf7082..a7d7c71d58bc5 100644 --- a/community/kernel/src/main/java/org/neo4j/kernel/impl/locking/Locks.java +++ b/community/kernel/src/main/java/org/neo4j/kernel/impl/locking/Locks.java @@ -20,8 +20,7 @@ package org.neo4j.kernel.impl.locking; import java.time.Clock; -import java.util.Collection; -import java.util.Objects; +import java.util.stream.Stream; import org.neo4j.helpers.Service; import org.neo4j.kernel.configuration.Config; @@ -115,7 +114,7 @@ interface Client extends ResourceLocker, AutoCloseable /** For slave transactions, this tracks an identifier for the lock session running on the master */ int getLockSessionId(); - Collection activeLocks(); + Stream activeLocks(); } /** @@ -130,67 +129,4 @@ interface Client extends ResourceLocker, AutoCloseable void accept(Visitor visitor); void close(); - - abstract class ActiveLock - { - public final ResourceType resourceType; - public final long resourceId; - - private ActiveLock( ResourceType resourceType, long resourceId ) - { - this.resourceType = resourceType; - this.resourceId = resourceId; - } - - @Override - public boolean equals( Object o ) - { - if ( this == o ) - { - return true; - } - if ( o.getClass() != this.getClass() ) - { - return false; - } - ActiveLock activeLock = (ActiveLock) o; - return resourceId == activeLock.resourceId && - Objects.equals( resourceType, activeLock.resourceType ); - } - - @Override - public int hashCode() - { - return Objects.hash( resourceType, resourceId ); - } - - public abstract String mode(); - } - - final class ActiveExclusiveLock extends ActiveLock - { - public ActiveExclusiveLock( ResourceType resourceType, long resourceId ) - { - super( resourceType, resourceId ); - } - - @Override - public String mode() - { - return "EXCLUSIVE"; - } - } - final class ActiveSharedLock extends ActiveLock - { - public ActiveSharedLock( ResourceType resourceType, long resourceId ) - { - super( resourceType, resourceId ); - } - - @Override - public String mode() - { - return "SHARED"; - } - } } diff --git a/community/kernel/src/main/java/org/neo4j/kernel/impl/locking/NoOpClient.java b/community/kernel/src/main/java/org/neo4j/kernel/impl/locking/NoOpClient.java index 066087ec717d9..6e1fd918b3816 100644 --- a/community/kernel/src/main/java/org/neo4j/kernel/impl/locking/NoOpClient.java +++ b/community/kernel/src/main/java/org/neo4j/kernel/impl/locking/NoOpClient.java @@ -19,8 +19,7 @@ */ package org.neo4j.kernel.impl.locking; -import java.util.Collection; -import java.util.Collections; +import java.util.stream.Stream; import org.neo4j.storageengine.api.lock.AcquireLockTimeoutException; import org.neo4j.storageengine.api.lock.ResourceType; @@ -76,8 +75,8 @@ public int getLockSessionId() } @Override - public Collection activeLocks() + public Stream activeLocks() { - return Collections.emptyList(); + return Stream.empty(); } } diff --git a/community/kernel/src/main/java/org/neo4j/kernel/impl/locking/SimpleStatementLocks.java b/community/kernel/src/main/java/org/neo4j/kernel/impl/locking/SimpleStatementLocks.java index 940a59aa50fa9..a3f2a5e469a38 100644 --- a/community/kernel/src/main/java/org/neo4j/kernel/impl/locking/SimpleStatementLocks.java +++ b/community/kernel/src/main/java/org/neo4j/kernel/impl/locking/SimpleStatementLocks.java @@ -19,7 +19,7 @@ */ package org.neo4j.kernel.impl.locking; -import java.util.Collection; +import java.util.stream.Stream; /** * A {@link StatementLocks} implementation that uses given {@link Locks.Client} for both @@ -65,7 +65,7 @@ public void close() } @Override - public Collection activeLocks() + public Stream activeLocks() { return client.activeLocks(); } diff --git a/community/kernel/src/main/java/org/neo4j/kernel/impl/locking/StatementLocks.java b/community/kernel/src/main/java/org/neo4j/kernel/impl/locking/StatementLocks.java index 7a00fe69de0da..dc786886ee5b7 100644 --- a/community/kernel/src/main/java/org/neo4j/kernel/impl/locking/StatementLocks.java +++ b/community/kernel/src/main/java/org/neo4j/kernel/impl/locking/StatementLocks.java @@ -19,7 +19,7 @@ */ package org.neo4j.kernel.impl.locking; -import java.util.Collection; +import java.util.stream.Stream; import org.neo4j.kernel.impl.api.KernelStatement; @@ -64,7 +64,9 @@ public interface StatementLocks extends AutoCloseable /** * List the locks held by this transaction. * + * This method is invoked by concurrent threads in order to inspect the lock state in this transaction. + * * @return the locks held by this transaction. */ - Collection activeLocks(); + Stream activeLocks(); } diff --git a/community/kernel/src/main/java/org/neo4j/kernel/impl/locking/community/CommunityLockClient.java b/community/kernel/src/main/java/org/neo4j/kernel/impl/locking/community/CommunityLockClient.java index 9f9632a88aae7..f16e67e55c137 100644 --- a/community/kernel/src/main/java/org/neo4j/kernel/impl/locking/community/CommunityLockClient.java +++ b/community/kernel/src/main/java/org/neo4j/kernel/impl/locking/community/CommunityLockClient.java @@ -20,16 +20,17 @@ package org.neo4j.kernel.impl.locking.community; import java.util.ArrayList; -import java.util.Collection; import java.util.List; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.LockSupport; +import java.util.stream.Stream; import org.neo4j.collection.primitive.Primitive; import org.neo4j.collection.primitive.PrimitiveIntObjectMap; import org.neo4j.collection.primitive.PrimitiveIntObjectVisitor; import org.neo4j.collection.primitive.PrimitiveLongObjectMap; import org.neo4j.collection.primitive.PrimitiveLongObjectVisitor; +import org.neo4j.kernel.impl.locking.ActiveLock; import org.neo4j.kernel.impl.locking.LockClientStateHolder; import org.neo4j.kernel.impl.locking.LockClientStoppedException; import org.neo4j.kernel.impl.locking.LockTracer; @@ -319,30 +320,27 @@ public int getLockSessionId() } @Override - public Collection activeLocks() + public Stream activeLocks() { - List locks = new ArrayList<>(); - exclusiveLocks.visitEntries( ( typeId, exclusive ) -> + List locks = new ArrayList<>(); + exclusiveLocks.visitEntries( collectActiveLocks( locks, ActiveLock.Factory.EXCLUSIVE_LOCK ) ); + sharedLocks.visitEntries( collectActiveLocks( locks, ActiveLock.Factory.SHARED_LOCK ) ); + return locks.stream(); + } + + private static PrimitiveIntObjectVisitor,RuntimeException> collectActiveLocks( + List locks, ActiveLock.Factory activeLock ) + { + return ( typeId, exclusive ) -> { ResourceType resourceType = ResourceTypes.fromId( typeId ); exclusive.visitEntries( ( resourceId, lock ) -> { - locks.add( new Locks.ActiveExclusiveLock( resourceType, resourceId ) ); - return false; - } ); - return false; - } ); - sharedLocks.visitEntries( ( typeId, shared ) -> - { - ResourceType resourceType = ResourceTypes.fromId( typeId ); - shared.visitEntries( ( resourceId, lock ) -> - { - locks.add( new Locks.ActiveSharedLock( resourceType, resourceId ) ); + locks.add( activeLock.create( resourceType, resourceId ) ); return false; } ); return false; - } ); - return locks; + }; } private PrimitiveLongObjectMap localShared( ResourceType resourceType ) diff --git a/community/kernel/src/main/java/org/neo4j/storageengine/api/lock/ResourceType.java b/community/kernel/src/main/java/org/neo4j/storageengine/api/lock/ResourceType.java index a77ae749ff86c..c27ced135a360 100644 --- a/community/kernel/src/main/java/org/neo4j/storageengine/api/lock/ResourceType.java +++ b/community/kernel/src/main/java/org/neo4j/storageengine/api/lock/ResourceType.java @@ -27,4 +27,7 @@ public interface ResourceType /** What to do if the lock cannot immediately be acquired. */ WaitStrategy waitStrategy(); + + /** Must be unique among all existing resource types. */ + String name(); } diff --git a/community/kernel/src/test/java/org/neo4j/kernel/api/ExecutingQueryTest.java b/community/kernel/src/test/java/org/neo4j/kernel/api/ExecutingQueryTest.java index e78557906846d..f98e0ce1f2bf9 100644 --- a/community/kernel/src/test/java/org/neo4j/kernel/api/ExecutingQueryTest.java +++ b/community/kernel/src/test/java/org/neo4j/kernel/api/ExecutingQueryTest.java @@ -175,14 +175,14 @@ private LockWaitEvent lock( String resourceType, long resourceId ) return query.lockTracer().waitForLock( resourceType( resourceType ), resourceId ); } - static ResourceType resourceType( String string ) + static ResourceType resourceType( String name ) { return new ResourceType() { @Override public String toString() { - return string; + return name(); } @Override @@ -196,6 +196,12 @@ public WaitStrategy waitStrategy() { throw new UnsupportedOperationException( "not used" ); } + + @Override + public String name() + { + return name; + } }; } diff --git a/community/kernel/src/test/java/org/neo4j/kernel/impl/api/TestKernelTransactionHandle.java b/community/kernel/src/test/java/org/neo4j/kernel/impl/api/TestKernelTransactionHandle.java index b05989ebaf41f..eee0399224be4 100644 --- a/community/kernel/src/test/java/org/neo4j/kernel/impl/api/TestKernelTransactionHandle.java +++ b/community/kernel/src/test/java/org/neo4j/kernel/impl/api/TestKernelTransactionHandle.java @@ -19,7 +19,6 @@ */ package org.neo4j.kernel.impl.api; -import java.util.Collection; import java.util.Objects; import java.util.Optional; import java.util.stream.Stream; @@ -29,7 +28,7 @@ import org.neo4j.kernel.api.KernelTransactionHandle; import org.neo4j.kernel.api.exceptions.Status; import org.neo4j.kernel.api.security.SecurityContext; -import org.neo4j.kernel.impl.locking.Locks; +import org.neo4j.kernel.impl.locking.ActiveLock; /** * A test implementation of {@link KernelTransactionHandle} that simply wraps a given {@link KernelTransaction}. @@ -99,7 +98,7 @@ public Stream executingQueries() } @Override - public Collection activeLocks() + public Stream activeLocks() { throw new UnsupportedOperationException(); } diff --git a/community/kernel/src/test/java/org/neo4j/kernel/impl/locking/ActiveLocksListingCompatibility.java b/community/kernel/src/test/java/org/neo4j/kernel/impl/locking/ActiveLocksListingCompatibility.java index 8907089bd841b..3fc378ab3be41 100644 --- a/community/kernel/src/test/java/org/neo4j/kernel/impl/locking/ActiveLocksListingCompatibility.java +++ b/community/kernel/src/test/java/org/neo4j/kernel/impl/locking/ActiveLocksListingCompatibility.java @@ -19,13 +19,17 @@ */ package org.neo4j.kernel.impl.locking; -import java.util.Collection; +import java.util.HashSet; +import java.util.stream.Stream; import org.junit.Ignore; import org.junit.Test; import static java.util.Arrays.asList; +import static java.util.stream.Collectors.toSet; import static org.junit.Assert.assertEquals; +import static org.neo4j.kernel.impl.locking.ActiveLock.exclusiveLock; +import static org.neo4j.kernel.impl.locking.ActiveLock.sharedLock; import static org.neo4j.kernel.impl.locking.ResourceTypes.NODE; @Ignore( "Not a test. This is a compatibility suite, run from LockingCompatibilityTestSuite." ) @@ -44,17 +48,17 @@ public void shouldListLocksHeldByTheCurrentClient() throws Exception clientA.acquireShared( LockTracer.NONE, NODE, 3, 4, 5 ); // when - Collection locks = clientA.activeLocks(); + Stream locks = clientA.activeLocks(); // then assertEquals( - asList( - new Locks.ActiveExclusiveLock( NODE, 1 ), - new Locks.ActiveExclusiveLock( NODE, 2 ), - new Locks.ActiveExclusiveLock( NODE, 3 ), - new Locks.ActiveSharedLock( NODE, 3 ), - new Locks.ActiveSharedLock( NODE, 4 ), - new Locks.ActiveSharedLock( NODE, 5 ) ), - locks ); + new HashSet<>( asList( + exclusiveLock( NODE, 1 ), + exclusiveLock( NODE, 2 ), + exclusiveLock( NODE, 3 ), + sharedLock( NODE, 3 ), + sharedLock( NODE, 4 ), + sharedLock( NODE, 5 ) ) ), + locks.collect( toSet() ) ); } } diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/state/machines/locks/LeaderOnlyLockManager.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/state/machines/locks/LeaderOnlyLockManager.java index 06f41bfd4c646..49e8ac550e131 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/state/machines/locks/LeaderOnlyLockManager.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/state/machines/locks/LeaderOnlyLockManager.java @@ -19,15 +19,16 @@ */ package org.neo4j.causalclustering.core.state.machines.locks; -import java.util.Collection; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; +import java.util.stream.Stream; import org.neo4j.causalclustering.core.consensus.LeaderLocator; import org.neo4j.causalclustering.core.consensus.NoLeaderFoundException; import org.neo4j.causalclustering.core.replication.Replicator; import org.neo4j.causalclustering.core.state.machines.tx.ReplicatedTransactionStateMachine; import org.neo4j.causalclustering.identity.MemberId; +import org.neo4j.kernel.impl.locking.ActiveLock; import org.neo4j.kernel.impl.locking.LockTracer; import org.neo4j.kernel.impl.locking.Locks; import org.neo4j.storageengine.api.lock.AcquireLockTimeoutException; @@ -260,9 +261,9 @@ public int getLockSessionId() } @Override - public Collection activeLocks() + public Stream activeLocks() { - throw new UnsupportedOperationException( "not implemented" ); + return localClient.activeLocks(); } } } diff --git a/enterprise/deferred-locks/src/main/java/org/neo4j/kernel/impl/locking/DeferringLockClient.java b/enterprise/deferred-locks/src/main/java/org/neo4j/kernel/impl/locking/DeferringLockClient.java index 7adbeb645b4b2..763346194b6bc 100644 --- a/enterprise/deferred-locks/src/main/java/org/neo4j/kernel/impl/locking/DeferringLockClient.java +++ b/enterprise/deferred-locks/src/main/java/org/neo4j/kernel/impl/locking/DeferringLockClient.java @@ -20,10 +20,9 @@ package org.neo4j.kernel.impl.locking; import java.util.Arrays; -import java.util.Collection; import java.util.Map; import java.util.TreeMap; -import java.util.stream.Collectors; +import java.util.stream.Stream; import org.apache.commons.lang3.mutable.MutableInt; @@ -161,12 +160,9 @@ public int getLockSessionId() } @Override - public Collection activeLocks() + public Stream activeLocks() { - return locks.keySet().stream().map( ( unit ) -> unit.isExclusive() - ? new Locks.ActiveExclusiveLock( unit.resourceType(), unit.resourceId() ) - : new Locks.ActiveSharedLock( unit.resourceType(), unit.resourceId() ) ) - .collect( Collectors.toList() ); + return locks.keySet().stream(); } private void assertNotStopped() diff --git a/enterprise/deferred-locks/src/main/java/org/neo4j/kernel/impl/locking/DeferringStatementLocks.java b/enterprise/deferred-locks/src/main/java/org/neo4j/kernel/impl/locking/DeferringStatementLocks.java index 5fcd59c6ec0ec..485f214e6c5f8 100644 --- a/enterprise/deferred-locks/src/main/java/org/neo4j/kernel/impl/locking/DeferringStatementLocks.java +++ b/enterprise/deferred-locks/src/main/java/org/neo4j/kernel/impl/locking/DeferringStatementLocks.java @@ -19,9 +19,7 @@ */ package org.neo4j.kernel.impl.locking; -import java.util.ArrayList; -import java.util.Collection; -import java.util.List; +import java.util.stream.Stream; /** * A {@link StatementLocks} implementation that defers {@link #optimistic() optimistic} @@ -69,28 +67,8 @@ public void close() } @Override - public Collection activeLocks() + public Stream activeLocks() { - Collection explicit = this.explicit.activeLocks(), implicit = this.implicit.activeLocks(); - // minimize (re-)allocation - if ( explicit instanceof ArrayList && explicit.size() > implicit.size() ) - { - List locks = (List) explicit; - locks.addAll( implicit ); - return locks; - } - else if ( implicit instanceof ArrayList && implicit.size() > explicit.size() ) - { - List locks = (List) implicit; - locks.addAll( explicit ); - return locks; - } - else - { - List locks = new ArrayList<>( explicit.size() + implicit.size() ); - locks.addAll( explicit ); - locks.addAll( implicit ); - return locks; - } + return Stream.concat( explicit.activeLocks(), implicit.activeLocks() ); } } diff --git a/enterprise/deferred-locks/src/main/java/org/neo4j/kernel/impl/locking/LockUnit.java b/enterprise/deferred-locks/src/main/java/org/neo4j/kernel/impl/locking/LockUnit.java index 44e2b5a18781f..abaffce2e6ab8 100644 --- a/enterprise/deferred-locks/src/main/java/org/neo4j/kernel/impl/locking/LockUnit.java +++ b/enterprise/deferred-locks/src/main/java/org/neo4j/kernel/impl/locking/LockUnit.java @@ -24,7 +24,7 @@ /** * Description of a lock that was deferred to commit time. */ -public class LockUnit implements Comparable +public class LockUnit implements Comparable, ActiveLock { private final ResourceType resourceType; private final long resourceId; @@ -37,6 +37,12 @@ public LockUnit( ResourceType resourceType, long resourceId, boolean exclusive ) this.exclusive = exclusive; } + @Override + public String mode() + { + return exclusive ? EXCLUSIVE_MODE : SHARED_MODE; + } + public ResourceType resourceType() { return resourceType; diff --git a/enterprise/deferred-locks/src/test/java/org/neo4j/kernel/impl/locking/DeferringLockClientTest.java b/enterprise/deferred-locks/src/test/java/org/neo4j/kernel/impl/locking/DeferringLockClientTest.java index 374bedfa5dbd3..838ed023678f7 100644 --- a/enterprise/deferred-locks/src/test/java/org/neo4j/kernel/impl/locking/DeferringLockClientTest.java +++ b/enterprise/deferred-locks/src/test/java/org/neo4j/kernel/impl/locking/DeferringLockClientTest.java @@ -20,12 +20,11 @@ package org.neo4j.kernel.impl.locking; import java.util.Arrays; -import java.util.Collection; import java.util.Collections; import java.util.HashSet; import java.util.LinkedHashSet; import java.util.Set; -import java.util.stream.Collectors; +import java.util.stream.Stream; import org.junit.Rule; import org.junit.Test; @@ -472,12 +471,9 @@ public int getLockSessionId() } @Override - public Collection activeLocks() + public Stream activeLocks() { - return actualLockUnits.stream().map( lu -> lu.isExclusive() - ? new Locks.ActiveExclusiveLock( lu.resourceType(), lu.resourceId() ) - : new Locks.ActiveSharedLock( lu.resourceType(), lu.resourceId() ) ) - .collect( Collectors.toList() ); + return actualLockUnits.stream(); } } } diff --git a/enterprise/ha/src/main/java/org/neo4j/kernel/ha/lock/SlaveLocksClient.java b/enterprise/ha/src/main/java/org/neo4j/kernel/ha/lock/SlaveLocksClient.java index 2c9e366866411..0e703bfe0355e 100644 --- a/enterprise/ha/src/main/java/org/neo4j/kernel/ha/lock/SlaveLocksClient.java +++ b/enterprise/ha/src/main/java/org/neo4j/kernel/ha/lock/SlaveLocksClient.java @@ -20,10 +20,11 @@ package org.neo4j.kernel.ha.lock; import java.util.Arrays; -import java.util.Collection; import java.util.HashMap; import java.util.Map; import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Function; +import java.util.stream.Stream; import org.neo4j.collection.primitive.PrimitiveLongCollections; import org.neo4j.com.ComException; @@ -35,6 +36,7 @@ import org.neo4j.kernel.DeadlockDetectedException; import org.neo4j.kernel.ha.com.RequestContextFactory; import org.neo4j.kernel.ha.com.master.Master; +import org.neo4j.kernel.impl.locking.ActiveLock; import org.neo4j.kernel.impl.locking.LockClientStoppedException; import org.neo4j.kernel.impl.locking.LockTracer; import org.neo4j.kernel.impl.locking.LockWaitEvent; @@ -58,6 +60,9 @@ */ class SlaveLocksClient implements Locks.Client { + private static final Function>,Stream> + EXCLUSIVE_ACTIVE_LOCKS = activeLocks( ActiveLock.Factory.EXCLUSIVE_LOCK ), + SHARED_ACTIVE_LOCKS = activeLocks( ActiveLock.Factory.SHARED_LOCK ); private final Master master; private final Locks.Client client; private final Locks localLockManager; @@ -242,9 +247,21 @@ public int getLockSessionId() } @Override - public Collection activeLocks() + public Stream activeLocks() { - throw new UnsupportedOperationException( "not implemented" ); + return Stream.concat( // TODO: do we need to make these maps of locks ConcurrentHashMaps? + exclusiveLocks.entrySet().stream().flatMap( EXCLUSIVE_ACTIVE_LOCKS ), + sharedLocks.entrySet().stream().flatMap( SHARED_ACTIVE_LOCKS ) ); + } + + private static Function>,Stream> activeLocks( + ActiveLock.Factory activeLock ) + { + return entry -> entry.getValue().keySet().stream().map( resourceId -> + { + ResourceType resourceType = entry.getKey(); + return activeLock.create( resourceType, resourceId ); + } ); } private void stopLockSessionOnMaster() diff --git a/enterprise/kernel/src/main/java/org/neo4j/kernel/enterprise/builtinprocs/ActiveLocksQueryResult.java b/enterprise/kernel/src/main/java/org/neo4j/kernel/enterprise/builtinprocs/ActiveLocksQueryResult.java index 9c6f358007002..5a90ed4fed846 100644 --- a/enterprise/kernel/src/main/java/org/neo4j/kernel/enterprise/builtinprocs/ActiveLocksQueryResult.java +++ b/enterprise/kernel/src/main/java/org/neo4j/kernel/enterprise/builtinprocs/ActiveLocksQueryResult.java @@ -19,7 +19,7 @@ */ package org.neo4j.kernel.enterprise.builtinprocs; -import org.neo4j.kernel.impl.locking.Locks; +import org.neo4j.kernel.impl.locking.ActiveLock; public class ActiveLocksQueryResult { @@ -27,10 +27,10 @@ public class ActiveLocksQueryResult public final String resourceType; public final long resourceId; - public ActiveLocksQueryResult( Locks.ActiveLock lock ) + public ActiveLocksQueryResult( ActiveLock lock ) { this.mode = lock.mode(); - this.resourceType = lock.resourceType.toString(); - this.resourceId = lock.resourceId; + this.resourceType = lock.resourceType().name(); + this.resourceId = lock.resourceId(); } } diff --git a/enterprise/kernel/src/main/java/org/neo4j/kernel/enterprise/builtinprocs/EnterpriseBuiltInDbmsProcedures.java b/enterprise/kernel/src/main/java/org/neo4j/kernel/enterprise/builtinprocs/EnterpriseBuiltInDbmsProcedures.java index f1fc31a8a0418..73b2b37befefc 100644 --- a/enterprise/kernel/src/main/java/org/neo4j/kernel/enterprise/builtinprocs/EnterpriseBuiltInDbmsProcedures.java +++ b/enterprise/kernel/src/main/java/org/neo4j/kernel/enterprise/builtinprocs/EnterpriseBuiltInDbmsProcedures.java @@ -254,7 +254,7 @@ public Stream listActiveLocks( @Name( "queryId" ) String { long id = fromExternalString( queryId ).kernelQueryId(); return getActiveTransactions( tx -> executingQueriesWithId( id, tx ) ) - .flatMap( pair -> pair.first().activeLocks().stream().map( ActiveLocksQueryResult::new ) ); + .flatMap( pair -> pair.first().activeLocks().map( ActiveLocksQueryResult::new ) ); } catch ( UncaughtCheckedException uncaught ) { diff --git a/enterprise/kernel/src/main/java/org/neo4j/kernel/impl/enterprise/lock/forseti/ForsetiClient.java b/enterprise/kernel/src/main/java/org/neo4j/kernel/impl/enterprise/lock/forseti/ForsetiClient.java index 8a663aab83f2f..b6b1fd3f7c472 100644 --- a/enterprise/kernel/src/main/java/org/neo4j/kernel/impl/enterprise/lock/forseti/ForsetiClient.java +++ b/enterprise/kernel/src/main/java/org/neo4j/kernel/impl/enterprise/lock/forseti/ForsetiClient.java @@ -21,10 +21,10 @@ import java.time.Clock; import java.util.ArrayList; -import java.util.Collection; import java.util.List; import java.util.concurrent.ConcurrentMap; import java.util.function.IntFunction; +import java.util.stream.Stream; import org.neo4j.collection.pool.Pool; import org.neo4j.collection.primitive.Primitive; @@ -34,6 +34,7 @@ import org.neo4j.graphdb.factory.GraphDatabaseSettings; import org.neo4j.kernel.DeadlockDetectedException; import org.neo4j.kernel.impl.enterprise.lock.forseti.ForsetiLockManager.DeadlockResolutionStrategy; +import org.neo4j.kernel.impl.locking.ActiveLock; import org.neo4j.kernel.impl.locking.LockAcquisitionTimeoutException; import org.neo4j.kernel.impl.locking.LockClientStateHolder; import org.neo4j.kernel.impl.locking.LockClientStoppedException; @@ -640,34 +641,32 @@ public int getLockSessionId() } @Override - public Collection activeLocks() + public Stream activeLocks() { - List locks = new ArrayList<>(); - for ( int typeId = 0; typeId < exclusiveLockCounts.length; typeId++ ) - { - PrimitiveLongIntMap lockCounts = exclusiveLockCounts[typeId]; - if (lockCounts != null) - { - ResourceType resourceType = ResourceTypes.fromId( typeId ); - lockCounts.visitEntries( (resourceId, count) -> { - locks.add( new Locks.ActiveExclusiveLock( resourceType, resourceId ) ); - return false; - } ); - } - } - for ( int typeId = 0; typeId < sharedLockCounts.length; typeId++ ) + List locks = new ArrayList<>(); + collectActiveLocks( exclusiveLockCounts, locks, ActiveLock.Factory.EXCLUSIVE_LOCK ); + collectActiveLocks( sharedLockCounts, locks, ActiveLock.Factory.SHARED_LOCK ); + return locks.stream(); + } + + private static void collectActiveLocks( + PrimitiveLongIntMap[] counts, + List locks, + ActiveLock.Factory activeLock ) + { + for ( int typeId = 0; typeId < counts.length; typeId++ ) { - PrimitiveLongIntMap lockCounts = sharedLockCounts[typeId]; - if (lockCounts != null) + PrimitiveLongIntMap lockCounts = counts[typeId]; + if ( lockCounts != null ) { ResourceType resourceType = ResourceTypes.fromId( typeId ); - lockCounts.visitEntries( (resourceId, count) -> { - locks.add( new Locks.ActiveSharedLock( resourceType, resourceId ) ); + lockCounts.visitEntries( ( resourceId, count ) -> + { + locks.add( activeLock.create( resourceType, resourceId ) ); return false; } ); } } - return locks; } public int waitListSize() diff --git a/enterprise/kernel/src/test/java/org/neo4j/kernel/enterprise/builtinprocs/ListQueriesProcedureTest.java b/enterprise/kernel/src/test/java/org/neo4j/kernel/enterprise/builtinprocs/ListQueriesProcedureTest.java index 2d8aabd37dcc4..5bdab650e0067 100644 --- a/enterprise/kernel/src/test/java/org/neo4j/kernel/enterprise/builtinprocs/ListQueriesProcedureTest.java +++ b/enterprise/kernel/src/test/java/org/neo4j/kernel/enterprise/builtinprocs/ListQueriesProcedureTest.java @@ -90,11 +90,11 @@ public void shouldContainTheQueryItself() throws Exception public void shouldProvideElapsedCpuTime() throws Exception { // given - String QUERY = "MATCH (n) SET n.v = n.v + 1"; - try ( Resource test = test( db::createNode, Transaction::acquireWriteLock, QUERY ) ) + String query = "MATCH (n) SET n.v = n.v + 1"; + try ( Resource test = test( db::createNode, Transaction::acquireWriteLock, query ) ) { // when - Map data = getQueryListing( QUERY ); + Map data = getQueryListing( query ); // then assertThat( data, hasKey( "elapsedTimeMillis" ) ); @@ -116,7 +116,7 @@ public void shouldProvideElapsedCpuTime() throws Exception assertThat( waitTime1, instanceOf( Long.class ) ); // when - data = getQueryListing( QUERY ); + data = getQueryListing( query ); // then Long cpuTime2 = (Long) data.get( "cpuTimeMillis" ); @@ -148,7 +148,7 @@ public void shouldListPlannerAndRuntimeUsed() throws Exception public void shouldListActiveLocks() throws Exception { // given - String QUERY = "MATCH (x:X) SET x.v = 5 WITH count(x) AS num MATCH (y:Y) SET y.c = num"; + String query = "MATCH (x:X) SET x.v = 5 WITH count(x) AS num MATCH (y:Y) SET y.c = num"; Set locked = new HashSet<>(); try ( Resource test = test( () -> { @@ -157,13 +157,13 @@ public void shouldListActiveLocks() throws Exception locked.add( db.createNode( label( "X" ) ).getId() ); } return db.createNode( label( "Y" ) ); - }, Transaction::acquireWriteLock, QUERY ) ) + }, Transaction::acquireWriteLock, query ) ) { // when try ( Result rows = db.execute( "CALL dbms.listQueries() YIELD query AS queryText, queryId " + "WHERE queryText = $queryText " + "CALL dbms.listActiveLocks(queryId) YIELD mode, resourceType, resourceId " - + "RETURN *", singletonMap( "queryText", QUERY ) ) ) + + "RETURN *", singletonMap( "queryText", query ) ) ) { // then Set ids = new HashSet<>();