Skip to content

Commit

Permalink
Addressed NPE in hazelcast discovery service
Browse files Browse the repository at this point in the history
  • Loading branch information
hugofirth committed Oct 25, 2018
1 parent 67df105 commit 29ad563
Show file tree
Hide file tree
Showing 2 changed files with 105 additions and 15 deletions.
Expand Up @@ -29,7 +29,6 @@
import com.hazelcast.core.Member; import com.hazelcast.core.Member;
import com.hazelcast.core.MultiMap; import com.hazelcast.core.MultiMap;


import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet; import java.util.HashSet;
Expand All @@ -48,12 +47,12 @@
import org.neo4j.causalclustering.identity.ClusterId; import org.neo4j.causalclustering.identity.ClusterId;
import org.neo4j.causalclustering.identity.MemberId; import org.neo4j.causalclustering.identity.MemberId;
import org.neo4j.helpers.AdvertisedSocketAddress; import org.neo4j.helpers.AdvertisedSocketAddress;
import org.neo4j.helpers.collection.CollectorsUtil;
import org.neo4j.helpers.collection.Pair; import org.neo4j.helpers.collection.Pair;
import org.neo4j.kernel.configuration.Config; import org.neo4j.kernel.configuration.Config;
import org.neo4j.logging.Log; import org.neo4j.logging.Log;
import org.neo4j.stream.Streams; import org.neo4j.stream.Streams;


import static java.util.Arrays.asList;
import static java.util.Collections.emptyMap; import static java.util.Collections.emptyMap;
import static org.neo4j.causalclustering.core.CausalClusteringSettings.refuse_to_be_leader; import static org.neo4j.causalclustering.core.CausalClusteringSettings.refuse_to_be_leader;
import static org.neo4j.helpers.SocketAddressParser.socketAddress; import static org.neo4j.helpers.SocketAddressParser.socketAddress;
Expand Down Expand Up @@ -81,11 +80,12 @@ public final class HazelcastClusterTopology
static final String DB_NAME_LEADER_TERM_PREFIX = "leader_term_for_database_name_"; static final String DB_NAME_LEADER_TERM_PREFIX = "leader_term_for_database_name_";


// the attributes used for reconstructing read replica information // the attributes used for reconstructing read replica information
private static Collection<String> simpleRRAttrMapKeys = asList( READ_REPLICA_BOLT_ADDRESS_MAP, READ_REPLICA_TRANSACTION_SERVER_ADDRESS_MAP, private static Set<String> simpleRRAttrMapKeys = Stream.of( READ_REPLICA_BOLT_ADDRESS_MAP, READ_REPLICA_TRANSACTION_SERVER_ADDRESS_MAP,
READ_REPLICA_MEMBER_ID_MAP, READ_REPLICAS_DB_NAME_MAP ); READ_REPLICA_MEMBER_ID_MAP, READ_REPLICAS_DB_NAME_MAP ).collect( Collectors.toSet() );


// the attributes used for reconstructing core member information // the attributes used for reconstructing core member information
private static Collection<String> coreAttrKeys = asList( MEMBER_UUID, RAFT_SERVER, TRANSACTION_SERVER, CLIENT_CONNECTOR_ADDRESSES, MEMBER_DB_NAME ); private static Set<String> coreAttrKeys = Stream.of( MEMBER_UUID, RAFT_SERVER, TRANSACTION_SERVER, CLIENT_CONNECTOR_ADDRESSES,
MEMBER_DB_NAME ).collect( Collectors.toSet() );


private HazelcastClusterTopology() private HazelcastClusterTopology()
{ {
Expand Down Expand Up @@ -185,7 +185,7 @@ static boolean casClusterId( HazelcastInstance hazelcastInstance, ClusterId clus
return uuid == null || clusterId.uuid().equals( uuid ); return uuid == null || clusterId.uuid().equals( uuid );
} }


private static Map<MemberId,ReadReplicaInfo> readReplicas( HazelcastInstance hazelcastInstance, Log log ) static Map<MemberId,ReadReplicaInfo> readReplicas( HazelcastInstance hazelcastInstance, Log log )
{ {
Pair<Set<String>,Map<String,IMap<String,String>>> validatedSimpleAttrMaps = validatedSimpleAttrMaps( hazelcastInstance ); Pair<Set<String>,Map<String,IMap<String,String>>> validatedSimpleAttrMaps = validatedSimpleAttrMaps( hazelcastInstance );
Set<String> missingAttrKeys = validatedSimpleAttrMaps.first(); Set<String> missingAttrKeys = validatedSimpleAttrMaps.first();
Expand Down Expand Up @@ -252,19 +252,17 @@ private static Pair<Set<String>,Map<String,IMap<String,String>>> validatedSimple
private static Pair<MemberId,ReadReplicaInfo> buildReadReplicaFromAttrMap( String hzId, Map<String,IMap<String,String>> simpleAttrMaps, private static Pair<MemberId,ReadReplicaInfo> buildReadReplicaFromAttrMap( String hzId, Map<String,IMap<String,String>> simpleAttrMaps,
MultiMap<String,String> serverGroupsMap, Log log ) MultiMap<String,String> serverGroupsMap, Log log )
{ {
Map<String,String> memberAttrs = simpleAttrMaps.entrySet().stream()
.map( e -> Pair.of( e.getKey(), e.getValue().get( hzId ) ) )
.filter( p -> loggingNonNullMemberAttrPredicate( p, hzId, log ) )
.collect( CollectorsUtil.pairsToMap() );


Map<String,String> memberAttrs = simpleAttrMaps.entrySet().stream().collect( Collectors.toMap( Map.Entry::getKey, e -> e.getValue().get( hzId ) ) ); if ( !memberAttrs.keySet().containsAll( simpleRRAttrMapKeys ) )
Collection<String> memberServerGroups = serverGroupsMap.get( hzId );

for ( Map.Entry<String,String> attr : memberAttrs.entrySet() )
{ {
if ( attr.getValue() == null ) return null;
{
log.warn( "Missing attribute %s for read replica with hz id %s", attr.getKey(), hzId );
return null;
}
} }


Collection<String> memberServerGroups = serverGroupsMap.get( hzId );
if ( memberServerGroups == null ) if ( memberServerGroups == null )
{ {
log.warn( "Missing attribute %s for read replica with hz id %s", SERVER_GROUPS_MULTIMAP, hzId ); log.warn( "Missing attribute %s for read replica with hz id %s", SERVER_GROUPS_MULTIMAP, hzId );
Expand All @@ -281,6 +279,16 @@ private static Pair<MemberId,ReadReplicaInfo> buildReadReplicaFromAttrMap( Strin
return Pair.of( memberId, rrInfo ); return Pair.of( memberId, rrInfo );
} }


private static boolean loggingNonNullMemberAttrPredicate( Pair<String,String> memberAttr, String hzId, Log log )
{
if ( memberAttr.other() == null )
{
log.warn( "Missing attribute %s for read replica with hz id %s", memberAttr.first(), hzId );
return false;
}
return true;
}

static void casLeaders( HazelcastInstance hazelcastInstance, LeaderInfo leaderInfo, String dbName, Log log ) static void casLeaders( HazelcastInstance hazelcastInstance, LeaderInfo leaderInfo, String dbName, Log log )
{ {
IAtomicReference<LeaderInfo> leaderRef = hazelcastInstance.getAtomicReference( DB_NAME_LEADER_TERM_PREFIX + dbName ); IAtomicReference<LeaderInfo> leaderRef = hazelcastInstance.getAtomicReference( DB_NAME_LEADER_TERM_PREFIX + dbName );
Expand Down
Expand Up @@ -40,6 +40,7 @@
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.UUID; import java.util.UUID;
import java.util.function.BiFunction;
import java.util.function.IntFunction; import java.util.function.IntFunction;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import java.util.stream.IntStream; import java.util.stream.IntStream;
Expand All @@ -56,6 +57,11 @@
import org.neo4j.logging.Log; import org.neo4j.logging.Log;
import org.neo4j.logging.NullLog; import org.neo4j.logging.NullLog;


import static java.util.Collections.emptyMap;
import static java.util.Collections.emptySet;
import static java.util.Collections.singleton;
import static java.util.Collections.singletonList;
import static java.util.Collections.singletonMap;
import static org.hamcrest.CoreMatchers.hasItems; import static org.hamcrest.CoreMatchers.hasItems;
import static org.hamcrest.CoreMatchers.not; import static org.hamcrest.CoreMatchers.not;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
Expand All @@ -68,6 +74,10 @@
import static org.mockito.Mockito.when; import static org.mockito.Mockito.when;
import static org.neo4j.causalclustering.discovery.HazelcastClusterTopology.CLUSTER_UUID_DB_NAME_MAP; import static org.neo4j.causalclustering.discovery.HazelcastClusterTopology.CLUSTER_UUID_DB_NAME_MAP;
import static org.neo4j.causalclustering.discovery.HazelcastClusterTopology.DB_NAME_LEADER_TERM_PREFIX; import static org.neo4j.causalclustering.discovery.HazelcastClusterTopology.DB_NAME_LEADER_TERM_PREFIX;
import static org.neo4j.causalclustering.discovery.HazelcastClusterTopology.READ_REPLICAS_DB_NAME_MAP;
import static org.neo4j.causalclustering.discovery.HazelcastClusterTopology.READ_REPLICA_BOLT_ADDRESS_MAP;
import static org.neo4j.causalclustering.discovery.HazelcastClusterTopology.READ_REPLICA_MEMBER_ID_MAP;
import static org.neo4j.causalclustering.discovery.HazelcastClusterTopology.READ_REPLICA_TRANSACTION_SERVER_ADDRESS_MAP;
import static org.neo4j.causalclustering.discovery.HazelcastClusterTopology.buildMemberAttributesForCore; import static org.neo4j.causalclustering.discovery.HazelcastClusterTopology.buildMemberAttributesForCore;
import static org.neo4j.causalclustering.discovery.HazelcastClusterTopology.toCoreMemberMap; import static org.neo4j.causalclustering.discovery.HazelcastClusterTopology.toCoreMemberMap;
import static org.neo4j.helpers.collection.Iterators.asSet; import static org.neo4j.helpers.collection.Iterators.asSet;
Expand Down Expand Up @@ -114,6 +124,47 @@ private static List<Config> generateConfigs( int numConfigs, IntFunction<HashMap
return IntStream.range(0, numConfigs).mapToObj( generator ).map( Config::defaults ).collect( Collectors.toList() ); return IntStream.range(0, numConfigs).mapToObj( generator ).map( Config::defaults ).collect( Collectors.toList() );
} }


@Test
public void shouldCollectReadReplicasAsMap()
{
// given
MemberId memberId = new MemberId( UUID.randomUUID() );
List<ClientConnectorAddresses.ConnectorUri> connectorUris = singletonList(
new ClientConnectorAddresses.ConnectorUri( ClientConnectorAddresses.Scheme.bolt,
new AdvertisedSocketAddress( "losthost", 4444 ) ) );
ClientConnectorAddresses addresses = new ClientConnectorAddresses( connectorUris );
ReadReplicaInfo readReplicaInfo = new ReadReplicaInfo( addresses, new AdvertisedSocketAddress( "localhost", 1353 ), GROUPS, "foo" );
generateReadReplicaAttributes( memberId, readReplicaInfo );

// when
Map<MemberId,ReadReplicaInfo> rrMap = HazelcastClusterTopology.readReplicas( hzInstance, NullLog.getInstance() );

// then
assertEquals( singletonMap( memberId, readReplicaInfo ), rrMap );
}

@Test
public void shouldValidateNullReadReplicaAttrMaps()
{
// given
MemberId memberId = new MemberId( UUID.randomUUID() );
List<ClientConnectorAddresses.ConnectorUri> connectorUris = singletonList(
new ClientConnectorAddresses.ConnectorUri( ClientConnectorAddresses.Scheme.bolt,
new AdvertisedSocketAddress( "losthost", 4444 ) ) );
ClientConnectorAddresses addresses = new ClientConnectorAddresses( connectorUris );
ReadReplicaInfo readReplicaInfo = new ReadReplicaInfo( addresses, new AdvertisedSocketAddress( "localhost", 1353 ), GROUPS, "foo" );
generateReadReplicaAttributes( memberId, readReplicaInfo, emptySet(), asSet( READ_REPLICAS_DB_NAME_MAP ) );

// when
AssertableLogProvider logProvider = new AssertableLogProvider();
Log log = logProvider.getLog( this.getClass() );
Map<MemberId,ReadReplicaInfo> rrMap = HazelcastClusterTopology.readReplicas( hzInstance, log );

// then
assertEquals( emptyMap(), rrMap );
logProvider.assertContainsMessageContaining( "Missing attribute %s for read replica" );
}

@Test @Test
public void shouldCollectMembersAsAMap() throws Exception public void shouldCollectMembersAsAMap() throws Exception
{ {
Expand Down Expand Up @@ -257,4 +308,35 @@ public void shouldCorrectlyReturnCoreMemberRoles()
assertEquals( "First member was expected to be leader.", RoleInfo.LEADER, roleMap.get( chosenLeaderId ) ); assertEquals( "First member was expected to be leader.", RoleInfo.LEADER, roleMap.get( chosenLeaderId ) );
} }


private void generateReadReplicaAttributes( MemberId memberId, ReadReplicaInfo readReplicaInfo )
{
generateReadReplicaAttributes( memberId, readReplicaInfo, emptySet(), emptySet() );
}

private void generateReadReplicaAttributes( MemberId memberId, ReadReplicaInfo readReplicaInfo, Set<String> missingAttrs, Set<String> nullAttrs )
{
Map<String,BiFunction<MemberId,ReadReplicaInfo,String>> attributeFactories = new HashMap<>();
attributeFactories.put( READ_REPLICAS_DB_NAME_MAP, ( ignored, rr ) -> rr.getDatabaseName() );
attributeFactories.put( READ_REPLICA_TRANSACTION_SERVER_ADDRESS_MAP, ( ignored, rr ) -> rr.getCatchupServer().toString() );
attributeFactories.put( READ_REPLICA_MEMBER_ID_MAP, ( mId, ignored ) -> mId.getUuid().toString() );
attributeFactories.put( READ_REPLICA_BOLT_ADDRESS_MAP, ( ignored, rr ) -> rr.connectors().toString() );

UUID hzId = UUID.randomUUID();
attributeFactories.entrySet().stream()
.filter( e -> !missingAttrs.contains( e.getKey() ) )
.forEach( e ->
{
String attrValue = nullAttrs.contains( e.getKey() ) ? null : e.getValue().apply( memberId, readReplicaInfo );
generateReadReplicaAttribute( e.getKey(), hzId, attrValue );
} );
}

private void generateReadReplicaAttribute( String attrKey, UUID hzId, String attrValue )
{
IMap<String,String> attrs = (IMap<String, String>) mock( IMap.class );
when( attrs.keySet() ).thenReturn( singleton( hzId.toString() ) );
when( attrs.get( hzId.toString() ) ).thenReturn( attrValue );
// when( attrs.get( AdditionalMatchers.not( hzId.toString() ) ) ).thenReturn( null );
when( hzInstance.<String,String>getMap( attrKey ) ).thenReturn( attrs );
}
} }

0 comments on commit 29ad563

Please sign in to comment.