Skip to content
Permalink
Browse files

fixes apache#1033 optimize default compaction strategy

  • Loading branch information...
keith-turner committed Mar 15, 2019
1 parent 77eacde commit c04a24022ea63c6a3068c24e4ffad71c252cc6e5
@@ -17,17 +17,115 @@
package org.apache.accumulo.tserver.compaction; package org.apache.accumulo.tserver.compaction;


import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.List; import java.util.List;
import java.util.Map;
import java.util.Map.Entry; import java.util.Map.Entry;
import java.util.TreeMap;
import java.util.TreeSet;


import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.metadata.schema.DataFileValue; import org.apache.accumulo.core.metadata.schema.DataFileValue;
import org.apache.accumulo.server.fs.FileRef; import org.apache.accumulo.server.fs.FileRef;


import com.google.common.base.Preconditions;

public class DefaultCompactionStrategy extends CompactionStrategy { public class DefaultCompactionStrategy extends CompactionStrategy {


/**
* Keeps track of the sum of the size of all files within a window. The files are sorted from
* largest to smallest. Supports efficiently creating sub windows, sliding the window, and
* shrinking the window.
*/
private static class SizeWindow {

List<CompactionFile> files;
long sum = 0;

int first;
int last;

SizeWindow() {}

SizeWindow(Map<FileRef,DataFileValue> allFiles) {
files = new ArrayList<>();
for (Entry<FileRef,DataFileValue> entry : allFiles.entrySet()) {
files.add(new CompactionFile(entry.getKey(), entry.getValue().getSize()));
}

Collections.sort(files, Comparator.reverseOrder());

for (CompactionFile file : files) {
sum += file.size;
}

first = 0;
last = files.size();
}

void pop() {
if (first >= last)
throw new IllegalStateException("Can not pop");

sum -= files.get(first).size;
first++;
}

long topSize() {
return files.get(first).size;
}

boolean slideUp() {
if (first == 0)
return false;

first--;
last--;

sum += files.get(first).size;
sum -= files.get(last).size;

return true;
}

SizeWindow tail(int windowSize) {
Preconditions.checkArgument(windowSize > 0);

SizeWindow sub = new SizeWindow();

sub.files = files;
sub.first = Math.max(last - windowSize, first);
sub.last = last;
sub.sum = 0;

for (int i = sub.first; i < sub.last; i++) {
sub.sum += files.get(i).size;
}

return sub;
}

long sum() {
return sum;
}

int size() {
return (last - first);
}

public List<FileRef> getFiles() {
List<FileRef> windowFiles = new ArrayList<>(size());
for (int i = first; i < last; i++) {
windowFiles.add(files.get(i).file);
}
return windowFiles;
}

public String toString() {
return "size:" + size() + " sum:" + sum() + " first:" + first + " last:" + last + " topSize:"
+ topSize();
}
}

@Override @Override
public boolean shouldCompact(MajorCompactionRequest request) { public boolean shouldCompact(MajorCompactionRequest request) {
CompactionPlan plan = getCompactionPlan(request); CompactionPlan plan = getCompactionPlan(request);
@@ -45,7 +143,7 @@ public CompactionPlan getCompactionPlan(MajorCompactionRequest request) {
return result; return result;
} }


private static class CompactionFile { private static class CompactionFile implements Comparable<CompactionFile> {
public FileRef file; public FileRef file;
public long size; public long size;


@@ -54,89 +152,80 @@ public CompactionFile(FileRef file, long size) {
this.file = file; this.file = file;
this.size = size; this.size = size;
} }

@Override
public int compareTo(CompactionFile o2) {
if (this == o2)
return 0;
if (this.size < o2.size)
return -1;
if (this.size > o2.size)
return 1;
return this.file.compareTo(o2.file);
}
} }


private List<FileRef> findMapFilesToCompact(MajorCompactionRequest request) { private List<FileRef> findMapFilesToCompact(MajorCompactionRequest request) {
MajorCompactionReason reason = request.getReason(); MajorCompactionReason reason = request.getReason();
if (reason == MajorCompactionReason.USER) { if (reason == MajorCompactionReason.USER) {
return new ArrayList<>(request.getFiles().keySet()); return new ArrayList<>(request.getFiles().keySet());
} }

if (reason == MajorCompactionReason.CHOP) { if (reason == MajorCompactionReason.CHOP) {
// should not happen, but this is safe // should not happen, but this is safe
return new ArrayList<>(request.getFiles().keySet()); return new ArrayList<>(request.getFiles().keySet());
} }


if (request.getFiles().size() <= 1) if (request.getFiles().size() <= 1)
return null; return null;
TreeSet<CompactionFile> candidateFiles = new TreeSet<>((o1, o2) -> {
if (o1 == o2)
return 0;
if (o1.size < o2.size)
return -1;
if (o1.size > o2.size)
return 1;
return o1.file.compareTo(o2.file);
});


double ratio = Double.parseDouble(request.getTableConfig(Property.TABLE_MAJC_RATIO.getKey())); double ratio = Double.parseDouble(request.getTableConfig(Property.TABLE_MAJC_RATIO.getKey()));
int maxFilesToCompact = Integer int maxFilesToCompact = Integer
.parseInt(request.getTableConfig(Property.TSERV_MAJC_THREAD_MAXOPEN.getKey())); .parseInt(request.getTableConfig(Property.TSERV_MAJC_THREAD_MAXOPEN.getKey()));
int maxFilesPerTablet = request.getMaxFilesPerTablet(); int maxFilesPerTablet = request.getMaxFilesPerTablet();


for (Entry<FileRef,DataFileValue> entry : request.getFiles().entrySet()) { int minFilesToCompact = 0;
candidateFiles.add(new CompactionFile(entry.getKey(), entry.getValue().getSize())); if (request.getFiles().size() > maxFilesPerTablet)
} minFilesToCompact = request.getFiles().size() - maxFilesPerTablet + 1;


long totalSize = 0; minFilesToCompact = Math.min(minFilesToCompact, maxFilesToCompact);
for (CompactionFile mfi : candidateFiles) {
totalSize += mfi.size;
}


List<FileRef> files = new ArrayList<>(); SizeWindow all = new SizeWindow(request.getFiles());


while (candidateFiles.size() > 1) { List<FileRef> files = null;
CompactionFile max = candidateFiles.last();
if (max.size * ratio <= totalSize) {
files.clear();
for (CompactionFile mfi : candidateFiles) {
files.add(mfi.file);
if (files.size() >= maxFilesToCompact)
break;
}


break; // Within a window of size maxFilesToCompact to see if any files meet the compaction ration
// criteria.
SizeWindow window = all.tail(maxFilesToCompact);
while (window.size() > 1 && files == null) {
if (window.topSize() * ratio <= window.sum()) {
files = window.getFiles();
} }
totalSize -= max.size;
candidateFiles.remove(max);
}


int totalFilesToCompact = 0; window.pop();
if (request.getFiles().size() > maxFilesPerTablet) }
totalFilesToCompact = request.getFiles().size() - maxFilesPerTablet + 1;

totalFilesToCompact = Math.min(totalFilesToCompact, maxFilesToCompact);


if (files.size() < totalFilesToCompact) { // Previous search was fruitless. If there are more files than maxFilesToCompact, then try
// moving the window up looking for files that meet the criteria.
if (files == null || files.size() < minFilesToCompact) {
window = all.tail(maxFilesToCompact);


TreeMap<FileRef,Long> tfc = new TreeMap<>(); // When moving the window up there is no need to pop/shrink the window. All possible sets are
for (Entry<FileRef,DataFileValue> entry : request.getFiles().entrySet()) { // covered without doing this. Proof is left as an exercise for the reader.
tfc.put(entry.getKey(), entry.getValue().getSize()); while (window.slideUp() && files == null) {
if (window.topSize() * ratio <= window.sum()) {
files = window.getFiles();
}
} }
tfc.keySet().removeAll(files); }

// put data in candidateFiles to sort it
candidateFiles.clear();
for (Entry<FileRef,Long> entry : tfc.entrySet())
candidateFiles.add(new CompactionFile(entry.getKey(), entry.getValue()));


for (CompactionFile mfi : candidateFiles) { // Ensure the minimum number of files are compacted.
files.add(mfi.file); if ((files != null && files.size() < minFilesToCompact)
if (files.size() >= totalFilesToCompact) || (files == null && minFilesToCompact > 0)) {
break; // get the smallest files of size minFilesToCompact
} files = all.tail(minFilesToCompact).getFiles();
} }


return files; return files;
} }

} }
@@ -155,6 +155,9 @@ public FileSKVIterator getSample(SamplerConfigurationImpl sampleConfig) {
static final DefaultConfiguration dfault = DefaultConfiguration.getInstance(); static final DefaultConfiguration dfault = DefaultConfiguration.getInstance();


private static class TestCompactionRequest extends MajorCompactionRequest { private static class TestCompactionRequest extends MajorCompactionRequest {

Integer mfpt = null;

@Override @Override
public FileSKVIterator openReader(FileRef ref) { public FileSKVIterator openReader(FileRef ref) {
return new TestFileSKVIterator(ref.toString()); return new TestFileSKVIterator(ref.toString());
@@ -166,13 +169,24 @@ public FileSKVIterator openReader(FileRef ref) {
setFiles(files); setFiles(files);
} }


public void setMaxFilesPerTablet(int mfpt) {
this.mfpt = mfpt;
}

@Override
public int getMaxFilesPerTablet() {
if (mfpt != null)
return mfpt;
return super.getMaxFilesPerTablet();
}

} }


private MajorCompactionRequest createRequest(MajorCompactionReason reason, Object... objs) { private TestCompactionRequest createRequest(MajorCompactionReason reason, Object... objs) {
return createRequest(new KeyExtent(TableId.of("0"), null, null), reason, objs); return createRequest(new KeyExtent(TableId.of("0"), null, null), reason, objs);
} }


private MajorCompactionRequest createRequest(KeyExtent extent, MajorCompactionReason reason, private TestCompactionRequest createRequest(KeyExtent extent, MajorCompactionReason reason,
Object... objs) { Object... objs) {
Map<FileRef,DataFileValue> files = new HashMap<>(); Map<FileRef,DataFileValue> files = new HashMap<>();
for (int i = 0; i < objs.length; i += 2) { for (int i = 0; i < objs.length; i += 2) {
@@ -206,7 +220,7 @@ public void testGetCompactionPlan() throws Exception {
DefaultCompactionStrategy s = new DefaultCompactionStrategy(); DefaultCompactionStrategy s = new DefaultCompactionStrategy();


// do nothing // do nothing
MajorCompactionRequest request = createRequest(MajorCompactionReason.IDLE, "file1", 10, "file2", TestCompactionRequest request = createRequest(MajorCompactionReason.IDLE, "file1", 10, "file2",
10); 10);
s.gatherInformation(request); s.gatherInformation(request);
CompactionPlan plan = s.getCompactionPlan(request); CompactionPlan plan = s.getCompactionPlan(request);
@@ -232,5 +246,38 @@ public void testGetCompactionPlan() throws Exception {
assertEquals(3, plan.inputFiles.size()); assertEquals(3, plan.inputFiles.size());
assertEquals(asStringSet(plan.inputFiles), asSet("file1,file2,file3".split(","))); assertEquals(asStringSet(plan.inputFiles), asSet("file1,file2,file3".split(",")));


// Two windows (of size 10 or less) meet the compaction criteria. Should select the smallest set
// of files that meet the criteria.
request = createRequest(MajorCompactionReason.NORMAL, "file0", 100, "file1", 100, "file2", 100,
"file3", 10, "file4", 10, "file5", 10, "file6", 10, "file7", 10, "file8", 10, "file9", 10,
"fileA", 10);
s.gatherInformation(request);
plan = s.getCompactionPlan(request);
assertEquals(8, plan.inputFiles.size());
assertEquals(asStringSet(plan.inputFiles),
asSet("file3,file4,file5,file6,file7,file8,file9,fileA".split(",")));

// The last 10 files do not meet compaction ratio critea. Should move window of 10 files up
// looking for files that meet criteria.
request = createRequest(MajorCompactionReason.NORMAL, "file0", 19683, "file1", 19683, "file2",
19683, "file3", 6561, "file4", 2187, "file5", 729, "file6", 243, "file7", 81, "file8", 27,
"file9", 9, "fileA", 3, "fileB", 1);
s.gatherInformation(request);
plan = s.getCompactionPlan(request);
assertEquals(10, plan.inputFiles.size());
assertEquals(asStringSet(plan.inputFiles),
asSet("file0,file1,file2,file3,file4,file5,file6,file7,file8,file9".split(",")));

// No window of files meets the compaction criteria, but there are more files than the max
// allowed. Should compact the smallest 2.
request = createRequest(MajorCompactionReason.NORMAL, "file1", 19683, "file2", 19683, "file3",
6561, "file4", 2187, "file5", 729, "file6", 243, "file7", 81, "file8", 27, "file9", 9,
"fileA", 3, "fileB", 1);
request.setMaxFilesPerTablet(10);
s.gatherInformation(request);
plan = s.getCompactionPlan(request);
assertEquals(2, plan.inputFiles.size());
assertEquals(asStringSet(plan.inputFiles), asSet("fileA,fileB".split(",")));

} }
} }

0 comments on commit c04a240

Please sign in to comment.
You can’t perform that action at this time.