Skip to content

Commit

Permalink
HIVE-27833: Hive Acid Replication Support for Dell Powerscale (apache…
Browse files Browse the repository at this point in the history
…#4841) (Harshal Patel, reviewed by Teddy Choi)
  • Loading branch information
harshal-16 authored and tarak271 committed Dec 19, 2023
1 parent 0f4d6ce commit 506a0e4
Show file tree
Hide file tree
Showing 6 changed files with 56 additions and 13 deletions.
24 changes: 19 additions & 5 deletions common/src/java/org/apache/hadoop/hive/common/FileUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

package org.apache.hadoop.hive.common;

import static org.apache.hadoop.hive.shims.Utils.RAW_RESERVED_VIRTUAL_PATH;

import java.io.EOFException;
import java.io.File;
import java.io.FileNotFoundException;
Expand Down Expand Up @@ -61,11 +63,13 @@
import org.apache.hadoop.fs.PathExistsException;
import org.apache.hadoop.fs.PathIsDirectoryException;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.apache.hadoop.hive.shims.HadoopShims;
import org.apache.hadoop.hive.shims.ShimLoader;
import org.apache.hadoop.hive.shims.Utils;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.security.UserGroupInformation;
import com.google.common.base.Preconditions;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.functional.RemoteIterators;
import org.apache.hive.common.util.ShutdownHookManager;
Expand Down Expand Up @@ -767,7 +771,7 @@ static boolean copy(FileSystem srcFS, Path src,
// is tried and it fails. We depend upon that behaviour in cases like replication,
// wherein if distcp fails, there is good reason to not plod along with a trivial
// implementation, and fail instead.
copied = doIOUtilsCopyBytes(srcFS, srcFS.getFileStatus(src), dstFS, dst, deleteSource, overwrite, shouldPreserveXAttrs(conf, srcFS, dstFS), conf, copyStatistics);
copied = doIOUtilsCopyBytes(srcFS, srcFS.getFileStatus(src), dstFS, dst, deleteSource, overwrite, shouldPreserveXAttrs(conf, srcFS, dstFS, src), conf, copyStatistics);
}
return copied;
}
Expand Down Expand Up @@ -895,11 +899,21 @@ private static void checkDependencies(FileSystem srcFS, Path src, FileSystem dst
}
}

public static boolean shouldPreserveXAttrs(HiveConf conf, FileSystem srcFS, FileSystem dstFS) throws IOException {
if (!Utils.checkFileSystemXAttrSupport(srcFS) || !Utils.checkFileSystemXAttrSupport(dstFS)){
return false;
public static boolean shouldPreserveXAttrs(HiveConf conf, FileSystem srcFS, FileSystem dstFS, Path path) throws IOException {
Preconditions.checkNotNull(path);
if (conf.getBoolVar(ConfVars.DFS_XATTR_ONLY_SUPPORTED_ON_RESERVED_NAMESPACE)) {

if (!(path.toUri().getPath().startsWith(RAW_RESERVED_VIRTUAL_PATH)
&& Utils.checkFileSystemXAttrSupport(srcFS, new Path(RAW_RESERVED_VIRTUAL_PATH))
&& Utils.checkFileSystemXAttrSupport(dstFS, new Path(RAW_RESERVED_VIRTUAL_PATH)))) {
return false;
}
} else {
if (!Utils.checkFileSystemXAttrSupport(srcFS) || !Utils.checkFileSystemXAttrSupport(dstFS)) {
return false;
}
}
for (Map.Entry<String,String> entry : conf.getPropsWithPrefix(Utils.DISTCP_OPTIONS_PREFIX).entrySet()) {
for (Map.Entry<String, String> entry : conf.getPropsWithPrefix(Utils.DISTCP_OPTIONS_PREFIX).entrySet()) {
String distCpOption = entry.getKey();
if (distCpOption.startsWith("p")) {
return distCpOption.contains("x");
Expand Down
2 changes: 2 additions & 0 deletions common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
Original file line number Diff line number Diff line change
Expand Up @@ -487,6 +487,8 @@ public static enum ConfVars {
MSC_CACHE_RECORD_STATS("hive.metastore.client.cache.v2.recordStats", false,
"This property enables recording metastore client cache stats in DEBUG logs"),
// QL execution stuff
DFS_XATTR_ONLY_SUPPORTED_ON_RESERVED_NAMESPACE("dfs.xattr.supported.only.on.reserved.namespace", false,
"DFS supports xattr only on Reserved Name space (/.reserved/raw)"),
SCRIPTWRAPPER("hive.exec.script.wrapper", null, ""),
PLAN("hive.exec.plan", "", ""),
STAGINGDIR("hive.exec.stagingdir", ".hive-staging",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,21 @@ public void testXAttrsPreserved() throws Exception {
verifyXAttrsPreserved(src, new Path(dst, src.getName()));
}

@Test
public void testShouldPreserveXAttrs() throws Exception {
conf.setBoolean(HiveConf.ConfVars.DFS_XATTR_ONLY_SUPPORTED_ON_RESERVED_NAMESPACE.varname, true);
Path filePath = new Path(basePath, "src.txt");
fs.create(filePath).close();
Assert.assertFalse(FileUtils.shouldPreserveXAttrs(conf, fs, fs, filePath));
Path reservedRawPath = new Path("/.reserved/raw/", "src1.txt");
fs.create(reservedRawPath).close();
Assert.assertTrue(FileUtils.shouldPreserveXAttrs(conf, fs, fs, reservedRawPath));

conf.setBoolean(HiveConf.ConfVars.DFS_XATTR_ONLY_SUPPORTED_ON_RESERVED_NAMESPACE.varname, false);
Assert.assertTrue(FileUtils.shouldPreserveXAttrs(conf, fs, fs, filePath));
Assert.assertTrue(FileUtils.shouldPreserveXAttrs(conf, fs, fs, reservedRawPath));
}

private void verifyXAttrsPreserved(Path src, Path dst) throws Exception {
FileStatus srcStatus = fs.getFileStatus(src);
FileStatus dstStatus = fs.getFileStatus(dst);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ void copyFilesBetweenFS(FileSystem srcFS, Path[] paths, FileSystem dstFS,
Path dst, boolean deleteSource, boolean overwrite,
DataCopyStatistics copyStatistics) throws IOException {
retryableFxn(() -> {
boolean preserveXAttrs = FileUtils.shouldPreserveXAttrs(hiveConf, srcFS, dstFS);
boolean preserveXAttrs = FileUtils.shouldPreserveXAttrs(hiveConf, srcFS, dstFS, paths[0]);
FileUtils.copy(srcFS, paths, dstFS, dst, deleteSource, overwrite, preserveXAttrs, hiveConf,
copyStatistics);
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@
import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
import org.apache.tez.test.MiniTezCluster;

import static org.apache.hadoop.hive.shims.Utils.RAW_RESERVED_VIRTUAL_PATH;
import static org.apache.hadoop.tools.DistCpConstants.CONF_LABEL_DISTCP_JOB_ID;

/**
Expand Down Expand Up @@ -1047,7 +1048,7 @@ public HadoopShims.StoragePolicyShim getStoragePolicyShim(FileSystem fs) {
List<String> constructDistCpParams(List<Path> srcPaths, Path dst, Configuration conf) throws IOException {
// -update and -delete are mandatory options for directory copy to work.
List<String> params = constructDistCpDefaultParams(conf, dst.getFileSystem(conf),
srcPaths.get(0).getFileSystem(conf));
srcPaths.get(0).getFileSystem(conf), srcPaths);
if (!params.contains("-delete")) {
params.add("-delete");
}
Expand All @@ -1059,7 +1060,7 @@ List<String> constructDistCpParams(List<Path> srcPaths, Path dst, Configuration
}

private List<String> constructDistCpDefaultParams(Configuration conf, FileSystem dstFs,
FileSystem sourceFs) throws IOException {
FileSystem sourceFs, List<Path> srcPaths) throws IOException {
List<String> params = new ArrayList<String>();
boolean needToAddPreserveOption = true;
for (Map.Entry<String,String> entry : conf.getPropsWithPrefix(Utils.DISTCP_OPTIONS_PREFIX).entrySet()){
Expand All @@ -1074,8 +1075,15 @@ private List<String> constructDistCpDefaultParams(Configuration conf, FileSystem
}
}
if (needToAddPreserveOption) {
params.add((Utils.checkFileSystemXAttrSupport(dstFs)
&& Utils.checkFileSystemXAttrSupport(sourceFs)) ? "-pbx" : "-pb");
if (conf.getBoolean("dfs.xattr.supported.only.on.reserved.namespace", false)) {
boolean shouldCopyXAttrs = srcPaths.get(0).toUri().getPath().startsWith(RAW_RESERVED_VIRTUAL_PATH)
&& Utils.checkFileSystemXAttrSupport(sourceFs, new Path(RAW_RESERVED_VIRTUAL_PATH))
&& Utils.checkFileSystemXAttrSupport(dstFs, new Path(RAW_RESERVED_VIRTUAL_PATH));
params.add(shouldCopyXAttrs ? "-pbx" : "-pb");
} else {
params.add((Utils.checkFileSystemXAttrSupport(dstFs)
&& Utils.checkFileSystemXAttrSupport(sourceFs)) ? "-pbx" : "-pb");
}
}
if (!params.contains("-update")) {
params.add("-update");
Expand All @@ -1097,7 +1105,7 @@ List<String> constructDistCpWithSnapshotParams(List<Path> srcPaths, Path dst, St
Configuration conf, String diff) throws IOException {
// Get the default distcp params
List<String> params = constructDistCpDefaultParams(conf, dst.getFileSystem(conf),
srcPaths.get(0).getFileSystem(conf));
srcPaths.get(0).getFileSystem(conf), srcPaths);
if (params.contains("-delete")) {
params.remove("-delete");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@
public class Utils {

private static final Logger LOG = LoggerFactory.getLogger(Utils.class);

public static final String RAW_RESERVED_VIRTUAL_PATH = "/.reserved/raw/";
private static final boolean IBM_JAVA = System.getProperty("java.vendor")
.contains("IBM");

Expand Down Expand Up @@ -165,8 +165,12 @@ public static Filter getXSRFFilter() {
}

public static boolean checkFileSystemXAttrSupport(FileSystem fs) throws IOException {
return checkFileSystemXAttrSupport(fs, new Path(Path.SEPARATOR));
}

public static boolean checkFileSystemXAttrSupport(FileSystem fs, Path path) throws IOException {
try {
fs.getXAttrs(new Path(Path.SEPARATOR));
fs.getXAttrs(path);
return true;
} catch (UnsupportedOperationException e) {
LOG.warn("XAttr won't be preserved since it is not supported for file system: " + fs.getUri());
Expand Down

0 comments on commit 506a0e4

Please sign in to comment.