Skip to content

Commit

Permalink
Make TmpFileIOPeon to create files in output dir, rather than java.io…
Browse files Browse the repository at this point in the history
….tmpdir
  • Loading branch information
leventov committed Mar 3, 2017
1 parent 94807b5 commit b0ed3a1
Show file tree
Hide file tree
Showing 4 changed files with 37 additions and 45 deletions.
41 changes: 20 additions & 21 deletions processing/src/main/java/io/druid/segment/IndexMerger.java
Expand Up @@ -60,13 +60,12 @@
import io.druid.segment.data.BitmapSerdeFactory;
import io.druid.segment.data.CompressedObjectStrategy;
import io.druid.segment.data.CompressionFactory;
import io.druid.segment.data.LongSupplierSerializer;
import io.druid.segment.data.GenericIndexed;
import io.druid.segment.data.IOPeon;
import io.druid.segment.data.Indexed;
import io.druid.segment.data.IndexedInts;
import io.druid.segment.data.IndexedIterable;
import io.druid.segment.data.ListIndexed;
import io.druid.segment.data.LongSupplierSerializer;
import io.druid.segment.data.TmpFileIOPeon;
import io.druid.segment.incremental.IncrementalIndex;
import io.druid.segment.incremental.IncrementalIndexAdapter;
Expand All @@ -83,7 +82,6 @@
import java.io.FileOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.nio.IntBuffer;
import java.nio.channels.FileChannel;
import java.util.ArrayList;
Expand Down Expand Up @@ -629,24 +627,13 @@ public Metadata apply(IndexableAdapter input)
Closer closer = Closer.create();
final Interval dataInterval;
final File v8OutDir = new File(outDir, "v8-tmp");
v8OutDir.mkdirs();
closer.register(new Closeable()
{
@Override
public void close() throws IOException
{
FileUtils.deleteDirectory(v8OutDir);
}
});
final IOPeon ioPeon = new TmpFileIOPeon();
closer.register(new Closeable()
{
@Override
public void close() throws IOException
{
ioPeon.close();
}
});
FileUtils.forceMkdir(v8OutDir);
registerDeleteDirectory(closer, v8OutDir);
File tmpPeonFilesDir = new File(v8OutDir, "tmpPeonFiles");
FileUtils.forceMkdir(tmpPeonFilesDir);
registerDeleteDirectory(closer, tmpPeonFilesDir);
final IOPeon ioPeon = new TmpFileIOPeon(tmpPeonFilesDir, true);
closer.register(ioPeon);
try {
/************* Main index.drd file **************/
progress.progress();
Expand Down Expand Up @@ -908,6 +895,18 @@ public String apply(@Nullable String input)
}
}

static void registerDeleteDirectory(Closer closer, final File dir)
{
closer.register(new Closeable()
{
@Override
public void close() throws IOException
{
FileUtils.deleteDirectory(dir);
}
});
}

protected DimensionHandler[] makeDimensionHandlers(final List<String> mergedDimensions, final List<ColumnCapabilitiesImpl> dimCapabilities)
{
final DimensionHandler[] handlers = new DimensionHandler[mergedDimensions.size()];
Expand Down
25 changes: 7 additions & 18 deletions processing/src/main/java/io/druid/segment/IndexMergerV9.java
Expand Up @@ -134,27 +134,16 @@ public Metadata apply(IndexableAdapter input)
}

Closer closer = Closer.create();
final IOPeon ioPeon = new TmpFileIOPeon(false);
closer.register(new Closeable()
{
@Override
public void close() throws IOException
{
ioPeon.close();
}
});
final FileSmoosher v9Smoosher = new FileSmoosher(outDir);
final File v9TmpDir = new File(outDir, "v9-tmp");
v9TmpDir.mkdirs();
closer.register(new Closeable()
{
@Override
public void close() throws IOException
{
FileUtils.deleteDirectory(v9TmpDir);
}
});
FileUtils.forceMkdir(v9TmpDir);
registerDeleteDirectory(closer, v9TmpDir);
log.info("Start making v9 index files, outDir:%s", outDir);
File tmpPeonFilesDir = new File(v9TmpDir, "tmpPeonFiles");
FileUtils.forceMkdir(tmpPeonFilesDir);
registerDeleteDirectory(closer, tmpPeonFilesDir);
final IOPeon ioPeon = new TmpFileIOPeon(tmpPeonFilesDir, false);
closer.register(ioPeon);
try {
long startTime = System.currentTimeMillis();
ByteStreams.write(
Expand Down
Expand Up @@ -64,7 +64,6 @@ public class StringDimensionMergerLegacy extends StringDimensionMergerV9 impleme
private static final Logger log = new Logger(StringDimensionMergerLegacy.class);

private VSizeIndexedWriter encodedValueWriterV8;
private IOPeon spatialIoPeon;
private File dictionaryFile;

public StringDimensionMergerLegacy(
Expand Down Expand Up @@ -131,12 +130,11 @@ public void close() throws IOException
RTree tree = null;
spatialWriter = null;
boolean hasSpatial = capabilities.hasSpatialIndexes();
spatialIoPeon = new TmpFileIOPeon();
if (hasSpatial) {
BitmapFactory bmpFactory = bitmapSerdeFactory.getBitmapFactory();
String spatialFilename = String.format("%s.spatial", dimensionName);
spatialWriter = new ByteBufferWriter<ImmutableRTree>(
spatialIoPeon, spatialFilename, new IndexedRTree.ImmutableRTreeObjectStrategy(bmpFactory)
ioPeon, spatialFilename, new IndexedRTree.ImmutableRTreeObjectStrategy(bmpFactory)
);
spatialWriter.open();
tree = new RTree(2, new LinearGutmanSplitStrategy(0, 50, bitmapFactory), bitmapFactory);
Expand Down Expand Up @@ -242,7 +240,6 @@ public OutputStream getOutput() throws IOException
spatialWriter.close();
serializerUtils.writeString(spatialIndexFile, dimensionName);
ByteStreams.copy(spatialWriter.combineStreams(), spatialIndexFile);
spatialIoPeon.close();
}
}
}
Expand Up @@ -34,8 +34,9 @@
*/
public class TmpFileIOPeon implements IOPeon
{
private final File dir;
private final boolean allowOverwrite;
Map<String, File> createdFiles = Maps.newLinkedHashMap();
private final Map<String, File> createdFiles = Maps.newLinkedHashMap();

public TmpFileIOPeon()
{
Expand All @@ -44,6 +45,12 @@ public TmpFileIOPeon()

public TmpFileIOPeon(boolean allowOverwrite)
{
this(null, allowOverwrite);
}

public TmpFileIOPeon(File dir, boolean allowOverwrite)
{
this.dir = dir;
this.allowOverwrite = allowOverwrite;
}

Expand All @@ -52,7 +59,7 @@ public OutputStream makeOutputStream(String filename) throws IOException
{
File retFile = createdFiles.get(filename);
if (retFile == null) {
retFile = File.createTempFile("filePeon", filename);
retFile = File.createTempFile("filePeon", filename, dir);
createdFiles.put(filename, retFile);
return new BufferedOutputStream(new FileOutputStream(retFile));
} else if (allowOverwrite) {
Expand Down

0 comments on commit b0ed3a1

Please sign in to comment.