Skip to content

Commit

Permalink
[HUDI-4526] Improve spillableMapBasePath when disk directory is full (a…
Browse files Browse the repository at this point in the history
  • Loading branch information
XuQianJin-Stars authored and Alexey Kudinkin committed Dec 14, 2022
1 parent 235b178 commit f7e5065
Show file tree
Hide file tree
Showing 3 changed files with 47 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,10 @@
import org.apache.hudi.common.config.ConfigGroups;
import org.apache.hudi.common.config.ConfigProperty;
import org.apache.hudi.common.config.HoodieConfig;
import org.apache.hudi.common.util.FileIOUtils;
import org.apache.hudi.common.util.Option;

import javax.annotation.concurrent.Immutable;

import java.io.File;
import java.io.FileReader;
import java.io.IOException;
Expand Down Expand Up @@ -80,7 +81,11 @@ public class HoodieMemoryConfig extends HoodieConfig {
public static final ConfigProperty<String> SPILLABLE_MAP_BASE_PATH = ConfigProperty
.key("hoodie.memory.spillable.map.path")
.defaultValue("/tmp/")
.withDocumentation("Default file path prefix for spillable map");
.withInferFunction(cfg -> {
String[] localDirs = FileIOUtils.getConfiguredLocalDirs();
return (localDirs != null && localDirs.length > 0) ? Option.of(localDirs[0]) : Option.empty();
})
.withDocumentation("Default file path for spillable map");

public static final ConfigProperty<Double> WRITESTATUS_FAILURE_FRACTION = ConfigProperty
.key("hoodie.memory.writestatus.failure.fraction")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@

package org.apache.hudi.common.table.log;

import org.apache.avro.Schema;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.common.config.HoodieCommonConfig;
import org.apache.hudi.common.model.DeleteRecord;
import org.apache.hudi.common.model.HoodieAvroRecord;
Expand All @@ -34,12 +37,7 @@
import org.apache.hudi.common.util.SpillableMapUtils;
import org.apache.hudi.common.util.collection.ExternalSpillableMap;
import org.apache.hudi.exception.HoodieIOException;

import org.apache.avro.Schema;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hudi.internal.schema.InternalSchema;

import org.apache.hadoop.fs.Path;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;

Expand Down Expand Up @@ -94,6 +92,7 @@ protected HoodieMergedLogRecordScanner(FileSystem fs, String basePath, List<Stri
// Store merged records for all versions for this log file, set the in-memory footprint to maxInMemoryMapSize
this.records = new ExternalSpillableMap<>(maxMemorySizeInBytes, spillableMapBasePath, new DefaultSizeEstimator(),
new HoodieRecordSizeEstimator(readerSchema), diskMapType, isBitCaskDiskMapCompressionEnabled);

this.maxMemorySizeInBytes = maxMemorySizeInBytes;
} catch (IOException e) {
throw new HoodieIOException("IOException when creating ExternalSpillableMap at " + spillableMapBasePath, e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -204,4 +204,40 @@ public static Option<byte[]> readDataFromPath(FileSystem fileSystem, org.apache.
public static Option<byte[]> readDataFromPath(FileSystem fileSystem, org.apache.hadoop.fs.Path detailPath) {
return readDataFromPath(fileSystem, detailPath, false);
}

/**
* Return the configured local directories where hudi can write files. This
* method does not create any directories on its own, it only encapsulates the
* logic of locating the local directories according to deployment mode.
*/
public static String[] getConfiguredLocalDirs() {
if (isRunningInYarnContainer()) {
// If we are in yarn mode, systems can have different disk layouts so we must set it
// to what Yarn on this system said was available. Note this assumes that Yarn has
// created the directories already, and that they are secured so that only the
// user has access to them.
return getYarnLocalDirs().split(",");
} else if (System.getProperty("java.io.tmpdir") != null) {
return System.getProperty("java.io.tmpdir").split(",");
} else {
return null;
}
}

private static boolean isRunningInYarnContainer() {
// These environment variables are set by YARN.
return System.getenv("CONTAINER_ID") != null;
}

/**
* Get the Yarn approved local directories.
*/
private static String getYarnLocalDirs() {
String localDirs = Option.of(System.getenv("LOCAL_DIRS")).orElse("");

if (localDirs.isEmpty()) {
throw new HoodieIOException("Yarn Local dirs can't be empty");
}
return localDirs;
}
}

0 comments on commit f7e5065

Please sign in to comment.