Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

New storage: allow deletion of multiple repositories #6758

Merged
merged 1 commit into from
May 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,12 @@
import static org.projectnessie.versioned.storage.cassandra.CassandraConstants.COL_REPO_ID;
import static org.projectnessie.versioned.storage.cassandra.CassandraConstants.CREATE_TABLE_OBJS;
import static org.projectnessie.versioned.storage.cassandra.CassandraConstants.CREATE_TABLE_REFS;
import static org.projectnessie.versioned.storage.cassandra.CassandraConstants.ERASE_OBJ;
import static org.projectnessie.versioned.storage.cassandra.CassandraConstants.ERASE_OBJS_SCAN;
import static org.projectnessie.versioned.storage.cassandra.CassandraConstants.ERASE_REF;
import static org.projectnessie.versioned.storage.cassandra.CassandraConstants.ERASE_REFS_SCAN;
import static org.projectnessie.versioned.storage.cassandra.CassandraConstants.MAX_CONCURRENT_BATCH_READS;
import static org.projectnessie.versioned.storage.cassandra.CassandraConstants.MAX_CONCURRENT_DELETES;
import static org.projectnessie.versioned.storage.cassandra.CassandraConstants.SELECT_BATCH_SIZE;
import static org.projectnessie.versioned.storage.cassandra.CassandraConstants.TABLE_OBJS;
import static org.projectnessie.versioned.storage.cassandra.CassandraConstants.TABLE_REFS;
Expand Down Expand Up @@ -438,4 +443,38 @@ public String configInfo() {
+ " DML timeout: "
+ config.dmlTimeout();
}

@Override
public void eraseRepositories(Set<String> repositoryIds) {
if (repositoryIds == null || repositoryIds.isEmpty()) {
return;
}

ArrayList<String> repoIdList = new ArrayList<>(repositoryIds);

try (LimitedConcurrentRequests requests =
new LimitedConcurrentRequests(MAX_CONCURRENT_DELETES)) {
for (Row row : execute(ERASE_REFS_SCAN, repoIdList)) {
String repoId = row.getString(0);
String ref = row.getString(1);
requests.submitted(executeAsync(ERASE_REF, repoId, ref));
}

for (Row row : execute(ERASE_OBJS_SCAN, repoIdList)) {
String repoId = row.getString(0);
String objId = row.getString(1);
requests.submitted(executeAsync(ERASE_OBJ, repoId, objId));
}
}
// We must ensure that the system clock advances a little, so that C*'s next write-timestamp
// does not collide with the write-timestamps of the DELETE statements above. Otherwise, the
// above DELETEs will silently "overrule" a following INSERT/UPDATE statement. In C*, if a
// DELETE and another INSERT/UPDATE have the same write-timestamp, the DELETE wins. This makes
// Nessie tests fail on machines that are "fast enough".
try {
Thread.sleep(2L);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -253,20 +253,24 @@ final class CassandraConstants {

static final String ERASE_OBJS_SCAN =
"SELECT "
+ COL_REPO_ID
+ ", "
+ COL_OBJ_ID
+ " FROM %s."
+ TABLE_OBJS
+ " WHERE "
+ COL_REPO_ID
+ "=? ALLOW FILTERING";
+ " IN ? ALLOW FILTERING";
static final String ERASE_REFS_SCAN =
"SELECT "
+ COL_REPO_ID
+ ", "
+ COL_REFS_NAME
+ " FROM %s."
+ TABLE_REFS
+ " WHERE "
+ COL_REPO_ID
+ "=? ALLOW FILTERING";
+ " IN ? ALLOW FILTERING";

static final String ERASE_OBJ = DELETE_OBJ;
static final String ERASE_REF =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkState;
import static java.util.Collections.singleton;
import static java.util.Collections.singletonList;
import static org.projectnessie.nessie.relocated.protobuf.UnsafeByteOperations.unsafeWrap;
import static org.projectnessie.versioned.storage.cassandra.CassandraConstants.ADD_REFERENCE;
Expand Down Expand Up @@ -49,10 +50,6 @@
import static org.projectnessie.versioned.storage.cassandra.CassandraConstants.COL_VALUE_DATA;
import static org.projectnessie.versioned.storage.cassandra.CassandraConstants.COL_VALUE_PAYLOAD;
import static org.projectnessie.versioned.storage.cassandra.CassandraConstants.DELETE_OBJ;
import static org.projectnessie.versioned.storage.cassandra.CassandraConstants.ERASE_OBJ;
import static org.projectnessie.versioned.storage.cassandra.CassandraConstants.ERASE_OBJS_SCAN;
import static org.projectnessie.versioned.storage.cassandra.CassandraConstants.ERASE_REF;
import static org.projectnessie.versioned.storage.cassandra.CassandraConstants.ERASE_REFS_SCAN;
import static org.projectnessie.versioned.storage.cassandra.CassandraConstants.FETCH_OBJ_TYPE;
import static org.projectnessie.versioned.storage.cassandra.CassandraConstants.FIND_OBJS;
import static org.projectnessie.versioned.storage.cassandra.CassandraConstants.FIND_OBJS_TYPED;
Expand All @@ -65,7 +62,6 @@
import static org.projectnessie.versioned.storage.cassandra.CassandraConstants.INSERT_OBJ_TAG;
import static org.projectnessie.versioned.storage.cassandra.CassandraConstants.INSERT_OBJ_VALUE;
import static org.projectnessie.versioned.storage.cassandra.CassandraConstants.MARK_REFERENCE_AS_DELETED;
import static org.projectnessie.versioned.storage.cassandra.CassandraConstants.MAX_CONCURRENT_DELETES;
import static org.projectnessie.versioned.storage.cassandra.CassandraConstants.MAX_CONCURRENT_STORES;
import static org.projectnessie.versioned.storage.cassandra.CassandraConstants.PURGE_REFERENCE;
import static org.projectnessie.versioned.storage.cassandra.CassandraConstants.SCAN_OBJS;
Expand Down Expand Up @@ -509,29 +505,7 @@ public void deleteObjs(@Nonnull @jakarta.annotation.Nonnull ObjId[] ids) {

@Override
public void erase() {
try (LimitedConcurrentRequests requests =
new LimitedConcurrentRequests(MAX_CONCURRENT_DELETES)) {
String repoId = config.repositoryId();
for (Row row : backend.execute(ERASE_REFS_SCAN, repoId)) {
String ref = row.getString(0);
requests.submitted(backend.executeAsync(ERASE_REF, repoId, ref));
}

for (Row row : backend.execute(ERASE_OBJS_SCAN, repoId)) {
String objId = row.getString(0);
requests.submitted(backend.executeAsync(ERASE_OBJ, repoId, objId));
}
}
// We must ensure that the system clock advances a little, so that C*'s next write-timestamp
// does not collide with the write-timestamps of the DELETE statements above. Otherwise, the
// above DELETEs will silently "overrule" a following INSERT/UPDATE statement. In C*, if a
// DELETE and another INSERT/UPDATE have the same write-timestamp, the DELETE wins. This makes
// Nessie tests fail on machines that are "fast enough".
try {
Thread.sleep(2L);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
backend.eraseRepositories(singleton(config().repositoryId()));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
/*
* Copyright (C) 2022 Dremio
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.projectnessie.versioned.storage.commontests;

import static org.projectnessie.versioned.storage.common.logic.Logics.repositoryLogic;

import java.util.EnumSet;
import java.util.List;
import java.util.UUID;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.assertj.core.api.SoftAssertions;
import org.assertj.core.api.junit.jupiter.InjectSoftAssertions;
import org.assertj.core.api.junit.jupiter.SoftAssertionsExtension;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import org.projectnessie.versioned.storage.common.config.StoreConfig;
import org.projectnessie.versioned.storage.common.logic.RepositoryLogic;
import org.projectnessie.versioned.storage.common.persist.Backend;
import org.projectnessie.versioned.storage.common.persist.CloseableIterator;
import org.projectnessie.versioned.storage.common.persist.Obj;
import org.projectnessie.versioned.storage.common.persist.ObjType;
import org.projectnessie.versioned.storage.common.persist.Persist;
import org.projectnessie.versioned.storage.common.persist.PersistFactory;
import org.projectnessie.versioned.storage.testextension.NessiePersist;
import org.projectnessie.versioned.storage.testextension.PersistExtension;

/** Basic {@link Persist} tests to be run by every implementation. */
@ExtendWith({PersistExtension.class, SoftAssertionsExtension.class})
public class AbstractBackendRepositoryTests {
@InjectSoftAssertions protected SoftAssertions soft;

@NessiePersist protected Backend backend;
@NessiePersist protected PersistFactory persistFactory;

@Test
public void createEraseRepoViaPersist() {
Persist repo1 = newRepo();

RepositoryLogic repositoryLogic = repositoryLogic(repo1);
repositoryLogic.initialize("foo-main");
soft.assertThat(repositoryLogic.repositoryExists()).isTrue();
repo1.erase();
soft.assertThat(repositoryLogic.repositoryExists()).isFalse();
try (CloseableIterator<Obj> scan = repo1.scanAllObjects(EnumSet.allOf(ObjType.class))) {
soft.assertThat(scan).isExhausted();
}
}

@ParameterizedTest
@ValueSource(ints = {1, 3, 10})
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't we go above MAX_CONCURRENT_DELETES?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oops - saw it too late - #6768

public void createEraseManyRepos(int numRepos) {
List<Persist> repos =
IntStream.range(0, numRepos).mapToObj(x -> newRepo()).collect(Collectors.toList());

repos.forEach(r -> repositoryLogic(r).initialize("foo-meep"));
soft.assertThat(repos).allMatch(r -> repositoryLogic(r).repositoryExists());
backend.eraseRepositories(
repos.stream().map(r -> r.config().repositoryId()).collect(Collectors.toSet()));
soft.assertThat(repos).noneMatch(r -> repositoryLogic(r).repositoryExists());
soft.assertThat(repos)
.noneMatch(
r -> {
try (CloseableIterator<Obj> scan = r.scanAllObjects(EnumSet.allOf(ObjType.class))) {
return scan.hasNext();
}
});
}

private Persist newRepo() {
return persistFactory.newPersist(
StoreConfig.Adjustable.empty().withRepositoryId("repo-" + UUID.randomUUID()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,4 +38,8 @@ public class ReferencesLogicTests extends AbstractReferenceLogicTests {}
@Nested
@SuppressWarnings("ClassCanBeStatic")
public class RepositoryLogicTests extends AbstractRepositoryLogicTests {}

@Nested
@SuppressWarnings("ClassCanBeStatic")
public class BackendRepositoryTests extends AbstractBackendRepositoryTests {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
package org.projectnessie.versioned.storage.common.persist;

import java.util.Set;
import javax.annotation.Nonnull;

public interface Backend extends AutoCloseable {
Expand All @@ -26,4 +27,6 @@ public interface Backend extends AutoCloseable {
PersistFactory createFactory();

String configInfo();

void eraseRepositories(Set<String> repositoryIds);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
* Copyright (C) 2023 Dremio
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.projectnessie.versioned.storage.dynamodb;

import static java.util.Collections.singletonMap;
import static org.projectnessie.versioned.storage.dynamodb.DynamoDBConstants.BATCH_WRITE_MAX_REQUESTS;
import static org.projectnessie.versioned.storage.dynamodb.DynamoDBConstants.KEY_NAME;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
import software.amazon.awssdk.services.dynamodb.model.WriteRequest;

final class BatchWrite implements AutoCloseable {
private final DynamoDBBackend backend;
private final String tableName;
private final List<WriteRequest> requestItems = new ArrayList<>();

BatchWrite(DynamoDBBackend backend, String tableName) {
this.backend = backend;
this.tableName = tableName;
}

private void addRequest(WriteRequest.Builder request) {
requestItems.add(request.build());
if (requestItems.size() == BATCH_WRITE_MAX_REQUESTS) {
flush();
}
}

void addDelete(AttributeValue key) {
addRequest(WriteRequest.builder().deleteRequest(b -> b.key(singletonMap(KEY_NAME, key))));
}

public void addPut(Map<String, AttributeValue> item) {
addRequest(WriteRequest.builder().putRequest(b -> b.item(item)));
}

// close() is actually a flush, implementing AutoCloseable for easier use of BatchDelete using
// try-with-resources.
@Override
public void close() {
if (!requestItems.isEmpty()) {
flush();
}
}

private void flush() {
backend.client().batchWriteItem(b -> b.requestItems(singletonMap(tableName, requestItems)));
requestItems.clear();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,20 @@
import static org.projectnessie.versioned.storage.dynamodb.DynamoDBConstants.TABLE_REFS;

import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.annotation.Nonnull;
import org.projectnessie.versioned.storage.common.persist.Backend;
import org.projectnessie.versioned.storage.common.persist.PersistFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.services.dynamodb.DynamoDbClient;
import software.amazon.awssdk.services.dynamodb.model.AttributeDefinition;
import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
import software.amazon.awssdk.services.dynamodb.model.BillingMode;
import software.amazon.awssdk.services.dynamodb.model.ComparisonOperator;
import software.amazon.awssdk.services.dynamodb.model.Condition;
import software.amazon.awssdk.services.dynamodb.model.DescribeTableRequest;
import software.amazon.awssdk.services.dynamodb.model.DescribeTableResponse;
import software.amazon.awssdk.services.dynamodb.model.KeySchemaElement;
Expand Down Expand Up @@ -130,4 +136,39 @@ private static void verifyKeySchema(TableDescription description) {
public String configInfo() {
return "";
}

@Override
public void eraseRepositories(Set<String> repositoryIds) {
if (repositoryIds == null || repositoryIds.isEmpty()) {
return;
}

List<String> prefixed =
repositoryIds.stream().map(DynamoDBBackend::keyPrefix).collect(Collectors.toList());

@SuppressWarnings("resource")
DynamoDbClient c = client();

Stream.of(TABLE_REFS, TABLE_OBJS)
.forEach(
table -> {
try (BatchWrite batchWrite = new BatchWrite(this, table)) {
c.scanPaginator(b -> b.tableName(table))
.forEach(
r ->
r.items().stream()
.map(attrs -> attrs.get(KEY_NAME))
.filter(key -> prefixed.stream().anyMatch(key.s()::startsWith))
.forEach(batchWrite::addDelete));
}
});
}

static String keyPrefix(String repositoryId) {
return repositoryId + ':';
}

static Condition condition(ComparisonOperator operator, AttributeValue... values) {
return Condition.builder().comparisonOperator(operator).attributeValueList(values).build();
}
}