Skip to content

Commit

Permalink
Make Locks.Client.activeLocks() return a Stream
Browse files Browse the repository at this point in the history
  • Loading branch information
thobe committed Jan 19, 2017
1 parent b38713a commit 985c9aa
Show file tree
Hide file tree
Showing 23 changed files with 257 additions and 193 deletions.
Expand Up @@ -19,14 +19,13 @@
*/
package org.neo4j.kernel.api;

import java.util.Collection;
import java.util.Optional;
import java.util.stream.Stream;

import org.neo4j.kernel.api.exceptions.Status;
import org.neo4j.kernel.api.security.SecurityContext;
import org.neo4j.kernel.impl.api.Kernel;
import org.neo4j.kernel.impl.locking.Locks;
import org.neo4j.kernel.impl.locking.ActiveLock;

/**
* View of a {@link KernelTransaction} that provides a limited set of actions against the transaction.
Expand Down Expand Up @@ -102,5 +101,5 @@ public interface KernelTransactionHandle
/**
* @return the lock requests granted for this transaction.
*/
Collection<Locks.ActiveLock> activeLocks();
Stream<? extends ActiveLock> activeLocks();
}
Expand Up @@ -29,6 +29,7 @@
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Supplier;
import java.util.stream.Stream;

import org.neo4j.collection.pool.Pool;
import org.neo4j.graphdb.TransactionTerminatedException;
Expand All @@ -50,6 +51,7 @@
import org.neo4j.kernel.impl.api.state.ConstraintIndexCreator;
import org.neo4j.kernel.impl.api.state.TxState;
import org.neo4j.kernel.impl.factory.AccessCapability;
import org.neo4j.kernel.impl.locking.ActiveLock;
import org.neo4j.kernel.impl.locking.LockTracer;
import org.neo4j.kernel.impl.locking.Locks;
import org.neo4j.kernel.impl.locking.StatementLocks;
Expand Down Expand Up @@ -793,10 +795,18 @@ public void dispose()
storageStatement.close();
}

public Collection<Locks.ActiveLock> activeLocks()
/**
* This method will be invoked by concurrent threads for inspecting the locks held by this transaction.
* <p>
* The fact that {@link #statementLocks} is a volatile fields, grants us enough of a read barrier to get a good
* enough snapshot of the lock state (as long as the underlying methods give us such guarantees).
*
* @return the locks held by this transaction.
*/
public Stream<? extends ActiveLock> activeLocks()
{
StatementLocks locks = this.statementLocks;
return locks == null ? Collections.emptyList() : locks.activeLocks();
return locks == null ? Stream.empty() : locks.activeLocks();
}

/**
Expand Down
Expand Up @@ -19,7 +19,6 @@
*/
package org.neo4j.kernel.impl.api;

import java.util.Collection;
import java.util.Optional;
import java.util.stream.Stream;

Expand All @@ -28,7 +27,7 @@
import org.neo4j.kernel.api.KernelTransactionHandle;
import org.neo4j.kernel.api.exceptions.Status;
import org.neo4j.kernel.api.security.SecurityContext;
import org.neo4j.kernel.impl.locking.Locks;
import org.neo4j.kernel.impl.locking.ActiveLock;

/**
* A {@link KernelTransactionHandle} that wraps the given {@link KernelTransactionImplementation}.
Expand Down Expand Up @@ -114,7 +113,7 @@ public Stream<ExecutingQuery> executingQueries()
}

@Override
public Collection<Locks.ActiveLock> activeLocks()
public Stream<? extends ActiveLock> activeLocks()
{
return tx.activeLocks();
}
Expand Down
@@ -0,0 +1,116 @@
/*
* Copyright (c) 2002-2017 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.kernel.impl.locking;

import java.util.Objects;

import org.neo4j.storageengine.api.lock.ResourceType;

public interface ActiveLock
{
String SHARED_MODE = "SHARED", EXCLUSIVE_MODE = "EXCLUSIVE";

String mode();

ResourceType resourceType();

long resourceId();

static ActiveLock exclusiveLock( ResourceType resourceType, long resourceId )
{
return new Implementation( resourceType, resourceId )
{
@Override
public String mode()
{
return EXCLUSIVE_MODE;
}
};
}

static ActiveLock sharedLock( ResourceType resourceType, long resourceId )
{
return new Implementation( resourceType, resourceId )
{
@Override
public String mode()
{
return SHARED_MODE;
}
};
}

interface Factory
{
Factory SHARED_LOCK = ActiveLock::sharedLock, EXCLUSIVE_LOCK = ActiveLock::exclusiveLock;

ActiveLock create( ResourceType resourceType, long resourceId );
}

abstract class Implementation implements ActiveLock
{
private final ResourceType resourceType;
private final long resourceId;

private Implementation( ResourceType resourceType, long resourceId )
{
this.resourceType = resourceType;
this.resourceId = resourceId;
}

@Override
public abstract String mode();

@Override
public ResourceType resourceType()
{
return resourceType;
}

@Override
public long resourceId()
{
return resourceId;
}

@Override
public boolean equals( Object o )
{
if ( this == o )
{
return true;
}
if ( !(o instanceof ActiveLock) )
{
return false;
}
ActiveLock that = (ActiveLock) o;
return resourceId == that.resourceId() &&
Objects.equals( mode(), that.mode() ) &&
Objects.equals( resourceType, that.resourceType() );
}

@Override
public int hashCode()
{
return Objects.hash( resourceType, resourceId, mode() );
}
}
}
Expand Up @@ -20,8 +20,7 @@
package org.neo4j.kernel.impl.locking;

import java.time.Clock;
import java.util.Collection;
import java.util.Objects;
import java.util.stream.Stream;

import org.neo4j.helpers.Service;
import org.neo4j.kernel.configuration.Config;
Expand Down Expand Up @@ -115,7 +114,7 @@ interface Client extends ResourceLocker, AutoCloseable
/** For slave transactions, this tracks an identifier for the lock session running on the master */
int getLockSessionId();

Collection<ActiveLock> activeLocks();
Stream<? extends ActiveLock> activeLocks();
}

/**
Expand All @@ -130,67 +129,4 @@ interface Client extends ResourceLocker, AutoCloseable
void accept(Visitor visitor);

void close();

abstract class ActiveLock
{
public final ResourceType resourceType;
public final long resourceId;

private ActiveLock( ResourceType resourceType, long resourceId )
{
this.resourceType = resourceType;
this.resourceId = resourceId;
}

@Override
public boolean equals( Object o )
{
if ( this == o )
{
return true;
}
if ( o.getClass() != this.getClass() )
{
return false;
}
ActiveLock activeLock = (ActiveLock) o;
return resourceId == activeLock.resourceId &&
Objects.equals( resourceType, activeLock.resourceType );
}

@Override
public int hashCode()
{
return Objects.hash( resourceType, resourceId );
}

public abstract String mode();
}

final class ActiveExclusiveLock extends ActiveLock
{
public ActiveExclusiveLock( ResourceType resourceType, long resourceId )
{
super( resourceType, resourceId );
}

@Override
public String mode()
{
return "EXCLUSIVE";
}
}
final class ActiveSharedLock extends ActiveLock
{
public ActiveSharedLock( ResourceType resourceType, long resourceId )
{
super( resourceType, resourceId );
}

@Override
public String mode()
{
return "SHARED";
}
}
}
Expand Up @@ -19,8 +19,7 @@
*/
package org.neo4j.kernel.impl.locking;

import java.util.Collection;
import java.util.Collections;
import java.util.stream.Stream;

import org.neo4j.storageengine.api.lock.AcquireLockTimeoutException;
import org.neo4j.storageengine.api.lock.ResourceType;
Expand Down Expand Up @@ -76,8 +75,8 @@ public int getLockSessionId()
}

@Override
public Collection<Locks.ActiveLock> activeLocks()
public Stream<? extends ActiveLock> activeLocks()
{
return Collections.emptyList();
return Stream.empty();
}
}
Expand Up @@ -19,7 +19,7 @@
*/
package org.neo4j.kernel.impl.locking;

import java.util.Collection;
import java.util.stream.Stream;

/**
* A {@link StatementLocks} implementation that uses given {@link Locks.Client} for both
Expand Down Expand Up @@ -65,7 +65,7 @@ public void close()
}

@Override
public Collection<Locks.ActiveLock> activeLocks()
public Stream<? extends ActiveLock> activeLocks()
{
return client.activeLocks();
}
Expand Down
Expand Up @@ -19,7 +19,7 @@
*/
package org.neo4j.kernel.impl.locking;

import java.util.Collection;
import java.util.stream.Stream;

import org.neo4j.kernel.impl.api.KernelStatement;

Expand Down Expand Up @@ -64,7 +64,9 @@ public interface StatementLocks extends AutoCloseable
/**
* List the locks held by this transaction.
*
* This method is invoked by concurrent threads in order to inspect the lock state in this transaction.
*
* @return the locks held by this transaction.
*/
Collection<Locks.ActiveLock> activeLocks();
Stream<? extends ActiveLock> activeLocks();
}

0 comments on commit 985c9aa

Please sign in to comment.