From cc9a773cfa55012e357d13a5a7e90e8a6d6744b6 Mon Sep 17 00:00:00 2001 From: wd-kang Date: Mon, 15 Nov 2021 11:45:41 +0900 Subject: [PATCH] [#8372] Replace HTableMultiplexer with BufferedMutator --- .../pinpoint/collector/dao/TraceDao.java | 4 +- .../collector/dao/hbase/HbaseTraceDaoV2.java | 21 +- .../monitor/BufferedMutatorMetrics.java | 43 ++++ .../applicationContext-collector-hbase.xml | 7 + .../src/main/resources/hbase-root.properties | 7 +- commons-hbase/pom.xml | 3 + .../common/hbase/SimpleBatchWriter.java | 10 + .../batch/BufferedMutatorConfiguration.java | 46 ++++ .../hbase/batch/BufferedMutatorWriter.java | 209 ++++++++++++++++++ .../batch/DelegateSimpleBatchWriter.java | 34 +++ .../common/hbase/batch/HbaseBatchWriter.java | 12 + .../hbase/util/HBaseExceptionUtils.java | 21 ++ .../hbase/util/SharedExecutorService.java | 90 ++++++++ .../hbase/client/BufferedMutatorUtils.java | 10 + 14 files changed, 502 insertions(+), 15 deletions(-) create mode 100644 collector/src/main/java/com/navercorp/pinpoint/collector/monitor/BufferedMutatorMetrics.java create mode 100644 commons-hbase/src/main/java/com/navercorp/pinpoint/common/hbase/SimpleBatchWriter.java create mode 100644 commons-hbase/src/main/java/com/navercorp/pinpoint/common/hbase/batch/BufferedMutatorConfiguration.java create mode 100644 commons-hbase/src/main/java/com/navercorp/pinpoint/common/hbase/batch/BufferedMutatorWriter.java create mode 100644 commons-hbase/src/main/java/com/navercorp/pinpoint/common/hbase/batch/DelegateSimpleBatchWriter.java create mode 100644 commons-hbase/src/main/java/com/navercorp/pinpoint/common/hbase/batch/HbaseBatchWriter.java create mode 100644 commons-hbase/src/main/java/com/navercorp/pinpoint/common/hbase/util/HBaseExceptionUtils.java create mode 100644 commons-hbase/src/main/java/com/navercorp/pinpoint/common/hbase/util/SharedExecutorService.java create mode 100644 commons-hbase/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorUtils.java diff --git a/collector/src/main/java/com/navercorp/pinpoint/collector/dao/TraceDao.java b/collector/src/main/java/com/navercorp/pinpoint/collector/dao/TraceDao.java index 7bf5e922d82a..022f2934f066 100644 --- a/collector/src/main/java/com/navercorp/pinpoint/collector/dao/TraceDao.java +++ b/collector/src/main/java/com/navercorp/pinpoint/collector/dao/TraceDao.java @@ -7,7 +7,7 @@ * @author Woonduk Kang(emeroad) */ public interface TraceDao { - boolean insert(SpanBo span); + void insert(SpanBo span); - boolean insertSpanChunk(SpanChunkBo spanChunk); + void insertSpanChunk(SpanChunkBo spanChunk); } diff --git a/collector/src/main/java/com/navercorp/pinpoint/collector/dao/hbase/HbaseTraceDaoV2.java b/collector/src/main/java/com/navercorp/pinpoint/collector/dao/hbase/HbaseTraceDaoV2.java index ef8252181cd8..b5433a6d6928 100644 --- a/collector/src/main/java/com/navercorp/pinpoint/collector/dao/hbase/HbaseTraceDaoV2.java +++ b/collector/src/main/java/com/navercorp/pinpoint/collector/dao/hbase/HbaseTraceDaoV2.java @@ -18,8 +18,8 @@ import com.navercorp.pinpoint.collector.dao.TraceDao; import com.navercorp.pinpoint.collector.util.CollectorUtils; +import com.navercorp.pinpoint.common.hbase.SimpleBatchWriter; import com.navercorp.pinpoint.common.hbase.HbaseColumnFamily; -import com.navercorp.pinpoint.common.hbase.HbaseOperations2; import com.navercorp.pinpoint.common.hbase.TableNameProvider; import com.navercorp.pinpoint.common.profiler.util.TransactionId; import com.navercorp.pinpoint.common.server.bo.SpanBo; @@ -49,7 +49,7 @@ public class HbaseTraceDaoV2 implements TraceDao { private static final HbaseColumnFamily.Trace descriptor = HbaseColumnFamily.TRACE_V2_SPAN; - private final HbaseOperations2 hbaseTemplate; + private final SimpleBatchWriter writer; private final TableNameProvider tableNameProvider; private final SpanSerializerV2 spanSerializer; @@ -58,12 +58,12 @@ public class HbaseTraceDaoV2 implements TraceDao { private final RowKeyEncoder rowKeyEncoder; - public HbaseTraceDaoV2(@Qualifier("asyncPutHbaseTemplate") HbaseOperations2 hbaseTemplate, + public HbaseTraceDaoV2(SimpleBatchWriter writer, TableNameProvider tableNameProvider, @Qualifier("traceRowKeyEncoderV2") RowKeyEncoder rowKeyEncoder, SpanSerializerV2 spanSerializer, SpanChunkSerializerV2 spanChunkSerializer) { - this.hbaseTemplate = Objects.requireNonNull(hbaseTemplate, "hbaseTemplate"); + this.writer = Objects.requireNonNull(writer, "writer"); this.tableNameProvider = Objects.requireNonNull(tableNameProvider, "tableNameProvider"); this.rowKeyEncoder = Objects.requireNonNull(rowKeyEncoder, "rowKeyEncoder"); this.spanSerializer = Objects.requireNonNull(spanSerializer, "spanSerializer"); @@ -71,7 +71,7 @@ public HbaseTraceDaoV2(@Qualifier("asyncPutHbaseTemplate") HbaseOperations2 hbas } @Override - public boolean insert(final SpanBo spanBo) { + public void insert(final SpanBo spanBo) { Objects.requireNonNull(spanBo, "spanBo"); if (logger.isDebugEnabled()) { logger.debug("insert trace: {}", spanBo); @@ -91,12 +91,11 @@ public boolean insert(final SpanBo spanBo) { this.spanSerializer.serialize(spanBo, put, null); TableName traceTableName = tableNameProvider.getTableName(descriptor.getTable()); - - return hbaseTemplate.asyncPut(traceTableName, put); + writer.write(traceTableName, put); } @Override - public boolean insertSpanChunk(SpanChunkBo spanChunkBo) { + public void insertSpanChunk(SpanChunkBo spanChunkBo) { Objects.requireNonNull(spanChunkBo, "spanChunkBo"); TransactionId transactionId = spanChunkBo.getTransactionId(); @@ -107,16 +106,14 @@ public boolean insertSpanChunk(SpanChunkBo spanChunkBo) { final List spanEventBoList = spanChunkBo.getSpanEventBoList(); if (CollectionUtils.isEmpty(spanEventBoList)) { - return true; + return; } this.spanChunkSerializer.serialize(spanChunkBo, put, null); if (!put.isEmpty()) { TableName traceTableName = tableNameProvider.getTableName(descriptor.getTable()); - return hbaseTemplate.asyncPut(traceTableName, put); + writer.write(traceTableName, put); } - - return false; } } diff --git a/collector/src/main/java/com/navercorp/pinpoint/collector/monitor/BufferedMutatorMetrics.java b/collector/src/main/java/com/navercorp/pinpoint/collector/monitor/BufferedMutatorMetrics.java new file mode 100644 index 000000000000..614bd24447e0 --- /dev/null +++ b/collector/src/main/java/com/navercorp/pinpoint/collector/monitor/BufferedMutatorMetrics.java @@ -0,0 +1,43 @@ +package com.navercorp.pinpoint.collector.monitor; + +import com.codahale.metrics.Gauge; +import com.codahale.metrics.Metric; +import com.codahale.metrics.MetricSet; +import com.navercorp.pinpoint.common.hbase.batch.BufferedMutatorWriter; + +import java.util.HashMap; +import java.util.Map; +import java.util.Objects; + +public class BufferedMutatorMetrics implements MetricSet { + private static final String HBASE_ASYNC_OPS = "hbase.bufferedmutator.ops"; + + private static final String COUNT = HBASE_ASYNC_OPS + ".success.count"; + private static final String REJECTED_COUNT = HBASE_ASYNC_OPS + ".rejected.count"; + + private final BufferedMutatorWriter writer; + + public BufferedMutatorMetrics(BufferedMutatorWriter writer) { + this.writer = Objects.requireNonNull(writer, "writer"); + } + + @Override + public Map getMetrics() { + + final Map gauges = new HashMap<>(3); + gauges.put(COUNT, new Gauge() { + @Override + public Long getValue() { + return writer.getSuccessCount(); + } + }); + gauges.put(REJECTED_COUNT, new Gauge() { + @Override + public Long getValue() { + return writer.getErrorCount(); + } + }); + + return gauges; + } +} diff --git a/collector/src/main/resources/applicationContext-collector-hbase.xml b/collector/src/main/resources/applicationContext-collector-hbase.xml index 15921fcf786e..9dfd2c46feb9 100644 --- a/collector/src/main/resources/applicationContext-collector-hbase.xml +++ b/collector/src/main/resources/applicationContext-collector-hbase.xml @@ -88,6 +88,13 @@ + + + + + + + diff --git a/collector/src/main/resources/hbase-root.properties b/collector/src/main/resources/hbase-root.properties index f2157249570f..e731355f91f9 100644 --- a/collector/src/main/resources/hbase-root.properties +++ b/collector/src/main/resources/hbase-root.properties @@ -24,4 +24,9 @@ hbase.client.async.flush.period.ms=100 # the max number of the retry attempts to insert queue before dropping the request. default:10000 hbase.client.async.max.retries.in.queue=10000 -hbase.client.compatibility-check=true \ No newline at end of file +hbase.client.compatibility-check=true + +collector.batchwrite.enable=true +collector.batchwrite.timertick=100 +collector.batchwrite.writebuffer.size=5012 +collector.batchwrite.writebuffer.heaplimit=100MB \ No newline at end of file diff --git a/commons-hbase/pom.xml b/commons-hbase/pom.xml index d4fc047e1917..bdbceeb61a69 100644 --- a/commons-hbase/pom.xml +++ b/commons-hbase/pom.xml @@ -34,6 +34,7 @@ java18 ${log4j2-jdk8.version} + ${spring5.version} @@ -71,6 +72,7 @@ org.springframework spring-core + ${spring.version} commons-logging @@ -81,6 +83,7 @@ org.springframework spring-tx + ${spring.version} diff --git a/commons-hbase/src/main/java/com/navercorp/pinpoint/common/hbase/SimpleBatchWriter.java b/commons-hbase/src/main/java/com/navercorp/pinpoint/common/hbase/SimpleBatchWriter.java new file mode 100644 index 000000000000..a5402ec0112b --- /dev/null +++ b/commons-hbase/src/main/java/com/navercorp/pinpoint/common/hbase/SimpleBatchWriter.java @@ -0,0 +1,10 @@ +package com.navercorp.pinpoint.common.hbase; + +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Put; + + +public interface SimpleBatchWriter { + + void write(TableName tableName, Put mutation); +} diff --git a/commons-hbase/src/main/java/com/navercorp/pinpoint/common/hbase/batch/BufferedMutatorConfiguration.java b/commons-hbase/src/main/java/com/navercorp/pinpoint/common/hbase/batch/BufferedMutatorConfiguration.java new file mode 100644 index 000000000000..2326c7233da1 --- /dev/null +++ b/commons-hbase/src/main/java/com/navercorp/pinpoint/common/hbase/batch/BufferedMutatorConfiguration.java @@ -0,0 +1,46 @@ +package com.navercorp.pinpoint.common.hbase.batch; + +import org.springframework.beans.factory.annotation.Value; +import org.springframework.util.unit.DataSize; + +public class BufferedMutatorConfiguration { + + @Value("${collector.batchwrite.enable:true}") + private boolean batchWriter = true; + + @Value("${collector.batchwrite.timertick:100}") + private long writeBufferPeriodicFlushTimerTickMs = 100; + + @Value("${collector.batchwrite.writebuffer.size:5012}") + private long writeBufferSize = 1024 * 5; + + // for OOM prevent + @Value("${collector.batchwrite.writebuffer.heaplimit:100MB}") + private DataSize writeBufferHeapLimit = DataSize.ofMegabytes(100); + + public boolean isBatchWriter() { + return batchWriter; + } + + public long getWriteBufferPeriodicFlushTimerTickMs() { + return writeBufferPeriodicFlushTimerTickMs; + } + + public long getWriteBufferSize() { + return writeBufferSize; + } + + public long getWriteBufferHeapLimit() { + return writeBufferHeapLimit.toBytes(); + } + + @Override + public String toString() { + return "BufferedMutatorConfiguration{" + + "batchWriter=" + batchWriter + + ", writeBufferPeriodicFlushTimerTickMs=" + writeBufferPeriodicFlushTimerTickMs + + ", writeBufferSize=" + writeBufferSize + + ", writeBufferHeapLimit=" + writeBufferHeapLimit + + '}'; + } +} diff --git a/commons-hbase/src/main/java/com/navercorp/pinpoint/common/hbase/batch/BufferedMutatorWriter.java b/commons-hbase/src/main/java/com/navercorp/pinpoint/common/hbase/batch/BufferedMutatorWriter.java new file mode 100644 index 000000000000..46e999734685 --- /dev/null +++ b/commons-hbase/src/main/java/com/navercorp/pinpoint/common/hbase/batch/BufferedMutatorWriter.java @@ -0,0 +1,209 @@ +package com.navercorp.pinpoint.common.hbase.batch; + +import com.navercorp.pinpoint.common.hbase.HbaseSystemException; +import com.navercorp.pinpoint.common.hbase.util.HBaseExceptionUtils; +import com.navercorp.pinpoint.common.hbase.util.SharedExecutorService; +import com.navercorp.pinpoint.common.profiler.concurrent.ExecutorFactory; +import com.navercorp.pinpoint.common.profiler.concurrent.PinpointThreadFactory; +import com.navercorp.pinpoint.common.util.CpuUtils; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.BufferedMutator; +import org.apache.hadoop.hbase.client.BufferedMutatorImpl; +import org.apache.hadoop.hbase.client.BufferedMutatorParams; +import org.apache.hadoop.hbase.client.BufferedMutatorUtils; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.Mutation; +import org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.DisposableBean; + +import java.io.IOException; +import java.util.Collections; +import java.util.List; +import java.util.Objects; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.RejectedExecutionHandler; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.LongAdder; +import java.util.function.Function; + +public class BufferedMutatorWriter implements DisposableBean, HbaseBatchWriter { + private final Logger logger = LoggerFactory.getLogger(this.getClass()); + + private final long writeBufferPeriodicFlushTimerTickMs; + private final long writeBufferLimit; + private final long writeBufferSize; + + // private final ReadWriteLock lock = new ReentrantReadWriteLock(); +// private final Striped lock = Striped.lock(128); + + private final ConcurrentMap mutatorMap = new ConcurrentHashMap<>(); + private final Function mutatorSupplier; + + private final LongAdder successCounter = new LongAdder(); + private final LongAdder errorCounter = new LongAdder(); + + private final ExecutorService pool; + private final ExecutorService sharedPool; + + + public BufferedMutatorWriter(Connection connection, BufferedMutatorConfiguration configuration) { + Objects.requireNonNull(connection, "connection"); + logger.info("{}", configuration); + + this.writeBufferPeriodicFlushTimerTickMs = configuration.getWriteBufferPeriodicFlushTimerTickMs(); + this.writeBufferSize = configuration.getWriteBufferSize(); + this.writeBufferLimit = configuration.getWriteBufferHeapLimit(); + + this.mutatorSupplier = new MutatorFactory(connection); + + this.pool = newExecutorService(); + this.sharedPool = new SharedExecutorService(this.pool); + } + + private ExecutorService newExecutorService() { + ThreadFactory factory = PinpointThreadFactory.createThreadFactory("BufferedMutatorWriter"); + final ThreadPoolExecutor pool = ExecutorFactory.newFixedThreadPool(CpuUtils.cpuCount(), 1024, factory); + pool.setRejectedExecutionHandler(new RejectedExecutionHandler() { + @Override + public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { + // error log + logger.error("Async batch job rejected job:{} ", r); + } + }); + return pool; + } + + private BufferedMutatorParams newBufferedMutatorParams(TableName tableName) { + BufferedMutatorParams params = new BufferedMutatorParams(tableName); + params.writeBufferSize(writeBufferSize); + params.setWriteBufferPeriodicFlushTimeoutMs(writeBufferPeriodicFlushTimerTickMs); + params.setWriteBufferPeriodicFlushTimerTickMs(writeBufferPeriodicFlushTimerTickMs); + params.pool(this.sharedPool); + + params.listener(new BufferedMutator.ExceptionListener() { + @Override + public void onException(RetriesExhaustedWithDetailsException e, BufferedMutator bufferedMutator) throws RetriesExhaustedWithDetailsException { + + errorCounter.increment(); + // fail count + TableName tableName = bufferedMutator.getName(); + int numExceptions = e.getNumExceptions(); + if (e.mayHaveClusterIssues()) { + String hosts = HBaseExceptionUtils.getErrorHost(e); + logger.error("Batch write error(mayHaveClusterIssues) {} numExceptions:{} {}", tableName, numExceptions, hosts); + } else { + logger.warn("Batch write error {} numExceptions:{}", tableName, numExceptions); + } + if (logger.isDebugEnabled()) { + String exhaustiveDescription = e.getExhaustiveDescription(); + logger.debug("ExhaustiveDescription {}", exhaustiveDescription); + } + } + + }); + return params; + } + + @Override + public void write(TableName tableName, List mutations) { + Objects.requireNonNull(tableName, "tableName"); + + final BufferedMutatorImpl mutator = getBufferedMutator(tableName); + + final long currentWriteBufferSize = BufferedMutatorUtils.getCurrentWriteBufferSize(mutator); + if (currentWriteBufferSize > writeBufferLimit) { + this.errorCounter.increment(); + return; + } + try { + mutator.mutate(mutations); + this.successCounter.increment(); + } catch (IOException e) { + this.errorCounter.increment(); + throw new HbaseSystemException(e); + } + } + + private BufferedMutatorImpl getBufferedMutator(TableName tableName) { + // workaround https://bugs.openjdk.java.net/browse/JDK-8161372 + final BufferedMutatorImpl mutator = this.mutatorMap.get(tableName); + if (mutator!= null) { + return mutator; + } + + return this.mutatorMap.computeIfAbsent(tableName, this.mutatorSupplier); +// final Lock lock = this.lock.get(tableName); +// try { +// lock.lock(); +// return this.mutatorMap.computeIfAbsent(tableName, this.mutatorSupplier); +// } finally { +// lock.unlock(); +// } + } + + @Override + public void write(TableName tableName, Mutation mutation) { + Objects.requireNonNull(tableName, "tableName"); + Objects.requireNonNull(mutation, "mutation"); + + write(tableName, Collections.singletonList(mutation)); + } + + private class MutatorFactory implements Function { + private final Connection connection; + + public MutatorFactory(Connection connection) { + this.connection = Objects.requireNonNull(connection, "connection"); + } + + @Override + public BufferedMutatorImpl apply(TableName tableName) { + try { + BufferedMutatorParams params = newBufferedMutatorParams(tableName); + return (BufferedMutatorImpl) this.connection.getBufferedMutator(params); + } catch (IOException e) { + throw new HbaseSystemException(e); + } + } + } + + public long getSuccessCount() { + return successCounter.sum(); + } + + public long getErrorCount() { + return errorCounter.sum(); + } + + @Override + public void destroy() throws Exception { + logger.info("destroy {}", this.mutatorMap.size()); +// final Lock lock = this.lock.get(tableName); +// lock.lock(); +// try { +// closeMutator(); +// } finally { +// lock.unlock(); +// } + closeMutator(); + this.pool.shutdown(); + this.pool.awaitTermination(1000 * 3, TimeUnit.MICROSECONDS); + + } + + private void closeMutator() { + for (BufferedMutator mutator : this.mutatorMap.values()) { + try { + mutator.close(); + } catch (IOException ignore) { + // + } + } + } +} diff --git a/commons-hbase/src/main/java/com/navercorp/pinpoint/common/hbase/batch/DelegateSimpleBatchWriter.java b/commons-hbase/src/main/java/com/navercorp/pinpoint/common/hbase/batch/DelegateSimpleBatchWriter.java new file mode 100644 index 000000000000..1783e1986a07 --- /dev/null +++ b/commons-hbase/src/main/java/com/navercorp/pinpoint/common/hbase/batch/DelegateSimpleBatchWriter.java @@ -0,0 +1,34 @@ +package com.navercorp.pinpoint.common.hbase.batch; + +import com.navercorp.pinpoint.common.hbase.HbaseOperations2; +import com.navercorp.pinpoint.common.hbase.SimpleBatchWriter; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Put; +import org.springframework.beans.factory.annotation.Qualifier; + +import java.util.Objects; + +public class DelegateSimpleBatchWriter implements SimpleBatchWriter { + private final boolean batchWriter; + + private final HbaseBatchWriter hbaseBatchWriter; + private final HbaseOperations2 hbaseTemplate; + + public DelegateSimpleBatchWriter(BufferedMutatorConfiguration configuration, + HbaseBatchWriter hbaseBatchWriter, + @Qualifier("asyncPutHbaseTemplate") HbaseOperations2 hbaseTemplate) { + this.batchWriter = configuration.isBatchWriter(); + this.hbaseBatchWriter = hbaseBatchWriter; + this.hbaseTemplate = Objects.requireNonNull(hbaseTemplate, "hbaseTemplate"); + } + + + @Override + public void write(TableName tableName, Put put) { + if (batchWriter) { + hbaseBatchWriter.write(tableName, put); + } else { + hbaseTemplate.asyncPut(tableName, put); + } + } +} diff --git a/commons-hbase/src/main/java/com/navercorp/pinpoint/common/hbase/batch/HbaseBatchWriter.java b/commons-hbase/src/main/java/com/navercorp/pinpoint/common/hbase/batch/HbaseBatchWriter.java new file mode 100644 index 000000000000..aa29a3d40cd6 --- /dev/null +++ b/commons-hbase/src/main/java/com/navercorp/pinpoint/common/hbase/batch/HbaseBatchWriter.java @@ -0,0 +1,12 @@ +package com.navercorp.pinpoint.common.hbase.batch; + +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Mutation; + +import java.util.List; + +public interface HbaseBatchWriter { + void write(TableName tableName, List mutations); + + void write(TableName tableName, Mutation mutation); +} diff --git a/commons-hbase/src/main/java/com/navercorp/pinpoint/common/hbase/util/HBaseExceptionUtils.java b/commons-hbase/src/main/java/com/navercorp/pinpoint/common/hbase/util/HBaseExceptionUtils.java new file mode 100644 index 000000000000..b4a65c6bd73c --- /dev/null +++ b/commons-hbase/src/main/java/com/navercorp/pinpoint/common/hbase/util/HBaseExceptionUtils.java @@ -0,0 +1,21 @@ +package com.navercorp.pinpoint.common.hbase.util; + +import org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException; + +import java.util.HashSet; +import java.util.Set; + +public final class HBaseExceptionUtils { + private HBaseExceptionUtils() { + } + + public static String getErrorHost(RetriesExhaustedWithDetailsException e) { + final int numExceptions = e.getNumExceptions(); + Set hostErrors = new HashSet<>(numExceptions); + for (int i = 0; i < numExceptions; i++) { + String hostnamePort = e.getHostnamePort(i); + hostErrors.add(hostnamePort); + } + return hostErrors.toString(); + } +} diff --git a/commons-hbase/src/main/java/com/navercorp/pinpoint/common/hbase/util/SharedExecutorService.java b/commons-hbase/src/main/java/com/navercorp/pinpoint/common/hbase/util/SharedExecutorService.java new file mode 100644 index 000000000000..d7df8cf42243 --- /dev/null +++ b/commons-hbase/src/main/java/com/navercorp/pinpoint/common/hbase/util/SharedExecutorService.java @@ -0,0 +1,90 @@ +package com.navercorp.pinpoint.common.hbase.util; + +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Objects; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +public class SharedExecutorService implements ExecutorService { + private final ExecutorService e; + private boolean shutdown = false; + + public SharedExecutorService(ExecutorService e) { + this.e = Objects.requireNonNull(e, "e"); + } + + @Override + public void shutdown() { + this.shutdown = true; +// e.shutdown(); + } + + @Override + public List shutdownNow() { + this.shutdown = true; +// return e.shutdownNow(); + return Collections.emptyList(); + } + + @Override + public boolean isShutdown() { + return this.shutdown; + } + + @Override + public boolean isTerminated() { + return this.shutdown; + } + + @Override + public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException { +// return e.awaitTermination(timeout, unit); + return true; + } + + @Override + public Future submit(Callable task) { + return e.submit(task); + } + + @Override + public Future submit(Runnable task, T result) { + return e.submit(task, result); + } + + @Override + public Future submit(Runnable task) { + return e.submit(task); + } + + @Override + public List> invokeAll(Collection> tasks) throws InterruptedException { + return e.invokeAll(tasks); + } + + @Override + public List> invokeAll(Collection> tasks, long timeout, TimeUnit unit) throws InterruptedException { + return e.invokeAll(tasks, timeout, unit); + } + + @Override + public T invokeAny(Collection> tasks) throws InterruptedException, ExecutionException { + return e.invokeAny(tasks); + } + + @Override + public T invokeAny(Collection> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { + return e.invokeAny(tasks, timeout, unit); + } + + @Override + public void execute(Runnable command) { + e.execute(command); + } +} diff --git a/commons-hbase/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorUtils.java b/commons-hbase/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorUtils.java new file mode 100644 index 000000000000..7b0942822ec6 --- /dev/null +++ b/commons-hbase/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorUtils.java @@ -0,0 +1,10 @@ +package org.apache.hadoop.hbase.client; + +public final class BufferedMutatorUtils { + private BufferedMutatorUtils() { + } + + public static long getCurrentWriteBufferSize(BufferedMutatorImpl mutator) { + return mutator.getCurrentWriteBufferSize(); + } +}