Skip to content

Commit

Permalink
ISPN-3797 DataContainer should interact atomically with Persistence
Browse files Browse the repository at this point in the history
* Changed DefaultDataContainer implementation to use named inner classes.
It avoids duplicate code.
  • Loading branch information
pruivo authored and danberindei committed Apr 1, 2014
1 parent 95125aa commit be6531f
Showing 1 changed file with 92 additions and 141 deletions.
233 changes: 92 additions & 141 deletions core/src/main/java/org/infinispan/container/DefaultDataContainer.java
Expand Up @@ -64,117 +64,15 @@ public DefaultDataContainer(int concurrencyLevel) {
// If no comparing implementations passed, could fallback on JDK CHM
entries = CollectionFactory.makeConcurrentParallelMap(128, concurrencyLevel);
evictionListener = null;
extendedMap = new ExtendedMap<K, V>() {
@Override
public void evict(K key) {
((EquivalentConcurrentHashMapV8<K, InternalCacheEntry<K, V>>) entries)
.computeIfPresent(key, new EquivalentConcurrentHashMapV8.BiFun<K, InternalCacheEntry<K, V>, InternalCacheEntry<K, V>>() {
@Override
public InternalCacheEntry<K, V> apply(K o, InternalCacheEntry<K, V> entry) {
passivator.passivate(entry);
return null;
}
});
}

@Override
public void compute(K key, final ComputeAction<K, V> action) {
((EquivalentConcurrentHashMapV8<K, InternalCacheEntry<K, V>>) entries)
.compute(key, new EquivalentConcurrentHashMapV8.BiFun<K, InternalCacheEntry<K, V>, InternalCacheEntry<K, V>>() {
@Override
public InternalCacheEntry<K, V> apply(K key, InternalCacheEntry<K, V> oldEntry) {
InternalCacheEntry<K, V> newEntry = action.compute(key, oldEntry, entryFactory);
if (newEntry == oldEntry) {
return oldEntry;
} else if (newEntry == null) {
return null;
}
if (oldEntry == null) {
//new entry. need to activate the key.
activator.activate(key);
}
if (trace)
log.tracef("Store %s in container", newEntry);
return newEntry;
}
});
}

@Override
public void putAndActivate(final InternalCacheEntry<K, V> newEntry) {
((EquivalentConcurrentHashMapV8<K, InternalCacheEntry<K, V>>) entries)
.compute(newEntry.getKey(), new EquivalentConcurrentHashMapV8.BiFun<Object, InternalCacheEntry<K, V>, InternalCacheEntry<K, V>>() {
@Override
public InternalCacheEntry apply(Object key, InternalCacheEntry<K, V> entry) {
if (entry == null) {
//entry does not exists before. we need to activate it.
activator.activate(key);
}

return newEntry;
}
});
}
};
extendedMap = new EquivalentConcurrentExtendedMap();
}

public DefaultDataContainer(int concurrencyLevel,
Equivalence<? super K> keyEq) {
// If at least one comparing implementation give, use ComparingCHMv8
entries = CollectionFactory.makeConcurrentParallelMap(128, concurrencyLevel, keyEq, AnyEquivalence.getInstance());
evictionListener = null;
extendedMap = new ExtendedMap<K, V>() {
@Override
public void evict(K key) {
((EquivalentConcurrentHashMapV8<K, InternalCacheEntry<K, V>>) entries)
.computeIfPresent(key, new EquivalentConcurrentHashMapV8.BiFun<K, InternalCacheEntry<K, V>, InternalCacheEntry<K, V>>() {
@Override
public InternalCacheEntry<K, V> apply(K o, InternalCacheEntry<K, V> entry) {
passivator.passivate(entry);
return null;
}
});
}

@Override
public void compute(K key, final ComputeAction<K, V> action) {
((EquivalentConcurrentHashMapV8<K, InternalCacheEntry<K, V>>) entries)
.compute(key, new EquivalentConcurrentHashMapV8.BiFun<K, InternalCacheEntry<K, V>, InternalCacheEntry<K, V>>() {
@Override
public InternalCacheEntry<K, V> apply(K key, InternalCacheEntry<K, V> oldEntry) {
InternalCacheEntry<K, V> newEntry = action.compute(key, oldEntry, entryFactory);
if (newEntry == oldEntry) {
return oldEntry;
} else if (newEntry == null) {
return null;
}
if (oldEntry == null) {
//new entry. need to activate the key.
activator.activate(key);
}
if (trace)
log.tracef("Store %s in container", newEntry);
return newEntry;
}
});
}

@Override
public void putAndActivate(final InternalCacheEntry<K, V> newEntry) {
((EquivalentConcurrentHashMapV8<K, InternalCacheEntry<K, V>>) entries)
.compute(newEntry.getKey(), new EquivalentConcurrentHashMapV8.BiFun<K, InternalCacheEntry<K, V>, InternalCacheEntry<K, V>>() {
@Override
public InternalCacheEntry<K, V> apply(K key, InternalCacheEntry<K, V> entry) {
if (entry == null) {
//entry does not exists before. we need to activate it.
activator.activate(key);
}

return newEntry;
}
});
}
};
extendedMap = new EquivalentConcurrentExtendedMap();
}

protected DefaultDataContainer(int concurrencyLevel, int maxEntries,
Expand Down Expand Up @@ -204,43 +102,9 @@ protected DefaultDataContainer(int concurrencyLevel, int maxEntries,
throw new IllegalArgumentException("No such eviction strategy " + strategy);
}

final BoundedConcurrentHashMap<K, InternalCacheEntry<K, V>> boundedMap =
new BoundedConcurrentHashMap<K, InternalCacheEntry<K, V>>(maxEntries, concurrencyLevel, eviction, evictionListener,
keyEquivalence, AnyEquivalence.getInstance());
entries = boundedMap;
extendedMap = new ExtendedMap<K, V>() {
@Override
public void evict(K key) {
boundedMap.evict(key);
}

@Override
public void compute(K key, final ComputeAction<K, V> action) {
boundedMap.lock(key);
try {
InternalCacheEntry<K, V> oldEntry = boundedMap.get(key);
InternalCacheEntry<K, V> newEntry = action.compute(key, oldEntry, entryFactory);
if (oldEntry == newEntry) {
return;
} else if (newEntry == null) {
boundedMap.remove(key);
return;
}
if (trace)
log.tracef("Store %s in container", newEntry);
//put already activate the entry if it is new.
boundedMap.put(key, newEntry);
} finally {
boundedMap.unlock(key);
}
}

@Override
public void putAndActivate(InternalCacheEntry<K, V> newEntry) {
//put already activate the entry if it is new.
boundedMap.put(newEntry.getKey(), newEntry);
}
};
entries = new BoundedConcurrentHashMap<K, InternalCacheEntry<K, V>>(maxEntries, concurrencyLevel, eviction, evictionListener,
keyEquivalence, AnyEquivalence.getInstance());
extendedMap = new BoundedConcurrentExtendedMap();
}

@Inject
Expand Down Expand Up @@ -544,5 +408,92 @@ private static interface ExtendedMap<K, V> {
void putAndActivate(InternalCacheEntry<K, V> newEntry);
}

private class EquivalentConcurrentExtendedMap implements ExtendedMap<K, V> {
@Override
public void evict(K key) {
((EquivalentConcurrentHashMapV8<K, InternalCacheEntry<K, V>>) entries)
.computeIfPresent(key, new EquivalentConcurrentHashMapV8.BiFun<K, InternalCacheEntry<K, V>, InternalCacheEntry<K, V>>() {
@Override
public InternalCacheEntry<K, V> apply(K o, InternalCacheEntry<K, V> entry) {
passivator.passivate(entry);
return null;
}
});
}

@Override
public void compute(K key, final ComputeAction<K, V> action) {
((EquivalentConcurrentHashMapV8<K, InternalCacheEntry<K, V>>) entries)
.compute(key, new EquivalentConcurrentHashMapV8.BiFun<K, InternalCacheEntry<K, V>, InternalCacheEntry<K, V>>() {
@Override
public InternalCacheEntry<K, V> apply(K key, InternalCacheEntry<K, V> oldEntry) {
InternalCacheEntry<K, V> newEntry = action.compute(key, oldEntry, entryFactory);
if (newEntry == oldEntry) {
return oldEntry;
} else if (newEntry == null) {
return null;
}
if (oldEntry == null) {
//new entry. need to activate the key.
activator.activate(key);
}
if (trace)
log.tracef("Store %s in container", newEntry);
return newEntry;
}
});
}

@Override
public void putAndActivate(final InternalCacheEntry<K, V> newEntry) {
((EquivalentConcurrentHashMapV8<K, InternalCacheEntry<K, V>>) entries)
.compute(newEntry.getKey(), new EquivalentConcurrentHashMapV8.BiFun<K, InternalCacheEntry<K, V>, InternalCacheEntry<K, V>>() {
@Override
public InternalCacheEntry<K, V> apply(K key, InternalCacheEntry<K, V> entry) {
if (entry == null) {
//entry does not exists before. we need to activate it.
activator.activate(key);
}

return newEntry;
}
});
}
}

private class BoundedConcurrentExtendedMap implements ExtendedMap<K, V> {
@Override
public void evict(K key) {
((BoundedConcurrentHashMap<Object, InternalCacheEntry<K, V>>) entries).evict(key);
}

@Override
public void compute(K key, final ComputeAction<K, V> action) {
final BoundedConcurrentHashMap<K, InternalCacheEntry<K, V>> boundedMap =
((BoundedConcurrentHashMap<K, InternalCacheEntry<K, V>>) entries);
boundedMap.lock(key);
try {
InternalCacheEntry<K, V> oldEntry = boundedMap.get(key);
InternalCacheEntry<K, V> newEntry = action.compute(key, oldEntry, entryFactory);
if (oldEntry == newEntry) {
return;
} else if (newEntry == null) {
boundedMap.remove(key);
return;
}
if (trace)
log.tracef("Store %s in container", newEntry);
//put already activate the entry if it is new.
boundedMap.put(key, newEntry);
} finally {
boundedMap.unlock(key);
}
}

@Override
public void putAndActivate(InternalCacheEntry<K, V> newEntry) {
//put already activate the entry if it is new.
entries.put(newEntry.getKey(), newEntry);
}
}
}

0 comments on commit be6531f

Please sign in to comment.