Skip to content

Commit

Permalink
Fixed logic for loading Read replica maps
Browse files Browse the repository at this point in the history
- Actually checks for null on the dbName map now
- Is much clearer
- Does logging similar to that found when loading core member maps
  • Loading branch information
hugofirth committed Oct 5, 2018
1 parent b5f02e0 commit faea7f0
Showing 1 changed file with 76 additions and 32 deletions.
Expand Up @@ -29,11 +29,11 @@
import com.hazelcast.core.Member;
import com.hazelcast.core.MultiMap;

import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
Expand All @@ -47,13 +47,13 @@
import org.neo4j.causalclustering.identity.ClusterId;
import org.neo4j.causalclustering.identity.MemberId;
import org.neo4j.helpers.AdvertisedSocketAddress;
import org.neo4j.helpers.collection.Pair;
import org.neo4j.kernel.configuration.Config;
import org.neo4j.logging.Log;
import org.neo4j.stream.Streams;

import static java.util.Arrays.asList;
import static java.util.Collections.emptyMap;
import static java.util.stream.Stream.concat;
import static java.util.stream.Stream.of;
import static org.neo4j.causalclustering.core.CausalClusteringSettings.refuse_to_be_leader;
import static org.neo4j.helpers.SocketAddressParser.socketAddress;
import static org.neo4j.helpers.collection.Iterables.asSet;
Expand All @@ -79,6 +79,13 @@ public final class HazelcastClusterTopology
static final String READ_REPLICAS_DB_NAME_MAP = "read_replicas_database_names";
static final String DB_NAME_LEADER_TERM_PREFIX = "leader_term_for_database_name_";

// the attributes used for reconstructing read replica information
private static Collection<String> simpleRRAttrMapKeys = asList( READ_REPLICA_BOLT_ADDRESS_MAP, READ_REPLICA_TRANSACTION_SERVER_ADDRESS_MAP,
READ_REPLICA_MEMBER_ID_MAP, READ_REPLICAS_DB_NAME_MAP );

// the attributes used for reconstructing core member information
private static Collection<String> coreAttrKeys = asList( MEMBER_UUID, RAFT_SERVER, TRANSACTION_SERVER, CLIENT_CONNECTOR_ADDRESSES, MEMBER_DB_NAME );

private HazelcastClusterTopology()
{
}
Expand All @@ -89,7 +96,7 @@ static ReadReplicaTopology getReadReplicaTopology( HazelcastInstance hazelcastIn

if ( hazelcastInstance != null )
{
readReplicas = readReplicas( hazelcastInstance );
readReplicas = readReplicas( hazelcastInstance, log );
}
else
{
Expand Down Expand Up @@ -177,42 +184,82 @@ static boolean casClusterId( HazelcastInstance hazelcastInstance, ClusterId clus
return uuid == null || clusterId.uuid().equals( uuid );
}

private static Map<MemberId,ReadReplicaInfo> readReplicas( HazelcastInstance hazelcastInstance )
private static Map<MemberId,ReadReplicaInfo> readReplicas( HazelcastInstance hazelcastInstance, Log log )
{
Map<MemberId,ReadReplicaInfo> result = new HashMap<>();
List<String> missingAttrKeys = new ArrayList<>();
Map<String,IMap<String,String>> validatedSimpleAttrMaps = new HashMap<>();

IMap<String/*uuid*/,String/*boltAddress*/> clientAddressMap =
hazelcastInstance.getMap( READ_REPLICA_BOLT_ADDRESS_MAP );
IMap<String,String> txServerMap = hazelcastInstance.getMap( READ_REPLICA_TRANSACTION_SERVER_ADDRESS_MAP );
IMap<String,String> memberIdMap = hazelcastInstance.getMap( READ_REPLICA_MEMBER_ID_MAP );
MultiMap<String,String> serverGroups = hazelcastInstance.getMultiMap( SERVER_GROUPS_MULTIMAP );
IMap<String, String> memberDbMap = hazelcastInstance.getMap( READ_REPLICAS_DB_NAME_MAP );

if ( of( clientAddressMap, txServerMap, memberIdMap, serverGroups ).anyMatch( Objects::isNull ) )
for ( String attrMapKey: simpleRRAttrMapKeys )
{
return result;
IMap<String,String> attrMap = hazelcastInstance.getMap( attrMapKey );
if ( attrMap == null )
{
missingAttrKeys.add( attrMapKey );
}
else
{
validatedSimpleAttrMaps.put( attrMapKey, attrMap );
}
}

for ( String hzUUID : clientAddressMap.keySet() )
MultiMap<String,String> serverGroupsMap = hazelcastInstance.getMultiMap( SERVER_GROUPS_MULTIMAP );
if ( serverGroupsMap == null )
{
String sAddresses = clientAddressMap.get( hzUUID );
String sCatchupAddress = txServerMap.get( hzUUID );
String sMemberId = memberIdMap.get( hzUUID );
String dbName = memberDbMap.get( hzUUID );
Collection<String> sServerGroups = serverGroups.get( hzUUID );
missingAttrKeys.add( SERVER_GROUPS_MULTIMAP );
}

if ( concat( of( sServerGroups ), of( sAddresses, sCatchupAddress, sMemberId ) ).anyMatch( Objects::isNull ) )
int totalNumAttrMaps = simpleRRAttrMapKeys.size() + 1;
if ( !missingAttrKeys.isEmpty() )
{
if ( missingAttrKeys.size() != totalNumAttrMaps )
{
continue;
String missingAttrs = String.join( ", ", missingAttrKeys );
log.warn( "Some, but not all, of the read replica attribute maps are null, including %s", missingAttrs );
}

ClientConnectorAddresses clientConnectorAddresses = ClientConnectorAddresses.fromString( sAddresses );
AdvertisedSocketAddress catchupAddress = socketAddress( sCatchupAddress, AdvertisedSocketAddress::new );
return emptyMap();
}

ReadReplicaInfo readReplicaInfo = new ReadReplicaInfo( clientConnectorAddresses, catchupAddress, asSet( sServerGroups ), dbName );
result.put( new MemberId( UUID.fromString( sMemberId ) ), readReplicaInfo );
Stream<String> readReplicaHzIds = validatedSimpleAttrMaps.get( READ_REPLICA_BOLT_ADDRESS_MAP ).keySet().stream();

Map<MemberId,ReadReplicaInfo> validatedReadReplicas = readReplicaHzIds
.flatMap( hzId -> Streams.ofNullable( buildReadReplicaFromAttrMap( hzId, validatedSimpleAttrMaps, serverGroupsMap, log ) ) )
.collect( Collectors.toMap( Pair::first, Pair::other ) );

return validatedReadReplicas;
}

private static Pair<MemberId,ReadReplicaInfo> buildReadReplicaFromAttrMap( String hzId, Map<String,IMap<String,String>> validatedSimpleAttrMaps,
MultiMap<String,String> serverGroupsMap, Log log )
{

Map<String,String> memberAttrs = validatedSimpleAttrMaps.entrySet().stream()
.collect( Collectors.toMap( Map.Entry::getKey, e -> e.getValue().get( hzId ) ) );
Collection<String> serverGroups = serverGroupsMap.get( hzId );

for ( Map.Entry<String,String> attr : memberAttrs.entrySet() )
{
if ( attr.getValue() == null )
{
log.warn( "Missing attribute %s for read replica with hz id %s", attr.getKey(), hzId );
return null;
}
}
return result;

if ( serverGroups == null )
{
log.warn( "Missing attribute %s for read replica with hz id %s", SERVER_GROUPS_MULTIMAP, hzId );
return null;
}

ClientConnectorAddresses boltAddresses = ClientConnectorAddresses.fromString( memberAttrs.get( READ_REPLICA_BOLT_ADDRESS_MAP ) );
AdvertisedSocketAddress catchupAddress = socketAddress( memberAttrs.get( READ_REPLICA_TRANSACTION_SERVER_ADDRESS_MAP ), AdvertisedSocketAddress::new );
MemberId memberId = new MemberId( UUID.fromString( memberAttrs.get( READ_REPLICA_MEMBER_ID_MAP ) ) );
String memberDbName = memberAttrs.get( READ_REPLICAS_DB_NAME_MAP );
Set<String> serverGroupSet = asSet( serverGroups );

ReadReplicaInfo rrInfo = new ReadReplicaInfo( boltAddresses, catchupAddress, serverGroupSet, memberDbName );
return Pair.of( memberId, rrInfo );
}

static void casLeaders( HazelcastInstance hazelcastInstance, LeaderInfo leaderInfo, String dbName, Log log )
Expand Down Expand Up @@ -271,12 +318,9 @@ static Map<MemberId,CoreServerInfo> toCoreMemberMap( Set<Member> members, Log lo

for ( Member member : members )
{
Collection<String> attrKeys = asList( MEMBER_UUID, RAFT_SERVER, TRANSACTION_SERVER,
CLIENT_CONNECTOR_ADDRESSES, MEMBER_DB_NAME );

Map<String,String> attrMap = new HashMap<>();
boolean incomplete = false;
for ( String attrKey : attrKeys )
for ( String attrKey : coreAttrKeys )
{
String attrValue = member.getStringAttribute( attrKey );
if ( attrValue == null )
Expand Down

0 comments on commit faea7f0

Please sign in to comment.