/
KeySetCommand.java
146 lines (122 loc) · 4.41 KB
/
KeySetCommand.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
package org.infinispan.commands.read;
import org.infinispan.Cache;
import org.infinispan.CacheSet;
import org.infinispan.CacheStream;
import org.infinispan.commands.VisitableCommand;
import org.infinispan.commands.Visitor;
import org.infinispan.commons.util.CloseableIterator;
import org.infinispan.commons.util.CloseableSpliterator;
import org.infinispan.commons.util.Closeables;
import org.infinispan.container.DataContainer;
import org.infinispan.container.entries.CacheEntry;
import org.infinispan.context.Flag;
import org.infinispan.context.InvocationContext;
import org.infinispan.distribution.DistributionManager;
import org.infinispan.stream.impl.local.KeyStreamSupplier;
import org.infinispan.stream.impl.local.LocalCacheStream;
import org.infinispan.util.DataContainerRemoveIterator;
import java.util.Iterator;
import java.util.Set;
import java.util.Spliterator;
import java.util.Spliterators;
import java.util.stream.StreamSupport;
/**
* Command implementation for {@link java.util.Map#keySet()} functionality.
*
* @author Galder Zamarreño
* @author Mircea.Markus@jboss.com
* @author <a href="http://gleamynode.net/">Trustin Lee</a>
* @author William Burns
* @since 4.0
*/
public class KeySetCommand<K, V> extends AbstractLocalCommand implements VisitableCommand {
private final Cache<K, V> cache;
public KeySetCommand(Cache<K, V> cache, Set<Flag> flags) {
setFlags(flags);
if (flags != null) {
this.cache = cache.getAdvancedCache().withFlags(flags.toArray(new Flag[flags.size()]));
} else {
this.cache = cache;
}
}
@Override
public Object acceptVisitor(InvocationContext ctx, Visitor visitor) throws Throwable {
return visitor.visitKeySetCommand(ctx, this);
}
@Override
public boolean readsExistingValues() {
return false;
}
@Override
public Set<K> perform(InvocationContext ctx) throws Throwable {
return new BackingKeySet<>(cache);
}
@Override
public String toString() {
return "KeySetCommand{" +
"cache=" + cache.getName() +
'}';
}
private static class BackingKeySet<K, V> extends AbstractCloseableIteratorCollection<K, K, V> implements CacheSet<K> {
public BackingKeySet(Cache<K, V> cache) {
super(cache);
}
@Override
public CloseableIterator<K> iterator() {
return new EntryToKeyIterator(new DataContainerRemoveIterator<>(cache));
}
@Override
public CloseableSpliterator<K> spliterator() {
return Closeables.spliterator(iterator(), cache.getAdvancedCache().getDataContainer().size(),
Spliterator.CONCURRENT | Spliterator.DISTINCT | Spliterator.NONNULL);
}
@Override
public int size() {
return cache.getAdvancedCache().getDataContainer().size();
}
@Override
public boolean contains(Object o) {
return cache.containsKey(o);
}
@Override
public boolean remove(Object o) {
return cache.remove(o) != null;
}
@Override
public CacheStream<K> stream() {
DistributionManager dm = cache.getAdvancedCache().getDistributionManager();
return new LocalCacheStream<>(new KeyStreamSupplier<>(cache, dm != null ? dm.getConsistentHash() : null,
() -> StreamSupport.stream(spliterator(), false)), false,
cache.getAdvancedCache().getComponentRegistry());
}
@Override
public CacheStream<K> parallelStream() {
DistributionManager dm = cache.getAdvancedCache().getDistributionManager();
return new LocalCacheStream<>(new KeyStreamSupplier<>(cache, dm != null ? dm.getConsistentHash() : null,
() -> StreamSupport.stream(spliterator(), false)), true,
cache.getAdvancedCache().getComponentRegistry());
}
}
private static class EntryToKeyIterator<K, V> implements CloseableIterator<K> {
private final Iterator<CacheEntry<K, V>> iterator;
public EntryToKeyIterator(Iterator<CacheEntry<K, V>> iterator) {
this.iterator = iterator;
}
@Override
public boolean hasNext() {
return iterator.hasNext();
}
@Override
public K next() {
return iterator.next().getKey();
}
@Override
public void remove() {
iterator.remove();
}
@Override
public void close() {
// Do nothing as we can't close regular iterator
}
}
}