Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Events notification system for Nessie - VersionStore changes #6647

Merged
merged 26 commits into from
May 5, 2023
Merged
Show file tree
Hide file tree
Changes from 22 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,11 @@
import org.projectnessie.quarkus.cli.ExportRepository.Format;
import org.projectnessie.versioned.BranchName;
import org.projectnessie.versioned.CommitMetaSerializer;
import org.projectnessie.versioned.Hash;
import org.projectnessie.versioned.CommitResult;
import org.projectnessie.versioned.ReferenceAlreadyExistsException;
import org.projectnessie.versioned.ReferenceConflictException;
import org.projectnessie.versioned.ReferenceNotFoundException;
import org.projectnessie.versioned.persist.adapter.CommitLogEntry;
import org.projectnessie.versioned.persist.adapter.ContentId;
import org.projectnessie.versioned.persist.adapter.DatabaseAdapter;
import org.projectnessie.versioned.persist.adapter.ImmutableCommitParams;
Expand Down Expand Up @@ -232,7 +233,7 @@ private static void populateRepository(DatabaseAdapter adapter)
ContentKey key = ContentKey.of("namespace123", "table123");
String namespaceId = UUID.randomUUID().toString();
String tableId = UUID.randomUUID().toString();
Hash main =
CommitResult<CommitLogEntry> main =
adapter.commit(
ImmutableCommitParams.builder()
.toBranch(branchMain)
Expand All @@ -256,7 +257,7 @@ private static void populateRepository(DatabaseAdapter adapter)
.toStoreOnReferenceState(
IcebergTable.of("meta", 42, 43, 44, 45, tableId))))
.build());
adapter.create(branchFoo, main);
adapter.create(branchFoo, main.getCommitHash());
adapter.commit(
ImmutableCommitParams.builder()
.toBranch(branchFoo)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,8 @@
import org.projectnessie.versioned.storage.common.indexes.StoreKey;
import org.projectnessie.versioned.storage.common.logic.CommitLogic;
import org.projectnessie.versioned.storage.common.logic.ReferenceLogic;
import org.projectnessie.versioned.storage.common.objtypes.CommitObj;
import org.projectnessie.versioned.storage.common.objtypes.ContentValueObj;
import org.projectnessie.versioned.storage.common.persist.ObjId;
import org.projectnessie.versioned.storage.common.persist.Persist;
import org.projectnessie.versioned.storage.common.persist.Reference;
import org.projectnessie.versioned.store.DefaultStoreWorker;
Expand Down Expand Up @@ -251,7 +251,7 @@ private void populateRepository(Persist persist) throws Exception {

soft.assertThat(persist.storeObj(valueMain)).isTrue();
StoreKey key = key("namespace123", "table123");
ObjId main =
CommitObj main =
commitLogic.doCommit(
newCommitBuilder()
.parentCommitId(EMPTY_OBJ_ID)
Expand All @@ -260,14 +260,14 @@ private void populateRepository(Persist persist) throws Exception {
.headers(EMPTY_COMMIT_HEADERS)
.build(),
emptyList());
referenceLogic.assignReference(refMain, requireNonNull(main));
referenceLogic.assignReference(refMain, requireNonNull(main).id());

Reference refFoo = referenceLogic.createReference("refs/heads/branch-foo", main);
Reference refFoo = referenceLogic.createReference("refs/heads/branch-foo", main.id());
soft.assertThat(persist.storeObj(valueFoo)).isTrue();
ObjId foo =
CommitObj foo =
commitLogic.doCommit(
newCommitBuilder()
.parentCommitId(main)
.parentCommitId(main.id())
.addAdds(
commitAdd(
key,
Expand All @@ -279,6 +279,6 @@ private void populateRepository(Persist persist) throws Exception {
.headers(EMPTY_COMMIT_HEADERS)
.build(),
emptyList());
referenceLogic.assignReference(refFoo, requireNonNull(foo));
referenceLogic.assignReference(refFoo, requireNonNull(foo).id());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -54,11 +54,13 @@ public void testNoAncestorHash(LaunchResult result, DatabaseAdapter adapter) {
public void testMainHash(QuarkusMainLauncher launcher, DatabaseAdapter adapter)
throws ReferenceNotFoundException, ReferenceConflictException {
Hash hash =
adapter.commit(
ImmutableCommitParams.builder()
.toBranch(BranchName.of("main"))
.commitMetaSerialized(ByteString.copyFrom(new byte[] {1, 2, 3}))
.build());
adapter
.commit(
ImmutableCommitParams.builder()
.toBranch(BranchName.of("main"))
.commitMetaSerialized(ByteString.copyFrom(new byte[] {1, 2, 3}))
.build())
.getCommitHash();

LaunchResult result = launcher.launch("info");
assertThat(result.getOutput()).contains(hash.asString());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
import org.projectnessie.versioned.storage.common.exceptions.ObjNotFoundException;
import org.projectnessie.versioned.storage.common.exceptions.RefConditionFailedException;
import org.projectnessie.versioned.storage.common.exceptions.RefNotFoundException;
import org.projectnessie.versioned.storage.common.persist.ObjId;
import org.projectnessie.versioned.storage.common.objtypes.CommitObj;
import org.projectnessie.versioned.storage.common.persist.Persist;

@QuarkusMainTest
Expand Down Expand Up @@ -71,7 +71,7 @@ public void testMainHash(QuarkusMainLauncher launcher, Persist persist)
ObjNotFoundException,
RefNotFoundException,
RefConditionFailedException {
ObjId head =
CommitObj head =
commitLogic(persist)
.doCommit(
newCommitBuilder()
Expand All @@ -81,12 +81,13 @@ public void testMainHash(QuarkusMainLauncher launcher, Persist persist)
.build(),
emptyList());
referenceLogic(persist)
.assignReference(reference("refs/heads/main", EMPTY_OBJ_ID, false), requireNonNull(head));
.assignReference(
reference("refs/heads/main", EMPTY_OBJ_ID, false), requireNonNull(head).id());

LaunchResult result = launcher.launch("info");
assertThat(result.getOutput())
.contains("Repository created:")
.contains("Default branch head commit ID: " + head)
.contains("Default branch head commit ID: " + head.id())
.contains("Default branch commit count: 1")
.contains("Version-store type: " + MONGODB.name())
.contains("Default branch: main")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -365,6 +365,7 @@ private Hash commit(
.rewriteSingle(CommitMeta.fromMessage(commitMsg)),
Collections.singletonList(contentOperation),
validator,
(k, c) -> {});
(k, c) -> {})
.getCommitHash();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,7 @@ public Reference createReference(
check.checkAndThrow();

try {
Hash hash = getStore().create(namedReference, toHash(targetHash, false));
Hash hash = getStore().create(namedReference, toHash(targetHash, false)).getHash();
return RefUtil.toReference(namedReference, hash);
} catch (ReferenceNotFoundException e) {
throw new NessieReferenceNotFoundException(e.getMessage(), e);
Expand Down Expand Up @@ -346,7 +346,7 @@ public Reference deleteReference(

startAccessCheck().canDeleteReference(ref).checkAndThrow();

Hash deletedAthash = getStore().delete(ref, toHash(expectedHash, true));
Hash deletedAthash = getStore().delete(ref, toHash(expectedHash, true)).getHash();
return RefUtil.toReference(ref, deletedAthash);
} catch (ReferenceNotFoundException e) {
throw new NessieReferenceNotFoundException(e.getMessage(), e);
Expand Down Expand Up @@ -569,8 +569,10 @@ public MergeResponse transplantCommitsIntoBranch(

BranchName targetBranch = BranchName.of(branchName);
String lastHash = hashesToTransplant.get(hashesToTransplant.size() - 1);
WithHash<NamedRef> namedRefWithHash = namedRefWithHashOrThrow(fromRefName, lastHash);

startAccessCheck()
.canViewReference(namedRefWithHashOrThrow(fromRefName, lastHash).getValue())
.canViewReference(namedRefWithHash.getValue())
.canCommitChangeAgainstReference(targetBranch)
.checkAndThrow();

Expand All @@ -590,6 +592,7 @@ public MergeResponse transplantCommitsIntoBranch(
MergeResult<Commit> result =
getStore()
.transplant(
namedRefWithHash.getValue(),
targetBranch,
into,
transplants,
Expand Down Expand Up @@ -641,8 +644,9 @@ public MergeResponse mergeRefIntoBranch(
validateCommitMeta(commitMeta);

BranchName targetBranch = BranchName.of(branchName);
WithHash<NamedRef> namedRefWithHash = namedRefWithHashOrThrow(fromRefName, fromHash);
startAccessCheck()
.canViewReference(namedRefWithHashOrThrow(fromRefName, fromHash).getValue())
.canViewReference(namedRefWithHash.getValue())
.canCommitChangeAgainstReference(targetBranch)
.checkAndThrow();

Expand All @@ -652,6 +656,7 @@ public MergeResponse mergeRefIntoBranch(
MergeResult<Commit> result =
getStore()
.merge(
namedRefWithHash.getValue(),
toHash(fromRefName, fromHash),
targetBranch,
into,
Expand Down Expand Up @@ -950,7 +955,8 @@ public CommitResponse commitMultipleOperations(
() -> null,
(key, cid) -> {
commitResponse.addAddedContents(addedContent(key, cid));
});
})
.getCommitHash();

return commitResponse.targetBranch(Branch.of(branch, newHash.asString())).build();
} catch (ReferenceNotFoundException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,15 @@
import org.projectnessie.model.ContentKey;
import org.projectnessie.nessie.relocated.protobuf.ByteString;
import org.projectnessie.versioned.Hash;
import org.projectnessie.versioned.Hashable;

/** Represents a commit-log-entry stored in the database. */
@Value.Immutable
public interface CommitLogEntry {
public interface CommitLogEntry extends Hashable {
/** Creation timestamp in microseconds since epoch. */
long getCreatedTime();

@Override
Hash getHash();

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,18 @@
import javax.annotation.Nonnull;
import org.projectnessie.model.ContentKey;
import org.projectnessie.nessie.relocated.protobuf.ByteString;
import org.projectnessie.versioned.CommitResult;
import org.projectnessie.versioned.Diff;
import org.projectnessie.versioned.GetNamedRefsParams;
import org.projectnessie.versioned.Hash;
import org.projectnessie.versioned.MergeResult;
import org.projectnessie.versioned.NamedRef;
import org.projectnessie.versioned.RefLogNotFoundException;
import org.projectnessie.versioned.ReferenceAlreadyExistsException;
import org.projectnessie.versioned.ReferenceAssignedResult;
import org.projectnessie.versioned.ReferenceConflictException;
import org.projectnessie.versioned.ReferenceCreatedResult;
import org.projectnessie.versioned.ReferenceDeletedResult;
import org.projectnessie.versioned.ReferenceInfo;
import org.projectnessie.versioned.ReferenceNotFoundException;

Expand Down Expand Up @@ -148,7 +152,7 @@ Stream<KeyListEntry> keys(Hash commit, KeyFilterPredicate keyFilter)
* branch due to a conflicting change or if the expected hash in {@link
* CommitParams#getToBranch()}is not its expected hEAD
*/
Hash commit(CommitParams commitParams)
CommitResult<CommitLogEntry> commit(CommitParams commitParams)
throws ReferenceConflictException, ReferenceNotFoundException;

/**
Expand Down Expand Up @@ -218,11 +222,11 @@ Stream<ReferenceInfo<ByteString>> namedRefs(GetNamedRefsParams params)
* @param target The already existing named reference with an optional hash on that branch. This
* parameter can be {@code null} for the edge case when the default branch is re-created after
* it has been dropped.
* @return the current HEAD of the created branch or tag
* @return A {@link ReferenceCreatedResult} containing the head of the created reference
* @throws ReferenceAlreadyExistsException if the reference {@code ref} already exists.
* @throws ReferenceNotFoundException if {@code target} does not exist.
*/
Hash create(NamedRef ref, Hash target)
ReferenceCreatedResult create(NamedRef ref, Hash target)
throws ReferenceAlreadyExistsException, ReferenceNotFoundException;

/**
Expand All @@ -231,12 +235,12 @@ Hash create(NamedRef ref, Hash target)
* @param reference named-reference to delete. If a value for the hash is specified, it must be
* equal to the current HEAD.
* @param expectedHead if present, {@code reference}'s current HEAD must be equal to this value
* @return head of deleted reference
* @return A {@link ReferenceDeletedResult} containing the head of the deleted reference
* @throws ReferenceNotFoundException if the named reference in {@code reference} does not exist.
* @throws ReferenceConflictException if the named reference's HEAD is not equal to the expected
* HEAD
*/
Hash delete(NamedRef reference, Optional<Hash> expectedHead)
ReferenceDeletedResult delete(NamedRef reference, Optional<Hash> expectedHead)
throws ReferenceNotFoundException, ReferenceConflictException;

/**
Expand All @@ -245,13 +249,15 @@ Hash delete(NamedRef reference, Optional<Hash> expectedHead)
* @param assignee named reference to re-assign
* @param expectedHead if present, {@code assignee}'s current HEAD must be equal to this value
* @param assignTo commit to update {@code assignee}'s HEAD to
* @return A {@link ReferenceAssignedResult} containing the previous and current head of the
* reference
* @throws ReferenceNotFoundException if either the named reference in {@code assignTo} or the
* commit on that reference, if specified, does not exist or if the named reference specified
* in {@code assignee} does not exist.
* @throws ReferenceConflictException if the HEAD of the named reference {@code assignee} is not
* equal to the expected HEAD
*/
void assign(NamedRef assignee, Optional<Hash> expectedHead, Hash assignTo)
ReferenceAssignedResult assign(NamedRef assignee, Optional<Hash> expectedHead, Hash assignTo)
throws ReferenceNotFoundException, ReferenceConflictException;

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,13 @@
import org.projectnessie.nessie.relocated.protobuf.ByteString;
import org.projectnessie.versioned.MergeType;
import org.projectnessie.versioned.MetadataRewriter;
import org.projectnessie.versioned.NamedRef;

public interface MetadataRewriteParams extends ToBranchParams {

/** Ref to merge or transplant from. */
NamedRef getFromRef();

/** Whether to keep the individual commits and do not squash the commits to merge. */
@Value.Default
default boolean keepIndividualCommits() {
Expand All @@ -47,6 +51,8 @@ default MergeType getDefaultMergeType() {

@SuppressWarnings({"override", "UnusedReturnValue"})
interface Builder<B> extends ToBranchParams.Builder<B> {
B fromRef(NamedRef fromBranch);

B keepIndividualCommits(boolean keepIndividualCommits);

B defaultMergeType(MergeType defaultMergeType);
Expand Down