Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Backport] Use exporter positions to calculate compactable index in snapshots #4363

Merged
merged 1 commit into from
Apr 24, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,11 @@
import io.atomix.raft.storage.log.entry.RaftLogEntry;
import io.atomix.storage.journal.Indexed;
import io.atomix.utils.concurrent.ThreadContext;
import io.atomix.utils.logging.ContextualLoggerFactory;
import io.atomix.utils.logging.LoggerContext;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public final class ZeebeRaftStateMachine implements RaftStateMachine {
private final RaftContext raft;
Expand All @@ -38,7 +39,9 @@ public ZeebeRaftStateMachine(final RaftContext raft) {
this.reader = raft.getLog().openReader(1, RaftLogReader.Mode.COMMITS);

this.lastEnqueued = reader.getFirstIndex() - 1;
this.logger = LoggerFactory.getLogger(this.getClass());
this.logger =
ContextualLoggerFactory.getLogger(
getClass(), LoggerContext.builder(getClass()).add("partition", raft.getName()).build());
this.metrics = new RaftServiceMetrics(raft.getName());
}

Expand All @@ -56,15 +59,19 @@ public ThreadContext executor() {
@Override
public CompletableFuture<Void> compact() {
raft.checkThread();

final var log = raft.getLog();
if (log.isCompactable(compactableIndex)) {
final var index = log.getCompactableIndex(compactableIndex);
if (index > reader.getFirstIndex()) {
final var future = new CompletableFuture<Void>();
logger.debug("Compacting log up from {} up to {}", reader.getFirstIndex(), index);
compact(index, future);
return future;
}
final var future = new CompletableFuture<Void>();
logger.debug("Compacting log up from {} up to {}", reader.getFirstIndex(), index);
compact(index, future);
return future;
} else {
logger.debug(
"Skipping compaction of non-compactable index {} (first log index: {})",
compactableIndex,
reader.getFirstIndex());
}

return CompletableFuture.completedFuture(null);
Expand Down Expand Up @@ -99,9 +106,23 @@ public void close() {
reader.close();
}

private void compact(final long index, final CompletableFuture<Void> future) {
logger.debug("Compacting up to index {}", index);
@Override
public long getCompactableIndex() {
return compactableIndex;
}

@Override
public void setCompactableIndex(final long index) {
this.compactableIndex = index;
}

@Override
public long getCompactableTerm() {
throw new UnsupportedOperationException(
"getCompactableTerm is not required by this implementation");
}

private void compact(final long index, final CompletableFuture<Void> future) {
try {
final var startTime = System.currentTimeMillis();
raft.getLog().compact(index);
Expand Down Expand Up @@ -170,20 +191,4 @@ private <T> void applyIndexed(
// mark as applied regardless of result
raft.setLastApplied(indexed.index(), indexed.entry().term());
}

@Override
public void setCompactableIndex(final long index) {
this.compactableIndex = index;
}

@Override
public long getCompactableIndex() {
return compactableIndex;
}

@Override
public long getCompactableTerm() {
throw new UnsupportedOperationException(
"getCompactableTerm is not required by this implementation");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
*/
package io.zeebe.broker.clustering.atomix.storage;

import io.atomix.raft.storage.log.entry.RaftLogEntry;
import io.atomix.raft.zeebe.ZeebeEntry;
import io.atomix.storage.journal.Indexed;
import io.zeebe.protocol.record.Record;
Expand All @@ -18,7 +19,7 @@
*/
@FunctionalInterface
public interface AtomixRecordEntrySupplier extends AutoCloseable {
Optional<Indexed<ZeebeEntry>> getIndexedEntry(long position);
Optional<Indexed<? extends RaftLogEntry>> getIndexedEntry(long position);

@Override
default void close() {}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,25 +7,39 @@
*/
package io.zeebe.broker.clustering.atomix.storage.snapshot;

import io.atomix.raft.zeebe.ZeebeEntry;
import io.atomix.raft.storage.log.RaftLogReader;
import io.atomix.raft.storage.log.entry.RaftLogEntry;
import io.atomix.storage.journal.Indexed;
import io.zeebe.broker.clustering.atomix.storage.AtomixRecordEntrySupplier;
import io.zeebe.logstreams.storage.atomix.AtomixLogStorageReader;
import io.zeebe.logstreams.storage.atomix.ZeebeIndexMapping;
import java.util.Optional;

public final class AtomixRecordEntrySupplierImpl implements AtomixRecordEntrySupplier {
private final AtomixLogStorageReader reader;
private final ZeebeIndexMapping indexMapping;
private final RaftLogReader reader;

public AtomixRecordEntrySupplierImpl(final AtomixLogStorageReader reader) {
public AtomixRecordEntrySupplierImpl(
final ZeebeIndexMapping indexMapping, final RaftLogReader reader) {
this.indexMapping = indexMapping;
this.reader = reader;
}

@Override
public Optional<Indexed<ZeebeEntry>> getIndexedEntry(final long position) {
final var index = reader.lookUpApproximateAddress(position);
// since Atomix assumes that a snapshot for index Y means Y is processed, return for the
// previous index
return reader.findEntry(index - 1);
public Optional<Indexed<? extends RaftLogEntry>> getIndexedEntry(final long position) {
final var index = indexMapping.lookupPosition(position);
if (index == -1) {
return Optional.empty();
}

reader.reset(index - 1);
if (reader.hasNext()) {
final var indexedEntry = reader.next();
if (indexedEntry.index() < index) {
return Optional.of(indexedEntry);
}
}

return Optional.empty();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@
*/
package io.zeebe.broker.clustering.atomix.storage.snapshot;

import io.atomix.raft.storage.log.entry.RaftLogEntry;
import io.atomix.raft.storage.snapshot.SnapshotListener;
import io.atomix.raft.storage.snapshot.SnapshotStore;
import io.atomix.raft.zeebe.ZeebeEntry;
import io.atomix.storage.journal.Indexed;
import io.atomix.utils.time.WallClockTimestamp;
import io.zeebe.broker.clustering.atomix.storage.AtomixRecordEntrySupplier;
Expand Down Expand Up @@ -60,7 +60,13 @@ public AtomixSnapshotStorage(
@Override
public Optional<Snapshot> getPendingSnapshotFor(final long snapshotPosition) {
final var optionalIndexed = entrySupplier.getIndexedEntry(snapshotPosition);
return optionalIndexed.map(this::getSnapshot);

final Long previousSnapshotIndex =
getLatestSnapshot().map(Snapshot::getCompactionBound).orElse(-1L);

return optionalIndexed
.filter(indexed -> indexed.index() != previousSnapshotIndex)
.map(this::getSnapshot);
}

@Override
Expand Down Expand Up @@ -162,7 +168,7 @@ private Path getPendingDirectoryFor(final DbSnapshotMetadata metadata) {
return pendingDirectory.resolve(metadata.getFileName());
}

private Path getPendingDirectoryFor(final Indexed<ZeebeEntry> entry) {
private Path getPendingDirectoryFor(final Indexed<? extends RaftLogEntry> entry) {
final var metadata =
new DbSnapshotMetadata(
entry.index(),
Expand All @@ -186,7 +192,7 @@ private void observeExistingSnapshots() {
metrics.setSnapshotCount(snapshots.size());
}

private Snapshot getSnapshot(final Indexed<ZeebeEntry> indexed) {
private Snapshot getSnapshot(final Indexed<? extends RaftLogEntry> indexed) {
final var pending = getPendingDirectoryFor(indexed);
return new SnapshotImpl(indexed.index(), pending);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,77 +9,14 @@

import io.zeebe.broker.exporter.stream.ExportersState;
import io.zeebe.db.ZeebeDb;
import io.zeebe.engine.state.DefaultZeebeDbFactory;
import io.zeebe.engine.state.LastProcessedPositionState;
import io.zeebe.engine.state.ZbColumnFamilies;
import java.nio.file.Path;
import org.slf4j.Logger;

public class StatePositionSupplier {

private final Logger logger;

public StatePositionSupplier(final Logger log) {
this.logger = log;
}

public long getLowestPosition(final Path directory) {
long processedPosition = -1;
long exportedPosition = -1;

try (final var db = open(directory)) {
processedPosition = getLastProcessedPosition(directory, db);
exportedPosition = getMinimumExportedPosition(directory, db);
} catch (final Exception e) {
logger.error(
"Unexpected error occurred while obtaining the processed and exported position from the snapshot {}",
directory,
e);
}

return Math.min(processedPosition, exportedPosition);
}

public long getLastProcessedPosition(final Path directory) {
long processedPosition = -1;

try (final var db = open(directory)) {
processedPosition = getLastProcessedPosition(directory, db);
} catch (final Exception e) {
logger.error(
"Unexpected error occurred while obtaining the processed position from the given snapshot {}",
directory,
e);
}

return processedPosition;
}

private long getMinimumExportedPosition(
final Path directory, final ZeebeDb<ZbColumnFamilies> zeebeDb) {
public static long getHighestExportedPosition(final ZeebeDb zeebeDb) {
final ExportersState exporterState = new ExportersState(zeebeDb, zeebeDb.createContext());

if (exporterState.hasExporters()) {
final long lowestPosition = exporterState.getLowestPosition();
logger.debug("The lowest exported position for snapshot {} is {}", directory, lowestPosition);

return lowestPosition;
return exporterState.getLowestPosition();
} else {
logger.debug("No exporters present in snapshot {}", directory);
return Long.MAX_VALUE;
}
}

private long getLastProcessedPosition(
final Path directory, final ZeebeDb<ZbColumnFamilies> zeebeDb) {
final var lastProcessedState = new LastProcessedPositionState(zeebeDb, zeebeDb.createContext());
final long lowestPosition = lastProcessedState.getPosition();
logger.debug("The last processed position for snapshot {} is {}", directory, lowestPosition);
return lowestPosition;
}

private ZeebeDb<ZbColumnFamilies> open(final Path directory) {
return DefaultZeebeDbFactory.defaultFactory(ZbColumnFamilies.class)
.createDb(directory.toFile());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import io.zeebe.broker.logstreams.AtomixLogCompactor;
import io.zeebe.broker.logstreams.LogCompactor;
import io.zeebe.broker.logstreams.LogDeletionService;
import io.zeebe.broker.logstreams.state.StatePositionSupplier;
import io.zeebe.broker.system.configuration.BrokerCfg;
import io.zeebe.broker.system.configuration.DataCfg;
import io.zeebe.broker.system.monitoring.HealthMetrics;
Expand All @@ -44,7 +45,6 @@
import io.zeebe.logstreams.state.SnapshotStorage;
import io.zeebe.logstreams.state.StateSnapshotController;
import io.zeebe.logstreams.storage.atomix.AtomixLogStorage;
import io.zeebe.logstreams.storage.atomix.AtomixLogStorageReader;
import io.zeebe.logstreams.storage.atomix.ZeebeIndexMapping;
import io.zeebe.protocol.impl.encoding.BrokerInfo;
import io.zeebe.util.FileUtil;
Expand Down Expand Up @@ -424,23 +424,24 @@ private StateSnapshotController createSnapshotController() {
: new NoneSnapshotReplication();

return new StateSnapshotController(
DefaultZeebeDbFactory.DEFAULT_DB_FACTORY, snapshotStorage, stateReplication);
DefaultZeebeDbFactory.DEFAULT_DB_FACTORY,
snapshotStorage,
stateReplication,
StatePositionSupplier::getHighestExportedPosition);
}

// sonar warns that we should use AtomixRecordEntrySupplierImpl in a try-with-resources, which is
// not applicable here; it is safe to ignore as we will close the object once we close the storage
@SuppressWarnings("squid:S2095")
private SnapshotStorage createSnapshotStorage(final Path pendingDirectory) {
final var reader =
new AtomixLogStorageReader(
zeebeIndexMapping, atomixRaftPartition.getServer().openReader(-1, Mode.COMMITS));
final var reader = atomixRaftPartition.getServer().openReader(-1, Mode.COMMITS);
final var runtimeDirectory = atomixRaftPartition.dataDirectory().toPath().resolve("runtime");

return new AtomixSnapshotStorage(
runtimeDirectory,
pendingDirectory,
atomixRaftPartition.getServer().getSnapshotStore(),
new AtomixRecordEntrySupplierImpl(reader),
new AtomixRecordEntrySupplierImpl(zeebeIndexMapping, reader),
new SnapshotMetrics(partitionId));
}

Expand Down