/
TxDistributionInterceptor.java
402 lines (353 loc) · 18.7 KB
/
TxDistributionInterceptor.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
/*
* JBoss, Home of Professional Open Source
* Copyright 2012 Red Hat Inc. and/or its affiliates and other contributors
* as indicated by the @author tags. All rights reserved.
* See the copyright.txt in the distribution for a
* full listing of individual contributors.
*
* This copyrighted material is made available to anyone wishing to use,
* modify, copy, or redistribute it subject to the terms and conditions
* of the GNU Lesser General Public License, v. 2.1.
* This program is distributed in the hope that it will be useful, but WITHOUT A
* WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
* PARTICULAR PURPOSE. See the GNU Lesser General Public License for more details.
* You should have received a copy of the GNU Lesser General Public License,
* v.2.1 along with this distribution; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
* MA 02110-1301, USA.
*/
package org.infinispan.interceptors.distribution;
import org.infinispan.commands.FlagAffectedCommand;
import org.infinispan.commands.control.LockControlCommand;
import org.infinispan.commands.read.GetCacheEntryCommand;
import org.infinispan.commands.read.GetKeyValueCommand;
import org.infinispan.commands.tx.CommitCommand;
import org.infinispan.commands.tx.PrepareCommand;
import org.infinispan.commands.tx.RollbackCommand;
import org.infinispan.commands.write.ClearCommand;
import org.infinispan.commands.write.PutKeyValueCommand;
import org.infinispan.commands.write.RemoveCommand;
import org.infinispan.commands.write.ReplaceCommand;
import org.infinispan.commands.write.WriteCommand;
import org.infinispan.container.entries.InternalCacheEntry;
import org.infinispan.context.Flag;
import org.infinispan.context.InvocationContext;
import org.infinispan.context.impl.LocalTxInvocationContext;
import org.infinispan.context.impl.TxInvocationContext;
import org.infinispan.distribution.DataLocality;
import org.infinispan.distribution.L1Manager;
import org.infinispan.factories.annotations.Inject;
import org.infinispan.factories.annotations.Start;
import org.infinispan.remoting.transport.Address;
import org.infinispan.remoting.transport.jgroups.SuspectException;
import org.infinispan.transaction.LocalTransaction;
import org.infinispan.transaction.LockingMode;
import org.infinispan.util.InfinispanCollections;
import org.infinispan.util.logging.Log;
import org.infinispan.util.logging.LogFactory;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.Future;
import java.util.concurrent.TimeoutException;
/**
* Handles the distribution of the transactional caches.
*
* @author Mircea Markus
* @since 5.2
*/
public class TxDistributionInterceptor extends BaseDistributionInterceptor {
private static Log log = LogFactory.getLog(TxDistributionInterceptor.class);
private static final boolean trace = log.isTraceEnabled();
private boolean isPessimisticCache;
private boolean useClusteredWriteSkewCheck;
private L1Manager l1Manager;
private boolean isL1CacheEnabled;
private static final RecipientGenerator CLEAR_COMMAND_GENERATOR = new RecipientGenerator() {
@Override
public List<Address> generateRecipients() {
return null;
}
@Override
public Collection<Object> getKeys() {
return InfinispanCollections.emptySet();
}
};
@Inject
public void init (L1Manager l1Manager) {
this.l1Manager = l1Manager;
}
@Override
public Object visitReplaceCommand(InvocationContext ctx, ReplaceCommand command) throws Throwable {
try {
return super.visitReplaceCommand(ctx, command);
} finally {
boolean ignorePreviousValues = ignorePreviousValueOnBackup(command, ctx);
command.setIgnorePreviousValue(ignorePreviousValues);
}
}
@Override
public Object visitRemoveCommand(InvocationContext ctx, RemoveCommand command) throws Throwable {
try {
return super.visitRemoveCommand(ctx, command);
} finally {
boolean ignorePreviousValues = ignorePreviousValueOnBackup(command, ctx);
command.setIgnorePreviousValue(ignorePreviousValues);
}
}
/**
* For conditional operations (replace, remove, put if absent) Used only for optimistic transactional caches, to solve the following situation:
* <pre>
* - node A (owner, tx originator) does a successful replace
* - the actual value changes
* - tx commits. The value is applied on A (the check was performed at operation time) but is not applied on
* B (check is performed at commit time).
* In such situations (optimistic caches) the remote conditional command should not re-check the old value.
* </pre>
*/
protected boolean ignorePreviousValueOnBackup(WriteCommand command, InvocationContext ctx) {
return super.ignorePreviousValueOnBackup(command, ctx)
&& cacheConfiguration.transaction().lockingMode() == LockingMode.OPTIMISTIC && !useClusteredWriteSkewCheck;
}
@Start
public void start() {
isPessimisticCache = cacheConfiguration.transaction().lockingMode() == LockingMode.PESSIMISTIC;
isL1CacheEnabled = cacheConfiguration.clustering().l1().enabled();
useClusteredWriteSkewCheck = !isPessimisticCache &&
cacheConfiguration.versioning().enabled() && cacheConfiguration.locking().writeSkewCheck();
}
@Override
public Object visitPutKeyValueCommand(InvocationContext ctx, PutKeyValueCommand command) throws Throwable {
SingleKeyRecipientGenerator skrg = new SingleKeyRecipientGenerator(command.getKey());
Object returnValue = handleWriteCommand(ctx, command, skrg, command.hasFlag(Flag.PUT_FOR_STATE_TRANSFER), false);
if (ignorePreviousValueOnBackup(command, ctx)) {
command.setPutIfAbsent(false);
}
// If this was a remote put record that which sent it
if (isL1CacheEnabled && !ctx.isOriginLocal() && !skrg.generateRecipients().contains(ctx.getOrigin()))
l1Manager.addRequestor(command.getKey(), ctx.getOrigin());
return returnValue;
}
@Override
public Object visitClearCommand(InvocationContext ctx, ClearCommand command) throws Throwable {
return handleWriteCommand(ctx, command, CLEAR_COMMAND_GENERATOR, false, true);
}
@Override
public Object visitGetKeyValueCommand(InvocationContext ctx, GetKeyValueCommand command) throws Throwable {
try {
Object returnValue = invokeNextInterceptor(ctx, command);
// If L1 caching is enabled, this is a remote command, and we found a value in our cache
// we store it so that we can later invalidate it
if (returnValue != null && isL1CacheEnabled && !ctx.isOriginLocal()) l1Manager.addRequestor(command.getKey(), ctx.getOrigin());
// need to check in the context as well since a null retval is not necessarily an indication of the entry not being
// available. It could just have been removed in the same tx beforehand. Also don't bother with a remote get if
// the entry is mapped to the local node.
if (returnValue == null) {
Object key = command.getKey();
if (needsRemoteGet(ctx, command)) {
returnValue = remoteGetAndStoreInL1(ctx, key, false, command);
}
if (returnValue == null) {
returnValue = localGet(ctx, key, false, command);
}
}
return returnValue;
} catch (SuspectException e) {
// retry
return visitGetKeyValueCommand(ctx, command);
}
}
private void lockAndWrap(InvocationContext ctx, Object key, InternalCacheEntry ice, FlagAffectedCommand command) throws InterruptedException {
boolean skipLocking = hasSkipLocking(command);
long lockTimeout = getLockAcquisitionTimeout(command, skipLocking);
lockManager.acquireLock(ctx, key, lockTimeout, skipLocking);
entryFactory.wrapEntryForPut(ctx, key, ice, false, command);
}
@Override
public Object visitLockControlCommand(TxInvocationContext ctx, LockControlCommand command) throws Throwable {
if (ctx.isOriginLocal()) {
final Collection<Address> affectedNodes = dm.getAffectedNodes(command.getKeys());
((LocalTxInvocationContext) ctx).remoteLocksAcquired(affectedNodes);
log.tracef("Registered remote locks acquired %s", affectedNodes);
rpcManager.invokeRemotely(affectedNodes, command, true, true);
}
return invokeNextInterceptor(ctx, command);
}
// ---- TX boundary commands
@Override
public Object visitCommitCommand(TxInvocationContext ctx, CommitCommand command) throws Throwable {
if (shouldInvokeRemoteTxCommand(ctx)) {
Future<?> f = flushL1Caches(ctx);
sendCommitCommand(ctx, command);
blockOnL1FutureIfNeeded(f);
} else if (isL1CacheEnabled && !ctx.isOriginLocal() && !ctx.getLockedKeys().isEmpty()) {
// We fall into this block if we are a remote node, happen to be the primary data owner and have locked keys.
// it is still our responsibility to invalidate L1 caches in the cluster.
blockOnL1FutureIfNeeded(flushL1Caches(ctx));
}
return invokeNextInterceptor(ctx, command);
}
private void blockOnL1FutureIfNeeded(Future<?> f) {
if (f != null && cacheConfiguration.transaction().syncCommitPhase()) {
try {
f.get();
} catch (Exception e) {
// Ignore SuspectExceptions - if the node has gone away then there is nothing to invalidate anyway.
if (!(e.getCause() instanceof SuspectException)) {
getLog().failedInvalidatingRemoteCache(e);
}
}
}
}
@Override
public Object visitPrepareCommand(TxInvocationContext ctx, PrepareCommand command) throws Throwable {
Object retVal = invokeNextInterceptor(ctx, command);
if (shouldInvokeRemoteTxCommand(ctx)) {
if (command.isOnePhaseCommit()) flushL1Caches(ctx); // if we are one-phase, don't block on this future.
boolean affectsAllNodes = ctx.getCacheTransaction().hasModification(ClearCommand.class);
Collection<Address> recipients = affectsAllNodes ? dm.getWriteConsistentHash().getMembers() : dm.getAffectedNodes(ctx.getAffectedKeys());
prepareOnAffectedNodes(ctx, command, recipients, defaultSynchronous);
((LocalTxInvocationContext) ctx).remoteLocksAcquired(recipients);
} else if (isL1CacheEnabled && command.isOnePhaseCommit() && !ctx.isOriginLocal() && !ctx.getLockedKeys().isEmpty()) {
// We fall into this block if we are a remote node, happen to be the primary data owner and have locked keys.
// it is still our responsibility to invalidate L1 caches in the cluster.
flushL1Caches(ctx);
}
return retVal;
}
protected void prepareOnAffectedNodes(TxInvocationContext ctx, PrepareCommand command, Collection<Address> recipients, boolean sync) {
// this method will return immediately if we're the only member (because exclude_self=true)
rpcManager.invokeRemotely(recipients, command, sync);
}
@Override
public Object visitRollbackCommand(TxInvocationContext ctx, RollbackCommand command) throws Throwable {
if (shouldInvokeRemoteTxCommand(ctx)) {
rpcManager.invokeRemotely(getCommitNodes(ctx), command, cacheConfiguration.transaction().syncRollbackPhase(), true);
}
return invokeNextInterceptor(ctx, command);
}
private Collection<Address> getCommitNodes(TxInvocationContext ctx) {
LocalTransaction localTx = (LocalTransaction) ctx.getCacheTransaction();
Collection<Address> affectedNodes = dm.getAffectedNodes(ctx.getAffectedKeys());
List<Address> members = dm.getConsistentHash().getMembers();
return localTx.getCommitNodes(affectedNodes, rpcManager.getTopologyId(), members);
}
protected void sendCommitCommand(TxInvocationContext ctx, CommitCommand command) throws TimeoutException, InterruptedException {
Collection<Address> recipients = getCommitNodes(ctx);
boolean syncCommitPhase = cacheConfiguration.transaction().syncCommitPhase();
rpcManager.invokeRemotely(recipients, command, syncCommitPhase, true);
}
private boolean shouldFetchRemoteValuesForWriteSkewCheck(InvocationContext ctx, WriteCommand cmd) {
if (useClusteredWriteSkewCheck && ctx.isInTxScope() && dm.isRehashInProgress()) {
for (Object key : cmd.getAffectedKeys()) {
if (dm.isAffectedByRehash(key) && !dataContainer.containsKey(key)) return true;
}
}
return false;
}
/**
* If we are within one transaction we won't do any replication as replication would only be performed at commit
* time. If the operation didn't originate locally we won't do any replication either.
*/
protected Object handleWriteCommand(InvocationContext ctx, WriteCommand command, RecipientGenerator recipientGenerator, boolean skipRemoteGet, boolean skipL1Invalidation) throws Throwable {
// see if we need to load values from remote sources first
if (ctx.isOriginLocal() && !skipRemoteGet || command.isConditional() || shouldFetchRemoteValuesForWriteSkewCheck(ctx, command))
remoteGetBeforeWrite(ctx, command, recipientGenerator);
// FIRST pass this call up the chain. Only if it succeeds (no exceptions) locally do we attempt to distribute.
return invokeNextInterceptor(ctx, command);
}
private Object localGet(InvocationContext ctx, Object key, boolean isWrite, FlagAffectedCommand command) throws Throwable {
InternalCacheEntry ice = dataContainer.get(key);
if (ice != null) {
if (isWrite && isPessimisticCache && ctx.isInTxScope()) {
((TxInvocationContext) ctx).addAffectedKey(key);
}
if (!ctx.replaceValue(key, ice)) {
if (isWrite)
lockAndWrap(ctx, key, ice, command);
else
ctx.putLookedUpEntry(key, ice);
}
return command instanceof GetCacheEntryCommand ? ice : ice.getValue();
}
return null;
}
private void remoteGetBeforeWrite(InvocationContext ctx, WriteCommand command, KeyGenerator keygen) throws Throwable {
// this should only happen if:
// a) unsafeUnreliableReturnValues is false
// b) unsafeUnreliableReturnValues is true, we are in a TX and the command is conditional
if (isNeedReliableReturnValues(command) || command.isConditional() || shouldFetchRemoteValuesForWriteSkewCheck(ctx, command)) {
for (Object k : keygen.getKeys()) {
Object returnValue = remoteGetAndStoreInL1(ctx, k, true, command);
if (returnValue == null) {
localGet(ctx, k, true, command);
}
}
}
}
private boolean isNotInL1(Object key) {
return !isL1CacheEnabled || !dataContainer.containsKey(key);
}
private Object remoteGetAndStoreInL1(InvocationContext ctx, Object key, boolean isWrite, FlagAffectedCommand command) throws Throwable {
// todo [anistor] fix locality checks in StateTransferManager (ISPN-2401) and use them here
DataLocality locality = dm.getReadConsistentHash().isKeyLocalToNode(rpcManager.getAddress(), key) ? DataLocality.LOCAL : DataLocality.NOT_LOCAL;
if (ctx.isOriginLocal() && !locality.isLocal() && isNotInL1(key) || dm.isAffectedByRehash(key) && !dataContainer.containsKey(key)) {
if (trace) log.tracef("Doing a remote get for key %s", key);
boolean acquireRemoteLock = false;
if (ctx.isInTxScope()) {
TxInvocationContext txContext = (TxInvocationContext) ctx;
acquireRemoteLock = isWrite && isPessimisticCache && !txContext.getAffectedKeys().contains(key);
}
// attempt a remote lookup
InternalCacheEntry ice = retrieveFromRemoteSource(key, ctx, acquireRemoteLock, command);
if (acquireRemoteLock) {
((TxInvocationContext) ctx).addAffectedKey(key);
}
if (ice != null) {
if (useClusteredWriteSkewCheck && ctx.isInTxScope()) {
((TxInvocationContext)ctx).getCacheTransaction().putLookedUpRemoteVersion(key, ice.getVersion());
}
if (isL1CacheEnabled) {
// We've requested the key only from the owners current (read) CH.
// If the intersection of owners in the current and pending CHs is empty,
// the requestor information might be lost, so we shouldn't store the entry in L1.
// TODO We don't have access to the pending CH here, so we just check if the owners list changed.
List<Address> readOwners = dm.getReadConsistentHash().locateOwners(key);
List<Address> writeOwners = dm.getWriteConsistentHash().locateOwners(key);
if (!readOwners.equals(writeOwners)) {
// todo [anistor] this check is not optimal and can yield false positives. here we should use StateTransferManager.isStateTransferInProgressForKey(key) after ISPN-2401 is fixed
if (trace) log.tracef("State transfer in progress for key %s, not storing to L1");
return ice.getValue();
}
if (trace) log.tracef("Caching remotely retrieved entry for key %s in L1", key);
// This should be fail-safe
try {
long l1Lifespan = cacheConfiguration.clustering().l1().lifespan();
long lifespan = ice.getLifespan() < 0 ? l1Lifespan : Math.min(ice.getLifespan(), l1Lifespan);
PutKeyValueCommand put = cf.buildPutKeyValueCommand(ice.getKey(), ice.getValue(), lifespan, -1, command.getFlags());
lockAndWrap(ctx, key, ice, command);
invokeNextInterceptor(ctx, put);
} catch (Exception e) {
// Couldn't store in L1 for some reason. But don't fail the transaction!
log.infof("Unable to store entry %s in L1 cache", key);
log.debug("Inability to store in L1 caused by", e);
}
} else {
if (!ctx.replaceValue(key, ice)) {
if (isWrite)
lockAndWrap(ctx, key, ice, command);
else
ctx.putLookedUpEntry(key, ice);
}
}
return ice.getValue();
}
} else {
if (trace) log.tracef("Not doing a remote get for key %s since entry is mapped to current node (%s), or is in L1. Owners are %s", key, rpcManager.getAddress(), dm.locate(key));
}
return null;
}
protected Future<?> flushL1Caches(InvocationContext ctx) {
// TODO how do we tell the L1 manager which keys are removed and which keys may still exist in remote L1?
return isL1CacheEnabled ? l1Manager.flushCacheWithSimpleFuture(ctx.getLockedKeys(), null, ctx.getOrigin(), true) : null;
}
}