Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Issue 3223: RetentionTest failure #3303

Merged
merged 1 commit into from
Jan 22, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,8 @@ public StreamCutService(final int bucketCount, String processId, final StreamMet

@Override
protected void doStart() {
Futures.allOf(IntStream.range(0, bucketCount).boxed().map(this::tryTakeOwnership).collect(Collectors.toList()))
streamMetadataStore.createBucketsRoot()
.thenCompose(v -> Futures.allOf(IntStream.range(0, bucketCount).boxed().map(this::tryTakeOwnership).collect(Collectors.toList())))
.thenAccept(x -> streamMetadataStore.registerBucketOwnershipListener(this))
.whenComplete((r, e) -> {
if (e != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@
import io.pravega.shared.metrics.OpStatsLogger;
import io.pravega.shared.metrics.StatsLogger;
import io.pravega.shared.metrics.StatsProvider;
import lombok.AccessLevel;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.commons.lang3.tuple.Pair;
Expand Down Expand Up @@ -79,6 +81,8 @@ public abstract class AbstractStreamMetadataStore implements StreamMetadataStore
private static final OpStatsLogger DELETE_STREAM = STATS_LOGGER.createStats(MetricsNames.DELETE_STREAM);
private final static String RESOURCE_PART_SEPARATOR = "_%_";

@VisibleForTesting
@Getter(AccessLevel.PACKAGE)
protected final int bucketCount;

private final LoadingCache<String, Scope> scopeCache;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -348,6 +348,11 @@ CompletableFuture<Void> recordLastStreamSegment(final String scope, final String
return CompletableFuture.completedFuture(null);
}

@Override
public CompletableFuture<Void> createBucketsRoot() {
return CompletableFuture.completedFuture(null);
}

private String scopedStreamName(final String scopeName, final String streamName) {
return new StringBuilder(scopeName).append("/").append(streamName).toString();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1084,4 +1084,9 @@ CompletableFuture<Void> completeCommitTransactions(final String scope, final Str
* @return CompletableFuture which indicates completion of processing.
*/
CompletableFuture<Void> deleteWaitingRequestConditionally(String scope, String stream, String processorName, OperationContext context, ScheduledExecutorService executor);

/**
* This method performs initialization tasks for the correct operation of services working on Stream buckets.
*/
CompletableFuture<Void> createBucketsRoot();
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import com.google.common.base.Preconditions;
import io.pravega.client.stream.RetentionPolicy;
import io.pravega.client.stream.impl.StreamImpl;
import io.pravega.common.Exceptions;
import io.pravega.common.concurrent.Futures;
import io.pravega.common.lang.AtomicInt96;
import io.pravega.common.lang.Int96;
Expand Down Expand Up @@ -68,9 +69,9 @@ class ZKStreamMetadataStore extends AbstractStreamMetadataStore implements AutoC
static final int COUNTER_RANGE = 10000;
static final String COUNTER_PATH = "/counter";
static final String DELETED_STREAMS_PATH = "/lastActiveStreamSegment/%s";
private static final String BUCKET_ROOT_PATH = "/buckets";
private static final String BUCKET_OWNERSHIP_PATH = BUCKET_ROOT_PATH + "/ownership";
private static final String BUCKET_PATH = BUCKET_ROOT_PATH + "/%d";
static final String BUCKET_ROOT_PATH = "/buckets";
static final String BUCKET_OWNERSHIP_PATH = BUCKET_ROOT_PATH + "/ownership";
static final String BUCKET_PATH = BUCKET_ROOT_PATH + "/%d";
private static final String RETENTION_PATH = BUCKET_PATH + "/%s";
private static final String TRANSACTION_ROOT_PATH = "/transactions";
private static final String COMPLETED_TXN_GC_NAME = "completedTxnGC";
Expand Down Expand Up @@ -460,6 +461,40 @@ CompletableFuture<Void> recordLastStreamSegment(final String scope, final String
});
}

/**
* When managing the Controller's metadata in Zookeeper, we explicitly create parent bucket zNodes (so they are of
* type "zNode"). Otherwise, they may be inadvertently created as Zookeeper "containers" by Curator. This would lead
* these zNodes to be candidates for automatic removal by Zookeeper if they become empty. For more information on
* the use of Zookeeper containers in Curator recipes, visit: https://curator.apache.org/curator-recipes/index.html
*/
@Override
public CompletableFuture<Void> createBucketsRoot() {
return initializeZNode(BUCKET_ROOT_PATH)
.thenCompose(v -> initializeZNode(BUCKET_OWNERSHIP_PATH))
.thenCompose(v -> {
List<CompletableFuture<Void>> initializationFutures = new ArrayList<>();
for (int bucket = 0; bucket < bucketCount; bucket++) {
final String bucketPath = String.format(BUCKET_PATH, bucket);
initializationFutures.add(initializeZNode(bucketPath));
}
return Futures.allOf(initializationFutures);
});
}

private CompletableFuture<Void> initializeZNode(String zNodePath) {
return storeHelper.createZNodeIfNotExist(zNodePath).handle(
(v, ex) -> {
if (ex == null) {
log.debug("Stream bucket correctly initialized: {}.", zNodePath);
} else if (Exceptions.unwrap(ex) instanceof StoreException.DataExistsException) {
log.debug("Stream bucket already initialized: {}.", zNodePath);
} else {
throw new CompletionException("Unexpected exception initializing Stream bucket.", ex);
}
return null;
});
}

private String encodedScopedStreamName(String scope, String stream) {
String scopedStreamName = getScopedStreamName(scope, stream);
return Base64.getEncoder().encodeToString(scopedStreamName.getBytes());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -408,6 +408,21 @@ public void testCommittedTxnGc() {
assertTrue(batches.contains(thirdBatch));
}

@Test
public void verifyBucketInitialization() {
ZKStoreHelper zkStoreHelper = new ZKStoreHelper(cli, executor);
// Verify that buckets are not initialized.
assertFalse(zkStoreHelper.checkExists(ZKStreamMetadataStore.BUCKET_ROOT_PATH).join());
// Execute the initialization of buckets in ZKStreamMetadataStore.
store.createBucketsRoot().join();
// Verify that the expected buckets are created after the execution of createBucketsRoot().
assertTrue(zkStoreHelper.checkExists(ZKStreamMetadataStore.BUCKET_ROOT_PATH).join());
assertTrue(zkStoreHelper.checkExists(ZKStreamMetadataStore.BUCKET_OWNERSHIP_PATH).join());
for (int i = 0; i < ((AbstractStreamMetadataStore) store).getBucketCount(); i++) {
assertTrue(zkStoreHelper.checkExists(String.format(ZKStreamMetadataStore.BUCKET_PATH, i)).join());
}
}

private CompletableFuture<TxnStatus> createAndCommitTxn(UUID txnId, String scope, String stream) {
return store.createTransaction(scope, stream, txnId, 100, 100, null, executor)
.thenCompose(x -> store.setState(scope, stream, State.COMMITTING_TXN, null, executor))
Expand Down