Skip to content

Commit

Permalink
Add support for checking store id on edge server startup
Browse files Browse the repository at this point in the history
Now when the edge server startup of the local database is not empty it
will check that its local store id is the same as the store id of one
core server.  In such a case the edge server is allowed to join the
cluster otherwise it will not join by shutting down.
  • Loading branch information
davidegrohmann committed Jul 4, 2016
1 parent d8ab022 commit 93e697d
Show file tree
Hide file tree
Showing 26 changed files with 640 additions and 37 deletions.
17 changes: 17 additions & 0 deletions community/common/src/main/java/org/neo4j/function/Predicates.java
Expand Up @@ -155,6 +155,23 @@ public static void await( Supplier<Boolean> condition, long timeout, TimeUnit un
await( condition, timeout, unit, defaultPollInterval, TimeUnit.MILLISECONDS );
}

public static void awaitEx( ThrowingSupplier<Boolean, Exception> condition, long timeout, TimeUnit unit )
throws TimeoutException, InterruptedException
{
int defaultPollInterval = 20;
Supplier<Boolean> adapter = () -> {
try
{
return condition.get();
}
catch ( Exception e )
{
throw new RuntimeException( e );
}
};
await( adapter, timeout, unit, defaultPollInterval, TimeUnit.MILLISECONDS );
}

public static void await( Supplier<Boolean> condition, long timeout, TimeUnit timeoutUnit, long pollInterval, TimeUnit pollUnit )
throws TimeoutException, InterruptedException
{
Expand Down
Expand Up @@ -38,8 +38,10 @@
import org.neo4j.coreedge.catchup.storecopy.core.CoreSnapshotEncoder;
import org.neo4j.coreedge.catchup.storecopy.core.CoreSnapshotRequestDecoder;
import org.neo4j.coreedge.catchup.storecopy.core.CoreSnapshotRequestHandler;
import org.neo4j.coreedge.catchup.storecopy.core.GetStoreIdRequestHandler;
import org.neo4j.coreedge.catchup.storecopy.core.GetStoreRequestHandler;
import org.neo4j.coreedge.catchup.storecopy.core.StoreCopyFinishedResponseEncoder;
import org.neo4j.coreedge.catchup.storecopy.edge.GetStoreIdRequestDecoder;
import org.neo4j.coreedge.catchup.storecopy.edge.GetStoreRequestDecoder;
import org.neo4j.coreedge.catchup.tx.core.TxPullRequestDecoder;
import org.neo4j.coreedge.catchup.tx.core.TxPullRequestHandler;
Expand Down Expand Up @@ -143,6 +145,8 @@ protected void initChannel( SocketChannel ch ) throws Exception
pipeline.addLast( new GetStoreRequestDecoder( protocol ) );
pipeline.addLast( new GetStoreRequestHandler( protocol, dataSourceSupplier,
checkPointerSupplier ) );
pipeline.addLast( new GetStoreIdRequestDecoder( protocol ) );
pipeline.addLast( new GetStoreIdRequestHandler( protocol, storeIdSupplier ) );

pipeline.addLast( new CoreSnapshotRequestDecoder( protocol ) );
pipeline.addLast( new CoreSnapshotRequestHandler( protocol, coreState ) );
Expand Down
Expand Up @@ -35,6 +35,6 @@ public boolean isExpecting( NextMessage message )

public enum NextMessage
{
MESSAGE_TYPE, GET_STORE, GET_RAFT_STATE, TX_PULL
MESSAGE_TYPE, GET_STORE, GET_STORE_ID, GET_RAFT_STATE, TX_PULL
}
}
Expand Up @@ -28,6 +28,7 @@ public enum RequestMessageType implements Message
TX_PULL_REQUEST( (byte) 1 ),
STORE( (byte) 2 ),
RAFT_STATE( (byte) 3 ),
STORE_ID( (byte) 4 ),
UNKNOWN( (byte) 404 );

private byte messageType;
Expand Down
Expand Up @@ -57,6 +57,10 @@ else if ( requestMessageType.equals( RequestMessageType.STORE ) )
{
protocol.expect( NextMessage.GET_STORE );
}
else if ( requestMessageType.equals( RequestMessageType.STORE_ID ) )
{
protocol.expect( NextMessage.GET_STORE_ID );
}
else if ( requestMessageType.equals( RequestMessageType.RAFT_STATE ) )
{
protocol.expect( NextMessage.GET_RAFT_STATE );
Expand Down
Expand Up @@ -23,14 +23,17 @@
import io.netty.channel.socket.SocketChannel;

import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;

import org.neo4j.coreedge.catchup.RequestMessageType;
import org.neo4j.coreedge.catchup.storecopy.core.CoreSnapshotListener;
import org.neo4j.coreedge.catchup.storecopy.core.CoreSnapshotRequest;
import org.neo4j.coreedge.catchup.storecopy.edge.GetStoreIdRequest;
import org.neo4j.coreedge.catchup.storecopy.edge.GetStoreRequest;
import org.neo4j.coreedge.catchup.storecopy.edge.StoreFileReceiver;
import org.neo4j.coreedge.catchup.storecopy.edge.StoreFileStreamingCompleteListener;
import org.neo4j.coreedge.catchup.storecopy.edge.StoreFileStreams;
import org.neo4j.coreedge.catchup.storecopy.edge.StoreIdReceiver;
import org.neo4j.coreedge.catchup.tx.edge.PullRequestMonitor;
import org.neo4j.coreedge.catchup.tx.edge.TxPullRequest;
import org.neo4j.coreedge.catchup.tx.edge.TxPullResponse;
Expand All @@ -44,20 +47,23 @@
import org.neo4j.coreedge.server.CoreMember;
import org.neo4j.coreedge.server.NonBlockingChannels;
import org.neo4j.coreedge.server.SenderService;
import org.neo4j.coreedge.server.StoreId;
import org.neo4j.helpers.Listeners;
import org.neo4j.kernel.lifecycle.LifecycleAdapter;
import org.neo4j.kernel.monitoring.Monitors;
import org.neo4j.logging.LogProvider;

import static java.util.Arrays.asList;

public abstract class CoreClient extends LifecycleAdapter implements StoreFileReceiver,
public abstract class CoreClient extends LifecycleAdapter implements StoreFileReceiver, StoreIdReceiver,
StoreFileStreamingCompleteListener,
TxStreamCompleteListener, TxPullResponseListener, CoreSnapshotListener
TxStreamCompleteListener, TxPullResponseListener,
CoreSnapshotListener
{
private final PullRequestMonitor pullRequestMonitor;
private final SenderService senderService;
private StoreFileStreams storeFileStreams = null;
private Consumer<StoreId> storeIdConsumer = null;
private final Listeners<StoreFileStreamingCompleteListener> storeFileStreamingCompleteListeners = new Listeners<>();
private final Listeners<TxStreamCompleteListener> txStreamCompleteListeners = new Listeners<>();
private final Listeners<TxPullResponseListener> txPullResponseListeners = new Listeners<>();
Expand All @@ -81,6 +87,12 @@ public void requestStore( CoreMember serverAddress )
send( serverAddress, RequestMessageType.STORE, getStoreRequest );
}

public void requestStoreId( CoreMember serverAddress )
{
GetStoreIdRequest getStoreIdRequest = new GetStoreIdRequest();
send( serverAddress, RequestMessageType.STORE_ID, getStoreIdRequest );
}

public CompletableFuture<CoreSnapshot> requestCoreSnapshot( CoreMember serverAddress )
{
coreSnapshotFuture = new CompletableFuture<>();
Expand Down Expand Up @@ -144,6 +156,11 @@ public void setStoreFileStreams( StoreFileStreams storeFileStreams )
this.storeFileStreams = storeFileStreams;
}

public void setStoreIdConsumer( Consumer<StoreId> storeIdConsumer )
{
this.storeIdConsumer = storeIdConsumer;
}

@Override
public void onFileStreamingComplete( long lastCommittedTxBeforeStoreCopy )
{
Expand All @@ -163,6 +180,12 @@ public void onTxReceived( final TxPullResponse tx )
txPullResponseListeners.notify( listener -> listener.onTxReceived( tx ) );
}

@Override
public void onStoreIdReceived( final StoreId storeId )
{
storeIdConsumer.accept( storeId );
}

@Override
public void onSnapshotReceived( CoreSnapshot snapshot )
{
Expand Down
Expand Up @@ -25,9 +25,4 @@ public StoreCopyFailedException( Throwable cause )
{
super( cause );
}

public StoreCopyFailedException( String message )
{
super( message );
}
}
@@ -0,0 +1,28 @@
/*
* Copyright (c) 2002-2016 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.coreedge.catchup.storecopy;

public class StoreIdDownloadFailedException extends Exception
{
public StoreIdDownloadFailedException( Throwable cause )
{
super( cause );
}
}
@@ -0,0 +1,56 @@
/*
* Copyright (c) 2002-2016 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.coreedge.catchup.storecopy.core;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;

import java.util.function.Supplier;

import org.neo4j.coreedge.catchup.CatchupServerProtocol;
import org.neo4j.coreedge.catchup.ResponseMessageType;
import org.neo4j.coreedge.catchup.storecopy.edge.GetStoreIdRequest;
import org.neo4j.coreedge.raft.replication.storeid.StoreIdMarshal;
import org.neo4j.coreedge.server.StoreId;

import static org.neo4j.coreedge.catchup.CatchupServerProtocol.NextMessage;

public class GetStoreIdRequestHandler extends SimpleChannelInboundHandler<GetStoreIdRequest>
{
private final CatchupServerProtocol protocol;
private final Supplier<StoreId> storeIdSupplier;

public GetStoreIdRequestHandler( CatchupServerProtocol protocol, Supplier<StoreId> storeIdSupplier )
{
this.protocol = protocol;
this.storeIdSupplier = storeIdSupplier;
}

@Override
protected void channelRead0( ChannelHandlerContext ctx, GetStoreIdRequest msg ) throws Exception
{
StoreId storeId = storeIdSupplier.get();
ctx.writeAndFlush( ResponseMessageType.STORE_ID );
NetworkFlushableByteBuf channel = new NetworkFlushableByteBuf( ctx.alloc().buffer() );
StoreIdMarshal.marshal( storeId, channel );
ctx.writeAndFlush( channel.buffer() );
protocol.expect( NextMessage.MESSAGE_TYPE );
}
}
Expand Up @@ -85,11 +85,15 @@ protected void initChannel( SocketChannel ch ) throws Exception

pipeline.addLast( new TxPullRequestEncoder() );
pipeline.addLast( new GetStoreRequestEncoder() );
pipeline.addLast( new GetStoreIdRequestEncoder() );
pipeline.addLast( new ResponseMessageTypeEncoder() );
pipeline.addLast( new RequestMessageTypeEncoder() );

pipeline.addLast( new ClientMessageTypeHandler( protocol, logProvider ) );

pipeline.addLast( new GetStoreIdResponseDecoder( protocol ) );
pipeline.addLast( new GetStoreIdResponseHandler( protocol, owner ) );

pipeline.addLast( new TxPullResponseDecoder( protocol ) );
pipeline.addLast( new TxPullResponseHandler( protocol, owner ) );

Expand Down
@@ -0,0 +1,28 @@
/*
* Copyright (c) 2002-2016 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.coreedge.catchup.storecopy.edge;

import org.neo4j.coreedge.catchup.RequestMessageType;
import org.neo4j.coreedge.network.Message;

public class GetStoreIdRequest implements Message
{
public static final RequestMessageType MESSAGE_TYPE = RequestMessageType.STORE_ID;
}
@@ -0,0 +1,52 @@
/*
* Copyright (c) 2002-2016 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.coreedge.catchup.storecopy.edge;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToMessageDecoder;

import java.util.List;

import org.neo4j.coreedge.catchup.CatchupServerProtocol;

public class GetStoreIdRequestDecoder extends MessageToMessageDecoder<ByteBuf>
{
private final CatchupServerProtocol protocol;

public GetStoreIdRequestDecoder( CatchupServerProtocol protocol )
{
this.protocol = protocol;
}

@Override
protected void decode( ChannelHandlerContext ctx, ByteBuf msg, List<Object> out ) throws Exception
{
if ( protocol.isExpecting( CatchupServerProtocol.NextMessage.GET_STORE_ID ) )
{
out.add( new GetStoreIdRequest() );
}
else
{
out.add( Unpooled.copiedBuffer( msg ) );
}
}
}
@@ -0,0 +1,37 @@
/*
* Copyright (c) 2002-2016 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.coreedge.catchup.storecopy.edge;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToMessageEncoder;

import java.util.List;

public class GetStoreIdRequestEncoder extends MessageToMessageEncoder<GetStoreIdRequest>
{
@Override
protected void encode( ChannelHandlerContext ctx, GetStoreIdRequest msg, List<Object> out ) throws Exception
{
ByteBuf buffer = ctx.alloc().buffer();
buffer.writeByte( 0 );
out.add( buffer );
}
}

0 comments on commit 93e697d

Please sign in to comment.