Skip to content

Commit

Permalink
Update dead client and idle channel reaper to cleanup clients locks s…
Browse files Browse the repository at this point in the history
…ession to prevent failure on next connection from the same client with same RequestContext.

Start Master and MasterImpl cleanup. Introduce conversation and conversation mamanger.
  • Loading branch information
MishaDemianenko authored and tinwelint committed Jun 25, 2015
1 parent 1970145 commit c5d7d33
Show file tree
Hide file tree
Showing 32 changed files with 755 additions and 181 deletions.
Expand Up @@ -23,7 +23,6 @@
import org.neo4j.collection.primitive.PrimitiveLongIterator;
import org.neo4j.function.LongPredicate;
import org.neo4j.kernel.api.exceptions.EntityNotFoundException;
import org.neo4j.kernel.api.properties.DefinedProperty;
import org.neo4j.kernel.api.properties.Property;
import org.neo4j.kernel.impl.api.operations.EntityOperations;
import org.neo4j.kernel.impl.api.operations.EntityReadOperations;
Expand Down
Expand Up @@ -19,7 +19,6 @@
*/
package org.neo4j.kernel.impl.api.state;

import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
Expand Down
Expand Up @@ -117,6 +117,16 @@ public void begin( KEY key ) throws ConcurrentAccessException
}
}

public void remove( KEY key )
{
Entry entry = repo.get( key );
if (entry == null)
{
return;
}
end0(key, entry.value);
}

/**
* End the life of a stored entry. If the entry is currently in use, it will be thrown out as soon as the other client
* is done with it.
Expand Down Expand Up @@ -166,7 +176,7 @@ public void end( KEY key )
public VALUE acquire( KEY key ) throws NoSuchEntryException, ConcurrentAccessException
{
Entry entry = repo.get( key );
if(entry == null)
if ( entry == null )
{
throw new NoSuchEntryException( String.format("Cannot access '%s', no such entry exists.", key) );
}
Expand All @@ -180,7 +190,7 @@ public VALUE acquire( KEY key ) throws NoSuchEntryException, ConcurrentAccessExc
public void release( KEY key )
{
Entry entry = repo.get( key );
if(!entry.release())
if(entry != null && !entry.release())
{
// This happens when another client has asked that this entry be ended while we were using it, leaving us
// a note to not release the object back to the public, and to end its life when we are done with it.
Expand Down
Expand Up @@ -20,7 +20,6 @@
package org.neo4j.kernel.impl.util.diffsets;

import java.util.Collections;
import java.util.Iterator;
import java.util.Set;

import org.neo4j.collection.primitive.PrimitiveIntIterator;
Expand Down
Expand Up @@ -19,7 +19,6 @@
*/
package org.neo4j.kernel.api.properties;

import org.junit.Ignore;
import org.junit.Test;

import java.util.concurrent.Callable;
Expand Down
Expand Up @@ -226,4 +226,19 @@ public void usingDuplicateKeysShouldDisposeOfPreemptiveAllocatedValue() throws E
}
assertThat(reapedValues, equalTo(asList(1l)));
}

@Test
public void shouldAllowBeginWithSameKeyAfterSessionRemoval() throws Exception
{
// Given
repo.begin( 1l );
repo.acquire( 1l );

// when
repo.remove( 1l );

//then
repo.begin( 1l );
assertThat( reapedValues, equalTo( asList( 0l ) ) );
}
}
Expand Up @@ -81,7 +81,7 @@ public MarshlandPool( Pool<T> delegatePool )
public T acquire()
{
// Try and get it from the thread local
LocalSlot<T> localSlot = puddle.get();;
LocalSlot<T> localSlot = puddle.get();

T object = localSlot.object;
if(object != null)
Expand Down
Expand Up @@ -89,7 +89,7 @@ protected RequestType<TheBackupInterface> getRequestContext( byte id )
}

@Override
protected void finishOffChannel( Channel channel, RequestContext context )
protected void cleanConversation( RequestContext context )
{
}
}
33 changes: 15 additions & 18 deletions enterprise/com/src/main/java/org/neo4j/com/Server.java
Expand Up @@ -19,17 +19,6 @@
*/
package org.neo4j.com;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.ObjectOutputStream;
import java.net.InetSocketAddress;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

import org.jboss.netty.bootstrap.ServerBootstrap;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBuffers;
Expand All @@ -47,6 +36,17 @@
import org.jboss.netty.channel.group.DefaultChannelGroup;
import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.ObjectOutputStream;
import java.net.InetSocketAddress;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

import org.neo4j.com.monitor.RequestMonitor;
import org.neo4j.helpers.Clock;
import org.neo4j.helpers.Exceptions;
Expand All @@ -55,14 +55,13 @@
import org.neo4j.kernel.impl.store.StoreId;
import org.neo4j.kernel.impl.transaction.CommittedTransactionRepresentation;
import org.neo4j.kernel.lifecycle.Lifecycle;
import org.neo4j.kernel.monitoring.ByteCounterMonitor;
import org.neo4j.logging.Log;
import org.neo4j.logging.LogProvider;
import org.neo4j.kernel.monitoring.ByteCounterMonitor;

import static java.util.concurrent.Executors.newCachedThreadPool;
import static java.util.concurrent.Executors.newScheduledThreadPool;
import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;

import static org.neo4j.com.DechunkingChannelBuffer.assertSameProtocolVersion;
import static org.neo4j.com.Protocol.addLengthFieldPipes;
import static org.neo4j.com.Protocol.assertChunkSizeIsWithinFrameSize;
Expand Down Expand Up @@ -116,7 +115,6 @@ public interface Configuration
private final int frameLength;
private final ByteCounterMonitor byteCounterMonitor;
private final RequestMonitor requestMonitor;
private final Clock clock;
private final byte applicationProtocolVersion;
private final TxChecksumVerifier txVerifier;
private ServerBootstrap bootstrap;
Expand All @@ -140,7 +138,6 @@ public Server( T requestTarget, Configuration config, LogProvider logProvider, i
this.logProvider = logProvider;
this.msgLog = this.logProvider.getLog( getClass() );
this.txVerifier = txVerifier;
this.clock = clock;
this.byteCounterMonitor = byteCounterMonitor;
this.requestMonitor = requestMonitor;

Expand Down Expand Up @@ -343,7 +340,7 @@ protected void tryToFinishOffChannel( Channel channel, RequestContext slave )
{
try
{
finishOffChannel( channel, slave );
cleanConversation( slave );
unmapSlave( channel );
}
catch ( Throwable failure ) // Unknown error trying to finish off the tx
Expand Down Expand Up @@ -378,7 +375,7 @@ public void run()
{
try
{
finishOffChannel( null, slave );
cleanConversation( slave );
}
catch ( Throwable e )
{
Expand Down Expand Up @@ -544,7 +541,7 @@ protected T getRequestTarget()
return requestTarget;
}

protected abstract void finishOffChannel( Channel channel, RequestContext context );
protected abstract void cleanConversation( RequestContext context );

private ChunkingChannelBuffer newChunkingBuffer( Channel channel )
{
Expand Down
Expand Up @@ -103,7 +103,7 @@ protected RequestType<MadeUpCommunicationInterface> getRequestContext( byte id )
}

@Override
protected void finishOffChannel( Channel channel, RequestContext context )
protected void cleanConversation( RequestContext context )
{
}

Expand Down
2 changes: 1 addition & 1 deletion enterprise/com/src/test/java/org/neo4j/com/ServerTest.java
Expand Up @@ -123,7 +123,7 @@ protected RequestType<Object> getRequestContext( byte id )
}

@Override
protected void finishOffChannel( Channel channel, RequestContext context )
protected void cleanConversation( RequestContext context )
{
}
};
Expand Down
Expand Up @@ -84,7 +84,7 @@ public class HaSettings
public static final Setting<TxPushStrategy> tx_push_strategy = setting( "ha.tx_push_strategy", options(
TxPushStrategy.class ), "fixed" );

public static enum TxPushStrategy
public enum TxPushStrategy
{
@Description("Round robin")
round_robin,
Expand Down
@@ -0,0 +1,34 @@
/*
* Copyright (c) 2002-2015 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.kernel.ha.cluster;

import org.neo4j.kernel.impl.locking.Locks;
import org.neo4j.kernel.impl.util.JobScheduler;

/**
* Conversation part of HA master SPI.
* Allows to hide dependencies from conversation management.
*/
public interface ConversationSPI
{
Locks.Client acquireClient();

JobScheduler.JobHandle scheduleRecurringJob( JobScheduler.Group group, long interval, Runnable job );
}
@@ -0,0 +1,53 @@
/*
* Copyright (c) 2002-2015 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.kernel.ha.cluster;

import java.util.concurrent.TimeUnit;

import org.neo4j.kernel.impl.locking.Locks;
import org.neo4j.kernel.impl.util.JobScheduler;

/**
* Default implementation of {@link ConversationSPI} used on master in HA setup.
*/
public class DefaultConversationSPI implements ConversationSPI
{
private Locks locks;
private JobScheduler jobScheduler;

public DefaultConversationSPI( Locks locks, JobScheduler jobScheduler )
{
this.locks = locks;
this.jobScheduler = jobScheduler;
}

@Override
public Locks.Client acquireClient()
{
return locks.newClient();
}

@Override
public JobScheduler.JobHandle scheduleRecurringJob( JobScheduler.Group group, long interval, Runnable job )
{
return jobScheduler.scheduleRecurring( group, job, interval, TimeUnit.MILLISECONDS);
}

}

0 comments on commit c5d7d33

Please sign in to comment.