forked from hazelcast/hazelcast
-
Notifications
You must be signed in to change notification settings - Fork 0
/
FlakeIdGeneratorProxy.java
278 lines (246 loc) · 11.4 KB
/
FlakeIdGeneratorProxy.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
/*
* Copyright (c) 2008-2020, Hazelcast, Inc. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.hazelcast.flakeidgen.impl;
import com.hazelcast.cluster.Member;
import com.hazelcast.config.FlakeIdGeneratorConfig;
import com.hazelcast.core.HazelcastException;
import com.hazelcast.flakeidgen.FlakeIdGenerator;
import com.hazelcast.flakeidgen.impl.AutoBatcher.IdBatchSupplier;
import com.hazelcast.internal.util.Clock;
import com.hazelcast.internal.util.ThreadLocalRandomProvider;
import com.hazelcast.logging.ILogger;
import com.hazelcast.spi.impl.AbstractDistributedObject;
import com.hazelcast.spi.impl.NodeEngine;
import com.hazelcast.spi.impl.operationservice.impl.InvocationFuture;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
import static com.hazelcast.internal.util.ExceptionUtil.rethrow;
import static com.hazelcast.internal.util.Preconditions.checkPositive;
import static com.hazelcast.internal.util.Preconditions.checkTrue;
import static java.lang.Thread.currentThread;
import static java.util.Collections.newSetFromMap;
import static java.util.concurrent.TimeUnit.SECONDS;
public class FlakeIdGeneratorProxy
extends AbstractDistributedObject<FlakeIdGeneratorService>
implements FlakeIdGenerator {
static final long NODE_ID_UPDATE_INTERVAL_NS = SECONDS.toNanos(2);
private static final int NODE_ID_NOT_YET_SET = -1;
private static final int NODE_ID_OUT_OF_RANGE = -2;
private static final int MAX_BIT_LENGTH = 63;
private final String name;
private final UUID source;
private final long epochStart;
private final long nodeIdOffset;
private final int bitsTimestamp;
private final int bitsSequence;
private final int bitsNodeId;
private final long allowedFutureMillis;
private volatile int nodeId = NODE_ID_NOT_YET_SET;
private volatile long nextNodeIdUpdate = Long.MIN_VALUE;
private final long increment;
private final ILogger logger;
/**
* The next timestamp|seq value to be returned. The value is not shifted to most significant bits.
*/
private final AtomicLong generatedValue = new AtomicLong(Long.MIN_VALUE);
private volatile Member randomMember;
private AutoBatcher batcher;
/**
* Set of member UUIDs of which we know have node IDs out of range. These members are never again used
* to generate unique IDs, because this error is unrecoverable.
*/
private final Set<UUID> outOfRangeMembers = newSetFromMap(new ConcurrentHashMap<>());
FlakeIdGeneratorProxy(String name, NodeEngine nodeEngine, FlakeIdGeneratorService service, UUID source) {
super(nodeEngine, service);
this.name = name;
this.logger = nodeEngine.getLogger(getClass());
this.source = source;
FlakeIdGeneratorConfig config = nodeEngine.getConfig().findFlakeIdGeneratorConfig(getName());
bitsSequence = config.getBitsSequence();
bitsNodeId = config.getBitsNodeId();
bitsTimestamp = MAX_BIT_LENGTH - (bitsSequence + bitsNodeId);
checkTrue(bitsTimestamp >= 0, "Configuration error, no bits left for the timestamp");
allowedFutureMillis = config.getAllowedFutureMillis();
increment = 1 << bitsNodeId;
epochStart = config.getEpochStart();
nodeIdOffset = config.getNodeIdOffset();
batcher = new AutoBatcher(config.getPrefetchCount(), config.getPrefetchValidityMillis(),
new IdBatchSupplier() {
@Override
public IdBatch newIdBatch(int batchSize) {
IdBatchAndWaitTime result = FlakeIdGeneratorProxy.this.newIdBatch(batchSize);
if (result.waitTimeMillis > 0) {
try {
Thread.sleep(result.waitTimeMillis);
} catch (InterruptedException e) {
currentThread().interrupt();
throw rethrow(e);
}
}
return result.idBatch;
}
});
if (logger.isFinestEnabled()) {
logger.finest("Created FlakeIdGeneratorProxy, name='" + name + "'");
}
}
@Override
public long newId() {
// The cluster version is checked when ClusterService.getMemberListJoinVersion() is called. This always happens
// before first ID is generated.
return batcher.newId();
}
public IdBatchAndWaitTime newIdBatch(int batchSize) {
int nodeId = getNodeId();
// if we have valid node ID, generate ID locally
if (nodeId >= 0) {
return newIdBaseLocal(Clock.currentTimeMillis(), nodeId, batchSize);
}
// Remote call otherwise. Loop will end when getRandomMember() throws that all members overflowed.
while (true) {
NewIdBatchOperation op = new NewIdBatchOperation(name, batchSize);
op.setCallerUuid(source);
Member target = getRandomMember();
InvocationFuture<Long> future = getNodeEngine().getOperationService()
.invokeOnTarget(getServiceName(), op, target.getAddress());
try {
long base = future.joinInternal();
return new IdBatchAndWaitTime(new IdBatch(base, increment, batchSize), 0);
} catch (NodeIdOutOfRangeException e) {
outOfRangeMembers.add(target.getUuid());
randomMember = null;
}
}
}
IdBatchAndWaitTime newIdBaseLocal(int batchSize) {
return newIdBaseLocal(Clock.currentTimeMillis(), getNodeId(), batchSize);
}
/**
* The layout of the ID is as follows (starting from most significant bits):<ul>
* <li>timestamp bits (41 by default)
* <li>sequence bits (6 by default)
* <li>node ID bits (16 by default)
* </ul>
*
* This order is important: timestamp must be first to keep IDs ordered. Sequence must be second for
* implementation reasons (it's included in {@link #generatedValue}). Node is just an appendix to make
* IDs unique.
*
* @param now Current time (currentTimeMillis() normally or other value in tests)
*/
// package-private for testing
IdBatchAndWaitTime newIdBaseLocal(long now, int nodeId, int batchSize) {
checkPositive(batchSize, "batchSize");
if (nodeId == NODE_ID_OUT_OF_RANGE) {
throw new NodeIdOutOfRangeException("NodeID overflow, this member cannot generate IDs");
}
assert (nodeId & -1 << bitsNodeId) == 0 : "nodeId out of range: " + nodeId;
now -= epochStart;
if (now < -(1L << bitsTimestamp) || now >= (1L << bitsTimestamp)) {
throw new HazelcastException("Current time out of allowed range");
}
now <<= bitsSequence;
long oldGeneratedValue;
long base;
do {
oldGeneratedValue = generatedValue.get();
base = Math.max(now, oldGeneratedValue);
} while (!generatedValue.compareAndSet(oldGeneratedValue, base + batchSize));
long waitTime = Math.max(0, ((base + batchSize - now) >> bitsSequence) - allowedFutureMillis);
base = base << bitsNodeId | nodeId;
getService().updateStatsForBatch(name, batchSize);
return new IdBatchAndWaitTime(new IdBatch(base, increment, batchSize), waitTime);
}
/**
* Three possible return outcomes of this call:<ul>
* <li>returns current node ID of this member that it not out of range (a positive value)
* <li>returns {@link #NODE_ID_OUT_OF_RANGE}
* <li>throws {@link IllegalStateException}, if node ID is not yet available, with description why.
* </ul>
*/
private int getNodeId() {
return getNodeId(System.nanoTime());
}
// package-visible for tests
int getNodeId(long nanoTime) {
// Check if it is a time to check for updated nodeId. We need to recheck, because if duplicate node ID
// is assigned during a network split, this will be resolved after a cluster merge.
// We throttle the calls to avoid contention due to the lock+unlock call in getMemberListJoinVersion().
int nodeId = this.nodeId;
if (nodeId != NODE_ID_OUT_OF_RANGE && nextNodeIdUpdate <= nanoTime) {
int newNodeId = getNodeEngine().getClusterService().getMemberListJoinVersion();
assert newNodeId >= 0 : "newNodeId=" + newNodeId;
newNodeId += nodeIdOffset;
nextNodeIdUpdate = nanoTime + NODE_ID_UPDATE_INTERVAL_NS;
if (newNodeId != nodeId) {
nodeId = newNodeId;
// If our node ID is out of range, assign NODE_ID_OUT_OF_RANGE to nodeId
if ((nodeId & -1 << bitsNodeId) != 0) {
outOfRangeMembers.add(getNodeEngine().getClusterService().getLocalMember().getUuid());
logger.severe("Node ID is out of range (" + nodeId + "), this member won't be able to generate IDs. "
+ "Cluster restart is recommended.");
nodeId = NODE_ID_OUT_OF_RANGE;
}
// we ignore possible double initialization
this.nodeId = nodeId;
if (logger.isFineEnabled()) {
logger.fine("Node ID assigned to '" + name + "': " + nodeId);
}
}
}
return nodeId;
}
private Member getRandomMember() {
Member member = randomMember;
if (member == null) {
// if local member is in outOfRangeMembers, use random member
Set<Member> members = getNodeEngine().getClusterService().getMembers();
List<Member> filteredMembers = new ArrayList<>(members.size());
for (Member m : members) {
if (!outOfRangeMembers.contains(m.getUuid())) {
filteredMembers.add(m);
}
}
if (filteredMembers.isEmpty()) {
throw new HazelcastException("All members have node ID out of range. Cluster restart is required");
}
member = filteredMembers.get(ThreadLocalRandomProvider.get().nextInt(filteredMembers.size()));
randomMember = member;
}
return member;
}
@Override
public String getName() {
return name;
}
@Override
public String getServiceName() {
return FlakeIdGeneratorService.SERVICE_NAME;
}
@SuppressWarnings("checkstyle:visibilitymodifier")
public static class IdBatchAndWaitTime {
public final IdBatch idBatch;
public final long waitTimeMillis;
IdBatchAndWaitTime(IdBatch idBatch, long waitTimeMillis) {
this.idBatch = idBatch;
this.waitTimeMillis = waitTimeMillis;
}
}
}