diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index 38f62d5d4631..9e4ac51ec378 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -397,6 +397,7 @@ public void setRestoredRegion(boolean restoredRegion) { final ExecutorService rowProcessorExecutor = Executors.newCachedThreadPool(); final ConcurrentHashMap scannerReadPoints; + final ReadPointCalculationLock smallestReadPointCalcLock; /** * The sequence ID that was enLongAddered when this region was opened. @@ -435,19 +436,18 @@ public void setRestoredRegion(boolean restoredRegion) { * this readPoint, are included in every read operation. */ public long getSmallestReadPoint() { - long minimumReadPoint; // We need to ensure that while we are calculating the smallestReadPoint // no new RegionScanners can grab a readPoint that we are unaware of. - // We achieve this by synchronizing on the scannerReadPoints object. - synchronized (scannerReadPoints) { - minimumReadPoint = mvcc.getReadPoint(); + smallestReadPointCalcLock.lock(ReadPointCalculationLock.LockType.CALCULATION_LOCK); + try { + long minimumReadPoint = mvcc.getReadPoint(); for (Long readPoint : this.scannerReadPoints.values()) { - if (readPoint < minimumReadPoint) { - minimumReadPoint = readPoint; - } + minimumReadPoint = Math.min(minimumReadPoint, readPoint); } + return minimumReadPoint; + } finally { + smallestReadPointCalcLock.unlock(ReadPointCalculationLock.LockType.CALCULATION_LOCK); } - return minimumReadPoint; } /* @@ -812,6 +812,7 @@ public HRegion(final HRegionFileSystem fs, final WAL wal, final Configuration co setHTableSpecificConf(); this.scannerReadPoints = new ConcurrentHashMap<>(); + this.smallestReadPointCalcLock = new ReadPointCalculationLock(conf); this.busyWaitDuration = conf.getLong("hbase.busy.wait.duration", DEFAULT_BUSY_WAIT_DURATION); this.maxBusyWaitMultiplier = conf.getInt("hbase.busy.wait.multiplier.max", 2); @@ -6994,7 +6995,8 @@ public RegionInfo getRegionInfo() { // getSmallestReadPoint, before scannerReadPoints is updated. IsolationLevel isolationLevel = scan.getIsolationLevel(); long mvccReadPoint = PackagePrivateFieldAccessor.getMvccReadPoint(scan); - synchronized (scannerReadPoints) { + region.smallestReadPointCalcLock.lock(ReadPointCalculationLock.LockType.RECORDING_LOCK); + try { if (mvccReadPoint > 0) { this.readPt = mvccReadPoint; } else if ( @@ -7005,6 +7007,8 @@ public RegionInfo getRegionInfo() { this.readPt = rsServices.getNonceManager().getMvccFromOperationContext(nonceGroup, nonce); } scannerReadPoints.put(this, this.readPt); + } finally { + region.smallestReadPointCalcLock.unlock(ReadPointCalculationLock.LockType.RECORDING_LOCK); } initializeScanners(scan, additionalScanners); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReadPointCalculationLock.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReadPointCalculationLock.java new file mode 100644 index 000000000000..380b82de93a6 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReadPointCalculationLock.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import org.apache.hadoop.conf.Configuration; +import org.apache.yetus.audience.InterfaceAudience; + +/** + * Lock to manage concurrency between {@link RegionScanner} and + * {@link HRegion#getSmallestReadPoint()}. We need to ensure that while we are calculating the + * smallest read point, no new scanners can modify the scannerReadPoints Map. We used to achieve + * this by synchronizing on the scannerReadPoints object. But this may block the read thread and + * reduce the read performance. Since the scannerReadPoints object is a + * {@link java.util.concurrent.ConcurrentHashMap}, which is thread-safe, so the + * {@link RegionScanner} can record their read points concurrently, what it needs to do is just + * acquiring a shared lock. When we calculate the smallest read point, we need to acquire an + * exclusive lock. This can improve read performance in most scenarios, only not when we have a lot + * of delta operations, like {@link org.apache.hadoop.hbase.client.Append} or + * {@link org.apache.hadoop.hbase.client.Increment}. So we introduce a flag to enable/disable this + * feature. + */ +@InterfaceAudience.Private +public class ReadPointCalculationLock { + + public enum LockType { + CALCULATION_LOCK, + RECORDING_LOCK + } + + private final boolean useReadWriteLockForReadPoints; + private Lock lock; + private ReadWriteLock readWriteLock; + + ReadPointCalculationLock(Configuration conf) { + this.useReadWriteLockForReadPoints = + conf.getBoolean("hbase.region.readpoints.read.write.lock.enable", false); + if (useReadWriteLockForReadPoints) { + readWriteLock = new ReentrantReadWriteLock(); + } else { + lock = new ReentrantLock(); + } + } + + void lock(LockType lockType) { + if (useReadWriteLockForReadPoints) { + assert lock == null; + if (lockType == LockType.CALCULATION_LOCK) { + readWriteLock.writeLock().lock(); + } else { + readWriteLock.readLock().lock(); + } + } else { + assert readWriteLock == null; + lock.lock(); + } + } + + void unlock(LockType lockType) { + if (useReadWriteLockForReadPoints) { + assert lock == null; + if (lockType == LockType.CALCULATION_LOCK) { + readWriteLock.writeLock().unlock(); + } else { + readWriteLock.readLock().unlock(); + } + } else { + assert readWriteLock == null; + lock.unlock(); + } + } +}