Skip to content

Commit

Permalink
Restructuring discoverMembers to return all the addresses
Browse files Browse the repository at this point in the history
And added acquireEndpoints which we can call to get back
read/write endpoints
  • Loading branch information
jimwebber authored and Mark Needham committed May 24, 2016
1 parent e5244d8 commit 9f6f693
Show file tree
Hide file tree
Showing 29 changed files with 752 additions and 419 deletions.
Expand Up @@ -458,6 +458,28 @@ public Code code()
}
}

enum Cluster implements Status
{
// transient errors
NoLeader( TransientError,
"No leader available at the moment. Retrying your request at a later time may succeed." ),

;

private final Code code;

@Override
public Code code()
{
return code;
}

Cluster( Classification classification, String description )
{
this.code = new Code( classification, this, description );
}
}

Code code();

class Code
Expand Down
Expand Up @@ -93,7 +93,8 @@ private Set<CoreMember> toCoreMembers( Set<Member> members )
{
coreMembers.add( new CoreMember(
new AdvertisedSocketAddress( member.getStringAttribute( TRANSACTION_SERVER ) ),
new AdvertisedSocketAddress( member.getStringAttribute( RAFT_SERVER ) )
new AdvertisedSocketAddress( member.getStringAttribute( RAFT_SERVER ) ),
new AdvertisedSocketAddress( member.getStringAttribute( BOLT_SERVER ) )
) );
}

Expand Down
Expand Up @@ -21,11 +21,11 @@

import java.util.Set;

import org.neo4j.coreedge.raft.RaftInstance;
import org.neo4j.coreedge.raft.RaftInstance.BootstrapException;
import org.neo4j.coreedge.raft.log.RaftLogCompactedException;
import org.neo4j.coreedge.server.CoreMember;
import org.neo4j.coreedge.raft.RaftInstance;
import org.neo4j.coreedge.raft.membership.CoreMemberSet;
import org.neo4j.coreedge.server.CoreMember;
import org.neo4j.kernel.lifecycle.LifecycleAdapter;

public class RaftDiscoveryServiceConnector extends LifecycleAdapter implements CoreTopologyService.Listener
Expand Down
Expand Up @@ -19,6 +19,14 @@
*/
package org.neo4j.coreedge.raft;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Set;
import java.util.concurrent.TimeoutException;
import java.util.function.Predicate;
import java.util.function.Supplier;

import org.neo4j.coreedge.helper.VolatileFuture;
import org.neo4j.coreedge.raft.log.RaftLog;
import org.neo4j.coreedge.raft.log.RaftLogCompactedException;
Expand All @@ -42,16 +50,9 @@
import org.neo4j.logging.Log;
import org.neo4j.logging.LogProvider;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Set;
import java.util.concurrent.TimeoutException;
import java.util.function.Predicate;
import java.util.function.Supplier;

import static java.lang.String.format;
import static java.util.Collections.singletonList;

import static org.neo4j.coreedge.raft.roles.Role.LEADER;

/**
Expand Down Expand Up @@ -203,7 +204,7 @@ public MEMBER getLeader() throws NoLeaderFoundException
return waitForLeader( 0, member -> member != null );
}

public MEMBER waitForLeader( long timeoutMillis, Predicate<MEMBER> predicate ) throws NoLeaderFoundException
private MEMBER waitForLeader( long timeoutMillis, Predicate<MEMBER> predicate ) throws NoLeaderFoundException
{
try
{
Expand Down
Expand Up @@ -145,8 +145,9 @@ private CoreMember retrieveMember( ByteBuf buffer ) throws UnsupportedEncodingEx

AdvertisedSocketAddress coreAddress = marshal.unmarshal( buffer );
AdvertisedSocketAddress raftAddress = marshal.unmarshal( buffer );
AdvertisedSocketAddress boltAddress = marshal.unmarshal( buffer );

return new CoreMember( coreAddress, raftAddress );
return new CoreMember( coreAddress, raftAddress, boltAddress );
}

@Override
Expand Down
Expand Up @@ -85,8 +85,8 @@ else if ( message instanceof RaftMessages.AppendEntries.Request )
}
else if ( message instanceof RaftMessages.AppendEntries.Response )
{
RaftMessages.AppendEntries.Response<CoreMember> appendResponse = (RaftMessages.AppendEntries
.Response<CoreMember>) message;
RaftMessages.AppendEntries.Response<CoreMember> appendResponse =
(RaftMessages.AppendEntries.Response<CoreMember>) message;

buf.writeLong( appendResponse.term() );
buf.writeBoolean( appendResponse.success() );
Expand Down Expand Up @@ -126,5 +126,6 @@ private void writeMember( CoreMember member, ByteBuf buffer ) throws Unsupported

marshal.marshal( member.getCoreAddress(), buffer );
marshal.marshal( member.getRaftAddress(), buffer );
marshal.marshal( member.getBoltAddress(), buffer );
}
}
Expand Up @@ -36,17 +36,21 @@ public class CoreMember
{
private final AdvertisedSocketAddress coreAddress;
private final AdvertisedSocketAddress raftAddress;
private final AdvertisedSocketAddress boltAddress;

public CoreMember( AdvertisedSocketAddress coreAddress, AdvertisedSocketAddress raftAddress )
public CoreMember( AdvertisedSocketAddress coreAddress, AdvertisedSocketAddress raftAddress,
AdvertisedSocketAddress boltAddress)
{
this.coreAddress = coreAddress;
this.raftAddress = raftAddress;
this.boltAddress = boltAddress;
}

@Override
public String toString()
{
return format( "CoreMember{coreAddress=%s, raftAddress=%s}", coreAddress, raftAddress );
return format( "CoreMember{coreAddress=%s, raftAddress=%s, boltAddress=%s}",
coreAddress, raftAddress, boltAddress );
}

@Override
Expand All @@ -62,13 +66,14 @@ public boolean equals( Object o )
}
CoreMember that = (CoreMember) o;
return Objects.equals( coreAddress, that.coreAddress ) &&
Objects.equals( raftAddress, that.raftAddress );
Objects.equals( raftAddress, that.raftAddress ) &&
Objects.equals( boltAddress, that.boltAddress );
}

@Override
public int hashCode()
{
return Objects.hash( coreAddress, raftAddress );
return Objects.hash( coreAddress, raftAddress, boltAddress );
}

public AdvertisedSocketAddress getCoreAddress()
Expand All @@ -81,6 +86,11 @@ public AdvertisedSocketAddress getRaftAddress()
return raftAddress;
}

public AdvertisedSocketAddress getBoltAddress()
{
return boltAddress;
}

/**
* Format:
* ┌────────────────────────────────────────────┐
Expand All @@ -95,6 +105,13 @@ public AdvertisedSocketAddress getRaftAddress()
* │ │port 4 bytes││
* │ └─────────────────────────────┘│
* └────────────────────────────────────────────┘
* │bolt address ┌─────────────────────────────┐│
* │ │hostnameLength 4 bytes││
* │ │hostnameBytes variable││
* │ │port 4 bytes││
* │ └─────────────────────────────┘│
* └────────────────────────────────────────────┘
*
* <p/>
* This Marshal implementation can also serialize and deserialize null values. They are encoded as a CoreMember
* with empty strings in the address fields, so they still adhere to the format displayed above.
Expand All @@ -116,11 +133,13 @@ public void marshal( CoreMember member, ByteBuffer buffer )
{
byteBufMarshal.marshal( NULL_ADDRESS, buffer );
byteBufMarshal.marshal( NULL_ADDRESS, buffer );
byteBufMarshal.marshal( NULL_ADDRESS, buffer );
}
else
{
byteBufMarshal.marshal( member.getCoreAddress(), buffer );
byteBufMarshal.marshal( member.getRaftAddress(), buffer );
byteBufMarshal.marshal( member.getBoltAddress(), buffer );
}
}

Expand All @@ -129,22 +148,25 @@ public void marshal( CoreMember member, ByteBuf buffer )
{
byteBufMarshal.marshal( member.getCoreAddress(), buffer );
byteBufMarshal.marshal( member.getRaftAddress(), buffer );
byteBufMarshal.marshal( member.getBoltAddress(), buffer );
}

public CoreMember unmarshal( ByteBuffer buffer )
{
AdvertisedSocketAddress coreAddress = byteBufMarshal.unmarshal( buffer );
AdvertisedSocketAddress raftAddress = byteBufMarshal.unmarshal( buffer );
AdvertisedSocketAddress boltAddress = byteBufMarshal.unmarshal( buffer );

return dealWithPossibleNullAddress( coreAddress, raftAddress );
return dealWithPossibleNullAddress( coreAddress, raftAddress, boltAddress );
}

@Override
public CoreMember unmarshal( ByteBuf source )
{
AdvertisedSocketAddress coreAddress = byteBufMarshal.unmarshal( source );
AdvertisedSocketAddress raftAddress = byteBufMarshal.unmarshal( source );
return dealWithPossibleNullAddress( coreAddress, raftAddress );
AdvertisedSocketAddress boltAddress = byteBufMarshal.unmarshal( source );
return dealWithPossibleNullAddress( coreAddress, raftAddress, boltAddress );
}

@Override
Expand All @@ -154,11 +176,13 @@ public void marshal( CoreMember member, WritableChannel channel ) throws IOExcep
{
channelMarshal.marshal( NULL_ADDRESS, channel );
channelMarshal.marshal( NULL_ADDRESS, channel );
channelMarshal.marshal( NULL_ADDRESS, channel );
}
else
{
channelMarshal.marshal( member.getCoreAddress(), channel );
channelMarshal.marshal( member.getRaftAddress(), channel );
channelMarshal.marshal( member.getBoltAddress(), channel );
}
}

Expand All @@ -167,21 +191,26 @@ public CoreMember unmarshal( ReadableChannel source ) throws IOException
{
AdvertisedSocketAddress coreAddress = channelMarshal.unmarshal( source );
AdvertisedSocketAddress raftAddress = channelMarshal.unmarshal( source );
return dealWithPossibleNullAddress( coreAddress, raftAddress );

AdvertisedSocketAddress boltAddress = channelMarshal.unmarshal( source );
return dealWithPossibleNullAddress( coreAddress, raftAddress, boltAddress );
}

private CoreMember dealWithPossibleNullAddress( AdvertisedSocketAddress coreAddress, AdvertisedSocketAddress
raftAddress )
private CoreMember dealWithPossibleNullAddress( AdvertisedSocketAddress coreAddress,
AdvertisedSocketAddress raftAddress,
AdvertisedSocketAddress boltAddress)
{
if ( coreAddress == null || raftAddress == null || (coreAddress.equals( NULL_ADDRESS ) && raftAddress
.equals( NULL_ADDRESS )) )
if ( coreAddress == null ||
raftAddress == null ||
boltAddress == null ||
(coreAddress.equals( NULL_ADDRESS ) &&
raftAddress.equals( NULL_ADDRESS ) &&
boltAddress.equals( NULL_ADDRESS )) )
{
return null;
}
else
{
return new CoreMember( coreAddress, raftAddress );
return new CoreMember( coreAddress, raftAddress, boltAddress );
}
}
}
Expand Down

0 comments on commit 9f6f693

Please sign in to comment.