Skip to content

Commit

Permalink
re apache#3724: tablet transaction log
Browse files Browse the repository at this point in the history
* Added configuration for the max log size
* Added configuration for the action to take when out of sync
* Created intial handler function
  • Loading branch information
ivakegg committed Aug 26, 2023
1 parent 44a0ff0 commit 75e0858
Show file tree
Hide file tree
Showing 6 changed files with 222 additions and 122 deletions.
13 changes: 13 additions & 0 deletions core/src/main/java/org/apache/accumulo/core/conf/Property.java
Original file line number Diff line number Diff line change
Expand Up @@ -1164,6 +1164,7 @@ public enum Property {
+ " data will be reported but queries will still run possibly returning a"
+ " subset of the data.",
"1.3.5"),

TABLE_DEFAULT_SCANTIME_VISIBILITY("table.security.scan.visibility.default", "",
PropertyType.STRING,
"The security label that will be assumed at scan time if an entry does"
Expand Down Expand Up @@ -1191,6 +1192,18 @@ public enum Property {
"Determines whether index block cache is enabled for a table.", "1.3.5"),
TABLE_BLOCKCACHE_ENABLED("table.cache.block.enable", "false", PropertyType.BOOLEAN,
"Determines whether data block cache is enabled for a table.", "1.3.5"),
TABLE_OPERATION_LOG_MAX_SIZE("table.operation.log.max.size", "0", PropertyType.COUNT,
"The maximum number of logged operations logged in memory for diagnostic purposes. "
+ "The log will can be used for diagnosing if and when in-memory file list diverges from metadata.",
"2.1.3"),
TABLE_OPERATION_LOG_RECOVERY("tablet.operation.log.recovery.action", "log",
PropertyType.DATAFILE_RECOVERY_ACTION,
"The action to take when the in-memory file list is found to be different than the metadata. "
+ "1) log: simply log the operation log, "
+ "2) logsync: sync in-memory and metadata with the log, "
+ "3) metasync: sync memory with what the metadata holds, "
+ "4) memsyn: sync metadata with what in-memory holds.",
"2.1.3"),
TABLE_ITERATOR_PREFIX("table.iterator.", null, PropertyType.PREFIX,
"Properties in this category specify iterators that are applied at"
+ " various stages (scopes) of interaction with a table. These properties"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,10 @@ public enum PropertyType {
GC_POST_ACTION("gc_post_action", in(true, null, "none", "flush", "compact"),
"One of 'none', 'flush', or 'compact'."),

DATAFILE_RECOVERY_ACTION("datafile_recovery_action",
in(true, null, "log", "logsync", "metasync", "memsync"),
"One of 'log', 'logsync', 'metasync', 'memsync',"),

LAST_LOCATION_MODE("last_location_mode", in(true, null, "assignment", "compaction"),
"Defines how to update the last location. One of 'assignment', or 'compaction'."),

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,12 @@
import org.apache.accumulo.core.metadata.TabletFile;
import org.apache.accumulo.core.metadata.schema.DataFileValue;
import org.apache.accumulo.core.metadata.schema.ExternalCompactionId;
import org.apache.accumulo.core.metadata.schema.TabletMetadata;
import org.apache.accumulo.core.metadata.schema.TabletMetadata.Location;
import org.apache.accumulo.core.trace.TraceUtil;
import org.apache.accumulo.core.util.MapCounter;
import org.apache.accumulo.core.util.Pair;
import org.apache.accumulo.server.ServerContext;
import org.apache.accumulo.server.fs.VolumeManager;
import org.apache.accumulo.server.replication.proto.Replication.Status;
import org.apache.accumulo.server.util.ManagerMetadataUtil;
Expand Down Expand Up @@ -88,8 +90,8 @@ class DatafileManager {
this.tablet = tablet;
this.metadataUpdateCount =
new AtomicReference<>(new MetadataUpdateCount(tablet.getExtent(), 0L, 0L));
// TODO make the max size configurable
this.tabletLog = new DatafileTransactionLog(tablet.getExtent(), datafileSizes.keySet(), 1000);
this.tabletLog = new DatafileTransactionLog(tablet.getExtent(), datafileSizes.keySet(),
tablet.getTableConfiguration());
}

private final Set<TabletFile> filesToDeleteAfterScan = new HashSet<>();
Expand Down Expand Up @@ -580,8 +582,55 @@ public MetadataUpdateCount getUpdateCount() {
return metadataUpdateCount.get();
}

public DatafileTransactionLog getTransactionLog() {
return tabletLog;
public void handleMetadataDiff(ServerContext context) {
String action = tablet.getTableConfiguration().get(Property.TABLE_OPERATION_LOG_RECOVERY);
synchronized (tablet) {
// always log the operation log regardless of the requested action
log.error("Operation log: " + tabletLog.dumpLog());

// clear the log
tabletLog.flush(0);

// rescan for the tablet metadata
var tabletMeta =
context.getAmple().readTablet(tablet.getExtent(), TabletMetadata.ColumnType.FILES);
if (tabletMeta == null) {
String msg = "Tablet " + tablet.getExtent() + " not found in metadata";
log.error(msg);
} else {
Set<StoredTabletFile> metadata = new HashSet<>(tabletMeta.getFiles());
Set<StoredTabletFile> memory = new HashSet<>(datafileSizes.keySet());
Set<StoredTabletFile> expected = tabletLog.getExpectedFiles();

// verify we are still out of sync
if (metadata.equals(memory)) {
log.debug("Metadata and in-memory file list are back in-sync: " + metadata);
if (!expected.equals(memory)) {
log.error(
"Resetting operation log " + expected + " with metadata and memory " + memory);
tabletLog.reset(memory);
}
} else {
if (action.equals("logsync")) {
if (expected.equals(memory)) {
action = "metasync";
} else if (expected.equals(metadata)) {
action = "memsync";
} else {
log.error("Operation log " + expected + " does not agree with metadata " + metadata
+ " or memory " + memory);
tabletLog.reset(memory);
}
}

if (action.equals("metasync")) {
// TODO
} else if (action.equals("memsync")) {
// TODO
}
}
}
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -18,81 +18,86 @@
*/
package org.apache.accumulo.tserver.tablet;

import org.apache.accumulo.core.metadata.StoredTabletFile;

import java.time.Instant;
import java.util.Date;
import java.util.HashSet;
import java.util.Optional;
import java.util.Set;

import org.apache.accumulo.core.metadata.StoredTabletFile;

public abstract class DatafileTransaction {

protected final long ts = System.currentTimeMillis();
protected final long ts = System.currentTimeMillis();

public void apply(Set<StoredTabletFile> files) {}

public Date getDate() {
return Date.from(Instant.ofEpochSecond(ts));
}

public static class Compacted extends DatafileTransaction {
private final Set<StoredTabletFile> compactedFiles = new HashSet<>();
private final Optional<StoredTabletFile> destination;

public Compacted(Set<StoredTabletFile> files, Optional<StoredTabletFile> destination) {
this.compactedFiles.addAll(files);
this.destination = destination;
}

@Override
public void apply(Set<StoredTabletFile> files) {
files.removeAll(compactedFiles);
if (destination.isPresent()) {
files.add(destination.orElseThrow());
}
}

@Override
public String toString() {
return String.format("%s: Compacted %s into %s", getDate(), compactedFiles, destination);
}
}

static class Flushed extends DatafileTransaction {
private final Optional<StoredTabletFile> flushFile;

public Flushed(Optional<StoredTabletFile> flushFile) {
this.flushFile = flushFile;
}

public void apply(Set<StoredTabletFile> files) {}
public Flushed() {
this.flushFile = null;
}

public Date getDate() {
return Date.from(Instant.ofEpochSecond(ts));
@Override
public void apply(Set<StoredTabletFile> files) {
if (flushFile.isPresent()) {
files.add(flushFile.orElseThrow());
}
}

public static class Compacted extends DatafileTransaction {
private final Set<StoredTabletFile> compactedFiles = new HashSet<>();
private final Optional<StoredTabletFile> destination;
public Compacted(Set<StoredTabletFile> files, Optional<StoredTabletFile> destination) {
this.compactedFiles.addAll(files);
this.destination = destination;
}

@Override
public void apply(Set<StoredTabletFile> files) {
files.removeAll(compactedFiles);
if (destination.isPresent()) {
files.add(destination.orElseThrow());
}
}

@Override
public String toString() {
return String.format("%s: Compacted %s into %s", getDate(), compactedFiles, destination);
}
@Override
public String toString() {
return String.format("%s: Flushed into %s", getDate(), flushFile);
}
}

static class BulkImported extends DatafileTransaction {
private final StoredTabletFile importFile;

public BulkImported(StoredTabletFile importFile) {
this.importFile = importFile;
}

static class Flushed extends DatafileTransaction {
private final Optional<StoredTabletFile> flushFile;
public Flushed(Optional<StoredTabletFile> flushFile) {
this.flushFile = flushFile;
}
public Flushed() {
this.flushFile = null;
}
@Override
public void apply(Set<StoredTabletFile> files) {
if (flushFile.isPresent()) {
files.add(flushFile.orElseThrow());
}
}

@Override
public String toString() {
return String.format("%s: Flushed into %s", getDate(), flushFile);
}
@Override
public void apply(Set<StoredTabletFile> files) {
files.add(importFile);
}

static class BulkImported extends DatafileTransaction {
private final StoredTabletFile importFile;
public BulkImported(StoredTabletFile importFile) {
this.importFile = importFile;
}

@Override
public void apply(Set<StoredTabletFile> files) {
files.add(importFile);
}

@Override
public String toString() {
return String.format("%s: Imported %s", getDate(), importFile);
}
@Override
public String toString() {
return String.format("%s: Imported %s", getDate(), importFile);
}
}
}

0 comments on commit 75e0858

Please sign in to comment.