forked from kovyrin/hbase
/
HRegion.java
5473 lines (4998 loc) · 193 KB
/
HRegion.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver;
import java.io.EOFException;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.io.UnsupportedEncodingException;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.text.ParseException;
import java.util.AbstractList;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.NavigableSet;
import java.util.Random;
import java.util.Set;
import java.util.TreeMap;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import com.google.protobuf.*;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.DroppedSnapshotException;
import org.apache.hadoop.hbase.FailedSanityCheckException;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HConstants.OperationStatusCode;
import org.apache.hadoop.hbase.HDFSBlocksDistribution;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.NotServingRegionException;
import org.apache.hadoop.hbase.RegionTooBusyException;
import org.apache.hadoop.hbase.UnknownScannerException;
import org.apache.hadoop.hbase.backup.HFileArchiver;
import org.apache.hadoop.hbase.client.Append;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Increment;
import org.apache.hadoop.hbase.client.IsolationLevel;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Row;
import org.apache.hadoop.hbase.client.RowLock;
import org.apache.hadoop.hbase.client.RowMutations;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.coprocessor.Exec;
import org.apache.hadoop.hbase.client.coprocessor.ExecResult;
import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.filter.FilterWrapper;
import org.apache.hadoop.hbase.filter.IncompatibleFilterException;
import org.apache.hadoop.hbase.filter.ByteArrayComparable;
import org.apache.hadoop.hbase.io.HeapSize;
import org.apache.hadoop.hbase.io.TimeRange;
import org.apache.hadoop.hbase.io.hfile.BlockCache;
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
import org.apache.hadoop.hbase.ipc.CoprocessorProtocol;
import org.apache.hadoop.hbase.ipc.HBaseRPC;
import org.apache.hadoop.hbase.ipc.HBaseServer;
import org.apache.hadoop.hbase.ipc.RpcCallContext;
import org.apache.hadoop.hbase.monitoring.MonitoredTask;
import org.apache.hadoop.hbase.monitoring.TaskMonitor;
import org.apache.hadoop.hbase.regionserver.MultiVersionConsistencyControl.WriteEntry;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
import org.apache.hadoop.hbase.regionserver.wal.HLog;
import org.apache.hadoop.hbase.regionserver.wal.HLogFactory;
import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
import org.apache.hadoop.hbase.regionserver.wal.HLogUtil;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.CancelableProgressable;
import org.apache.hadoop.hbase.util.ClassSize;
import org.apache.hadoop.hbase.util.CompressionTest;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.HashedBytes;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.io.MultipleIOException;
import org.apache.hadoop.util.StringUtils;
import org.cliffc.high_scale_lib.Counter;
import com.google.common.base.Preconditions;
import com.google.common.collect.ClassToInstanceMap;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.MutableClassToInstanceMap;
import com.google.common.io.Closeables;
import static org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceCall;
/**
* HRegion stores data for a certain region of a table. It stores all columns
* for each row. A given table consists of one or more HRegions.
*
* <p>We maintain multiple HStores for a single HRegion.
*
* <p>An Store is a set of rows with some column data; together,
* they make up all the data for the rows.
*
* <p>Each HRegion has a 'startKey' and 'endKey'.
* <p>The first is inclusive, the second is exclusive (except for
* the final region) The endKey of region 0 is the same as
* startKey for region 1 (if it exists). The startKey for the
* first region is null. The endKey for the final region is null.
*
* <p>Locking at the HRegion level serves only one purpose: preventing the
* region from being closed (and consequently split) while other operations
* are ongoing. Each row level operation obtains both a row lock and a region
* read lock for the duration of the operation. While a scanner is being
* constructed, getScanner holds a read lock. If the scanner is successfully
* constructed, it holds a read lock until it is closed. A close takes out a
* write lock and consequently will block for ongoing operations and will block
* new operations from starting while the close is in progress.
*
* <p>An HRegion is defined by its table and its key extent.
*
* <p>It consists of at least one Store. The number of Stores should be
* configurable, so that data which is accessed together is stored in the same
* Store. Right now, we approximate that by building a single Store for
* each column family. (This config info will be communicated via the
* tabledesc.)
*
* <p>The HTableDescriptor contains metainfo about the HRegion's table.
* regionName is a unique identifier for this HRegion. (startKey, endKey]
* defines the keyspace for this HRegion.
*/
@InterfaceAudience.Private
public class HRegion implements HeapSize { // , Writable{
public static final Log LOG = LogFactory.getLog(HRegion.class);
private static final String MERGEDIR = ".merges";
final AtomicBoolean closed = new AtomicBoolean(false);
/* Closing can take some time; use the closing flag if there is stuff we don't
* want to do while in closing state; e.g. like offer this region up to the
* master as a region to close if the carrying regionserver is overloaded.
* Once set, it is never cleared.
*/
final AtomicBoolean closing = new AtomicBoolean(false);
protected long completeSequenceId = -1L;
//////////////////////////////////////////////////////////////////////////////
// Members
//////////////////////////////////////////////////////////////////////////////
private final ConcurrentHashMap<HashedBytes, CountDownLatch> lockedRows =
new ConcurrentHashMap<HashedBytes, CountDownLatch>();
private final ConcurrentHashMap<Integer, HashedBytes> lockIds =
new ConcurrentHashMap<Integer, HashedBytes>();
private final AtomicInteger lockIdGenerator = new AtomicInteger(1);
static private Random rand = new Random();
protected final Map<byte[], Store> stores = new ConcurrentSkipListMap<byte[], Store>(
Bytes.BYTES_RAWCOMPARATOR);
// Registered region protocol handlers
private ClassToInstanceMap<CoprocessorProtocol>
protocolHandlers = MutableClassToInstanceMap.create();
private Map<String, Class<? extends CoprocessorProtocol>>
protocolHandlerNames = Maps.newHashMap();
// TODO: account for each registered handler in HeapSize computation
private Map<String, Service> coprocessorServiceHandlers = Maps.newHashMap();
/**
* Temporary subdirectory of the region directory used for compaction output.
*/
public static final String REGION_TEMP_SUBDIR = ".tmp";
//These variable are just used for getting data out of the region, to test on
//client side
// private int numStores = 0;
// private int [] storeSize = null;
// private byte [] name = null;
public final AtomicLong memstoreSize = new AtomicLong(0);
// Debug possible data loss due to WAL off
final Counter numPutsWithoutWAL = new Counter();
final Counter dataInMemoryWithoutWAL = new Counter();
// Debug why CAS operations are taking a while.
final Counter checkAndMutateChecksPassed = new Counter();
final Counter checkAndMutateChecksFailed = new Counter();
//Number of requests
final Counter readRequestsCount = new Counter();
final Counter writeRequestsCount = new Counter();
//How long operations were blocked by a memstore over highwater.
final Counter updatesBlockedMs = new Counter();
/**
* The directory for the table this region is part of.
* This directory contains the directory for this region.
*/
private final Path tableDir;
private final HLog log;
private final FileSystem fs;
private final Configuration conf;
private final Configuration baseConf;
private final int rowLockWaitDuration;
static final int DEFAULT_ROWLOCK_WAIT_DURATION = 30000;
// The internal wait duration to acquire a lock before read/update
// from the region. It is not per row. The purpose of this wait time
// is to avoid waiting a long time while the region is busy, so that
// we can release the IPC handler soon enough to improve the
// availability of the region server. It can be adjusted by
// tuning configuration "hbase.busy.wait.duration".
final long busyWaitDuration;
static final long DEFAULT_BUSY_WAIT_DURATION = HConstants.DEFAULT_HBASE_RPC_TIMEOUT;
// If updating multiple rows in one call, wait longer,
// i.e. waiting for busyWaitDuration * # of rows. However,
// we can limit the max multiplier.
final int maxBusyWaitMultiplier;
// Max busy wait duration. There is no point to wait longer than the RPC
// purge timeout, when a RPC call will be terminated by the RPC engine.
final long maxBusyWaitDuration;
// negative number indicates infinite timeout
static final long DEFAULT_ROW_PROCESSOR_TIMEOUT = 60 * 1000L;
final ExecutorService rowProcessorExecutor = Executors.newCachedThreadPool();
private final HRegionInfo regionInfo;
private final Path regiondir;
KeyValue.KVComparator comparator;
private final ConcurrentHashMap<RegionScanner, Long> scannerReadPoints;
/**
* @return The smallest mvcc readPoint across all the scanners in this
* region. Writes older than this readPoint, are included in every
* read operation.
*/
public long getSmallestReadPoint() {
long minimumReadPoint;
// We need to ensure that while we are calculating the smallestReadPoint
// no new RegionScanners can grab a readPoint that we are unaware of.
// We achieve this by synchronizing on the scannerReadPoints object.
synchronized(scannerReadPoints) {
minimumReadPoint = mvcc.memstoreReadPoint();
for (Long readPoint: this.scannerReadPoints.values()) {
if (readPoint < minimumReadPoint) {
minimumReadPoint = readPoint;
}
}
}
return minimumReadPoint;
}
/*
* Data structure of write state flags used coordinating flushes,
* compactions and closes.
*/
static class WriteState {
// Set while a memstore flush is happening.
volatile boolean flushing = false;
// Set when a flush has been requested.
volatile boolean flushRequested = false;
// Number of compactions running.
volatile int compacting = 0;
// Gets set in close. If set, cannot compact or flush again.
volatile boolean writesEnabled = true;
// Set if region is read-only
volatile boolean readOnly = false;
/**
* Set flags that make this region read-only.
*
* @param onOff flip value for region r/o setting
*/
synchronized void setReadOnly(final boolean onOff) {
this.writesEnabled = !onOff;
this.readOnly = onOff;
}
boolean isReadOnly() {
return this.readOnly;
}
boolean isFlushRequested() {
return this.flushRequested;
}
static final long HEAP_SIZE = ClassSize.align(
ClassSize.OBJECT + 5 * Bytes.SIZEOF_BOOLEAN);
}
final WriteState writestate = new WriteState();
long memstoreFlushSize;
final long timestampSlop;
final long rowProcessorTimeout;
private volatile long lastFlushTime;
final RegionServerServices rsServices;
private RegionServerAccounting rsAccounting;
private List<Pair<Long, Long>> recentFlushes = new ArrayList<Pair<Long,Long>>();
private long blockingMemStoreSize;
final long threadWakeFrequency;
// Used to guard closes
final ReentrantReadWriteLock lock =
new ReentrantReadWriteLock();
// Stop updates lock
private final ReentrantReadWriteLock updatesLock =
new ReentrantReadWriteLock();
private boolean splitRequest;
private byte[] explicitSplitPoint = null;
private final MultiVersionConsistencyControl mvcc =
new MultiVersionConsistencyControl();
// Coprocessor host
private RegionCoprocessorHost coprocessorHost;
/**
* Name of the region info file that resides just under the region directory.
*/
public final static String REGIONINFO_FILE = ".regioninfo";
private HTableDescriptor htableDescriptor = null;
private RegionSplitPolicy splitPolicy;
private final MetricsRegion metricsRegion;
private final MetricsRegionWrapperImpl metricsRegionWrapper;
/**
* Should only be used for testing purposes
*/
public HRegion(){
this.tableDir = null;
this.blockingMemStoreSize = 0L;
this.conf = null;
this.rowLockWaitDuration = DEFAULT_ROWLOCK_WAIT_DURATION;
this.rsServices = null;
this.baseConf = null;
this.fs = null;
this.timestampSlop = HConstants.LATEST_TIMESTAMP;
this.rowProcessorTimeout = DEFAULT_ROW_PROCESSOR_TIMEOUT;
this.memstoreFlushSize = 0L;
this.log = null;
this.regiondir = null;
this.regionInfo = null;
this.htableDescriptor = null;
this.threadWakeFrequency = 0L;
this.coprocessorHost = null;
this.scannerReadPoints = new ConcurrentHashMap<RegionScanner, Long>();
this.metricsRegionWrapper = new MetricsRegionWrapperImpl(this);
this.metricsRegion = new MetricsRegion(this.metricsRegionWrapper);
this.maxBusyWaitDuration = 2 * HConstants.DEFAULT_HBASE_RPC_TIMEOUT;
this.busyWaitDuration = DEFAULT_BUSY_WAIT_DURATION;
this.maxBusyWaitMultiplier = 2;
}
/**
* HRegion copy constructor. Useful when reopening a closed region (normally
* for unit tests)
* @param other original object
*/
public HRegion(HRegion other) {
this(other.getTableDir(), other.getLog(), other.getFilesystem(),
other.baseConf, other.getRegionInfo(), other.getTableDesc(), null);
}
/**
* HRegion constructor. his constructor should only be used for testing and
* extensions. Instances of HRegion should be instantiated with the
* {@link HRegion#newHRegion(Path, HLog, FileSystem, Configuration, HRegionInfo, HTableDescriptor, RegionServerServices)} method.
*
*
* @param tableDir qualified path of directory where region should be located,
* usually the table directory.
* @param log The HLog is the outbound log for any updates to the HRegion
* (There's a single HLog for all the HRegions on a single HRegionServer.)
* The log file is a logfile from the previous execution that's
* custom-computed for this HRegion. The HRegionServer computes and sorts the
* appropriate log info for this HRegion. If there is a previous log file
* (implying that the HRegion has been written-to before), then read it from
* the supplied path.
* @param fs is the filesystem.
* @param confParam is global configuration settings.
* @param regionInfo - HRegionInfo that describes the region
* is new), then read them from the supplied path.
* @param rsServices reference to {@link RegionServerServices} or null
*
* @see HRegion#newHRegion(Path, HLog, FileSystem, Configuration, HRegionInfo, HTableDescriptor, RegionServerServices)
*/
public HRegion(Path tableDir, HLog log, FileSystem fs,
Configuration confParam, final HRegionInfo regionInfo,
final HTableDescriptor htd, RegionServerServices rsServices) {
this.tableDir = tableDir;
this.comparator = regionInfo.getComparator();
this.log = log;
this.fs = fs;
if (confParam instanceof CompoundConfiguration) {
throw new IllegalArgumentException("Need original base configuration");
}
// 'conf' renamed to 'confParam' b/c we use this.conf in the constructor
this.baseConf = confParam;
this.conf = new CompoundConfiguration()
.add(confParam)
.add(htd.getValues());
this.rowLockWaitDuration = conf.getInt("hbase.rowlock.wait.duration",
DEFAULT_ROWLOCK_WAIT_DURATION);
this.regionInfo = regionInfo;
this.htableDescriptor = htd;
this.rsServices = rsServices;
this.threadWakeFrequency = conf.getLong(HConstants.THREAD_WAKE_FREQUENCY,
10 * 1000);
String encodedNameStr = this.regionInfo.getEncodedName();
setHTableSpecificConf();
this.regiondir = getRegionDir(this.tableDir, encodedNameStr);
this.scannerReadPoints = new ConcurrentHashMap<RegionScanner, Long>();
this.busyWaitDuration = conf.getLong(
"hbase.busy.wait.duration", DEFAULT_BUSY_WAIT_DURATION);
this.maxBusyWaitMultiplier = conf.getInt("hbase.busy.wait.multiplier.max", 2);
if (busyWaitDuration * maxBusyWaitMultiplier <= 0L) {
throw new IllegalArgumentException("Invalid hbase.busy.wait.duration ("
+ busyWaitDuration + ") or hbase.busy.wait.multiplier.max ("
+ maxBusyWaitMultiplier + "). Their product should be positive");
}
this.maxBusyWaitDuration = conf.getLong("ipc.client.call.purge.timeout",
2 * HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
/*
* timestamp.slop provides a server-side constraint on the timestamp. This
* assumes that you base your TS around currentTimeMillis(). In this case,
* throw an error to the user if the user-specified TS is newer than now +
* slop. LATEST_TIMESTAMP == don't use this functionality
*/
this.timestampSlop = conf.getLong(
"hbase.hregion.keyvalue.timestamp.slop.millisecs",
HConstants.LATEST_TIMESTAMP);
/**
* Timeout for the process time in processRowsWithLocks().
* Use -1 to switch off time bound.
*/
this.rowProcessorTimeout = conf.getLong(
"hbase.hregion.row.processor.timeout", DEFAULT_ROW_PROCESSOR_TIMEOUT);
if (rsServices != null) {
this.rsAccounting = this.rsServices.getRegionServerAccounting();
// don't initialize coprocessors if not running within a regionserver
// TODO: revisit if coprocessors should load in other cases
this.coprocessorHost = new RegionCoprocessorHost(this, rsServices, conf);
this.metricsRegionWrapper = new MetricsRegionWrapperImpl(this);
this.metricsRegion = new MetricsRegion(this.metricsRegionWrapper);
} else {
this.metricsRegionWrapper = null;
this.metricsRegion = null;
}
if (LOG.isDebugEnabled()) {
// Write out region name as string and its encoded name.
LOG.debug("Instantiated " + this);
}
}
void setHTableSpecificConf() {
if (this.htableDescriptor == null) return;
long flushSize = this.htableDescriptor.getMemStoreFlushSize();
if (flushSize == HTableDescriptor.DEFAULT_MEMSTORE_FLUSH_SIZE) {
flushSize = conf.getLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE,
HTableDescriptor.DEFAULT_MEMSTORE_FLUSH_SIZE);
}
this.memstoreFlushSize = flushSize;
this.blockingMemStoreSize = this.memstoreFlushSize *
conf.getLong("hbase.hregion.memstore.block.multiplier", 2);
}
/**
* Initialize this region.
* @return What the next sequence (edit) id should be.
* @throws IOException e
*/
public long initialize() throws IOException {
return initialize(null);
}
/**
* Initialize this region.
*
* @param reporter Tickle every so often if initialize is taking a while.
* @return What the next sequence (edit) id should be.
* @throws IOException e
*/
public long initialize(final CancelableProgressable reporter)
throws IOException {
MonitoredTask status = TaskMonitor.get().createStatus("Initializing region " + this);
long nextSeqId = -1;
try {
nextSeqId = initializeRegionInternals(reporter, status);
return nextSeqId;
} finally {
// nextSeqid will be -1 if the initialization fails.
// At least it will be 0 otherwise.
if (nextSeqId == -1) {
status
.abort("Exception during region " + this.getRegionNameAsString() + " initialization.");
}
}
}
private long initializeRegionInternals(final CancelableProgressable reporter, MonitoredTask status)
throws IOException, UnsupportedEncodingException {
if (coprocessorHost != null) {
status.setStatus("Running coprocessor pre-open hook");
coprocessorHost.preOpen();
}
// Write HRI to a file in case we need to recover .META.
status.setStatus("Writing region info on filesystem");
checkRegioninfoOnFilesystem();
// Remove temporary data left over from old regions
status.setStatus("Cleaning up temporary data from old regions");
cleanupTmpDir();
// Load in all the HStores.
//
// Context: During replay we want to ensure that we do not lose any data. So, we
// have to be conservative in how we replay logs. For each store, we calculate
// the maxSeqId up to which the store was flushed. And, skip the edits which
// is equal to or lower than maxSeqId for each store.
Map<byte[], Long> maxSeqIdInStores = new TreeMap<byte[], Long>(
Bytes.BYTES_COMPARATOR);
long maxSeqId = -1;
// initialized to -1 so that we pick up MemstoreTS from column families
long maxMemstoreTS = -1;
if (this.htableDescriptor != null &&
!htableDescriptor.getFamilies().isEmpty()) {
// initialize the thread pool for opening stores in parallel.
ThreadPoolExecutor storeOpenerThreadPool =
getStoreOpenAndCloseThreadPool(
"StoreOpenerThread-" + this.regionInfo.getRegionNameAsString());
CompletionService<HStore> completionService =
new ExecutorCompletionService<HStore>(storeOpenerThreadPool);
// initialize each store in parallel
for (final HColumnDescriptor family : htableDescriptor.getFamilies()) {
status.setStatus("Instantiating store for column family " + family);
completionService.submit(new Callable<HStore>() {
public HStore call() throws IOException {
return instantiateHStore(tableDir, family);
}
});
}
try {
for (int i = 0; i < htableDescriptor.getFamilies().size(); i++) {
Future<HStore> future = completionService.take();
HStore store = future.get();
this.stores.put(store.getColumnFamilyName().getBytes(), store);
// Do not include bulk loaded files when determining seqIdForReplay
long storeSeqIdForReplay = store.getMaxSequenceId(false);
maxSeqIdInStores.put(store.getColumnFamilyName().getBytes(),
storeSeqIdForReplay);
// Include bulk loaded files when determining seqIdForAssignment
long storeSeqIdForAssignment = store.getMaxSequenceId(true);
if (maxSeqId == -1 || storeSeqIdForAssignment > maxSeqId) {
maxSeqId = storeSeqIdForAssignment;
}
long maxStoreMemstoreTS = store.getMaxMemstoreTS();
if (maxStoreMemstoreTS > maxMemstoreTS) {
maxMemstoreTS = maxStoreMemstoreTS;
}
}
} catch (InterruptedException e) {
throw new IOException(e);
} catch (ExecutionException e) {
throw new IOException(e.getCause());
} finally {
storeOpenerThreadPool.shutdownNow();
}
}
mvcc.initialize(maxMemstoreTS + 1);
// Recover any edits if available.
maxSeqId = Math.max(maxSeqId, replayRecoveredEditsIfAny(
this.regiondir, maxSeqIdInStores, reporter, status));
status.setStatus("Cleaning up detritus from prior splits");
// Get rid of any splits or merges that were lost in-progress. Clean out
// these directories here on open. We may be opening a region that was
// being split but we crashed in the middle of it all.
SplitTransaction.cleanupAnySplitDetritus(this);
FSUtils.deleteDirectory(this.fs, new Path(regiondir, MERGEDIR));
if (this.htableDescriptor != null) {
this.writestate.setReadOnly(this.htableDescriptor.isReadOnly());
}
this.writestate.flushRequested = false;
this.writestate.compacting = 0;
// Initialize split policy
this.splitPolicy = RegionSplitPolicy.create(this, conf);
this.lastFlushTime = EnvironmentEdgeManager.currentTimeMillis();
// Use maximum of log sequenceid or that which was found in stores
// (particularly if no recovered edits, seqid will be -1).
long nextSeqid = maxSeqId + 1;
LOG.info("Onlined " + this.toString() + "; next sequenceid=" + nextSeqid);
// A region can be reopened if failed a split; reset flags
this.closing.set(false);
this.closed.set(false);
if (coprocessorHost != null) {
status.setStatus("Running coprocessor post-open hooks");
coprocessorHost.postOpen();
}
status.markComplete("Region opened successfully");
return nextSeqid;
}
/*
* Move any passed HStore files into place (if any). Used to pick up split
* files and any merges from splits and merges dirs.
* @param initialFiles
* @throws IOException
*/
static void moveInitialFilesIntoPlace(final FileSystem fs,
final Path initialFiles, final Path regiondir)
throws IOException {
if (initialFiles != null && fs.exists(initialFiles)) {
if (!fs.rename(initialFiles, regiondir)) {
LOG.warn("Unable to rename " + initialFiles + " to " + regiondir);
}
}
}
/**
* @return True if this region has references.
*/
public boolean hasReferences() {
for (Store store : this.stores.values()) {
for (StoreFile sf : store.getStorefiles()) {
// Found a reference, return.
if (sf.isReference()) return true;
}
}
return false;
}
/**
* This function will return the HDFS blocks distribution based on the data
* captured when HFile is created
* @return The HDFS blocks distribution for the region.
*/
public HDFSBlocksDistribution getHDFSBlocksDistribution() {
HDFSBlocksDistribution hdfsBlocksDistribution =
new HDFSBlocksDistribution();
synchronized (this.stores) {
for (Store store : this.stores.values()) {
for (StoreFile sf : store.getStorefiles()) {
HDFSBlocksDistribution storeFileBlocksDistribution =
sf.getHDFSBlockDistribution();
hdfsBlocksDistribution.add(storeFileBlocksDistribution);
}
}
}
return hdfsBlocksDistribution;
}
/**
* This is a helper function to compute HDFS block distribution on demand
* @param conf configuration
* @param tableDescriptor HTableDescriptor of the table
* @param regionEncodedName encoded name of the region
* @return The HDFS blocks distribution for the given region.
* @throws IOException
*/
static public HDFSBlocksDistribution computeHDFSBlocksDistribution(
Configuration conf, HTableDescriptor tableDescriptor,
String regionEncodedName) throws IOException {
HDFSBlocksDistribution hdfsBlocksDistribution =
new HDFSBlocksDistribution();
Path tablePath = FSUtils.getTablePath(FSUtils.getRootDir(conf),
tableDescriptor.getName());
FileSystem fs = tablePath.getFileSystem(conf);
for (HColumnDescriptor family: tableDescriptor.getFamilies()) {
Path storeHomeDir = HStore.getStoreHomedir(tablePath, regionEncodedName,
family.getName());
if (!fs.exists(storeHomeDir))continue;
FileStatus[] hfilesStatus = null;
hfilesStatus = fs.listStatus(storeHomeDir);
for (FileStatus hfileStatus : hfilesStatus) {
HDFSBlocksDistribution storeFileBlocksDistribution =
FSUtils.computeHDFSBlocksDistribution(fs, hfileStatus, 0,
hfileStatus.getLen());
hdfsBlocksDistribution.add(storeFileBlocksDistribution);
}
}
return hdfsBlocksDistribution;
}
public AtomicLong getMemstoreSize() {
return memstoreSize;
}
/**
* Increase the size of mem store in this region and the size of global mem
* store
* @param memStoreSize
* @return the size of memstore in this region
*/
public long addAndGetGlobalMemstoreSize(long memStoreSize) {
if (this.rsAccounting != null) {
rsAccounting.addAndGetGlobalMemstoreSize(memStoreSize);
}
return this.memstoreSize.getAndAdd(memStoreSize);
}
/*
* Write out an info file under the region directory. Useful recovering
* mangled regions.
* @throws IOException
*/
private void checkRegioninfoOnFilesystem() throws IOException {
Path regioninfoPath = new Path(this.regiondir, REGIONINFO_FILE);
// Compose the content of the file so we can compare to length in filesystem. If not same,
// rewrite it (it may have been written in the old format using Writables instead of pb). The
// pb version is much shorter -- we write now w/o the toString version -- so checking length
// only should be sufficient. I don't want to read the file every time to check if it pb
// serialized.
byte [] content = getDotRegionInfoFileContent(this.getRegionInfo());
boolean exists = this.fs.exists(regioninfoPath);
FileStatus status = exists? this.fs.getFileStatus(regioninfoPath): null;
if (status != null && status.getLen() == content.length) {
// Then assume the content good and move on.
return;
}
// Create in tmpdir and then move into place in case we crash after
// create but before close. If we don't successfully close the file,
// subsequent region reopens will fail the below because create is
// registered in NN.
// First check to get the permissions
FsPermission perms = FSUtils.getFilePermissions(fs, conf, HConstants.DATA_FILE_UMASK_KEY);
// And then create the file
Path tmpPath = new Path(getTmpDir(), REGIONINFO_FILE);
// If datanode crashes or if the RS goes down just before the close is called while trying to
// close the created regioninfo file in the .tmp directory then on next
// creation we will be getting AlreadyCreatedException.
// Hence delete and create the file if exists.
if (FSUtils.isExists(fs, tmpPath)) {
FSUtils.delete(fs, tmpPath, true);
}
FSDataOutputStream out = FSUtils.create(fs, tmpPath, perms);
try {
// We used to write out this file as serialized Writable followed by '\n\n' and then the
// toString of the HRegionInfo but now we just write out the pb serialized bytes so we can
// for sure tell whether the content has been pb'd or not just by looking at file length; the
// pb version will be shorter.
out.write(content);
} finally {
out.close();
}
if (exists) {
LOG.info("Rewriting .regioninfo file at " + regioninfoPath);
if (!fs.delete(regioninfoPath, false)) {
throw new IOException("Unable to remove existing " + regioninfoPath);
}
}
if (!fs.rename(tmpPath, regioninfoPath)) {
throw new IOException("Unable to rename " + tmpPath + " to " + regioninfoPath);
}
}
/**
* @param hri
* @return Content of the file we write out to the filesystem under a region
* @throws IOException
*/
private static byte [] getDotRegionInfoFileContent(final HRegionInfo hri) throws IOException {
return hri.toDelimitedByteArray();
}
/**
* @param fs
* @param dir
* @return An HRegionInfo instance gotten from the <code>.regioninfo</code> file under region dir
* @throws IOException
*/
public static HRegionInfo loadDotRegionInfoFileContent(final FileSystem fs, final Path dir)
throws IOException {
Path regioninfo = new Path(dir, HRegion.REGIONINFO_FILE);
if (!fs.exists(regioninfo)) throw new FileNotFoundException(regioninfo.toString());
FSDataInputStream in = fs.open(regioninfo);
try {
return HRegionInfo.parseFrom(in);
} finally {
in.close();
}
}
/** @return a HRegionInfo object for this region */
public HRegionInfo getRegionInfo() {
return this.regionInfo;
}
/**
* @return Instance of {@link RegionServerServices} used by this HRegion.
* Can be null.
*/
RegionServerServices getRegionServerServices() {
return this.rsServices;
}
/** @return readRequestsCount for this region */
long getReadRequestsCount() {
return this.readRequestsCount.get();
}
/** @return writeRequestsCount for this region */
long getWriteRequestsCount() {
return this.writeRequestsCount.get();
}
MetricsRegion getMetrics() {
return metricsRegion;
}
/** @return true if region is closed */
public boolean isClosed() {
return this.closed.get();
}
/**
* @return True if closing process has started.
*/
public boolean isClosing() {
return this.closing.get();
}
/** @return true if region is available (not closed and not closing) */
public boolean isAvailable() {
return !isClosed() && !isClosing();
}
/** @return true if region is splittable */
public boolean isSplittable() {
return isAvailable() && !hasReferences();
}
boolean areWritesEnabled() {
synchronized(this.writestate) {
return this.writestate.writesEnabled;
}
}
public MultiVersionConsistencyControl getMVCC() {
return mvcc;
}
/**
* Close down this HRegion. Flush the cache, shut down each HStore, don't
* service any more calls.
*
* <p>This method could take some time to execute, so don't call it from a
* time-sensitive thread.
*
* @return Vector of all the storage files that the HRegion's component
* HStores make use of. It's a list of all HStoreFile objects. Returns empty
* vector if already closed and null if judged that it should not close.
*
* @throws IOException e
*/
public List<StoreFile> close() throws IOException {
return close(false);
}
private final Object closeLock = new Object();
/**
* Close down this HRegion. Flush the cache unless abort parameter is true,
* Shut down each HStore, don't service any more calls.
*
* This method could take some time to execute, so don't call it from a
* time-sensitive thread.
*
* @param abort true if server is aborting (only during testing)
* @return Vector of all the storage files that the HRegion's component
* HStores make use of. It's a list of HStoreFile objects. Can be null if
* we are not to close at this time or we are already closed.
*
* @throws IOException e
*/
public List<StoreFile> close(final boolean abort) throws IOException {
// Only allow one thread to close at a time. Serialize them so dual
// threads attempting to close will run up against each other.
MonitoredTask status = TaskMonitor.get().createStatus(
"Closing region " + this +
(abort ? " due to abort" : ""));
status.setStatus("Waiting for close lock");
try {
synchronized (closeLock) {
return doClose(abort, status);
}
} finally {
status.cleanup();
}
}
private List<StoreFile> doClose(
final boolean abort, MonitoredTask status)
throws IOException {
if (isClosed()) {
LOG.warn("Region " + this + " already closed");
return null;
}
if (coprocessorHost != null) {
status.setStatus("Running coprocessor pre-close hooks");
this.coprocessorHost.preClose(abort);
}
status.setStatus("Disabling compacts and flushes for region");
boolean wasFlushing = false;
synchronized (writestate) {
// Disable compacting and flushing by background threads for this
// region.
writestate.writesEnabled = false;
wasFlushing = writestate.flushing;
LOG.debug("Closing " + this + ": disabling compactions & flushes");
waitForFlushesAndCompactions();