Skip to content
This repository has been archived by the owner on Jun 7, 2024. It is now read-only.

Commit

Permalink
ARUHA-1693: improced deletion logic;
Browse files Browse the repository at this point in the history
  • Loading branch information
v-stepanov committed Jun 25, 2018
1 parent ba36bcf commit 07427f7
Showing 1 changed file with 18 additions and 31 deletions.
49 changes: 18 additions & 31 deletions src/main/java/org/zalando/nakadi/service/EventTypeService.java
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
package org.zalando.nakadi.service;

import com.google.common.collect.HashMultimap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Multimap;
import org.echocat.jomon.runtime.concurrent.RetryForSpecifiedTimeStrategy;
import org.everit.json.schema.Schema;
import org.everit.json.schema.SchemaException;
import org.everit.json.schema.loader.SchemaLoader;
Expand All @@ -14,6 +12,7 @@
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import org.springframework.transaction.TransactionException;
import org.springframework.transaction.support.TransactionTemplate;
import org.zalando.nakadi.config.NakadiSettings;
import org.zalando.nakadi.domain.CompatibilityMode;
Expand Down Expand Up @@ -70,7 +69,6 @@
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;

import static org.echocat.jomon.runtime.concurrent.Retryer.executeWithRetry;
import static org.zalando.nakadi.service.FeatureToggleService.Feature.CHECK_PARTITIONS_KEYS;
import static org.zalando.nakadi.service.FeatureToggleService.Feature.DELETE_EVENT_TYPE_WITH_SUBSCRIPTIONS;

Expand Down Expand Up @@ -269,35 +267,24 @@ private Multimap<TopicRepository, String> deleteEventTypeIfNoSubscriptions(final
return transactionTemplate.execute(action -> deleteEventType(eventType));
}

@SuppressWarnings("unchecked")
private Multimap<TopicRepository, String> deleteEventTypeWithSubscriptions(final String eventType) {
return executeWithRetry(() -> {
return transactionTemplate.execute(action -> {
final List<Subscription> subscriptions = subscriptionRepository.listSubscriptions(
ImmutableSet.of(eventType), Optional.empty(), 0, 100000);
subscriptions.forEach(s -> {
try {
subscriptionRepository.deleteSubscription(s.getId());
} catch (final NoSuchSubscriptionException e) {
// probably somebody deleted subscription just now
LOG.warn("Subscription to be deleted is not found {}", s.getId());
}
});
try {
return deleteEventType(eventType);
} catch (final EventTypeDeletionException e) {
if (hasSubscriptions(eventType)) {
// somebody already created a new subscription before we deleted ET
// we need to return empty value that will cause a retry
return HashMultimap.<TopicRepository, String>create();
} else {
throw e;
}
}
});
},
new RetryForSpecifiedTimeStrategy<Multimap<TopicRepository, String>>(1000)
.withResultsThatForceRetry(HashMultimap.create()));
try {
return transactionTemplate.execute(action -> {
final List<Subscription> subscriptions = subscriptionRepository.listSubscriptions(
ImmutableSet.of(eventType), Optional.empty(), 0, 100000);
subscriptions.forEach(s -> {
try {
subscriptionRepository.deleteSubscription(s.getId());
} catch (final NoSuchSubscriptionException e) {
// should not happen as we are inside transaction
throw new InconsistentStateException("Subscription to be deleted is not found", e);
}
});
return deleteEventType(eventType);
});
} catch (final TransactionException e) {
throw new InconsistentStateException("Failed to delete event-type because of race condition in DB", e);
}
}

private boolean hasSubscriptions(final String eventTypeName) {
Expand Down

0 comments on commit 07427f7

Please sign in to comment.