-
Notifications
You must be signed in to change notification settings - Fork 615
/
RemoteCommandsFactory.java
388 lines (383 loc) · 17.9 KB
/
RemoteCommandsFactory.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
package org.infinispan.commands;
import java.util.Map;
import org.infinispan.commands.control.LockControlCommand;
import org.infinispan.commands.functional.ReadOnlyKeyCommand;
import org.infinispan.commands.functional.ReadOnlyManyCommand;
import org.infinispan.commands.functional.ReadWriteKeyCommand;
import org.infinispan.commands.functional.ReadWriteKeyValueCommand;
import org.infinispan.commands.functional.ReadWriteManyCommand;
import org.infinispan.commands.functional.ReadWriteManyEntriesCommand;
import org.infinispan.commands.functional.TxReadOnlyKeyCommand;
import org.infinispan.commands.functional.TxReadOnlyManyCommand;
import org.infinispan.commands.functional.WriteOnlyKeyCommand;
import org.infinispan.commands.functional.WriteOnlyKeyValueCommand;
import org.infinispan.commands.functional.WriteOnlyManyCommand;
import org.infinispan.commands.functional.WriteOnlyManyEntriesCommand;
import org.infinispan.commands.module.ModuleCommandFactory;
import org.infinispan.commands.read.GetCacheEntryCommand;
import org.infinispan.commands.read.GetKeyValueCommand;
import org.infinispan.commands.remote.CacheRpcCommand;
import org.infinispan.commands.remote.ClusteredGetAllCommand;
import org.infinispan.commands.remote.ClusteredGetCommand;
import org.infinispan.commands.remote.GetKeysInGroupCommand;
import org.infinispan.commands.remote.RenewBiasCommand;
import org.infinispan.commands.remote.RevokeBiasCommand;
import org.infinispan.commands.remote.SingleRpcCommand;
import org.infinispan.commands.remote.expiration.RetrieveLastAccessCommand;
import org.infinispan.commands.remote.expiration.UpdateLastAccessCommand;
import org.infinispan.commands.remote.recovery.CompleteTransactionCommand;
import org.infinispan.commands.remote.recovery.GetInDoubtTransactionsCommand;
import org.infinispan.commands.remote.recovery.GetInDoubtTxInfoCommand;
import org.infinispan.commands.remote.recovery.TxCompletionNotificationCommand;
import org.infinispan.commands.triangle.MultiEntriesFunctionalBackupWriteCommand;
import org.infinispan.commands.triangle.MultiKeyFunctionalBackupWriteCommand;
import org.infinispan.commands.triangle.PutMapBackupWriteCommand;
import org.infinispan.commands.triangle.SingleKeyBackupWriteCommand;
import org.infinispan.commands.triangle.SingleKeyFunctionalBackupWriteCommand;
import org.infinispan.commands.tx.CommitCommand;
import org.infinispan.commands.tx.PrepareCommand;
import org.infinispan.commands.tx.RollbackCommand;
import org.infinispan.commands.tx.VersionedCommitCommand;
import org.infinispan.commands.tx.VersionedPrepareCommand;
import org.infinispan.commands.tx.totalorder.TotalOrderCommitCommand;
import org.infinispan.commands.tx.totalorder.TotalOrderNonVersionedPrepareCommand;
import org.infinispan.commands.tx.totalorder.TotalOrderRollbackCommand;
import org.infinispan.commands.tx.totalorder.TotalOrderVersionedCommitCommand;
import org.infinispan.commands.tx.totalorder.TotalOrderVersionedPrepareCommand;
import org.infinispan.commands.write.ClearCommand;
import org.infinispan.commands.write.ComputeCommand;
import org.infinispan.commands.write.ComputeIfAbsentCommand;
import org.infinispan.commands.write.InvalidateCommand;
import org.infinispan.commands.write.InvalidateL1Command;
import org.infinispan.commands.write.InvalidateVersionsCommand;
import org.infinispan.commands.write.PutKeyValueCommand;
import org.infinispan.commands.write.PutMapCommand;
import org.infinispan.commands.write.RemoveCommand;
import org.infinispan.commands.write.RemoveExpiredCommand;
import org.infinispan.commands.write.ReplaceCommand;
import org.infinispan.commons.CacheException;
import org.infinispan.factories.GlobalComponentRegistry;
import org.infinispan.factories.KnownComponentNames;
import org.infinispan.factories.annotations.ComponentName;
import org.infinispan.factories.annotations.Inject;
import org.infinispan.factories.scopes.Scope;
import org.infinispan.factories.scopes.Scopes;
import org.infinispan.manager.EmbeddedCacheManager;
import org.infinispan.manager.impl.ReplicableManagerFunctionCommand;
import org.infinispan.manager.impl.ReplicableRunnableCommand;
import org.infinispan.notifications.cachelistener.cluster.MultiClusterEventCommand;
import org.infinispan.reactive.publisher.impl.commands.batch.CancelPublisherCommand;
import org.infinispan.reactive.publisher.impl.commands.batch.InitialPublisherCommand;
import org.infinispan.reactive.publisher.impl.commands.batch.NextPublisherCommand;
import org.infinispan.reactive.publisher.impl.commands.reduction.ReductionPublisherRequestCommand;
import org.infinispan.statetransfer.StateRequestCommand;
import org.infinispan.statetransfer.StateResponseCommand;
import org.infinispan.stream.impl.StreamIteratorCloseCommand;
import org.infinispan.stream.impl.StreamIteratorNextCommand;
import org.infinispan.stream.impl.StreamIteratorRequestCommand;
import org.infinispan.stream.impl.StreamRequestCommand;
import org.infinispan.stream.impl.StreamResponseCommand;
import org.infinispan.topology.CacheTopologyControlCommand;
import org.infinispan.topology.HeartBeatCommand;
import org.infinispan.util.ByteString;
import org.infinispan.xsite.SingleXSiteRpcCommand;
import org.infinispan.xsite.XSiteAdminCommand;
import org.infinispan.xsite.statetransfer.XSiteStatePushCommand;
import org.infinispan.xsite.statetransfer.XSiteStateTransferControlCommand;
/**
* Specifically used to create un-initialized {@link org.infinispan.commands.ReplicableCommand}s from a byte stream.
* This is a {@link Scopes#GLOBAL} component and doesn't have knowledge of initializing a command by injecting
* cache-specific components into it.
* <p />
* Usually a second step to unmarshalling a command from a byte stream (after
* creating an un-initialized version using this factory) is to pass the command though {@link CommandsFactory#initializeReplicableCommand(ReplicableCommand,boolean)}.
*
* @see CommandsFactory#initializeReplicableCommand(ReplicableCommand,boolean)
* @author Manik Surtani
* @author Mircea.Markus@jboss.com
* @since 4.0
*/
@Scope(Scopes.GLOBAL)
public class RemoteCommandsFactory {
@Inject EmbeddedCacheManager cacheManager;
@Inject GlobalComponentRegistry globalComponentRegistry;
@Inject @ComponentName(KnownComponentNames.MODULE_COMMAND_FACTORIES)
Map<Byte,ModuleCommandFactory> commandFactories;
/**
* Creates an un-initialized command. Un-initialized in the sense that parameters will be set, but any components
* specific to the cache in question will not be set.
* <p/>
* You would typically set these parameters using {@link CommandsFactory#initializeReplicableCommand(ReplicableCommand,boolean)}
* <p/>
*
*
* @param id id of the command
* @param type type of the command
* @return a replicable command
*/
public ReplicableCommand fromStream(byte id, byte type) {
ReplicableCommand command;
if (type == 0) {
switch (id) {
case PutKeyValueCommand.COMMAND_ID:
command = new PutKeyValueCommand();
break;
case PutMapCommand.COMMAND_ID:
command = new PutMapCommand();
break;
case RemoveCommand.COMMAND_ID:
command = new RemoveCommand();
break;
case ReplaceCommand.COMMAND_ID:
command = new ReplaceCommand();
break;
case ComputeCommand.COMMAND_ID:
command = new ComputeCommand();
break;
case ComputeIfAbsentCommand.COMMAND_ID:
command = new ComputeIfAbsentCommand();
break;
case GetKeyValueCommand.COMMAND_ID:
command = new GetKeyValueCommand();
break;
case ClearCommand.COMMAND_ID:
command = new ClearCommand();
break;
case InvalidateCommand.COMMAND_ID:
command = new InvalidateCommand();
break;
case InvalidateL1Command.COMMAND_ID:
command = new InvalidateL1Command();
break;
case CacheTopologyControlCommand.COMMAND_ID:
command = new CacheTopologyControlCommand();
break;
case GetKeysInGroupCommand.COMMAND_ID:
command = new GetKeysInGroupCommand();
break;
case GetCacheEntryCommand.COMMAND_ID:
command = new GetCacheEntryCommand();
break;
case ReadWriteKeyCommand.COMMAND_ID:
command = new ReadWriteKeyCommand<>();
break;
case ReadWriteKeyValueCommand.COMMAND_ID:
command = new ReadWriteKeyValueCommand<>();
break;
case ReadWriteManyCommand.COMMAND_ID:
command = new ReadWriteManyCommand<>();
break;
case ReadWriteManyEntriesCommand.COMMAND_ID:
command = new ReadWriteManyEntriesCommand<>();
break;
case WriteOnlyKeyCommand.COMMAND_ID:
command = new WriteOnlyKeyCommand<>();
break;
case WriteOnlyKeyValueCommand.COMMAND_ID:
command = new WriteOnlyKeyValueCommand<>();
break;
case WriteOnlyManyCommand.COMMAND_ID:
command = new WriteOnlyManyCommand<>();
break;
case WriteOnlyManyEntriesCommand.COMMAND_ID:
command = new WriteOnlyManyEntriesCommand<>();
break;
case RemoveExpiredCommand.COMMAND_ID:
command = new RemoveExpiredCommand();
break;
case ReplicableRunnableCommand.COMMAND_ID:
command = new ReplicableRunnableCommand();
break;
case ReplicableManagerFunctionCommand.COMMAND_ID:
command = new ReplicableManagerFunctionCommand();
break;
case ReadOnlyKeyCommand.COMMAND_ID:
command = new ReadOnlyKeyCommand();
break;
case ReadOnlyManyCommand.COMMAND_ID:
command = new ReadOnlyManyCommand<>();
break;
case TxReadOnlyKeyCommand.COMMAND_ID:
command = new TxReadOnlyKeyCommand<>();
break;
case TxReadOnlyManyCommand.COMMAND_ID:
command = new TxReadOnlyManyCommand<>();
break;
case HeartBeatCommand.COMMAND_ID:
command = HeartBeatCommand.INSTANCE;
break;
default:
throw new CacheException("Unknown command id " + id + "!");
}
} else {
ModuleCommandFactory mcf = commandFactories.get(id);
if (mcf != null)
return mcf.fromStream(id);
else
throw new CacheException("Unknown command id " + id + "!");
}
return command;
}
/**
* Resolve an {@link CacheRpcCommand} from the stream.
*
* @param id id of the command
* @param type type of command (whether internal or user defined)
* @param cacheName cache name at which this command is directed
* @return an instance of {@link CacheRpcCommand}
*/
public CacheRpcCommand fromStream(byte id, byte type, ByteString cacheName) {
CacheRpcCommand command;
if (type == 0) {
switch (id) {
case LockControlCommand.COMMAND_ID:
command = new LockControlCommand(cacheName);
break;
case PrepareCommand.COMMAND_ID:
command = new PrepareCommand(cacheName);
break;
case VersionedPrepareCommand.COMMAND_ID:
command = new VersionedPrepareCommand(cacheName);
break;
case TotalOrderNonVersionedPrepareCommand.COMMAND_ID:
command = new TotalOrderNonVersionedPrepareCommand(cacheName);
break;
case TotalOrderVersionedPrepareCommand.COMMAND_ID:
command = new TotalOrderVersionedPrepareCommand(cacheName);
break;
case CommitCommand.COMMAND_ID:
command = new CommitCommand(cacheName);
break;
case VersionedCommitCommand.COMMAND_ID:
command = new VersionedCommitCommand(cacheName);
break;
case TotalOrderCommitCommand.COMMAND_ID:
command = new TotalOrderCommitCommand(cacheName);
break;
case TotalOrderVersionedCommitCommand.COMMAND_ID:
command = new TotalOrderVersionedCommitCommand(cacheName);
break;
case RollbackCommand.COMMAND_ID:
command = new RollbackCommand(cacheName);
break;
case TotalOrderRollbackCommand.COMMAND_ID:
command = new TotalOrderRollbackCommand(cacheName);
break;
case SingleRpcCommand.COMMAND_ID:
command = new SingleRpcCommand(cacheName);
break;
case ClusteredGetCommand.COMMAND_ID:
command = new ClusteredGetCommand(cacheName);
break;
case StateRequestCommand.COMMAND_ID:
command = new StateRequestCommand(cacheName);
break;
case StateResponseCommand.COMMAND_ID:
command = new StateResponseCommand(cacheName);
break;
case TxCompletionNotificationCommand.COMMAND_ID:
command = new TxCompletionNotificationCommand(cacheName);
break;
case GetInDoubtTransactionsCommand.COMMAND_ID:
command = new GetInDoubtTransactionsCommand(cacheName);
break;
case GetInDoubtTxInfoCommand.COMMAND_ID:
command = new GetInDoubtTxInfoCommand(cacheName);
break;
case CompleteTransactionCommand.COMMAND_ID:
command = new CompleteTransactionCommand(cacheName);
break;
case CreateCacheCommand.COMMAND_ID:
command = new CreateCacheCommand(cacheName);
break;
case XSiteAdminCommand.COMMAND_ID:
command = new XSiteAdminCommand(cacheName);
break;
case CancelCommand.COMMAND_ID:
command = new CancelCommand(cacheName);
break;
case XSiteStateTransferControlCommand.COMMAND_ID:
command = new XSiteStateTransferControlCommand(cacheName);
break;
case XSiteStatePushCommand.COMMAND_ID:
command = new XSiteStatePushCommand(cacheName);
break;
case SingleXSiteRpcCommand.COMMAND_ID:
command = new SingleXSiteRpcCommand(cacheName);
break;
case ClusteredGetAllCommand.COMMAND_ID:
command = new ClusteredGetAllCommand(cacheName);
break;
case StreamRequestCommand.COMMAND_ID:
command = new StreamRequestCommand(cacheName);
break;
case StreamResponseCommand.COMMAND_ID:
command = new StreamResponseCommand(cacheName);
break;
case StreamIteratorRequestCommand.COMMAND_ID:
command = new StreamIteratorRequestCommand<>(cacheName);
break;
case StreamIteratorNextCommand.COMMAND_ID:
command = new StreamIteratorNextCommand(cacheName);
break;
case StreamIteratorCloseCommand.COMMAND_ID:
command = new StreamIteratorCloseCommand(cacheName);
break;
case SingleKeyBackupWriteCommand.COMMAND_ID:
command = new SingleKeyBackupWriteCommand(cacheName);
break;
case SingleKeyFunctionalBackupWriteCommand.COMMAND_ID:
command = new SingleKeyFunctionalBackupWriteCommand(cacheName);
break;
case PutMapBackupWriteCommand.COMMAND_ID:
command = new PutMapBackupWriteCommand(cacheName);
break;
case MultiEntriesFunctionalBackupWriteCommand.COMMAND_ID:
command = new MultiEntriesFunctionalBackupWriteCommand(cacheName);
break;
case MultiKeyFunctionalBackupWriteCommand.COMMAND_ID:
command = new MultiKeyFunctionalBackupWriteCommand(cacheName);
break;
case InvalidateVersionsCommand.COMMAND_ID:
command = new InvalidateVersionsCommand(cacheName);
break;
case RevokeBiasCommand.COMMAND_ID:
command = new RevokeBiasCommand(cacheName);
break;
case RenewBiasCommand.COMMAND_ID:
command = new RenewBiasCommand(cacheName);
break;
case RetrieveLastAccessCommand.COMMAND_ID:
command = new RetrieveLastAccessCommand(cacheName);
break;
case UpdateLastAccessCommand.COMMAND_ID:
command = new UpdateLastAccessCommand(cacheName);
break;
case ReductionPublisherRequestCommand.COMMAND_ID:
command = new ReductionPublisherRequestCommand<>(cacheName);
break;
case MultiClusterEventCommand.COMMAND_ID:
command = new MultiClusterEventCommand<>(cacheName);
break;
case InitialPublisherCommand.COMMAND_ID:
command = new InitialPublisherCommand<>(cacheName);
break;
case NextPublisherCommand.COMMAND_ID:
command = new NextPublisherCommand(cacheName);
break;
case CancelPublisherCommand.COMMAND_ID:
command = new CancelPublisherCommand(cacheName);
break;
default:
throw new CacheException("Unknown command id " + id + "!");
}
} else {
ModuleCommandFactory mcf = commandFactories.get(id);
if (mcf != null)
return mcf.fromStream(id, cacheName);
else
throw new CacheException("Unknown command id " + id + "!");
}
return command;
}
}