Skip to content

Commit

Permalink
GEODE-8029: Allow OplogEntryIdSet to Overflow (apache#5329)
Browse files Browse the repository at this point in the history
Do not delete drf files during member startup as that should be only
done by the compactor thread. Instead, allow the OplogEntryIdSet to
grow over the default capacity and log a warning message instructing
the user to manually compact the disk-stores.

- Added unit tests.
- Replaced usages of 'junit.Assert' by 'assertj'.
- Modified DiskStoreImpl.deadRecordCount to return long instead of int.
- Added internal overflow implementation to the OplogEntryIdSet so it can
  grow above the default limit.
  • Loading branch information
jujoramos committed Jul 2, 2020
1 parent ffeabe0 commit fdc4401
Show file tree
Hide file tree
Showing 5 changed files with 239 additions and 446 deletions.
Expand Up @@ -75,6 +75,7 @@
import org.apache.geode.CancelException;
import org.apache.geode.StatisticsFactory;
import org.apache.geode.SystemFailure;
import org.apache.geode.annotations.VisibleForTesting;
import org.apache.geode.annotations.internal.MakeNotStatic;
import org.apache.geode.annotations.internal.MutableForTesting;
import org.apache.geode.cache.Cache;
Expand Down Expand Up @@ -3517,33 +3518,85 @@ public String toString() {
}

/**
* Set of OplogEntryIds (longs). Memory is optimized by using an int[] for ids in the unsigned int
* range.
* Set of OplogEntryIds (longs).
* Memory is optimized by using an int[] for ids in the unsigned int range.
* By default we can't have more than 805306401 ids for a load factor of 0.75, the internal lists
* are used to overcome this limit, allowing the disk-store to recover successfully (the internal
* class is **only** used during recovery to read all deleted entries).
*/
static class OplogEntryIdSet {
private final IntOpenHashSet ints = new IntOpenHashSet((int) INVALID_ID);
private final LongOpenHashSet longs = new LongOpenHashSet((int) INVALID_ID);
private final List<IntOpenHashSet> allInts;
private final List<LongOpenHashSet> allLongs;
private final AtomicReference<IntOpenHashSet> currentInts;
private final AtomicReference<LongOpenHashSet> currentLongs;

// For testing purposes only.
@VisibleForTesting
OplogEntryIdSet(List<IntOpenHashSet> allInts, List<LongOpenHashSet> allLongs) {
this.allInts = allInts;
this.currentInts = new AtomicReference<>(this.allInts.get(0));

this.allLongs = allLongs;
this.currentLongs = new AtomicReference<>(this.allLongs.get(0));
}

public OplogEntryIdSet() {
IntOpenHashSet intHashSet = new IntOpenHashSet((int) INVALID_ID);
this.allInts = new ArrayList<>();
this.allInts.add(intHashSet);
this.currentInts = new AtomicReference<>(intHashSet);

LongOpenHashSet longHashSet = new LongOpenHashSet((int) INVALID_ID);
this.allLongs = new ArrayList<>();
this.allLongs.add(longHashSet);
this.currentLongs = new AtomicReference<>(longHashSet);
}

public void add(long id) {
if (id == 0) {
throw new IllegalArgumentException();
} else if (id > 0 && id <= 0x00000000FFFFFFFFL) {
this.ints.add((int) id);
} else {
this.longs.add(id);
}

try {
if (id > 0 && id <= 0x00000000FFFFFFFFL) {
this.currentInts.get().add((int) id);
} else {
this.currentLongs.get().add(id);
}
} catch (IllegalArgumentException illegalArgumentException) {
// See GEODE-8029.
// Too many entries on the accumulated drf files, overflow and continue.
logger.warn(
"There is a large number of deleted entries within the disk-store, please execute an offline compaction.");

// Overflow to the next [Int|Long]OpenHashSet and continue.
if (id > 0 && id <= 0x00000000FFFFFFFFL) {
IntOpenHashSet overflownHashSet = new IntOpenHashSet((int) INVALID_ID);
allInts.add(overflownHashSet);
currentInts.set(overflownHashSet);

currentInts.get().add((int) id);
} else {
LongOpenHashSet overflownHashSet = new LongOpenHashSet((int) INVALID_ID);
allLongs.add(overflownHashSet);
currentLongs.set(overflownHashSet);

currentLongs.get().add(id);
}
}
}

public boolean contains(long id) {
if (id >= 0 && id <= 0x00000000FFFFFFFFL) {
return this.ints.contains((int) id);
return allInts.stream().anyMatch(ints -> ints.contains((int) id));
} else {
return this.longs.contains(id);
return allLongs.stream().anyMatch(longs -> longs.contains(id));
}
}

public int size() {
return this.ints.size() + this.longs.size();
public long size() {
return allInts.stream().mapToInt(IntOpenHashSet::size).sum()
+ allLongs.stream().mapToInt(LongOpenHashSet::size).sum();
}
}

Expand Down Expand Up @@ -3973,13 +4026,13 @@ public int getLiveEntryCount() {
return this.liveEntryCount;
}

private int deadRecordCount;
private long deadRecordCount;

void incDeadRecordCount(int count) {
void incDeadRecordCount(long count) {
this.deadRecordCount += count;
}

public int getDeadRecordCount() {
public long getDeadRecordCount() {
return this.deadRecordCount;
}

Expand Down
Expand Up @@ -937,17 +937,10 @@ void initAfterRecovery(boolean offline) {
// this.crf.raf.seek(this.crf.currSize);
} else if (!offline) {
// drf exists but crf has been deleted (because it was empty).
// I don't think the drf needs to be opened. It is only used during recovery.
// At some point the compacter may identify that it can be deleted.
this.crf.RAFClosed = true;
deleteCRF();

// The drf file needs to be deleted (see GEODE-8029).
// If compaction is not enabled, or if the compaction-threshold is never reached, there
// will be orphaned drf files that are not automatically deleted (unless a manual
// compaction is executed), in which case a later recovery might fail when the amount of
// deleted records is too high (805306401).
setHasDeletes(false);
deleteDRF();

this.closed = true;
this.deleted.set(true);
}
Expand Down

This file was deleted.

@@ -0,0 +1,169 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.geode.internal.cache;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.spy;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.stream.IntStream;
import java.util.stream.LongStream;

import it.unimi.dsi.fastutil.ints.IntOpenHashSet;
import it.unimi.dsi.fastutil.longs.LongOpenHashSet;
import org.junit.Test;
import org.mockito.stubbing.Answer;

import org.apache.geode.internal.cache.DiskStoreImpl.OplogEntryIdSet;

/**
* Tests DiskStoreImpl.OplogEntryIdSet
*/
public class OplogEntryIdSetTest {

@Test
public void testBasics() {
OplogEntryIdSet s = new OplogEntryIdSet();

LongStream.range(1, 777777).forEach(i -> assertThat(s.contains(i)).isFalse());
LongStream.range(1, 777777).forEach(s::add);
LongStream.range(1, 777777).forEach(i -> assertThat(s.contains(i)).isTrue());

assertThatThrownBy(() -> s.add(DiskStoreImpl.INVALID_ID))
.isInstanceOf(IllegalArgumentException.class);
assertThat(s.contains(0)).isFalse();

assertThat(s.contains(0x00000000FFFFFFFFL)).isFalse();
s.add(0x00000000FFFFFFFFL);
assertThat(s.contains(0x00000000FFFFFFFFL)).isTrue();

LongStream.range(0x00000000FFFFFFFFL + 1, 0x00000000FFFFFFFFL + 777777)
.forEach(i -> assertThat(s.contains(i)).isFalse());
LongStream.range(0x00000000FFFFFFFFL + 1, 0x00000000FFFFFFFFL + 777777).forEach(s::add);
LongStream.range(0x00000000FFFFFFFFL + 1, 0x00000000FFFFFFFFL + 777777)
.forEach(i -> assertThat(s.contains(i)).isTrue());

LongStream.range(1, 777777).forEach(i -> assertThat(s.contains(i)).isTrue());

assertThat(s.contains(Long.MAX_VALUE)).isFalse();
s.add(Long.MAX_VALUE);
assertThat(s.contains(Long.MAX_VALUE)).isTrue();
assertThat(s.contains(Long.MIN_VALUE)).isFalse();
s.add(Long.MIN_VALUE);
assertThat(s.contains(Long.MIN_VALUE)).isTrue();
}

@Test
public void addMethodOverflowsWhenInternalAddThrowsIllegalArgumentException() {
int testEntries = 1000;
int magicInt = testEntries + 1;
long magicLong = 0x00000000FFFFFFFFL + testEntries + 1;

Answer<Void> answer = invocationOnMock -> {
Number value = invocationOnMock.getArgument(0);
if ((value.intValue() == magicInt) || (value.longValue() == magicLong)) {
throw new IllegalArgumentException(
"Too large (XXXXXXXX expected elements with load factor Y.YY)");
}
invocationOnMock.callRealMethod();
return null;
};

IntOpenHashSet intOpenHashSet = spy(IntOpenHashSet.class);
doAnswer(answer).when(intOpenHashSet).add(anyInt());
LongOpenHashSet longOpenHashSet = spy(LongOpenHashSet.class);
doAnswer(answer).when(longOpenHashSet).add(anyLong());
List<IntOpenHashSet> intOpenHashSets =
new ArrayList<>(Collections.singletonList(intOpenHashSet));
List<LongOpenHashSet> longOpenHashSets =
new ArrayList<>(Collections.singletonList(longOpenHashSet));
OplogEntryIdSet oplogEntryIdSet = new OplogEntryIdSet(intOpenHashSets, longOpenHashSets);

// Insert some entries.
assertThat(intOpenHashSets).hasSize(1);
assertThat(longOpenHashSets).hasSize(1);
IntStream.range(1, testEntries).forEach(oplogEntryIdSet::add);
LongStream.range(0x00000000FFFFFFFFL + 1, 0x00000000FFFFFFFFL + testEntries)
.forEach(oplogEntryIdSet::add);

// Insert an entry that would cause an overflow for ints and longs.
oplogEntryIdSet.add(magicInt);
oplogEntryIdSet.add(magicLong);

// Entries should exist and no exception should be thrown (even those that caused the exception)
assertThat(intOpenHashSets).hasSize(2);
assertThat(longOpenHashSets).hasSize(2);
IntStream.range(1, testEntries).forEach(i -> assertThat(oplogEntryIdSet.contains(i)).isTrue());
LongStream.range(0x00000000FFFFFFFFL + 1, 0x00000000FFFFFFFFL + testEntries)
.forEach(i -> assertThat(oplogEntryIdSet.contains(i)).isTrue());
assertThat(oplogEntryIdSet.contains(magicInt)).isTrue();
assertThat(oplogEntryIdSet.contains(magicLong)).isTrue();
}

@Test
public void sizeShouldIncludeOverflownSets() {
int testEntries = 1000;
List<IntOpenHashSet> intHashSets = new ArrayList<>();
List<LongOpenHashSet> longHashSets = new ArrayList<>();

IntStream.range(1, testEntries + 1).forEach(value -> {
IntOpenHashSet intOpenHashSet = new IntOpenHashSet();
intOpenHashSet.add(value);
intHashSets.add(intOpenHashSet);
});

LongStream.range(0x00000000FFFFFFFFL + 1, 0x00000000FFFFFFFFL + testEntries + 1)
.forEach(value -> {
LongOpenHashSet longOpenHashSet = new LongOpenHashSet();
longOpenHashSet.add(value);
longHashSets.add(longOpenHashSet);
});

OplogEntryIdSet oplogEntryIdSet = new OplogEntryIdSet(intHashSets, longHashSets);
assertThat(oplogEntryIdSet.size()).isEqualTo(testEntries * 2);
}

@Test
public void containsShouldSearchAcrossOverflownSets() {
int testEntries = 1000;
List<IntOpenHashSet> intHashSets = new ArrayList<>();
List<LongOpenHashSet> longHashSets = new ArrayList<>();

IntStream.range(1, testEntries).forEach(value -> {
IntOpenHashSet intOpenHashSet = new IntOpenHashSet();
intOpenHashSet.add(value);
intHashSets.add(intOpenHashSet);
});

LongStream.range(0x00000000FFFFFFFFL + 1, 0x00000000FFFFFFFFL + testEntries).forEach(value -> {
LongOpenHashSet longOpenHashSet = new LongOpenHashSet();
longOpenHashSet.add(value);
longHashSets.add(longOpenHashSet);
});

OplogEntryIdSet oplogEntryIdSet = new OplogEntryIdSet(intHashSets, longHashSets);

// All entries should be searchable across overflown sets
IntStream.range(1, testEntries).forEach(i -> assertThat(oplogEntryIdSet.contains(i)).isTrue());
LongStream.range(0x00000000FFFFFFFFL + 1, 0x00000000FFFFFFFFL + testEntries)
.forEach(i -> assertThat(oplogEntryIdSet.contains(i)).isTrue());
}
}

0 comments on commit fdc4401

Please sign in to comment.