Skip to content

Commit

Permalink
Replace guava multimap in PCBC with custom impl (Cherry-pick)
Browse files Browse the repository at this point in the history
Descriptions of the changes in this PR:

(cherry-pick apache#1569)

For a long time PerChannelBookieClient has used guava
LinkedListMultiMap to store conflicting V2 completion keys and
values. This is problematic though. Completion keys are pooled
objects. When a key-value pair is stored in a LinkedListMultiMap, if
it is the first value for that key, a collection is created for the
values, and added to a top-level map using the key, and then the key
and the value are added to the collection. When a second value is
added for the same key, the key and value are simply added to the
collection. The problem occurs when the first key is removed. PBCB
will recycle the key object, but this object is still being used in
the multimap in the top-level map. This causes all sorts of fun like
NullPointerException and IllegalStateException.

Because of this, this patch introduces a very simple multimap
implementation that only stores the key one time (in the collection)
and uses the hashCode of the key to separate the collections into
buckets. It's pretty inefficient, but this code it only hit in the
rare case where a client is trying to read or write the same entry
from the same ledger more than once at the same time.

Author: Ivan Kelly <ivanivankelly.net>

Reviewers: Enrico Olivelli <eolivelligmail.com>

This closes apache#1569 from ivankelly/conc-test-flake

Master Issue: apache#1569

Author: Ivan Kelly <ivan@ivankelly.net>

Reviewers: Ivan Kelly <ivank@apache.org>, Enrico Olivelli <eolivelli@gmail.com>

This closes apache#1618 from sijie/cherry-pick-pcbc

(cherry picked from commit 83d3abe)
Signed-off-by: JV Jujjuri <vjujjuri@salesforce.com>
  • Loading branch information
ivankelly authored and jvrao committed Sep 7, 2018
1 parent 1486899 commit 81878ad
Show file tree
Hide file tree
Showing 3 changed files with 197 additions and 42 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,6 @@
import static org.apache.bookkeeper.client.LedgerHandle.INVALID_ENTRY_ID;

import com.google.common.base.Joiner;
import com.google.common.collect.LinkedListMultimap;
import com.google.common.collect.ListMultimap;
import com.google.common.collect.Sets;
import com.google.protobuf.ByteString;
import com.google.protobuf.ExtensionRegistry;
Expand Down Expand Up @@ -71,8 +69,8 @@
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -127,6 +125,7 @@
import org.apache.bookkeeper.util.MathUtils;
import org.apache.bookkeeper.util.SafeRunnable;
import org.apache.bookkeeper.util.collections.ConcurrentOpenHashMap;
import org.apache.bookkeeper.util.collections.SynchronizedHashMultiMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -165,8 +164,8 @@ public class PerChannelBookieClient extends ChannelInboundHandlerAdapter {

// Map that hold duplicated read requests. The idea is to only use this map (synchronized) when there is a duplicate
// read request for the same ledgerId/entryId
private final ListMultimap<CompletionKey, CompletionValue> completionObjectsV2Conflicts =
LinkedListMultimap.create();
private final SynchronizedHashMultiMap<CompletionKey, CompletionValue> completionObjectsV2Conflicts =
new SynchronizedHashMultiMap<>();

private final StatsLogger statsLogger;
private final OpStatsLogger readEntryOpLogger;
Expand Down Expand Up @@ -787,9 +786,7 @@ private void readEntryInternal(final long ledgerId,
CompletionValue existingValue = completionObjects.putIfAbsent(completionKey, readCompletion);
if (existingValue != null) {
// There's a pending read request on same ledger/entry. Use the multimap to track all of them
synchronized (completionObjectsV2Conflicts) {
completionObjectsV2Conflicts.put(completionKey, readCompletion);
}
completionObjectsV2Conflicts.put(completionKey, readCompletion);
}

writeAndFlush(channel, completionKey, request, allowFastFail);
Expand Down Expand Up @@ -826,16 +823,7 @@ public void getBookieInfo(final long requested, GetBookieInfoCallback cb, Object
public void checkTimeoutOnPendingOperations() {
int timedOutOperations = completionObjects.removeIf(timeoutCheck);

synchronized (this) {
Iterator<CompletionValue> iterator = completionObjectsV2Conflicts.values().iterator();
while (iterator.hasNext()) {
CompletionValue value = iterator.next();
if (value.maybeTimeout()) {
++timedOutOperations;
iterator.remove();
}
}
}
timedOutOperations += completionObjectsV2Conflicts.removeIf(timeoutCheck);

if (timedOutOperations > 0) {
LOG.info("Timed-out {} operations to channel {} for {}",
Expand Down Expand Up @@ -1001,6 +989,9 @@ void errorOut(final CompletionKey key) {
CompletionValue completion = completionObjects.remove(key);
if (completion != null) {
completion.errorOut();
} else {
// If there's no completion object here, try in the multimap
completionObjectsV2Conflicts.removeAny(key).ifPresent(c -> c.errorOut());
}
}

Expand All @@ -1013,14 +1004,7 @@ void errorOut(final CompletionKey key, final int rc) {
completion.errorOut(rc);
} else {
// If there's no completion object here, try in the multimap
synchronized (completionObjectsV2Conflicts) {
if (completionObjectsV2Conflicts.containsKey(key)) {
completion = completionObjectsV2Conflicts.get(key).get(0);
completionObjectsV2Conflicts.remove(key, completion);

completion.errorOut(rc);
}
}
completionObjectsV2Conflicts.removeAny(key).ifPresent(c -> c.errorOut(rc));
}
}

Expand Down Expand Up @@ -1049,16 +1033,10 @@ void errorOutPendingOps(int rc) {
*/

void errorOutOutstandingEntries(int rc) {
// DO NOT rewrite these using Map.Entry iterations. We want to iterate
// on keys and see if we are successfully able to remove the key from
// the map. Because the add and the read methods also do the same thing
// in case they get a write failure on the socket. The one who
// successfully removes the key from the map is the one responsible for
// calling the application callback.
for (CompletionKey key : completionObjectsV2Conflicts.keySet()) {
while (completionObjectsV2Conflicts.get(key).size() > 0) {
errorOut(key, rc);
}
Optional<CompletionKey> multikey = completionObjectsV2Conflicts.getAnyKey();
while (multikey.isPresent()) {
multikey.ifPresent(k -> errorOut(k, rc));
multikey = completionObjectsV2Conflicts.getAnyKey();
}
for (CompletionKey key : completionObjects.keys()) {
errorOut(key, rc);
Expand Down Expand Up @@ -1179,12 +1157,7 @@ private void readV2Response(final BookieProtocol.Response response) {
key.release();
if (completionValue == null) {
// If there's no completion object here, try in the multimap
synchronized (this) {
if (completionObjectsV2Conflicts.containsKey(key)) {
completionValue = completionObjectsV2Conflicts.get(key).get(0);
completionObjectsV2Conflicts.remove(key, completionValue);
}
}
completionValue = completionObjectsV2Conflicts.removeAny(key).orElse(null);
}

if (null == completionValue) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.apache.bookkeeper.util.collections;

import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Optional;
import java.util.Set;
import java.util.function.BiPredicate;
import org.apache.commons.lang3.tuple.Pair;

/**
* Simple multimap implementation that only stores key reference once.
*
* <p>Implementation is aimed at storing PerChannelBookieClient completions when there
* are duplicates. If the key is a pooled object, it must not exist once the value
* has been removed from the map, which can happen with guava multimap implemenations.
*
* <p>With this map is implemented with pretty heavy locking, but this shouldn't be an
* issue as the multimap only needs to be used in rare cases, i.e. when a user tries
* to read or the same entry twice at the same time. This class should *NOT* be used
* in critical path code.
*
* <p>A unique key-value pair will only be stored once.
*/
public class SynchronizedHashMultiMap<K, V> {

HashMap<Integer, Set<Pair<K, V>>> map = new HashMap<>();

public synchronized void put(K k, V v) {
map.computeIfAbsent(k.hashCode(), (ignore) -> new HashSet<>()).add(Pair.of(k, v));
}

public synchronized Optional<K> getAnyKey() {
return map.values().stream().findAny().flatMap(pairs -> pairs.stream().findAny().map(p -> p.getLeft()));
}

public synchronized Optional<V> removeAny(K k) {
Set<Pair<K, V>> set = map.getOrDefault(k.hashCode(), Collections.emptySet());
Optional<Pair<K, V>> pair = set.stream().filter(p -> p.getLeft().equals(k)).findAny();
pair.ifPresent(p -> set.remove(p));
return pair.map(p -> p.getRight());
}

public synchronized int removeIf(BiPredicate<K, V> predicate) {
int removedSum = map.values().stream().mapToInt(
pairs -> {
int removed = 0;
// Can't use removeIf because we need the count
Iterator<Pair<K, V>> iter = pairs.iterator();
while (iter.hasNext()) {
Pair<K, V> kv = iter.next();
if (predicate.test(kv.getLeft(), kv.getRight())) {
iter.remove();
removed++;
}
}
return removed;
}).sum();
map.values().removeIf((s) -> s.isEmpty());
return removedSum;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.apache.bookkeeper.util.collections;

import java.util.Optional;

import org.junit.Assert;
import org.junit.Test;

/**
* Test for SynchronizedHashMultiMap.
*/
public class SynchronizedHashMultiMapTest {
@Test
public void testGetAnyKey() {
SynchronizedHashMultiMap<Integer, Integer> map = new SynchronizedHashMultiMap<>();
Assert.assertFalse(map.getAnyKey().isPresent());

map.put(1, 2);
Assert.assertEquals(map.getAnyKey().get(), Integer.valueOf(1));

map.put(1, 3);
Assert.assertEquals(map.getAnyKey().get(), Integer.valueOf(1));

map.put(2, 4);
int res = map.getAnyKey().get();
Assert.assertTrue(res == 1 || res == 2);

map.removeIf((k, v) -> k == 1);
Assert.assertEquals(map.getAnyKey().get(), Integer.valueOf(2));
}

@Test
public void testRemoveAny() {
SynchronizedHashMultiMap<Integer, Integer> map = new SynchronizedHashMultiMap<>();
Assert.assertFalse(map.removeAny(1).isPresent());

map.put(1, 2);
map.put(1, 3);
map.put(2, 4);
map.put(2, 4);

Optional<Integer> v = map.removeAny(1);
int firstVal = v.get();
Assert.assertTrue(firstVal == 2 || firstVal == 3);

v = map.removeAny(1);
int secondVal = v.get();
Assert.assertTrue(secondVal == 2 || secondVal == 3);
Assert.assertNotEquals(secondVal, firstVal);

v = map.removeAny(2);
Assert.assertTrue(v.isPresent());
Assert.assertEquals(v.get(), Integer.valueOf(4));

Assert.assertFalse(map.removeAny(1).isPresent());
Assert.assertFalse(map.removeAny(2).isPresent());
Assert.assertFalse(map.removeAny(3).isPresent());
}

@Test
public void testRemoveIf() {
SynchronizedHashMultiMap<Integer, Integer> map = new SynchronizedHashMultiMap<>();
Assert.assertEquals(map.removeIf((k, v) -> true), 0);

map.put(1, 2);
map.put(1, 3);
map.put(2, 4);
map.put(2, 4);

Assert.assertEquals(map.removeIf((k, v) -> v == 4), 1);
Assert.assertEquals(map.removeIf((k, v) -> k == 1), 2);

map.put(1, 2);
map.put(1, 3);
map.put(2, 4);

Assert.assertEquals(map.removeIf((k, v) -> false), 0);
Assert.assertEquals(map.removeIf((k, v) -> true), 3);
}
}

0 comments on commit 81878ad

Please sign in to comment.