Skip to content
This repository has been archived by the owner on Nov 14, 2024. It is now read-only.

Reduce allocations in VersionedEventStore#retentionEvents #7118

Merged
merged 2 commits into from
Jun 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ private LockWatchEventLog(ClientLockWatchSnapshot snapshot, CacheMetrics metrics

CacheUpdate processUpdate(LockWatchStateUpdate update) {
if (latestVersion.isEmpty()
|| !update.logId().equals(latestVersion.get().id())) {
|| !update.logId().equals(latestVersion.orElseThrow().id())) {
return update.accept(new NewLeaderVisitor());
} else {
return update.accept(new ProcessingVisitor());
Expand Down Expand Up @@ -134,11 +134,9 @@ private ClientLogEvents getEventsBetweenVersionsInternal(VersionBounds versionBo
+ "transactions"));
return ClientLogEvents.builder()
.clearCache(false)
.events(LockWatchEvents.builder()
.addAllEvents(eventStore.getEventsBetweenVersionsInclusive(
Optional.of(startVersion.get().version()),
versionBounds.endVersion().version()))
.build())
.events(LockWatchEvents.of(eventStore.getEventsBetweenVersionsInclusive(
Optional.of(startVersion.get().version()),
versionBounds.endVersion().version())))
.build();
}
}
Expand Down Expand Up @@ -174,8 +172,7 @@ private LockWatchEvent getCompressedSnapshot(VersionBounds versionBounds) {
long snapshotVersion = versionBounds.snapshotVersion();
Collection<LockWatchEvent> collapsibleEvents =
eventStore.getEventsBetweenVersionsInclusive(Optional.empty(), snapshotVersion);
LockWatchEvents events =
LockWatchEvents.builder().addAllEvents(collapsibleEvents).build();
LockWatchEvents events = LockWatchEvents.of(collapsibleEvents);

return LockWatchCreatedEvent.fromSnapshot(snapshot.getSnapshotWithEvents(events, versionBounds.leader()));
}
Expand Down Expand Up @@ -211,7 +208,7 @@ private LockWatchVersion createStartVersion(LockWatchVersion startVersion) {

private LockWatchVersion getLatestVersionAndVerify(LockWatchVersion endVersion) {
Preconditions.checkState(latestVersion.isPresent(), "Cannot get events when log does not know its version");
LockWatchVersion currentVersion = latestVersion.get();
LockWatchVersion currentVersion = latestVersion.orElseThrow();
Preconditions.checkArgument(
endVersion.version() <= currentVersion.version(),
"Transactions' view of the world is more up-to-date than the log");
Expand Down Expand Up @@ -243,9 +240,8 @@ private void processSuccessInternal(LockWatchStateUpdate.Success success) {
+ " should only happen very rarely.");
}

if (success.lastKnownVersion() > latestVersion.get().version()) {
LockWatchEvents events =
LockWatchEvents.builder().events(success.events()).build();
if (success.lastKnownVersion() > latestVersion.orElseThrow().version()) {
LockWatchEvents events = LockWatchEvents.of(success.events());
if (events.events().isEmpty()) {
throw new TransactionLockWatchFailedException("Success event has a later version than the current "
+ "version, but has no events to bridge the gap. The transaction should be retried, but this "
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package com.palantir.atlasdb.keyvalue.api.watch;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.Range;
import com.palantir.lock.watch.LockWatchEvent;
import com.palantir.lock.watch.LockWatchVersion;
Expand All @@ -27,6 +28,8 @@

@Value.Immutable
public interface LockWatchEvents {

@Value.Parameter
List<LockWatchEvent> events();

@Value.Derived
Expand Down Expand Up @@ -55,7 +58,7 @@ default void contiguousSequence() {
@Value.Check
default void rangeOnlyPresentIffEventsAre() {
if (events().isEmpty()) {
Preconditions.checkState(!versionRange().isPresent(), "Cannot have a version range with no events");
Preconditions.checkState(versionRange().isEmpty(), "Cannot have a version range with no events");
} else {
Preconditions.checkState(versionRange().isPresent(), "Non-empty events must have a version range");
}
Expand All @@ -67,7 +70,7 @@ default void assertNoEventsAreMissingAfterLatestVersion(Optional<LockWatchVersio
}

if (latestVersion.isPresent()) {
long firstVersion = versionRange().get().lowerEndpoint();
long firstVersion = versionRange().orElseThrow().lowerEndpoint();
Preconditions.checkArgument(
firstVersion <= latestVersion.get().version()
|| latestVersion.get().version() + 1 == firstVersion,
Expand All @@ -80,4 +83,12 @@ default void assertNoEventsAreMissingAfterLatestVersion(Optional<LockWatchVersio
static ImmutableLockWatchEvents.Builder builder() {
return ImmutableLockWatchEvents.builder();
}

static LockWatchEvents of(Iterable<LockWatchEvent> events) {
return ImmutableLockWatchEvents.of(events);
}

static LockWatchEvents empty() {
return of(ImmutableList.of());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,12 @@
import com.palantir.logsafe.UnsafeArg;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map.Entry;
import java.util.NavigableMap;
import java.util.Optional;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.stream.Collectors;
import java.util.stream.Stream;

final class VersionedEventStore {
private static final boolean INCLUSIVE = true;
Expand Down Expand Up @@ -60,41 +61,31 @@ Collection<LockWatchEvent> getEventsBetweenVersionsInclusive(Optional<Long> mayb
}

LockWatchEvents retentionEvents(Optional<Sequence> earliestSequenceToKeep) {
if (eventMap.size() < minEvents) {
return LockWatchEvents.builder().build();
int numToRetention = eventMap.size() - minEvents;
if (numToRetention <= 0) {
return LockWatchEvents.empty();
}

// Guarantees that we remove some events while still also potentially performing further retention - note
// that each call to retentionEventsInternal modifies eventMap.
Stream<LockWatchEvent> events = retentionEvents(numToRetention, earliestSequenceToKeep.orElse(MAX_VERSION));

// Guarantees that we remove some events while still also potentially performing further retention.
// Note that consuming elements from retentionEvents stream removes them from eventMap.
if (eventMap.size() > maxEvents) {
List<LockWatchEvent> overMaxSizeEvents = retentionEventsInternal(eventMap.size() - maxEvents, MAX_VERSION);
List<LockWatchEvent> restOfEvents =
retentionEventsInternal(eventMap.size() - minEvents, earliestSequenceToKeep.orElse(MAX_VERSION));
return ImmutableLockWatchEvents.builder()
.addAllEvents(overMaxSizeEvents)
.addAllEvents(restOfEvents)
.build();
} else {
return ImmutableLockWatchEvents.builder()
.addAllEvents(retentionEventsInternal(
eventMap.size() - minEvents, earliestSequenceToKeep.orElse(MAX_VERSION)))
.build();
Stream<LockWatchEvent> overMaxSizeEvents = retentionEvents(eventMap.size() - maxEvents, MAX_VERSION);
return ImmutableLockWatchEvents.of(Stream.concat(overMaxSizeEvents, events)
.collect(Collectors.toCollection(() -> new ArrayList<>(maxEvents))));
}
return ImmutableLockWatchEvents.of(events.collect(Collectors.toCollection(() -> new ArrayList<>(minEvents))));
}

private List<LockWatchEvent> retentionEventsInternal(int numToRetention, Sequence maxVersion) {
List<LockWatchEvent> events = new ArrayList<>(numToRetention);

private Stream<LockWatchEvent> retentionEvents(int numToRetention, Sequence maxVersion) {
// The correctness of this depends upon eventMap's entrySet returning entries in ascending sorted order.
eventMap.entrySet().stream()
.takeWhile(entry -> entry.getKey().value() < maxVersion.value())
return eventMap.headMap(maxVersion).entrySet().stream()
.limit(numToRetention)
.forEachOrdered(entry -> {
.map(entry -> {
eventMap.remove(entry.getKey());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like the original implementation also did this, but this arrangement of removing from the map seems worrying in terms of non-interference, especially since the entrySet() contract suggests that we shouldn't edit the map?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can revert this portion if desired.

eventMap is a ConcurrentSkipListMap whose entrySet() iterator/stream
is weakly consistent. We can safely remove as we traverse the events to retention.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd prefer to keep this as a stream as it avoid some of the Object[] allocations due to intermediate List and the semantics are the same

events.add(entry.getValue());
return entry.getValue();
});

return events;
}

boolean containsEntryLessThanOrEqualTo(long version) {
Expand Down
5 changes: 5 additions & 0 deletions changelog/@unreleased/pr-7118.v2.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
type: improvement
improvement:
description: Reduce allocations in VersionedEventStore#retentionEvents
links:
- https://github.com/palantir/atlasdb/pull/7118