Skip to content

Commit

Permalink
Test attribute validtion for N RRs
Browse files Browse the repository at this point in the history
  • Loading branch information
hugofirth committed Oct 25, 2018
1 parent 29ad563 commit 036c2d9
Show file tree
Hide file tree
Showing 2 changed files with 81 additions and 32 deletions.
Expand Up @@ -80,11 +80,11 @@ public final class HazelcastClusterTopology
static final String DB_NAME_LEADER_TERM_PREFIX = "leader_term_for_database_name_";

// the attributes used for reconstructing read replica information
private static Set<String> simpleRRAttrMapKeys = Stream.of( READ_REPLICA_BOLT_ADDRESS_MAP, READ_REPLICA_TRANSACTION_SERVER_ADDRESS_MAP,
static final Set<String> RR_ATTR_KEYS = Stream.of( READ_REPLICA_BOLT_ADDRESS_MAP, READ_REPLICA_TRANSACTION_SERVER_ADDRESS_MAP,
READ_REPLICA_MEMBER_ID_MAP, READ_REPLICAS_DB_NAME_MAP ).collect( Collectors.toSet() );

// the attributes used for reconstructing core member information
private static Set<String> coreAttrKeys = Stream.of( MEMBER_UUID, RAFT_SERVER, TRANSACTION_SERVER, CLIENT_CONNECTOR_ADDRESSES,
static final Set<String> CORE_ATTR_KEYS = Stream.of( MEMBER_UUID, RAFT_SERVER, TRANSACTION_SERVER, CLIENT_CONNECTOR_ADDRESSES,
MEMBER_DB_NAME ).collect( Collectors.toSet() );

private HazelcastClusterTopology()
Expand Down Expand Up @@ -201,7 +201,7 @@ static Map<MemberId,ReadReplicaInfo> readReplicas( HazelcastInstance hazelcastIn
if ( !missingAttrKeys.isEmpty() )
{
// We might well not have any read replicas, in which case missing maps is not an error, but we *can't* have some maps and not others
boolean missingAllKeys = missingAttrKeys.containsAll( simpleRRAttrMapKeys ) && missingAttrKeys.contains( SERVER_GROUPS_MULTIMAP );
boolean missingAllKeys = missingAttrKeys.containsAll( RR_ATTR_KEYS ) && missingAttrKeys.contains( SERVER_GROUPS_MULTIMAP );
if ( !missingAllKeys )
{
String missingAttrs = String.join( ", ", missingAttrKeys );
Expand All @@ -228,7 +228,7 @@ private static Pair<Set<String>,Map<String,IMap<String,String>>> validatedSimple
Set<String> missingAttrKeys = new HashSet<>();
Map<String,IMap<String,String>> validatedSimpleAttrMaps = new HashMap<>();

for ( String attrMapKey : simpleRRAttrMapKeys )
for ( String attrMapKey : RR_ATTR_KEYS )
{
IMap<String,String> attrMap = hazelcastInstance.getMap( attrMapKey );
if ( attrMap == null )
Expand All @@ -254,10 +254,10 @@ private static Pair<MemberId,ReadReplicaInfo> buildReadReplicaFromAttrMap( Strin
{
Map<String,String> memberAttrs = simpleAttrMaps.entrySet().stream()
.map( e -> Pair.of( e.getKey(), e.getValue().get( hzId ) ) )
.filter( p -> loggingNonNullMemberAttrPredicate( p, hzId, log ) )
.filter( p -> hasAttribute( p, hzId, log ) )
.collect( CollectorsUtil.pairsToMap() );

if ( !memberAttrs.keySet().containsAll( simpleRRAttrMapKeys ) )
if ( !memberAttrs.keySet().containsAll( RR_ATTR_KEYS ) )
{
return null;
}
Expand All @@ -279,7 +279,7 @@ private static Pair<MemberId,ReadReplicaInfo> buildReadReplicaFromAttrMap( Strin
return Pair.of( memberId, rrInfo );
}

private static boolean loggingNonNullMemberAttrPredicate( Pair<String,String> memberAttr, String hzId, Log log )
private static boolean hasAttribute( Pair<String,String> memberAttr, String hzId, Log log )
{
if ( memberAttr.other() == null )
{
Expand Down Expand Up @@ -347,7 +347,7 @@ static Map<MemberId,CoreServerInfo> toCoreMemberMap( Set<Member> members, Log lo
{
Map<String,String> attrMap = new HashMap<>();
boolean incomplete = false;
for ( String attrKey : coreAttrKeys )
for ( String attrKey : CORE_ATTR_KEYS )
{
String attrValue = member.getStringAttribute( attrKey );
if ( attrValue == null )
Expand Down
Expand Up @@ -40,8 +40,10 @@
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ThreadLocalRandom;
import java.util.function.BiFunction;
import java.util.function.IntFunction;
import java.util.function.IntSupplier;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
Expand All @@ -51,6 +53,8 @@
import org.neo4j.causalclustering.helpers.CausalClusteringTestHelpers;
import org.neo4j.causalclustering.identity.MemberId;
import org.neo4j.helpers.AdvertisedSocketAddress;
import org.neo4j.helpers.collection.CollectorsUtil;
import org.neo4j.helpers.collection.Pair;
import org.neo4j.kernel.configuration.BoltConnector;
import org.neo4j.kernel.configuration.Config;
import org.neo4j.logging.AssertableLogProvider;
Expand Down Expand Up @@ -78,6 +82,7 @@
import static org.neo4j.causalclustering.discovery.HazelcastClusterTopology.READ_REPLICA_BOLT_ADDRESS_MAP;
import static org.neo4j.causalclustering.discovery.HazelcastClusterTopology.READ_REPLICA_MEMBER_ID_MAP;
import static org.neo4j.causalclustering.discovery.HazelcastClusterTopology.READ_REPLICA_TRANSACTION_SERVER_ADDRESS_MAP;
import static org.neo4j.causalclustering.discovery.HazelcastClusterTopology.RR_ATTR_KEYS;
import static org.neo4j.causalclustering.discovery.HazelcastClusterTopology.buildMemberAttributesForCore;
import static org.neo4j.causalclustering.discovery.HazelcastClusterTopology.toCoreMemberMap;
import static org.neo4j.helpers.collection.Iterators.asSet;
Expand All @@ -104,6 +109,7 @@ public class HazelcastClusterTopologyTest
};

private final HazelcastInstance hzInstance = mock( HazelcastInstance.class );
private Map<String,IMap<String,String>> rrAttributeMaps;

@Before
public void setup()
Expand All @@ -112,6 +118,7 @@ public void setup()
MultiMap<String,String> serverGroupsMMap = mock( MultiMap.class );
when( serverGroupsMMap.get( any() ) ).thenReturn( GROUPS );
when( hzInstance.getMultiMap( anyString() ) ).thenReturn( (MultiMap) serverGroupsMMap );
rrAttributeMaps = RR_ATTR_KEYS.stream().map( k -> Pair.of( k, (IMap<String,String>) mock( IMap.class ) ) ).collect( CollectorsUtil.pairsToMap() );
}

private static List<Config> generateConfigs( int numConfigs )
Expand All @@ -129,31 +136,24 @@ public void shouldCollectReadReplicasAsMap()
{
// given
MemberId memberId = new MemberId( UUID.randomUUID() );
List<ClientConnectorAddresses.ConnectorUri> connectorUris = singletonList(
new ClientConnectorAddresses.ConnectorUri( ClientConnectorAddresses.Scheme.bolt,
new AdvertisedSocketAddress( "losthost", 4444 ) ) );
ClientConnectorAddresses addresses = new ClientConnectorAddresses( connectorUris );
ReadReplicaInfo readReplicaInfo = new ReadReplicaInfo( addresses, new AdvertisedSocketAddress( "localhost", 1353 ), GROUPS, "foo" );
generateReadReplicaAttributes( memberId, readReplicaInfo );
ReadReplicaInfo readReplicaInfo = generateReadReplicaInfo();
Map<MemberId,ReadReplicaInfo> mockedRRs = singletonMap( memberId, readReplicaInfo );
mockReadReplicaAttributes( mockedRRs );

// when
Map<MemberId,ReadReplicaInfo> rrMap = HazelcastClusterTopology.readReplicas( hzInstance, NullLog.getInstance() );

// then
assertEquals( singletonMap( memberId, readReplicaInfo ), rrMap );
assertEquals( mockedRRs, rrMap );
}

@Test
public void shouldValidateNullReadReplicaAttrMaps()
{
// given
MemberId memberId = new MemberId( UUID.randomUUID() );
List<ClientConnectorAddresses.ConnectorUri> connectorUris = singletonList(
new ClientConnectorAddresses.ConnectorUri( ClientConnectorAddresses.Scheme.bolt,
new AdvertisedSocketAddress( "losthost", 4444 ) ) );
ClientConnectorAddresses addresses = new ClientConnectorAddresses( connectorUris );
ReadReplicaInfo readReplicaInfo = new ReadReplicaInfo( addresses, new AdvertisedSocketAddress( "localhost", 1353 ), GROUPS, "foo" );
generateReadReplicaAttributes( memberId, readReplicaInfo, emptySet(), asSet( READ_REPLICAS_DB_NAME_MAP ) );
ReadReplicaInfo readReplicaInfo = generateReadReplicaInfo();
mockReadReplicaAttributes( singletonMap( memberId, readReplicaInfo ), singleton( READ_REPLICAS_DB_NAME_MAP ), emptyMap() );

// when
AssertableLogProvider logProvider = new AssertableLogProvider();
Expand All @@ -162,6 +162,34 @@ public void shouldValidateNullReadReplicaAttrMaps()

// then
assertEquals( emptyMap(), rrMap );
logProvider.assertContainsMessageContaining( "Some, but not all, of the read replica attribute maps are null" );
}

@Test
public void shouldValidateReadReplicaAttrMapNullValues()
{
// given
Map<MemberId,ReadReplicaInfo> mockedRRs = new HashMap<>();

MemberId validMemberId = new MemberId( UUID.randomUUID() );
MemberId invalidMemberId = new MemberId( UUID.randomUUID() );
ReadReplicaInfo validReadReplicaInfo = generateReadReplicaInfo();
ReadReplicaInfo invalidReadReplicaInfo = generateReadReplicaInfo();

mockedRRs.put( validMemberId, validReadReplicaInfo );
mockedRRs.put( invalidMemberId, invalidReadReplicaInfo );

Map<MemberId,Set<String>> nullAttrValues = singletonMap( invalidMemberId, singleton( READ_REPLICA_TRANSACTION_SERVER_ADDRESS_MAP ) );

mockReadReplicaAttributes( mockedRRs, emptySet(), nullAttrValues );

// when
AssertableLogProvider logProvider = new AssertableLogProvider();
Log log = logProvider.getLog( this.getClass() );
Map<MemberId,ReadReplicaInfo> rrMap = HazelcastClusterTopology.readReplicas( hzInstance, log );

// then
assertEquals( singletonMap( validMemberId, validReadReplicaInfo ), rrMap );
logProvider.assertContainsMessageContaining( "Missing attribute %s for read replica" );
}

Expand Down Expand Up @@ -308,35 +336,56 @@ public void shouldCorrectlyReturnCoreMemberRoles()
assertEquals( "First member was expected to be leader.", RoleInfo.LEADER, roleMap.get( chosenLeaderId ) );
}

private void generateReadReplicaAttributes( MemberId memberId, ReadReplicaInfo readReplicaInfo )
private void mockReadReplicaAttributes( Map<MemberId,ReadReplicaInfo> readReplicaInfos )
{
mockReadReplicaAttributes( readReplicaInfos, emptySet(), emptyMap() );
}

private void mockReadReplicaAttributes( Map<MemberId,ReadReplicaInfo> readReplicaInfos, Set<String> missingAttrsMaps, Map<MemberId,Set<String>> nullAttrs )
{
generateReadReplicaAttributes( memberId, readReplicaInfo, emptySet(), emptySet() );
Set<String> hzIds = new HashSet<>();
readReplicaInfos.forEach( ( memberId, readReplicaInfo ) ->
{
UUID hzId = UUID.randomUUID();
hzIds.add( hzId.toString() );
generateReadReplicaAttributes( hzId, memberId, readReplicaInfo, missingAttrsMaps, nullAttrs.getOrDefault( memberId, emptySet() ) );
} );
rrAttributeMaps.forEach( ( ignored, attrs ) -> when( attrs.keySet() ).thenReturn( hzIds ) );
}

private void generateReadReplicaAttributes( MemberId memberId, ReadReplicaInfo readReplicaInfo, Set<String> missingAttrs, Set<String> nullAttrs )
private void generateReadReplicaAttributes( UUID hzId, MemberId memberId, ReadReplicaInfo readReplicaInfo,
Set<String> missingAttrsMaps, Set<String> nullAttrs )
{
Map<String,BiFunction<MemberId,ReadReplicaInfo,String>> attributeFactories = new HashMap<>();
attributeFactories.put( READ_REPLICAS_DB_NAME_MAP, ( ignored, rr ) -> rr.getDatabaseName() );
attributeFactories.put( READ_REPLICA_TRANSACTION_SERVER_ADDRESS_MAP, ( ignored, rr ) -> rr.getCatchupServer().toString() );
attributeFactories.put( READ_REPLICA_MEMBER_ID_MAP, ( mId, ignored ) -> mId.getUuid().toString() );
attributeFactories.put( READ_REPLICA_BOLT_ADDRESS_MAP, ( ignored, rr ) -> rr.connectors().toString() );

UUID hzId = UUID.randomUUID();
attributeFactories.entrySet().stream()
.filter( e -> !missingAttrs.contains( e.getKey() ) )
.filter( e -> !missingAttrsMaps.contains( e.getKey() ) )
.forEach( e ->
{
String attrValue = nullAttrs.contains( e.getKey() ) ? null : e.getValue().apply( memberId, readReplicaInfo );
generateReadReplicaAttribute( e.getKey(), hzId, attrValue );
} );
mockReadReplicaAttribute( e.getKey(), hzId, attrValue );
});
}

private void generateReadReplicaAttribute( String attrKey, UUID hzId, String attrValue )
private void mockReadReplicaAttribute( String attrKey, UUID hzId, String attrValue )
{
IMap<String,String> attrs = (IMap<String, String>) mock( IMap.class );
when( attrs.keySet() ).thenReturn( singleton( hzId.toString() ) );
IMap<String,String> attrs = rrAttributeMaps.get( attrKey );
when( attrs.get( hzId.toString() ) ).thenReturn( attrValue );
// when( attrs.get( AdditionalMatchers.not( hzId.toString() ) ) ).thenReturn( null );
when( hzInstance.<String,String>getMap( attrKey ) ).thenReturn( attrs );
}

private ReadReplicaInfo generateReadReplicaInfo()
{
IntSupplier portFactory = () -> ThreadLocalRandom.current().nextInt( 1000, 10000 );

List<ClientConnectorAddresses.ConnectorUri> connectorUris = singletonList(
new ClientConnectorAddresses.ConnectorUri( ClientConnectorAddresses.Scheme.bolt,
new AdvertisedSocketAddress( "losthost", portFactory.getAsInt() ) ) );
ClientConnectorAddresses addresses = new ClientConnectorAddresses( connectorUris );
return new ReadReplicaInfo( addresses, new AdvertisedSocketAddress( "localhost", portFactory.getAsInt() ), GROUPS, "foo" );
}
}

0 comments on commit 036c2d9

Please sign in to comment.