/
SocketTimeoutErrorTest.java
97 lines (82 loc) · 4.17 KB
/
SocketTimeoutErrorTest.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
package org.infinispan.client.hotrod;
import static org.infinispan.server.hotrod.test.HotRodTestingUtil.hotRodCacheConfiguration;
import static org.infinispan.test.TestingUtil.k;
import static org.testng.AssertJUnit.assertEquals;
import java.lang.reflect.Method;
import java.net.SocketTimeoutException;
import java.util.concurrent.CompletableFuture;
import org.infinispan.client.hotrod.exceptions.TransportException;
import org.infinispan.client.hotrod.test.HotRodClientTestingUtil;
import org.infinispan.client.hotrod.test.SingleHotRodServerTest;
import org.infinispan.commands.write.PutKeyValueCommand;
import org.infinispan.commons.marshall.ProtoStreamMarshaller;
import org.infinispan.commons.marshall.WrappedByteArray;
import org.infinispan.commons.test.Exceptions;
import org.infinispan.configuration.cache.ConfigurationBuilder;
import org.infinispan.configuration.global.GlobalConfigurationBuilder;
import org.infinispan.context.InvocationContext;
import org.infinispan.interceptors.BaseCustomAsyncInterceptor;
import org.infinispan.interceptors.impl.EntryWrappingInterceptor;
import org.infinispan.manager.EmbeddedCacheManager;
import org.infinispan.server.hotrod.HotRodServer;
import org.infinispan.server.hotrod.configuration.HotRodServerConfigurationBuilder;
import org.infinispan.test.TestingUtil;
import org.infinispan.test.fwk.TestCacheManagerFactory;
import org.testng.annotations.Test;
/**
* Tests the behaviour of the client upon a socket timeout exception and any invocation after that.
*
* @author Galder Zamarreño
* @since 4.2
*/
@Test(groups = "functional", testName = "client.hotrod.SocketTimeoutErrorTest")
public class SocketTimeoutErrorTest extends SingleHotRodServerTest {
@Override
protected EmbeddedCacheManager createCacheManager() throws Exception {
GlobalConfigurationBuilder global = new GlobalConfigurationBuilder().nonClusteredDefault();
TestCacheManagerFactory.addInterceptor(global, TestCacheManagerFactory.DEFAULT_CACHE_NAME::equals, new TimeoutInducingInterceptor(), TestCacheManagerFactory.InterceptorPosition.AFTER, EntryWrappingInterceptor.class);
ConfigurationBuilder builder = new ConfigurationBuilder();
return TestCacheManagerFactory.createCacheManager(global, hotRodCacheConfiguration(builder));
}
@Override
protected HotRodServer createHotRodServer() {
HotRodServerConfigurationBuilder builder = new HotRodServerConfigurationBuilder();
return HotRodClientTestingUtil.startHotRodServer(cacheManager, builder);
}
@Override
protected RemoteCacheManager getRemoteCacheManager() {
org.infinispan.client.hotrod.configuration.ConfigurationBuilder builder =
HotRodClientTestingUtil.newRemoteConfigurationBuilder();
builder.addServer().host("127.0.0.1").port(hotrodServer.getPort());
builder.socketTimeout(2000);
builder.maxRetries(0);
return new RemoteCacheManager(builder.build());
}
public void testErrorWhileDoingPut(Method m) {
RemoteCache<String, Integer> cache = remoteCacheManager.getCache();
cache.put(k(m), 1);
assertEquals(1, cache.get(k(m)).intValue());
Exceptions.expectException(TransportException.class, SocketTimeoutException.class, () -> cache.put("FailFailFail", 2));
cache.put("dos", 2);
assertEquals(2, cache.get("dos").intValue());
TestingUtil.extractInterceptorChain(this.cache)
.findInterceptorWithClass(TimeoutInducingInterceptor.class)
.stopBlocking();
}
public static class TimeoutInducingInterceptor extends BaseCustomAsyncInterceptor {
public final CompletableFuture<Void> delay = new CompletableFuture<>();
@Override
public Object visitPutKeyValueCommand(InvocationContext ctx, PutKeyValueCommand command) throws Throwable {
if (unmarshall(command.getKey()).equals("FailFailFail")) {
return asyncValue(delay);
}
return super.visitPutKeyValueCommand(ctx, command);
}
private String unmarshall(Object key) throws Exception {
return (String) new ProtoStreamMarshaller().objectFromByteBuffer(((WrappedByteArray) key).getBytes());
}
private void stopBlocking() {
delay.complete(null);
}
}
}