Skip to content

Commit

Permalink
ISPN-14187 SoftIndexFileStore not removing log files with expiration
Browse files Browse the repository at this point in the history
* Unset schedule flag when completing file - set from expiration
* Make sure nextExpirationTime is set properly if stats already exist
  • Loading branch information
wburns committed Oct 5, 2022
1 parent d855ad4 commit a1efa76
Show file tree
Hide file tree
Showing 7 changed files with 406 additions and 124 deletions.
Expand Up @@ -13,6 +13,7 @@ public class ControlledTimeService extends DefaultTimeService {
private static final Log log = LogFactory.getLog(ControlledTimeService.class);

private final Object id;
private TimeService actualTimeService;
protected volatile long currentMillis;

public ControlledTimeService() {
Expand Down Expand Up @@ -51,6 +52,14 @@ public void advance(long delta, TimeUnit timeUnit) {
advance(timeUnit.toMillis(delta));
}

public TimeService getActualTimeService() {
return actualTimeService;
}

public void setActualTimeService(TimeService actualTimeService) {
this.actualTimeService = actualTimeService;
}

public synchronized void advance(long deltaMillis) {
if (deltaMillis <= 0) {
throw new IllegalArgumentException("Argument must be greater than 0");
Expand Down
32 changes: 28 additions & 4 deletions core/src/main/java/org/infinispan/persistence/sifs/Compactor.java
Expand Up @@ -119,6 +119,8 @@ public void free(int file, int size) {
public void completeFile(int file, int currentSize, long nextExpirationTime) {
Stats stats = getStats(file, currentSize, nextExpirationTime);
stats.setCompleted();
// It is possible this was a logFile that was compacted
stats.scheduled = false;
if (stats.readyToBeScheduled(compactionThreshold, stats.getFree())) {
schedule(file, stats);
}
Expand Down Expand Up @@ -199,6 +201,7 @@ private Stats getStats(int file, int currentSize, long expirationTime) {
if (fileSize >= 0) {
stats.setTotal(fileSize);
}
stats.setNextExpirationTime(ExpiryHelper.mostRecentExpirationTime(stats.nextExpirationTime, expirationTime));
}
return stats;
}
Expand Down Expand Up @@ -353,13 +356,18 @@ public void accept(Object o) throws Throwable {
// that can expire yet
// Note that log files do not set the expiration time, so it is always -1 in that case, but we still
// want to check just in case some files are expired there.
// Note that we when compacting an expired entry from the log file we first write to the compacted
// file and then notify the subscriber. Assuming the subscriber then invokes remove expired it
// will actually cause two writes for the same expired entry. This is required though in case if
// the entry is not removed from the listener as we don't want to keep returning the same entry
// to the listener that it has expired.
if (stats.markedForDeletion() || (!isLogFile && stats.nextExpirationTime == -1) || stats.nextExpirationTime > currentTimeMilliseconds) {
log.tracef("Skipping expiration for file %d since it is marked for deletion: %s or its expiration time %s is not yet",
(Object) fileId, stats.markedForDeletion(), stats.nextExpirationTime);
continue;
}
// Make sure we don't start another compactoin for this file while performing expiration
stats.scheduled = true;
// Make sure we don't start another compaction for this file while performing expiration
stats.setScheduled();
}
compactSingleFile(fileId, isLogFile, subscriber, currentTimeMilliseconds);
}
Expand Down Expand Up @@ -471,7 +479,7 @@ private void compactSingleFile(int scheduledFile, boolean isLogFile, CompactionE
} else if (entry.file == scheduledFile && entry.offset == ~scheduledOffset) {
// The temporary table doesn't know how many entries we have for a key, so we shouldn't truncate
// or drop
log.tracef("Key for %d:%d ignored as it was expired");
log.tracef("Key for %d:%d ignored as it was expired but was in temporary table");
scheduledOffset += header.totalLength();
continue;
} else {
Expand Down Expand Up @@ -663,7 +671,7 @@ private void compactSingleFile(int scheduledFile, boolean isLogFile, CompactionE
static class Stats {
private final AtomicInteger free;
private volatile int total;
private final long nextExpirationTime;
private volatile long nextExpirationTime;
/* File is not 'completed' when we have not loaded that yet completely.
Files created by log appender/compactor are completed as soon as it closes them.
File cannot be scheduled for compaction until it's completed.
Expand Down Expand Up @@ -699,6 +707,10 @@ public long getNextExpirationTime() {
return nextExpirationTime;
}

public void setNextExpirationTime(long nextExpirationTime) {
this.nextExpirationTime = nextExpirationTime;
}

public boolean readyToBeScheduled(double compactionThreshold, int free) {
int total = this.total;
return completed && !scheduled && total >= 0 && free >= total * compactionThreshold;
Expand Down Expand Up @@ -727,5 +739,17 @@ public void markForDeletion() {
public boolean markedForDeletion() {
return this.markedForDeletion;
}

@Override
public String toString() {
return "Stats{" +
"free=" + free +
", total=" + total +
", nextExpirationTime=" + nextExpirationTime +
", completed=" + completed +
", scheduled=" + scheduled +
", markedForDeletion=" + markedForDeletion +
'}';
}
}
}
Expand Up @@ -4,6 +4,8 @@
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;

import org.infinispan.commons.util.Util;

/**
* Helper for reading/writing entries into file.
*
Expand Down Expand Up @@ -47,7 +49,18 @@ public long getCreated() {
}

public long getLastUsed() {
return meta == null ? -1 :meta.getLastUsed();
return meta == null ? -1 : meta.getLastUsed();
}

@Override
public String toString() {
return "EntryRecord{" +
"header=" + header +
", key=" + Util.printArray(key) +
", value=" + Util.printArray(value) +
", meta=" + meta +
", internalMetadata=" + Util.printArray(internalMetadata) +
'}';
}

public EntryRecord loadMetadataAndValue(FileProvider.Handle handle, int offset, boolean saveValue) throws IOException {
Expand Down
Expand Up @@ -43,7 +43,7 @@ public class LogAppender implements Consumer<LogAppender.WriteOperation> {
private final java.nio.ByteBuffer REUSED_BUFFER = java.nio.ByteBuffer.allocate(EntryHeader.HEADER_SIZE_11_0);

// These variables are only ever read from the provided executor and rxjava guarantees visibility
// to it so they don't need to be volatile or synchronized
// to them, so they don't need to be volatile or synchronized
private int currentOffset = 0;
private long seqId = 0;
private int receivedCount = 0;
Expand Down

0 comments on commit a1efa76

Please sign in to comment.