Skip to content

Commit

Permalink
HBASE-24055 Make AsyncFSWAL can run on EC cluster (apache#1437)
Browse files Browse the repository at this point in the history
Signed-off-by: Nick Dimiduk <ndimiduk@apache.org>
Signed-off-by: stack <stack@apache.org>
  • Loading branch information
Apache9 authored and thangTang committed Apr 16, 2020
1 parent 0bf54d5 commit 13d82fc
Show file tree
Hide file tree
Showing 3 changed files with 71 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@
*/
package org.apache.hadoop.hbase.io.asyncfs;

import static org.apache.hadoop.fs.CreateFlag.CREATE;
import static org.apache.hadoop.fs.CreateFlag.OVERWRITE;
import static org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputSaslHelper.createEncryptor;
import static org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputSaslHelper.trySaslNegotiate;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY;
Expand Down Expand Up @@ -177,6 +175,11 @@ Object createObject(ClientProtocol instance, String src, FsPermission masked, St

private static final FileCreator FILE_CREATOR;

// CreateFlag.SHOULD_REPLICATE is to make OutputStream on a EC directory support hflush/hsync, but
// EC is introduced in hadoop 3.x so we do not have this enum on 2.x, that's why we need to
// indirectly reference it through reflection.
private static final CreateFlag SHOULD_REPLICATE_FLAG;

private static DFSClientAdaptor createDFSClientAdaptor() throws NoSuchMethodException {
Method isClientRunningMethod = DFSClient.class.getDeclaredMethod("isClientRunning");
isClientRunningMethod.setAccessible(true);
Expand Down Expand Up @@ -272,6 +275,15 @@ private static FileCreator createFileCreator() throws NoSuchMethodException {
return createFileCreator2();
}

private static CreateFlag loadShouldReplicateFlag() {
try {
return CreateFlag.valueOf("SHOULD_REPLICATE");
} catch (IllegalArgumentException e) {
LOG.debug("can not find SHOULD_REPLICATE flag, should be hadoop 2.x", e);
return null;
}
}

// cancel the processing if DFSClient is already closed.
static final class CancelOnClose implements CancelableProgressable {

Expand All @@ -292,6 +304,7 @@ public boolean progress() {
LEASE_MANAGER = createLeaseManager();
DFS_CLIENT_ADAPTOR = createDFSClientAdaptor();
FILE_CREATOR = createFileCreator();
SHOULD_REPLICATE_FLAG = loadShouldReplicateFlag();
} catch (Exception e) {
String msg = "Couldn't properly initialize access to HDFS internals. Please " +
"update your WAL Provider to not make use of the 'asyncfs' provider. See " +
Expand Down Expand Up @@ -486,6 +499,18 @@ public NameNodeException(Throwable cause) {
}
}

private static EnumSetWritable<CreateFlag> getCreateFlags(boolean overwrite) {
List<CreateFlag> flags = new ArrayList<>();
flags.add(CreateFlag.CREATE);
if (overwrite) {
flags.add(CreateFlag.OVERWRITE);
}
if (SHOULD_REPLICATE_FLAG != null) {
flags.add(SHOULD_REPLICATE_FLAG);
}
return new EnumSetWritable<>(EnumSet.copyOf(flags));
}

private static FanOutOneBlockAsyncDFSOutput createOutput(DistributedFileSystem dfs, String src,
boolean overwrite, boolean createParent, short replication, long blockSize,
EventLoopGroup eventLoopGroup, Class<? extends Channel> channelClass) throws IOException {
Expand All @@ -502,8 +527,8 @@ private static FanOutOneBlockAsyncDFSOutput createOutput(DistributedFileSystem d
try {
stat = FILE_CREATOR.create(namenode, src,
FsPermission.getFileDefault().applyUMask(FsPermission.getUMask(conf)), clientName,
new EnumSetWritable<>(overwrite ? EnumSet.of(CREATE, OVERWRITE) : EnumSet.of(CREATE)),
createParent, replication, blockSize, CryptoProtocolVersion.supported());
getCreateFlags(overwrite), createParent, replication, blockSize,
CryptoProtocolVersion.supported());
} catch (Exception e) {
if (e instanceof RemoteException) {
throw (RemoteException) e;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@

import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY;
import static org.hamcrest.CoreMatchers.instanceOf;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.fail;

import java.io.FileNotFoundException;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,11 @@

import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;

import java.io.IOException;
import java.lang.reflect.Method;
import java.util.Arrays;
import java.util.List;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.StreamCapabilities;
Expand All @@ -34,40 +35,49 @@
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.CommonFSUtils;
import org.apache.hadoop.hbase.wal.WALFactory;
import org.apache.hadoop.hdfs.DFSClient;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.junit.AfterClass;
import org.junit.After;
import org.junit.Assume;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameter;
import org.junit.runners.Parameterized.Parameters;

@Category(LargeTests.class)
@RunWith(Parameterized.class)
@Category({ RegionServerTests.class, LargeTests.class })
public class TestHBaseWalOnEC {

@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestHBaseWalOnEC.class);
HBaseClassTestRule.forClass(TestHBaseWalOnEC.class);

private static final HBaseTestingUtility util = new HBaseTestingUtility();
private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();

@BeforeClass
public static void setup() throws Exception {
public static void setUpBeforeClass() throws Exception {
try {
MiniDFSCluster cluster = util.startMiniDFSCluster(3); // Need 3 DNs for RS-3-2 policy
MiniDFSCluster cluster = UTIL.startMiniDFSCluster(3); // Need 3 DNs for RS-3-2 policy
DistributedFileSystem fs = cluster.getFileSystem();

Method enableAllECPolicies = DFSTestUtil.class.getMethod("enableAllECPolicies",
DistributedFileSystem.class);
Method enableAllECPolicies =
DFSTestUtil.class.getMethod("enableAllECPolicies", DistributedFileSystem.class);
enableAllECPolicies.invoke(null, fs);

DFSClient client = fs.getClient();
Method setErasureCodingPolicy = DFSClient.class.getMethod("setErasureCodingPolicy",
String.class, String.class);
Method setErasureCodingPolicy =
DFSClient.class.getMethod("setErasureCodingPolicy", String.class, String.class);
setErasureCodingPolicy.invoke(client, "/", "RS-3-2-1024k"); // try a built-in policy

try (FSDataOutputStream out = fs.create(new Path("/canary"))) {
Expand All @@ -80,38 +90,43 @@ public static void setup() throws Exception {
Assume.assumeNoException("Using an older version of hadoop; EC not available.", e);
}

util.getConfiguration().setBoolean(CommonFSUtils.UNSAFE_STREAM_CAPABILITY_ENFORCE, true);
util.startMiniCluster();
UTIL.getConfiguration().setBoolean(CommonFSUtils.UNSAFE_STREAM_CAPABILITY_ENFORCE, true);

}

@AfterClass
public static void tearDown() throws Exception {
util.shutdownMiniCluster();
@Parameter
public String walProvider;

@Parameters
public static List<Object[]> params() {
return Arrays.asList(new Object[] { "asyncfs" }, new Object[] { "filesystem" });
}

@Test
public void testStreamCreate() throws IOException {
try (FSDataOutputStream out = CommonFSUtils.createForWal(util.getDFSCluster().getFileSystem(),
new Path("/testStreamCreate"), true)) {
assertTrue(out.hasCapability(StreamCapabilities.HFLUSH));
}
@Before
public void setUp() throws Exception {
UTIL.getConfiguration().set(WALFactory.WAL_PROVIDER, walProvider);
UTIL.startMiniCluster(3);
}

@After
public void tearDown() throws Exception {
UTIL.shutdownMiniCluster();
}

@Test
public void testFlush() throws IOException {
public void testReadWrite() throws IOException {
byte[] row = Bytes.toBytes("row");
byte[] cf = Bytes.toBytes("cf");
byte[] cq = Bytes.toBytes("cq");
byte[] value = Bytes.toBytes("value");

TableName name = TableName.valueOf(getClass().getSimpleName());

Table t = util.createTable(name, cf);
Table t = UTIL.createTable(name, cf);
t.put(new Put(row).addColumn(cf, cq, value));

util.getAdmin().flush(name);
UTIL.getAdmin().flush(name);

assertArrayEquals(value, t.get(new Get(row)).getValue(cf, cq));
}
}

0 comments on commit 13d82fc

Please sign in to comment.