Skip to content

Commit

Permalink
Bulk-fetch contents for get-keys
Browse files Browse the repository at this point in the history
Fixes #7428
  • Loading branch information
snazy committed Aug 30, 2023
1 parent ea21cc8 commit 751874a
Show file tree
Hide file tree
Showing 3 changed files with 188 additions and 54 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ as necessary. Empty sections will not end in the release notes.

### Changes
- Content Generator tool: commit hashes are now printed in full when running the `commits` command.
- For a "get-keys" operation that requests the content objects as well, the content objects are now
fetched using bulk-requests.

### Deprecations

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@
import static org.projectnessie.versioned.storage.versionstore.TypeMapping.toCommitMeta;
import static org.projectnessie.versioned.store.DefaultStoreWorker.contentTypeForPayload;

import com.google.common.collect.AbstractIterator;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
Expand All @@ -73,6 +75,7 @@
import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.annotation.CheckForNull;
import javax.annotation.Nonnull;
import org.projectnessie.model.CommitMeta;
import org.projectnessie.model.Content;
Expand Down Expand Up @@ -139,6 +142,7 @@

public class VersionStoreImpl implements VersionStore {

public static final int GET_KEYS_CONTENT_BATCH_SIZE = 50;
private final Persist persist;

@SuppressWarnings("unused")
Expand Down Expand Up @@ -554,23 +558,8 @@ public PaginationIterator<KeyEntry> getKeys(
index.iterator(keyRanges.beginStoreKey(), keyRanges.endStoreKey(), false);
ContentMapping contentMapping = new ContentMapping(persist);

Predicate<StoreIndexElement<CommitOp>> keyPredicate =
indexElement ->
indexElement.content().action().exists()
&& indexElement.key().endsWithElement(CONTENT_DISCRIMINATOR);
BiPredicate<ContentKey, Content.Type> contentKeyPredicate =
keyRestrictions.contentKeyPredicate();
if (contentKeyPredicate != null) {
keyPredicate =
keyPredicate.and(
indexElement -> {
ContentKey key = storeKeyToKey(indexElement.key());
// Note: key==null, if not the "main universe" or not a "content" discriminator
return key != null
&& contentKeyPredicate.test(
key, contentTypeForPayload(indexElement.content().payload()));
});
}

Predicate<StoreIndexElement<CommitOp>> stopPredicate;
ContentKey prefixKey = keyRestrictions.prefixKey();
Expand All @@ -581,44 +570,99 @@ public PaginationIterator<KeyEntry> getKeys(
stopPredicate = x -> false;
}

return new FilteringPaginationIterator<>(
result,
indexElement -> {
try {
ContentKey key = storeKeyToKey(indexElement.key());
CommitOp commitOp = indexElement.content();
// "Base" iterator, which maps StoreIndexElement objects to ContentKey and CommitOp and also
// filters out non-content keys and non-live CommitOps.
Iterator<ContentKeyWithCommitOp> keyAndOp =
new AbstractIterator<>() {
@CheckForNull
@Override
protected ContentKeyWithCommitOp computeNext() {
while (true) {
if (!result.hasNext()) {
return endOfData();
}

if (withContent) {
Content c =
contentMapping.fetchContent(
requireNonNull(commitOp.value(), "Required value pointer is null"));
return KeyEntry.of(buildIdentifiedKey(key, index, c, x -> null), c);
}
StoreIndexElement<CommitOp> indexElement = result.next();

UUID contentId = commitOp.contentId();
String contentIdString;
if (contentId == null) {
// this should only be hit by imported legacy nessie repos
Content c =
contentMapping.fetchContent(
requireNonNull(commitOp.value(), "Required value pointer is null"));
contentIdString = c.getId();
} else {
contentIdString = contentId.toString();
if (stopPredicate.test(indexElement)) {
return endOfData();
}

StoreKey storeKey = indexElement.key();

if (!indexElement.content().action().exists()
|| !indexElement.key().endsWithElement(CONTENT_DISCRIMINATOR)) {
continue;
}

ContentKey key = storeKeyToKey(storeKey);

if (contentKeyPredicate != null
&& !contentKeyPredicate.test(
key, contentTypeForPayload(indexElement.content().payload()))) {
continue;
}

return new ContentKeyWithCommitOp(storeKey, key, indexElement.content());
}
Content.Type contentType = contentTypeForPayload(commitOp.payload());
return KeyEntry.of(
buildIdentifiedKey(key, index, contentType, contentIdString, x -> null));
} catch (ObjNotFoundException e) {
throw new RuntimeException("Could not fetch or map content", e);
}
},
keyPredicate,
stopPredicate) {
};

// "Fetch content" iterator - same as the "base" iterator when not fetching the content,
// fetches contents in batches if 'withContent == true'.
Iterator<ContentKeyWithCommitOp> fetchContent =
withContent
? new AbstractIterator<>() {
final List<ContentKeyWithCommitOp> batch =
new ArrayList<>(GET_KEYS_CONTENT_BATCH_SIZE);

Iterator<ContentKeyWithCommitOp> current;

@CheckForNull
@Override
protected ContentKeyWithCommitOp computeNext() {
Iterator<ContentKeyWithCommitOp> c = current;
if (c != null && c.hasNext()) {
return c.next();
}

for (int i = 0; i < GET_KEYS_CONTENT_BATCH_SIZE; i++) {
if (!keyAndOp.hasNext()) {
break;
}
batch.add(keyAndOp.next());
}

if (batch.isEmpty()) {
current = null;
return endOfData();
}

try {
Map<ContentKey, Content> contents =
contentMapping.fetchContents(
index, batch.stream().map(op -> op.key).collect(Collectors.toList()));
for (ContentKeyWithCommitOp op : batch) {
op.content = contents.get(op.key);
}
} catch (ObjNotFoundException e) {
throw new RuntimeException("Could not fetch or map content", e);
}
current = new ArrayList<>(batch).iterator();
batch.clear();
return current.next();
}
}
: keyAndOp;

// "Final" iterator, adding functionality for paging. Needs to be a separate instance, because
// we cannot use the "base" iterator to provide the token for the "current" entry.
return new PaginationIterator<>() {
ContentKeyWithCommitOp current;

@Override
protected String computeTokenForCurrent() {
StoreIndexElement<CommitOp> c = current();
return c != null ? token(c.key()) : null;
public String tokenForCurrent() {
return token(current.storeKey);
}

@Override
Expand All @@ -629,9 +673,60 @@ public String tokenForEntry(KeyEntry entry) {
private String token(StoreKey storeKey) {
return pagingToken(copyFromUtf8(storeKey.rawString())).asString();
}

@Override
public boolean hasNext() {
return fetchContent.hasNext();
}

@Override
public KeyEntry next() {
ContentKeyWithCommitOp c = current = fetchContent.next();

if (c.content != null) {
return KeyEntry.of(buildIdentifiedKey(c.key, index, c.content, x -> null), c.content);
}

try {
UUID contentId = c.commitOp.contentId();
String contentIdString;
if (contentId == null) {
// this should only be hit by imported legacy nessie repos
if (c.content == null) {
c.content =
contentMapping.fetchContent(
requireNonNull(c.commitOp.value(), "Required value pointer is null"));
}
contentIdString = c.content.getId();
} else {
contentIdString = contentId.toString();
}
Content.Type contentType = contentTypeForPayload(c.commitOp.payload());
return KeyEntry.of(
buildIdentifiedKey(c.key, index, contentType, contentIdString, x -> null));
} catch (ObjNotFoundException e) {
throw new RuntimeException("Could not fetch or map content", e);
}
}

@Override
public void close() {}
};
}

static final class ContentKeyWithCommitOp {
final StoreKey storeKey;
final ContentKey key;
final CommitOp commitOp;
Content content;

ContentKeyWithCommitOp(StoreKey storeKey, ContentKey key, CommitOp commitOp) {
this.storeKey = storeKey;
this.key = key;
this.commitOp = commitOp;
}
}

@Override
public ContentResult getValue(Ref ref, ContentKey key) throws ReferenceNotFoundException {
RefMapping refMapping = new RefMapping(persist);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,22 @@

import static com.google.common.collect.Lists.newArrayList;
import static org.assertj.core.api.Assumptions.assumeThat;
import static org.assertj.core.groups.Tuple.tuple;
import static org.projectnessie.model.IdentifiedContentKey.IdentifiedElement.identifiedElement;
import static org.projectnessie.versioned.VersionStore.KeyRestrictions.NO_KEY_RESTRICTIONS;
import static org.projectnessie.versioned.testworker.OnRefOnly.newOnRef;

import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import org.assertj.core.api.SoftAssertions;
import org.assertj.core.api.junit.jupiter.InjectSoftAssertions;
import org.assertj.core.api.junit.jupiter.SoftAssertionsExtension;
import org.assertj.core.groups.Tuple;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import org.projectnessie.model.ContentKey;
import org.projectnessie.model.IdentifiedContentKey;
import org.projectnessie.model.Namespace;
Expand All @@ -39,6 +44,7 @@
import org.projectnessie.versioned.VersionStore;
import org.projectnessie.versioned.VersionStore.KeyRestrictions;
import org.projectnessie.versioned.paging.PaginationIterator;
import org.projectnessie.versioned.testworker.OnRefOnly;

@ExtendWith(SoftAssertionsExtension.class)
public abstract class AbstractEntries extends AbstractNestedVersionStore {
Expand Down Expand Up @@ -163,6 +169,37 @@ List<KeyEntry> keysAsList(Ref ref, KeyRestrictions keyRestrictions) throws Excep
}
}

@ParameterizedTest
@ValueSource(ints = {0, 1, 49, 50, 51, 100, 101})
void getKeys(int numKeys) throws Exception {
assumeThat(isNewStorageModel()).isTrue();

BranchName branch = BranchName.of("foo");
Hash head = store().create(branch, Optional.empty()).getHash();

List<Tuple> expected = new ArrayList<>();
if (numKeys > 0) {
CommitBuilder commit = commit("Initial Commit");
for (int i = 0; i < numKeys; i++) {
ContentKey key = ContentKey.of("key" + i);
OnRefOnly value = newOnRef("key" + i);
commit.put(key, value);
expected.add(tuple(key, value));
}
head = commit.toBranch(branch);
}

try (PaginationIterator<KeyEntry> iter =
store().getKeys(head, null, true, NO_KEY_RESTRICTIONS)) {
soft.assertThat(iter)
.toIterable()
.extracting(
keyEntry -> keyEntry.getKey().contentKey(),
keyEntry -> keyEntry.getContent().withId(null))
.containsExactlyInAnyOrderElementsOf(expected);
}
}

@Test
void entries() throws Exception {
BranchName branch = BranchName.of("foo");
Expand All @@ -185,15 +222,15 @@ void entries() throws Exception {
ContentResult content23a = store().getValue(commit, key23a);

try (PaginationIterator<KeyEntry> iter =
store.getKeys(commit, null, false, NO_KEY_RESTRICTIONS)) {
store.getKeys(commit, null, true, NO_KEY_RESTRICTIONS)) {
soft.assertThat(iter)
.toIterable()
.extracting(KeyEntry::getKey)
.extracting(KeyEntry::getKey, KeyEntry::getContent)
.containsExactlyInAnyOrder(
content2.identifiedKey(),
content2a.identifiedKey(),
content23.identifiedKey(),
content23a.identifiedKey());
tuple(content2.identifiedKey(), content2.content()),
tuple(content2a.identifiedKey(), content2a.content()),
tuple(content23.identifiedKey(), content23.content()),
tuple(content23a.identifiedKey(), content23a.content()));
}

if (isNewStorageModel()) {
Expand Down

0 comments on commit 751874a

Please sign in to comment.