Skip to content

Commit

Permalink
KAFKA-8802: ConcurrentSkipListMap shows performance regression in cac…
Browse files Browse the repository at this point in the history
…he and in-memory store (apache#7212)

Reverts the TreeMap -> ConcurrentSkipListMap change that caused a performance regression in 2.3, and fixes the ConcurrentModificationException by copying (just) the key set to iterate over

Reviewers: Bill Bejeck <bill@confluent.io>, Guozhang Wang <wangguoz@gmail.com>
  • Loading branch information
A. Sophie Blee-Goldman authored and guozhangwang committed Aug 16, 2019
1 parent 9cb27f5 commit 051d290
Show file tree
Hide file tree
Showing 3 changed files with 42 additions and 36 deletions.
Expand Up @@ -17,8 +17,10 @@
package org.apache.kafka.streams.state.internals;

import java.util.List;
import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.NavigableMap;
import java.util.Set;
import java.util.TreeMap;
import java.util.TreeSet;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.processor.ProcessorContext;
Expand All @@ -27,13 +29,12 @@
import org.apache.kafka.streams.state.KeyValueStore;

import java.util.Iterator;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class InMemoryKeyValueStore implements KeyValueStore<Bytes, byte[]> {
private final String name;
private final ConcurrentNavigableMap<Bytes, byte[]> map = new ConcurrentSkipListMap<>();
private final NavigableMap<Bytes, byte[]> map = new TreeMap<>();
private volatile boolean open = false;
private long size = 0L; // SkipListMap#size is O(N) so we just do our best to track it

Expand Down Expand Up @@ -71,12 +72,12 @@ public boolean isOpen() {
}

@Override
public byte[] get(final Bytes key) {
public synchronized byte[] get(final Bytes key) {
return map.get(key);
}

@Override
public void put(final Bytes key, final byte[] value) {
public synchronized void put(final Bytes key, final byte[] value) {
if (value == null) {
size -= map.remove(key) == null ? 0 : 1;
} else {
Expand All @@ -85,7 +86,7 @@ public void put(final Bytes key, final byte[] value) {
}

@Override
public byte[] putIfAbsent(final Bytes key, final byte[] value) {
public synchronized byte[] putIfAbsent(final Bytes key, final byte[] value) {
final byte[] originalValue = get(key);
if (originalValue == null) {
put(key, value);
Expand All @@ -101,14 +102,14 @@ public void putAll(final List<KeyValue<Bytes, byte[]>> entries) {
}

@Override
public byte[] delete(final Bytes key) {
public synchronized byte[] delete(final Bytes key) {
final byte[] oldValue = map.remove(key);
size -= oldValue == null ? 0 : 1;
return oldValue;
}

@Override
public KeyValueIterator<Bytes, byte[]> range(final Bytes from, final Bytes to) {
public synchronized KeyValueIterator<Bytes, byte[]> range(final Bytes from, final Bytes to) {

if (from.compareTo(to) > 0) {
LOG.warn("Returning empty iterator for fetch with invalid key range: from > to. "
Expand All @@ -119,14 +120,14 @@ public KeyValueIterator<Bytes, byte[]> range(final Bytes from, final Bytes to) {

return new DelegatingPeekingKeyValueIterator<>(
name,
new InMemoryKeyValueIterator(map.subMap(from, true, to, true).entrySet().iterator()));
new InMemoryKeyValueIterator(map.subMap(from, true, to, true).keySet()));
}

@Override
public KeyValueIterator<Bytes, byte[]> all() {
public synchronized KeyValueIterator<Bytes, byte[]> all() {
return new DelegatingPeekingKeyValueIterator<>(
name,
new InMemoryKeyValueIterator(map.entrySet().iterator()));
new InMemoryKeyValueIterator(map.keySet()));
}

@Override
Expand All @@ -146,11 +147,11 @@ public void close() {
open = false;
}

private static class InMemoryKeyValueIterator implements KeyValueIterator<Bytes, byte[]> {
private final Iterator<Map.Entry<Bytes, byte[]>> iter;
private class InMemoryKeyValueIterator implements KeyValueIterator<Bytes, byte[]> {
private final Iterator<Bytes> iter;

private InMemoryKeyValueIterator(final Iterator<Map.Entry<Bytes, byte[]>> iter) {
this.iter = iter;
private InMemoryKeyValueIterator(final Set<Bytes> keySet) {
this.iter = new TreeSet<>(keySet).iterator();
}

@Override
Expand All @@ -160,8 +161,8 @@ public boolean hasNext() {

@Override
public KeyValue<Bytes, byte[]> next() {
final Map.Entry<Bytes, byte[]> entry = iter.next();
return new KeyValue<>(entry.getKey(), entry.getValue());
final Bytes key = iter.next();
return new KeyValue<>(key, map.get(key));
}

@Override
Expand Down
Expand Up @@ -17,7 +17,8 @@
package org.apache.kafka.streams.state.internals;

import java.util.NavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.TreeMap;
import java.util.TreeSet;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.metrics.stats.Avg;
Expand All @@ -39,7 +40,7 @@
class NamedCache {
private static final Logger log = LoggerFactory.getLogger(NamedCache.class);
private final String name;
private final NavigableMap<Bytes, LRUNode> cache = new ConcurrentSkipListMap<>();
private final NavigableMap<Bytes, LRUNode> cache = new TreeMap<>();
private final Set<Bytes> dirtyKeys = new LinkedHashSet<>();
private ThreadCache.DirtyEntryFlushListener listener;
private LRUNode tail;
Expand Down Expand Up @@ -270,12 +271,16 @@ public boolean isEmpty() {
return cache.isEmpty();
}

synchronized Iterator<Map.Entry<Bytes, LRUNode>> subMapIterator(final Bytes from, final Bytes to) {
return cache.subMap(from, true, to, true).entrySet().iterator();
synchronized Iterator<Bytes> keyRange(final Bytes from, final Bytes to) {
return keySetIterator(cache.navigableKeySet().subSet(from, true, to, true));
}

synchronized Iterator<Map.Entry<Bytes, LRUNode>> allIterator() {
return cache.entrySet().iterator();
private Iterator<Bytes> keySetIterator(final Set<Bytes> keySet) {
return new TreeSet<>(keySet).iterator();
}

synchronized Iterator<Bytes> allKeys() {
return keySetIterator(cache.navigableKeySet());
}

synchronized LRUCacheEntry first() {
Expand Down
Expand Up @@ -20,7 +20,6 @@
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.state.internals.NamedCache.LRUNode;
import org.slf4j.Logger;

import java.util.Collections;
Expand Down Expand Up @@ -181,17 +180,17 @@ public LRUCacheEntry delete(final String namespace, final Bytes key) {
public MemoryLRUCacheBytesIterator range(final String namespace, final Bytes from, final Bytes to) {
final NamedCache cache = getCache(namespace);
if (cache == null) {
return new MemoryLRUCacheBytesIterator(Collections.emptyIterator());
return new MemoryLRUCacheBytesIterator(Collections.<Bytes>emptyIterator(), new NamedCache(namespace, this.metrics));
}
return new MemoryLRUCacheBytesIterator(cache.subMapIterator(from, to));
return new MemoryLRUCacheBytesIterator(cache.keyRange(from, to), cache);
}

public MemoryLRUCacheBytesIterator all(final String namespace) {
final NamedCache cache = getCache(namespace);
if (cache == null) {
return new MemoryLRUCacheBytesIterator(Collections.emptyIterator());
return new MemoryLRUCacheBytesIterator(Collections.<Bytes>emptyIterator(), new NamedCache(namespace, this.metrics));
}
return new MemoryLRUCacheBytesIterator(cache.allIterator());
return new MemoryLRUCacheBytesIterator(cache.allKeys(), cache);
}

public long size() {
Expand Down Expand Up @@ -261,11 +260,13 @@ private synchronized NamedCache getOrCreateCache(final String name) {
}

static class MemoryLRUCacheBytesIterator implements PeekingKeyValueIterator<Bytes, LRUCacheEntry> {
private final Iterator<Map.Entry<Bytes, LRUNode>> underlying;
private final Iterator<Bytes> keys;
private final NamedCache cache;
private KeyValue<Bytes, LRUCacheEntry> nextEntry;

MemoryLRUCacheBytesIterator(final Iterator<Map.Entry<Bytes, LRUNode>> underlying) {
this.underlying = underlying;
MemoryLRUCacheBytesIterator(final Iterator<Bytes> keys, final NamedCache cache) {
this.keys = keys;
this.cache = cache;
}

public Bytes peekNextKey() {
Expand All @@ -289,7 +290,7 @@ public boolean hasNext() {
return true;
}

while (underlying.hasNext() && nextEntry == null) {
while (keys.hasNext() && nextEntry == null) {
internalNext();
}

Expand All @@ -307,9 +308,8 @@ public KeyValue<Bytes, LRUCacheEntry> next() {
}

private void internalNext() {
final Map.Entry<Bytes, LRUNode> mapEntry = underlying.next();
final Bytes cacheKey = mapEntry.getKey();
final LRUCacheEntry entry = mapEntry.getValue().entry();
final Bytes cacheKey = keys.next();
final LRUCacheEntry entry = cache.get(cacheKey);
if (entry == null) {
return;
}
Expand Down

0 comments on commit 051d290

Please sign in to comment.