forked from hazelcast/hazelcast
/
ClientRaftProxyFactory.java
126 lines (107 loc) · 5.7 KB
/
ClientRaftProxyFactory.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
/*
* Copyright (c) 2008-2019, Hazelcast, Inc. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.hazelcast.client.cp.internal.datastructures.proxy;
import com.hazelcast.client.cp.internal.datastructures.atomiclong.AtomicLongProxy;
import com.hazelcast.client.cp.internal.datastructures.atomicref.AtomicRefProxy;
import com.hazelcast.client.cp.internal.datastructures.countdownlatch.CountDownLatchProxy;
import com.hazelcast.client.cp.internal.datastructures.lock.FencedLockProxy;
import com.hazelcast.client.cp.internal.datastructures.semaphore.SessionAwareSemaphoreProxy;
import com.hazelcast.client.cp.internal.datastructures.semaphore.SessionlessSemaphoreProxy;
import com.hazelcast.client.impl.clientside.HazelcastClientInstanceImpl;
import com.hazelcast.client.impl.protocol.ClientMessage;
import com.hazelcast.client.impl.protocol.codec.CPGroupCreateCPGroupCodec;
import com.hazelcast.client.impl.protocol.codec.SemaphoreGetSemaphoreTypeCodec;
import com.hazelcast.client.impl.spi.ClientContext;
import com.hazelcast.client.impl.spi.impl.ClientInvocation;
import com.hazelcast.core.DistributedObject;
import com.hazelcast.cp.ISemaphore;
import com.hazelcast.cp.internal.RaftGroupId;
import com.hazelcast.cp.internal.datastructures.atomiclong.AtomicLongService;
import com.hazelcast.cp.internal.datastructures.atomicref.AtomicRefService;
import com.hazelcast.cp.internal.datastructures.countdownlatch.CountDownLatchService;
import com.hazelcast.cp.internal.datastructures.lock.LockService;
import com.hazelcast.cp.internal.datastructures.semaphore.SemaphoreService;
import com.hazelcast.cp.lock.FencedLock;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import static com.hazelcast.cp.internal.RaftService.getObjectNameForProxy;
import static com.hazelcast.cp.internal.RaftService.withoutDefaultGroupName;
/**
* Creates client-side proxies of the CP data structures
*/
public class ClientRaftProxyFactory {
private final HazelcastClientInstanceImpl client;
private final ConcurrentMap<String, FencedLockProxy> lockProxies
= new ConcurrentHashMap<String, FencedLockProxy>();
private ClientContext context;
public ClientRaftProxyFactory(HazelcastClientInstanceImpl client) {
this.client = client;
}
public void init(ClientContext context) {
this.context = context;
}
public <T extends DistributedObject> T createProxy(String serviceName, String proxyName) {
proxyName = withoutDefaultGroupName(proxyName);
String objectName = getObjectNameForProxy(proxyName);
RaftGroupId groupId = getGroupId(proxyName, objectName);
if (serviceName.equals(AtomicLongService.SERVICE_NAME)) {
return (T) new AtomicLongProxy(context, groupId, proxyName, objectName);
} else if (serviceName.equals(AtomicRefService.SERVICE_NAME)) {
return (T) new AtomicRefProxy(context, groupId, proxyName, objectName);
} else if (serviceName.equals(CountDownLatchService.SERVICE_NAME)) {
return (T) new CountDownLatchProxy(context, groupId, proxyName, objectName);
} else if (serviceName.equals(LockService.SERVICE_NAME)) {
return (T) createFencedLock(groupId, proxyName, objectName);
} else if (serviceName.equals(SemaphoreService.SERVICE_NAME)) {
return (T) createSemaphore(groupId, proxyName, objectName);
} else {
throw new IllegalArgumentException();
}
}
private FencedLock createFencedLock(RaftGroupId groupId, String proxyName, String objectName) {
while (true) {
FencedLockProxy proxy = lockProxies.get(proxyName);
if (proxy != null) {
if (!proxy.getGroupId().equals(groupId)) {
lockProxies.remove(proxyName, proxy);
} else {
return proxy;
}
}
proxy = new FencedLockProxy(context, groupId, proxyName, objectName);
FencedLockProxy existing = lockProxies.putIfAbsent(proxyName, proxy);
if (existing == null) {
return proxy;
} else if (existing.getGroupId().equals(groupId)) {
return existing;
}
groupId = getGroupId(proxyName, objectName);
}
}
private ISemaphore createSemaphore(RaftGroupId groupId, String proxyName, String objectName) {
ClientMessage request = SemaphoreGetSemaphoreTypeCodec.encodeRequest(proxyName);
ClientMessage response = new ClientInvocation(client, request, objectName).invoke().join();
boolean jdkCompatible = SemaphoreGetSemaphoreTypeCodec.decodeResponse(response).response;
return jdkCompatible
? new SessionlessSemaphoreProxy(context, groupId, proxyName, objectName)
: new SessionAwareSemaphoreProxy(context, groupId, proxyName, objectName);
}
private RaftGroupId getGroupId(String proxyName, String objectName) {
ClientMessage request = CPGroupCreateCPGroupCodec.encodeRequest(proxyName);
ClientMessage response = new ClientInvocation(client, request, objectName).invoke().join();
return CPGroupCreateCPGroupCodec.decodeResponse(response).groupId;
}
}