Skip to content

Commit

Permalink
GEODE-5595: Fix DeltaPropagationDUnitTest flakiness (apache#4653)
Browse files Browse the repository at this point in the history
Improve testability of CacheClientProxy
* Extract inner classes
* Introduce CacheClientProxyFactory with support for property injection
  • Loading branch information
kirklund authored and mhansonp committed Mar 12, 2020
1 parent b9b30c6 commit 608efe7
Show file tree
Hide file tree
Showing 19 changed files with 2,710 additions and 2,069 deletions.

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -435,6 +435,11 @@ public boolean getKeepAlive() {
return false;
}

@Override
public int getPrimaryPort() {
return 0;
}

@Override
public EndpointManager getEndpointManager() {
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -311,6 +311,11 @@ public boolean getKeepAlive() {
return false;
}

@Override
public int getPrimaryPort() {
return 0;
}

@Override
public Object execute(Op op, int retryAttempts) {
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ public interface ClientRegionFactory<K, V> {
* @since GemFire 7.0
* @param concurrencyChecksEnabled whether to perform concurrency checks on operations
*/
void setConcurrencyChecksEnabled(boolean concurrencyChecksEnabled);
ClientRegionFactory<K, V> setConcurrencyChecksEnabled(boolean concurrencyChecksEnabled);

/**
* Sets the DiskStore name attribute. This causes the region to belong to the DiskStore.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -186,8 +186,9 @@ public ClientRegionFactory<K, V> setConcurrencyLevel(int concurrencyLevel) {
}

@Override
public void setConcurrencyChecksEnabled(boolean concurrencyChecksEnabled) {
public ClientRegionFactory<K, V> setConcurrencyChecksEnabled(boolean concurrencyChecksEnabled) {
this.attrsFactory.setConcurrencyChecksEnabled(concurrencyChecksEnabled);
return this;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import java.util.concurrent.ScheduledExecutorService;

import org.apache.geode.CancelCriterion;
import org.apache.geode.annotations.VisibleForTesting;
import org.apache.geode.cache.client.Pool;
import org.apache.geode.internal.cache.PoolStats;

Expand All @@ -26,9 +27,9 @@
* connection source to access the cache and update the list of endpoints on the connection pool.
*
* @since GemFire 5.7
*
*/
public interface InternalPool extends Pool, ExecutablePool {

PoolStats getStats();

Map getEndpointMap();
Expand All @@ -46,4 +47,10 @@ public interface InternalPool extends Pool, ExecutablePool {
String getPoolOrCacheCancelInProgress();

boolean getKeepAlive();

/**
* Test hook that returns the port of the primary server. -1 is returned if we have no primary.
*/
@VisibleForTesting
int getPrimaryPort();
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.apache.geode.CancelException;
import org.apache.geode.StatisticsFactory;
import org.apache.geode.SystemFailure;
import org.apache.geode.annotations.VisibleForTesting;
import org.apache.geode.annotations.internal.MakeNotStatic;
import org.apache.geode.annotations.internal.MutableForTesting;
import org.apache.geode.cache.NoSubscriptionServersAvailableException;
Expand Down Expand Up @@ -1019,10 +1020,8 @@ public String getPrimaryName() {
return result;
}

/**
* Test hook that returns an int which the port of the primary server. -1 is returned if we have
* no primary.
*/
@Override
@VisibleForTesting
public int getPrimaryPort() {
int result = -1;
ServerLocation sl = getPrimary();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@
import static org.apache.geode.internal.cache.PartitionedRegionHelper.PARTITION_LOCK_SERVICE_NAME;
import static org.apache.geode.internal.cache.control.InternalResourceManager.ResourceType.HEAP_MEMORY;
import static org.apache.geode.internal.cache.control.InternalResourceManager.ResourceType.OFFHEAP_MEMORY;
import static org.apache.geode.internal.cache.util.UncheckedUtils.uncheckedRegion;
import static org.apache.geode.internal.cache.util.UncheckedUtils.cast;
import static org.apache.geode.internal.logging.CoreLoggingExecutors.newThreadPoolWithFixedFeed;
import static org.apache.geode.internal.tcp.ConnectionTable.threadWantsSharedResources;
import static org.apache.geode.logging.internal.executors.LoggingExecutors.newFixedThreadPool;
Expand Down Expand Up @@ -3022,7 +3022,7 @@ public <K, V> Region<K, V> createVMRegion(String name, RegionAttributes<K, V> p_
system.handleResourceEvent(ResourceEvent.REGION_CREATE, region);
}

return uncheckedRegion(region);
return cast(region);
}

@Override
Expand Down Expand Up @@ -3146,7 +3146,7 @@ private static void validatePath(String path) {

@Override
public <K, V> Region<K, V> getRegionByPath(String path) {
return uncheckedRegion(getInternalRegionByPath(path));
return cast(getInternalRegionByPath(path));
}

@Override
Expand Down Expand Up @@ -3203,7 +3203,7 @@ public <K, V> Region<K, V> getRegion(String path, boolean returnDestroyedRegion)
stopper.checkCancelInProgress(null);
return null;
}
return uncheckedRegion(result);
return cast(result);
}

String[] pathParts = parsePath(path);
Expand All @@ -3227,7 +3227,7 @@ public <K, V> Region<K, V> getRegion(String path, boolean returnDestroyedRegion)
logger.debug("GemFireCache.getRegion, calling getSubregion on rootRegion({}): {}",
pathParts[0], pathParts[1]);
}
return uncheckedRegion(rootRegion.getSubregion(pathParts[1], returnDestroyedRegion));
return cast(rootRegion.getSubregion(pathParts[1], returnDestroyedRegion));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
*/
package org.apache.geode.internal.cache;

import static org.apache.geode.internal.cache.util.UncheckedUtils.uncheckedRegion;
import static org.apache.geode.internal.cache.util.UncheckedUtils.cast;

import java.io.File;
import java.io.IOException;
Expand Down Expand Up @@ -162,7 +162,7 @@ public <K, V> Region<K, V> getInternalRegion(String path) {
public <K, V> Region<K, V> getRegion(String path, boolean returnDestroyedRegion) {
Region result = delegate.getRegion(path, returnDestroyedRegion);
checkForInternalRegion(result);
return uncheckedRegion(result);
return cast(result);
}

@Override
Expand All @@ -176,7 +176,7 @@ public InternalRegion getReinitializingRegion(String fullPath) {
public <K, V> Region<K, V> getRegionByPath(String path) {
InternalRegion result = delegate.getInternalRegionByPath(path);
checkForInternalRegion(result);
return uncheckedRegion(result);
return cast(result);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.geode.internal.cache;

import org.apache.geode.annotations.VisibleForTesting;
import org.apache.geode.cache.client.Pool;
import org.apache.geode.cache.client.PoolFactory;
import org.apache.geode.internal.cache.PoolFactoryImpl.PoolAttributes;

public interface InternalPoolFactory extends PoolFactory {

/**
* Initializes the state of this factory for the given pool's state.
*/
void init(Pool cp);

/**
* Needed by test framework.
*/
@VisibleForTesting
PoolAttributes getPoolAttributes();
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@

import org.apache.geode.DataSerializable;
import org.apache.geode.DataSerializer;
import org.apache.geode.annotations.VisibleForTesting;
import org.apache.geode.cache.CacheException;
import org.apache.geode.cache.client.Pool;
import org.apache.geode.cache.client.PoolFactory;
Expand All @@ -49,7 +50,7 @@
*
* @since GemFire 5.7
*/
public class PoolFactoryImpl implements PoolFactory {
public class PoolFactoryImpl implements InternalPoolFactory {
private static final Logger logger = LogService.getLogger();

/**
Expand Down Expand Up @@ -310,10 +311,7 @@ public PoolFactory reset() {
return this;
}


/**
* Initializes the state of this factory for the given pool's state.
*/
@Override
public void init(Pool cp) {
setSocketConnectTimeout(cp.getSocketConnectTimeout());
setFreeConnectionTimeout(cp.getFreeConnectionTimeout());
Expand Down Expand Up @@ -380,9 +378,8 @@ private static GemFireCacheImpl getInternalCache() {
return GemFireCacheImpl.getInstance();
}

/**
* Needed by test framework.
*/
@Override
@VisibleForTesting
public PoolAttributes getPoolAttributes() {
return attributes;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ public class CacheClientNotifier {

private final SocketMessageWriter socketMessageWriter = new SocketMessageWriter();
private final ClientRegistrationEventQueueManager clientRegistrationEventQueueManager;
private final CacheClientProxyFactory cacheClientProxyFactory;

/**
* Factory method to construct a CacheClientNotifier {@code CacheClientNotifier} instance.
Expand All @@ -145,8 +146,8 @@ public static synchronized CacheClientNotifier getInstance(InternalCache cache,
boolean isGatewayReceiver) {
if (ccnSingleton == null) {
ccnSingleton = new CacheClientNotifier(cache, clientRegistrationEventQueueManager,
statisticsClock, acceptorStats,
maximumMessageCount, messageTimeToLive, listener, isGatewayReceiver);
statisticsClock, acceptorStats, maximumMessageCount, messageTimeToLive, listener,
isGatewayReceiver, new CacheClientProxyFactory());
}

if (!isGatewayReceiver && ccnSingleton.getHaContainer() == null) {
Expand Down Expand Up @@ -297,9 +298,9 @@ void registerClientInternal(final ClientRegistrationMetadata clientRegistrationM
clientProxyMembershipID.getDurableId());
}
cacheClientProxy =
new CacheClientProxy(this, socket, clientProxyMembershipID, isPrimary, clientConflation,
clientVersion, acceptorId, notifyBySubscription, cache.getSecurityService(),
subject, statisticsClock);
cacheClientProxyFactory.create(this, socket, clientProxyMembershipID, isPrimary,
clientConflation, clientVersion, acceptorId, notifyBySubscription,
cache.getSecurityService(), subject, statisticsClock);
successful = initializeProxy(cacheClientProxy);
} else {
cacheClientProxy.setSubject(subject);
Expand Down Expand Up @@ -1660,7 +1661,7 @@ public CacheClientNotifierStats getStats() {
/**
* Returns this {@code CacheClientNotifier}'s {@code InternalCache}.
*/
protected InternalCache getCache() {
public InternalCache getCache() {
if (cache != null && cache.isClosed()) {
InternalCache cache = GemFireCacheImpl.getInstance();
if (cache != null) {
Expand Down Expand Up @@ -1702,9 +1703,13 @@ protected void handleInterestEvent(InterestRegistrationEvent event) {
private CacheClientNotifier(InternalCache cache,
ClientRegistrationEventQueueManager clientRegistrationEventQueueManager,
StatisticsClock statisticsClock,
CacheServerStats acceptorStats, int maximumMessageCount,
CacheServerStats acceptorStats,
int maximumMessageCount,
int messageTimeToLive,
ConnectionListener listener, boolean isGatewayReceiver) {
ConnectionListener listener,
boolean isGatewayReceiver,
CacheClientProxyFactory cacheClientProxyFactory) {
this.cacheClientProxyFactory = cacheClientProxyFactory;
// Set the Cache
setCache(cache);
this.clientRegistrationEventQueueManager = clientRegistrationEventQueueManager;
Expand Down

0 comments on commit 608efe7

Please sign in to comment.