forked from apache/hadoop
-
Notifications
You must be signed in to change notification settings - Fork 0
/
FSImage.java
1559 lines (1414 loc) · 58.4 KB
/
FSImage.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.namenode;
import static org.apache.hadoop.util.Time.now;
import java.io.Closeable;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hadoop.util.ShutdownHookManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.HAUtil;
import org.apache.hadoop.hdfs.protocol.LayoutVersion;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NamenodeRole;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.RollingUpgradeStartupOption;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption;
import org.apache.hadoop.hdfs.server.common.InconsistentFSStateException;
import org.apache.hadoop.hdfs.server.common.Storage;
import org.apache.hadoop.hdfs.server.common.Storage.FormatConfirmable;
import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
import org.apache.hadoop.hdfs.server.common.Storage.StorageState;
import org.apache.hadoop.hdfs.server.common.Util;
import org.apache.hadoop.hdfs.server.namenode.FSImageStorageInspector.FSImageFile;
import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeDirType;
import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeFile;
import org.apache.hadoop.hdfs.server.namenode.startupprogress.Phase;
import org.apache.hadoop.hdfs.server.namenode.startupprogress.StartupProgress;
import org.apache.hadoop.hdfs.server.protocol.CheckpointCommand;
import org.apache.hadoop.hdfs.server.protocol.NamenodeCommand;
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.hdfs.util.Canceler;
import org.apache.hadoop.hdfs.util.MD5FileUtils;
import org.apache.hadoop.io.MD5Hash;
import org.apache.hadoop.log.LogThrottlingHelper;
import org.apache.hadoop.log.LogThrottlingHelper.LogAction;
import org.apache.hadoop.util.ExitUtil;
import org.apache.hadoop.util.Time;
import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
/**
* FSImage handles checkpointing and logging of the namespace edits.
*
*/
@InterfaceAudience.Private
@InterfaceStability.Evolving
public class FSImage implements Closeable {
public static final Logger LOG =
LoggerFactory.getLogger(FSImage.class.getName());
/**
* Priority of the FSImageSaver shutdown hook: {@value}.
*/
public static final int SHUTDOWN_HOOK_PRIORITY = 10;
protected FSEditLog editLog = null;
private boolean isUpgradeFinalized = false;
// If true, then image corruption was detected. The NameNode process will
// exit immediately after saving the image.
private AtomicBoolean exitAfterSave = new AtomicBoolean(false);
protected NNStorage storage;
/**
* The last transaction ID that was either loaded from an image
* or loaded by loading edits files.
*/
protected long lastAppliedTxId = 0;
final private Configuration conf;
protected NNStorageRetentionManager archivalManager;
/**
* The collection of newly added storage directories. These are partially
* formatted then later fully populated along with a VERSION file.
* For HA, the second part is done when the next checkpoint is saved.
* This set will be cleared once a VERSION file is created.
* For non-HA, a new fsimage will be locally generated along with a new
* VERSION file. This set is not used for non-HA mode.
*/
private Set<StorageDirectory> newDirs = null;
/* Used to make sure there are no concurrent checkpoints for a given txid
* The checkpoint here could be one of the following operations.
* a. checkpoint when NN is in standby.
* b. admin saveNameSpace operation.
* c. download checkpoint file from any remote checkpointer.
*/
private final Set<Long> currentlyCheckpointing =
Collections.<Long>synchronizedSet(new HashSet<Long>());
/** Limit logging about edit loading to every 5 seconds max. */
private static final long LOAD_EDIT_LOG_INTERVAL_MS = 5000;
private final LogThrottlingHelper loadEditLogHelper =
new LogThrottlingHelper(LOAD_EDIT_LOG_INTERVAL_MS);
/**
* Construct an FSImage
* @param conf Configuration
* @throws IOException if default directories are invalid.
*/
public FSImage(Configuration conf) throws IOException {
this(conf,
FSNamesystem.getNamespaceDirs(conf),
FSNamesystem.getNamespaceEditsDirs(conf));
}
/**
* Construct the FSImage. Set the default checkpoint directories.
*
* Setup storage and initialize the edit log.
*
* @param conf Configuration
* @param imageDirs Directories the image can be stored in.
* @param editsDirs Directories the editlog can be stored in.
* @throws IOException if directories are invalid.
*/
protected FSImage(Configuration conf,
Collection<URI> imageDirs,
List<URI> editsDirs)
throws IOException {
this.conf = conf;
storage = new NNStorage(conf, imageDirs, editsDirs);
if(conf.getBoolean(DFSConfigKeys.DFS_NAMENODE_NAME_DIR_RESTORE_KEY,
DFSConfigKeys.DFS_NAMENODE_NAME_DIR_RESTORE_DEFAULT)) {
storage.setRestoreFailedStorage(true);
}
this.editLog = FSEditLog.newInstance(conf, storage, editsDirs);
archivalManager = new NNStorageRetentionManager(conf, storage, editLog);
}
void format(FSNamesystem fsn, String clusterId, boolean force)
throws IOException {
long fileCount = fsn.getFilesTotal();
// Expect 1 file, which is the root inode
Preconditions.checkState(fileCount == 1,
"FSImage.format should be called with an uninitialized namesystem, has " +
fileCount + " files");
NamespaceInfo ns = NNStorage.newNamespaceInfo();
LOG.info("Allocated new BlockPoolId: " + ns.getBlockPoolID());
ns.clusterID = clusterId;
storage.format(ns);
editLog.formatNonFileJournals(ns, force);
saveFSImageInAllDirs(fsn, 0);
}
/**
* Check whether the storage directories and non-file journals exist.
* If running in interactive mode, will prompt the user for each
* directory to allow them to format anyway. Otherwise, returns
* false, unless 'force' is specified.
*
* @param force if true, format regardless of whether dirs exist
* @param interactive prompt the user when a dir exists
* @return true if formatting should proceed
* @throws IOException if some storage cannot be accessed
*/
boolean confirmFormat(boolean force, boolean interactive) throws IOException {
List<FormatConfirmable> confirms = Lists.newArrayList();
for (StorageDirectory sd : storage.dirIterable(null)) {
confirms.add(sd);
}
confirms.addAll(editLog.getFormatConfirmables());
return Storage.confirmFormat(confirms, force, interactive);
}
/**
* Analyze storage directories.
* Recover from previous transitions if required.
* Perform fs state transition if necessary depending on the namespace info.
* Read storage info.
*
* @throws IOException
* @return true if the image needs to be saved or false otherwise
*/
boolean recoverTransitionRead(StartupOption startOpt, FSNamesystem target,
MetaRecoveryContext recovery)
throws IOException {
assert startOpt != StartupOption.FORMAT :
"NameNode formatting should be performed before reading the image";
Collection<URI> imageDirs = storage.getImageDirectories();
Collection<URI> editsDirs = editLog.getEditURIs();
// none of the data dirs exist
if((imageDirs.size() == 0 || editsDirs.size() == 0)
&& startOpt != StartupOption.IMPORT)
throw new IOException(
"All specified directories are not accessible or do not exist.");
// 1. For each data directory calculate its state and
// check whether all is consistent before transitioning.
Map<StorageDirectory, StorageState> dataDirStates =
new HashMap<StorageDirectory, StorageState>();
boolean isFormatted = recoverStorageDirs(startOpt, storage, dataDirStates);
if (LOG.isTraceEnabled()) {
LOG.trace("Data dir states:\n " +
Joiner.on("\n ").withKeyValueSeparator(": ")
.join(dataDirStates));
}
if (!isFormatted && startOpt != StartupOption.ROLLBACK
&& startOpt != StartupOption.IMPORT) {
throw new IOException("NameNode is not formatted.");
}
int layoutVersion = storage.getLayoutVersion();
if (startOpt == StartupOption.METADATAVERSION) {
System.out.println("HDFS Image Version: " + layoutVersion);
System.out.println("Software format version: " +
HdfsServerConstants.NAMENODE_LAYOUT_VERSION);
return false;
}
if (layoutVersion < Storage.LAST_PRE_UPGRADE_LAYOUT_VERSION) {
NNStorage.checkVersionUpgradable(storage.getLayoutVersion());
}
if (startOpt != StartupOption.UPGRADE
&& startOpt != StartupOption.UPGRADEONLY
&& !RollingUpgradeStartupOption.STARTED.matches(startOpt)
&& layoutVersion < Storage.LAST_PRE_UPGRADE_LAYOUT_VERSION
&& layoutVersion != HdfsServerConstants.NAMENODE_LAYOUT_VERSION) {
throw new IOException(
"\nFile system image contains an old layout version "
+ storage.getLayoutVersion() + ".\nAn upgrade to version "
+ HdfsServerConstants.NAMENODE_LAYOUT_VERSION + " is required.\n"
+ "Please restart NameNode with the \""
+ RollingUpgradeStartupOption.STARTED.getOptionString()
+ "\" option if a rolling upgrade is already started;"
+ " or restart NameNode with the \""
+ StartupOption.UPGRADE.getName() + "\" option to start"
+ " a new upgrade.");
}
storage.processStartupOptionsForUpgrade(startOpt, layoutVersion);
// 2. Format unformatted dirs.
for (Iterator<StorageDirectory> it = storage.dirIterator(); it.hasNext();) {
StorageDirectory sd = it.next();
StorageState curState = dataDirStates.get(sd);
switch(curState) {
case NON_EXISTENT:
throw new IOException(StorageState.NON_EXISTENT +
" state cannot be here");
case NOT_FORMATTED:
// Create a dir structure, but not the VERSION file. The presence of
// VERSION is checked in the inspector's needToSave() method and
// saveNamespace is triggered if it is absent. This will bring
// the storage state uptodate along with a new VERSION file.
// If HA is enabled, NNs start up as standby so saveNamespace is not
// triggered.
LOG.info("Storage directory " + sd.getRoot() + " is not formatted.");
LOG.info("Formatting ...");
sd.clearDirectory(); // create empty currrent dir
// For non-HA, no further action is needed here, as saveNamespace will
// take care of the rest.
if (!target.isHaEnabled()) {
continue;
}
// If HA is enabled, save the dirs to create a version file later when
// a checkpoint image is saved.
if (newDirs == null) {
newDirs = new HashSet<StorageDirectory>();
}
newDirs.add(sd);
break;
default:
break;
}
}
// 3. Do transitions
switch(startOpt) {
case UPGRADE:
case UPGRADEONLY:
doUpgrade(target);
return false; // upgrade saved image already
case IMPORT:
doImportCheckpoint(target);
return false; // import checkpoint saved image already
case ROLLBACK:
throw new AssertionError("Rollback is now a standalone command, " +
"NameNode should not be starting with this option.");
case REGULAR:
default:
// just load the image
}
return loadFSImage(target, startOpt, recovery);
}
/**
* Create a VERSION file in the newly added storage directories.
*/
private void initNewDirs() {
if (newDirs == null) {
return;
}
for (StorageDirectory sd : newDirs) {
try {
storage.writeProperties(sd);
LOG.info("Wrote VERSION in the new storage, " + sd.getCurrentDir());
} catch (IOException e) {
// Failed to create a VERSION file. Report the error.
storage.reportErrorOnFile(sd.getVersionFile());
}
}
newDirs.clear();
newDirs = null;
}
/**
* For each storage directory, performs recovery of incomplete transitions
* (eg. upgrade, rollback, checkpoint) and inserts the directory's storage
* state into the dataDirStates map.
* @param dataDirStates output of storage directory states
* @return true if there is at least one valid formatted storage directory
*/
public static boolean recoverStorageDirs(StartupOption startOpt,
NNStorage storage, Map<StorageDirectory, StorageState> dataDirStates)
throws IOException {
boolean isFormatted = false;
// This loop needs to be over all storage dirs, even shared dirs, to make
// sure that we properly examine their state, but we make sure we don't
// mutate the shared dir below in the actual loop.
for (Iterator<StorageDirectory> it =
storage.dirIterator(); it.hasNext();) {
StorageDirectory sd = it.next();
StorageState curState;
if (startOpt == StartupOption.METADATAVERSION) {
/* All we need is the layout version. */
storage.readProperties(sd);
return true;
}
try {
curState = sd.analyzeStorage(startOpt, storage);
// sd is locked but not opened
switch(curState) {
case NON_EXISTENT:
// name-node fails if any of the configured storage dirs are missing
throw new InconsistentFSStateException(sd.getRoot(),
"storage directory does not exist or is not accessible.");
case NOT_FORMATTED:
break;
case NORMAL:
break;
default: // recovery is possible
sd.doRecover(curState);
}
if (curState != StorageState.NOT_FORMATTED
&& startOpt != StartupOption.ROLLBACK) {
// read and verify consistency with other directories
storage.readProperties(sd, startOpt);
isFormatted = true;
}
if (startOpt == StartupOption.IMPORT && isFormatted)
// import of a checkpoint is allowed only into empty image directories
throw new IOException("Cannot import image from a checkpoint. "
+ " NameNode already contains an image in " + sd.getRoot());
} catch (IOException ioe) {
sd.unlock();
throw ioe;
}
dataDirStates.put(sd,curState);
}
return isFormatted;
}
/** Check if upgrade is in progress. */
public static void checkUpgrade(NNStorage storage) throws IOException {
// Upgrade or rolling upgrade is allowed only if there are
// no previous fs states in any of the local directories
for (Iterator<StorageDirectory> it = storage.dirIterator(false); it.hasNext();) {
StorageDirectory sd = it.next();
if (sd.getPreviousDir().exists())
throw new InconsistentFSStateException(sd.getRoot(),
"previous fs state should not exist during upgrade. "
+ "Finalize or rollback first.");
}
}
void checkUpgrade() throws IOException {
checkUpgrade(storage);
}
/**
* @return true if there is rollback fsimage (for rolling upgrade) in NameNode
* directory.
*/
public boolean hasRollbackFSImage() throws IOException {
final FSImageStorageInspector inspector = new FSImageTransactionalStorageInspector(
EnumSet.of(NameNodeFile.IMAGE_ROLLBACK));
storage.inspectStorageDirs(inspector);
try {
List<FSImageFile> images = inspector.getLatestImages();
return images != null && !images.isEmpty();
} catch (FileNotFoundException e) {
return false;
}
}
void doUpgrade(FSNamesystem target) throws IOException {
checkUpgrade();
// load the latest image
// Do upgrade for each directory
this.loadFSImage(target, StartupOption.UPGRADE, null);
target.checkRollingUpgrade("upgrade namenode");
long oldCTime = storage.getCTime();
storage.cTime = now(); // generate new cTime for the state
int oldLV = storage.getLayoutVersion();
storage.layoutVersion = HdfsServerConstants.NAMENODE_LAYOUT_VERSION;
List<StorageDirectory> errorSDs =
Collections.synchronizedList(new ArrayList<StorageDirectory>());
assert !editLog.isSegmentOpen() : "Edits log must not be open.";
LOG.info("Starting upgrade of local storage directories."
+ "\n old LV = " + oldLV
+ "; old CTime = " + oldCTime
+ ".\n new LV = " + storage.getLayoutVersion()
+ "; new CTime = " + storage.getCTime());
// Do upgrade for each directory
for (Iterator<StorageDirectory> it = storage.dirIterator(false); it.hasNext();) {
StorageDirectory sd = it.next();
try {
NNUpgradeUtil.doPreUpgrade(conf, sd);
} catch (Exception e) {
LOG.error("Failed to move aside pre-upgrade storage " +
"in image directory " + sd.getRoot(), e);
errorSDs.add(sd);
continue;
}
}
if (target.isHaEnabled()) {
editLog.doPreUpgradeOfSharedLog();
}
storage.reportErrorsOnDirectories(errorSDs);
errorSDs.clear();
saveFSImageInAllDirs(target, editLog.getLastWrittenTxId());
// upgrade shared edit storage first
if (target.isHaEnabled()) {
editLog.doUpgradeOfSharedLog();
}
for (Iterator<StorageDirectory> it = storage.dirIterator(false); it.hasNext();) {
StorageDirectory sd = it.next();
try {
NNUpgradeUtil.doUpgrade(sd, storage);
} catch (IOException ioe) {
errorSDs.add(sd);
continue;
}
}
storage.reportErrorsOnDirectories(errorSDs);
isUpgradeFinalized = false;
if (!storage.getRemovedStorageDirs().isEmpty()) {
// during upgrade, it's a fatal error to fail any storage directory
throw new IOException("Upgrade failed in "
+ storage.getRemovedStorageDirs().size()
+ " storage directory(ies), previously logged.");
}
}
void doRollback(FSNamesystem fsns) throws IOException {
// Rollback is allowed only if there is
// a previous fs states in at least one of the storage directories.
// Directories that don't have previous state do not rollback
boolean canRollback = false;
FSImage prevState = new FSImage(conf);
try {
prevState.getStorage().layoutVersion = HdfsServerConstants.NAMENODE_LAYOUT_VERSION;
for (Iterator<StorageDirectory> it = storage.dirIterator(false); it.hasNext();) {
StorageDirectory sd = it.next();
if (!NNUpgradeUtil.canRollBack(sd, storage, prevState.getStorage(),
HdfsServerConstants.NAMENODE_LAYOUT_VERSION)) {
continue;
}
LOG.info("Can perform rollback for " + sd);
canRollback = true;
}
if (fsns.isHaEnabled()) {
// If HA is enabled, check if the shared log can be rolled back as well.
editLog.initJournalsForWrite();
boolean canRollBackSharedEditLog = editLog.canRollBackSharedLog(
prevState.getStorage(), HdfsServerConstants.NAMENODE_LAYOUT_VERSION);
if (canRollBackSharedEditLog) {
LOG.info("Can perform rollback for shared edit log.");
canRollback = true;
}
}
if (!canRollback)
throw new IOException("Cannot rollback. None of the storage "
+ "directories contain previous fs state.");
// Now that we know all directories are going to be consistent
// Do rollback for each directory containing previous state
for (Iterator<StorageDirectory> it = storage.dirIterator(false); it.hasNext();) {
StorageDirectory sd = it.next();
LOG.info("Rolling back storage directory " + sd.getRoot()
+ ".\n new LV = " + prevState.getStorage().getLayoutVersion()
+ "; new CTime = " + prevState.getStorage().getCTime());
NNUpgradeUtil.doRollBack(sd);
}
if (fsns.isHaEnabled()) {
// If HA is enabled, try to roll back the shared log as well.
editLog.doRollback();
}
isUpgradeFinalized = true;
} finally {
prevState.close();
}
}
/**
* Load image from a checkpoint directory and save it into the current one.
* @param target the NameSystem to import into
* @throws IOException
*/
void doImportCheckpoint(FSNamesystem target) throws IOException {
Collection<URI> checkpointDirs =
FSImage.getCheckpointDirs(conf, null);
List<URI> checkpointEditsDirs =
FSImage.getCheckpointEditsDirs(conf, null);
if (checkpointDirs == null || checkpointDirs.isEmpty()) {
throw new IOException("Cannot import image from a checkpoint. "
+ "\"dfs.namenode.checkpoint.dir\" is not set." );
}
if (checkpointEditsDirs == null || checkpointEditsDirs.isEmpty()) {
throw new IOException("Cannot import image from a checkpoint. "
+ "\"dfs.namenode.checkpoint.dir\" is not set." );
}
FSImage realImage = target.getFSImage();
FSImage ckptImage = new FSImage(conf,
checkpointDirs, checkpointEditsDirs);
// load from the checkpoint dirs
try {
ckptImage.recoverTransitionRead(StartupOption.REGULAR, target, null);
} finally {
ckptImage.close();
}
// return back the real image
realImage.getStorage().setStorageInfo(ckptImage.getStorage());
realImage.getEditLog().setNextTxId(ckptImage.getEditLog().getLastWrittenTxId()+1);
realImage.initEditLog(StartupOption.IMPORT);
realImage.getStorage().setBlockPoolID(ckptImage.getBlockPoolID());
// and save it but keep the same checkpointTime
saveNamespace(target);
updateStorageVersion();
}
void finalizeUpgrade(boolean finalizeEditLog) throws IOException {
LOG.info("Finalizing upgrade for local dirs. " +
(storage.getLayoutVersion() == 0 ? "" :
"\n cur LV = " + storage.getLayoutVersion()
+ "; cur CTime = " + storage.getCTime()));
for (Iterator<StorageDirectory> it = storage.dirIterator(false); it.hasNext();) {
StorageDirectory sd = it.next();
NNUpgradeUtil.doFinalize(sd);
}
if (finalizeEditLog) {
// We only do this in the case that HA is enabled and we're active. In any
// other case the NN will have done the upgrade of the edits directories
// already by virtue of the fact that they're local.
editLog.doFinalizeOfSharedLog();
}
isUpgradeFinalized = true;
}
boolean isUpgradeFinalized() {
return isUpgradeFinalized;
}
public FSEditLog getEditLog() {
return editLog;
}
void openEditLogForWrite(int layoutVersion) throws IOException {
assert editLog != null : "editLog must be initialized";
editLog.openForWrite(layoutVersion);
storage.writeTransactionIdFileToStorage(editLog.getCurSegmentTxId());
}
/**
* Toss the current image and namesystem, reloading from the specified
* file.
*/
void reloadFromImageFile(File file, FSNamesystem target) throws IOException {
target.clear();
LOG.debug("Reloading namespace from " + file);
loadFSImage(file, target, null, false);
}
/**
* Choose latest image from one of the directories,
* load it and merge with the edits.
*
* Saving and loading fsimage should never trigger symlink resolution.
* The paths that are persisted do not have *intermediate* symlinks
* because intermediate symlinks are resolved at the time files,
* directories, and symlinks are created. All paths accessed while
* loading or saving fsimage should therefore only see symlinks as
* the final path component, and the functions called below do not
* resolve symlinks that are the final path component.
*
* @return whether the image should be saved
* @throws IOException
*/
private boolean loadFSImage(FSNamesystem target, StartupOption startOpt,
MetaRecoveryContext recovery)
throws IOException {
final boolean rollingRollback
= RollingUpgradeStartupOption.ROLLBACK.matches(startOpt);
final EnumSet<NameNodeFile> nnfs;
if (rollingRollback) {
// if it is rollback of rolling upgrade, only load from the rollback image
nnfs = EnumSet.of(NameNodeFile.IMAGE_ROLLBACK);
} else {
// otherwise we can load from both IMAGE and IMAGE_ROLLBACK
nnfs = EnumSet.of(NameNodeFile.IMAGE, NameNodeFile.IMAGE_ROLLBACK);
}
final FSImageStorageInspector inspector = storage
.readAndInspectDirs(nnfs, startOpt);
isUpgradeFinalized = inspector.isUpgradeFinalized();
List<FSImageFile> imageFiles = inspector.getLatestImages();
StartupProgress prog = NameNode.getStartupProgress();
prog.beginPhase(Phase.LOADING_FSIMAGE);
File phaseFile = imageFiles.get(0).getFile();
prog.setFile(Phase.LOADING_FSIMAGE, phaseFile.getAbsolutePath());
prog.setSize(Phase.LOADING_FSIMAGE, phaseFile.length());
boolean needToSave = inspector.needToSave();
Iterable<EditLogInputStream> editStreams = null;
initEditLog(startOpt);
if (NameNodeLayoutVersion.supports(
LayoutVersion.Feature.TXID_BASED_LAYOUT, getLayoutVersion())) {
// If we're open for write, we're either non-HA or we're the active NN, so
// we better be able to load all the edits. If we're the standby NN, it's
// OK to not be able to read all of edits right now.
// In the meanwhile, for HA upgrade, we will still write editlog thus need
// this toAtLeastTxId to be set to the max-seen txid
// For rollback in rolling upgrade, we need to set the toAtLeastTxId to
// the txid right before the upgrade marker.
long toAtLeastTxId = editLog.isOpenForWrite() ? inspector
.getMaxSeenTxId() : 0;
if (rollingRollback) {
// note that the first image in imageFiles is the special checkpoint
// for the rolling upgrade
toAtLeastTxId = imageFiles.get(0).getCheckpointTxId() + 2;
}
editStreams = editLog.selectInputStreams(
imageFiles.get(0).getCheckpointTxId() + 1,
toAtLeastTxId, recovery, false);
} else {
editStreams = FSImagePreTransactionalStorageInspector
.getEditLogStreams(storage);
}
int maxOpSize = conf.getInt(DFSConfigKeys.DFS_NAMENODE_MAX_OP_SIZE_KEY,
DFSConfigKeys.DFS_NAMENODE_MAX_OP_SIZE_DEFAULT);
for (EditLogInputStream elis : editStreams) {
elis.setMaxOpSize(maxOpSize);
}
for (EditLogInputStream l : editStreams) {
LOG.debug("Planning to load edit log stream: " + l);
}
if (!editStreams.iterator().hasNext()) {
LOG.info("No edit log streams selected.");
}
FSImageFile imageFile = null;
for (int i = 0; i < imageFiles.size(); i++) {
try {
imageFile = imageFiles.get(i);
loadFSImageFile(target, recovery, imageFile, startOpt);
break;
} catch (IllegalReservedPathException ie) {
throw new IOException("Failed to load image from " + imageFile,
ie);
} catch (Exception e) {
LOG.error("Failed to load image from " + imageFile, e);
target.clear();
imageFile = null;
}
}
// Failed to load any images, error out
if (imageFile == null) {
FSEditLog.closeAllStreams(editStreams);
throw new IOException("Failed to load FSImage file, see error(s) " +
"above for more info.");
}
prog.endPhase(Phase.LOADING_FSIMAGE);
if (!rollingRollback) {
long txnsAdvanced = loadEdits(editStreams, target, Long.MAX_VALUE,
startOpt, recovery);
needToSave |= needsResaveBasedOnStaleCheckpoint(imageFile.getFile(),
txnsAdvanced);
} else {
// Trigger the rollback for rolling upgrade. Here lastAppliedTxId equals
// to the last txid in rollback fsimage.
rollingRollback(lastAppliedTxId + 1, imageFiles.get(0).getCheckpointTxId());
needToSave = false;
}
editLog.setNextTxId(lastAppliedTxId + 1);
return needToSave;
}
/** rollback for rolling upgrade. */
private void rollingRollback(long discardSegmentTxId, long ckptId)
throws IOException {
// discard discard unnecessary editlog segments starting from the given id
this.editLog.discardSegments(discardSegmentTxId);
// rename the special checkpoint
renameCheckpoint(ckptId, NameNodeFile.IMAGE_ROLLBACK, NameNodeFile.IMAGE,
true);
// purge all the checkpoints after the marker
archivalManager.purgeCheckpoinsAfter(NameNodeFile.IMAGE, ckptId);
// HDFS-7939: purge all old fsimage_rollback_*
archivalManager.purgeCheckpoints(NameNodeFile.IMAGE_ROLLBACK);
String nameserviceId = DFSUtil.getNamenodeNameServiceId(conf);
if (HAUtil.isHAEnabled(conf, nameserviceId)) {
// close the editlog since it is currently open for write
this.editLog.close();
// reopen the editlog for read
this.editLog.initSharedJournalsForRead();
}
}
void loadFSImageFile(FSNamesystem target, MetaRecoveryContext recovery,
FSImageFile imageFile, StartupOption startupOption) throws IOException {
LOG.info("Planning to load image: " + imageFile);
StorageDirectory sdForProperties = imageFile.sd;
storage.readProperties(sdForProperties, startupOption);
if (NameNodeLayoutVersion.supports(
LayoutVersion.Feature.TXID_BASED_LAYOUT, getLayoutVersion())) {
// For txid-based layout, we should have a .md5 file
// next to the image file
boolean isRollingRollback = RollingUpgradeStartupOption.ROLLBACK
.matches(startupOption);
loadFSImage(imageFile.getFile(), target, recovery, isRollingRollback);
} else if (NameNodeLayoutVersion.supports(
LayoutVersion.Feature.FSIMAGE_CHECKSUM, getLayoutVersion())) {
// In 0.22, we have the checksum stored in the VERSION file.
String md5 = storage.getDeprecatedProperty(
NNStorage.DEPRECATED_MESSAGE_DIGEST_PROPERTY);
if (md5 == null) {
throw new InconsistentFSStateException(sdForProperties.getRoot(),
"Message digest property " +
NNStorage.DEPRECATED_MESSAGE_DIGEST_PROPERTY +
" not set for storage directory " + sdForProperties.getRoot());
}
loadFSImage(imageFile.getFile(), new MD5Hash(md5), target, recovery,
false);
} else {
// We don't have any record of the md5sum
loadFSImage(imageFile.getFile(), null, target, recovery, false);
}
}
public void initEditLog(StartupOption startOpt) throws IOException {
Preconditions.checkState(getNamespaceID() != 0,
"Must know namespace ID before initting edit log");
String nameserviceId = DFSUtil.getNamenodeNameServiceId(conf);
if (!HAUtil.isHAEnabled(conf, nameserviceId)) {
// If this NN is not HA
editLog.initJournalsForWrite();
editLog.recoverUnclosedStreams();
} else if (HAUtil.isHAEnabled(conf, nameserviceId)
&& (startOpt == StartupOption.UPGRADE
|| startOpt == StartupOption.UPGRADEONLY
|| RollingUpgradeStartupOption.ROLLBACK.matches(startOpt))) {
// This NN is HA, but we're doing an upgrade or a rollback of rolling
// upgrade so init the edit log for write.
editLog.initJournalsForWrite();
if (startOpt == StartupOption.UPGRADE
|| startOpt == StartupOption.UPGRADEONLY) {
long sharedLogCTime = editLog.getSharedLogCTime();
if (this.storage.getCTime() < sharedLogCTime) {
throw new IOException("It looks like the shared log is already " +
"being upgraded but this NN has not been upgraded yet. You " +
"should restart this NameNode with the '" +
StartupOption.BOOTSTRAPSTANDBY.getName() + "' option to bring " +
"this NN in sync with the other.");
}
}
editLog.recoverUnclosedStreams();
} else {
// This NN is HA and we're not doing an upgrade.
editLog.initSharedJournalsForRead();
}
}
/**
* @param imageFile the image file that was loaded
* @param numEditsLoaded the number of edits loaded from edits logs
* @return true if the NameNode should automatically save the namespace
* when it is started, due to the latest checkpoint being too old.
*/
private boolean needsResaveBasedOnStaleCheckpoint(
File imageFile, long numEditsLoaded) {
final long checkpointPeriod = conf.getTimeDuration(
DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_PERIOD_KEY,
DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_PERIOD_DEFAULT, TimeUnit.SECONDS);
final long checkpointTxnCount = conf.getLong(
DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_TXNS_KEY,
DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_TXNS_DEFAULT);
long checkpointAge = Time.now() - imageFile.lastModified();
return (checkpointAge > checkpointPeriod * 1000) ||
(numEditsLoaded > checkpointTxnCount);
}
/**
* Load the specified list of edit files into the image.
*/
public long loadEdits(Iterable<EditLogInputStream> editStreams,
FSNamesystem target) throws IOException {
return loadEdits(editStreams, target, Long.MAX_VALUE, null, null);
}
public long loadEdits(Iterable<EditLogInputStream> editStreams,
FSNamesystem target, long maxTxnsToRead,
StartupOption startOpt, MetaRecoveryContext recovery)
throws IOException {
LOG.debug("About to load edits:\n " + Joiner.on("\n ").join(editStreams));
StartupProgress prog = NameNode.getStartupProgress();
prog.beginPhase(Phase.LOADING_EDITS);
long prevLastAppliedTxId = lastAppliedTxId;
try {
FSEditLogLoader loader = new FSEditLogLoader(target, lastAppliedTxId);
// Load latest edits
for (EditLogInputStream editIn : editStreams) {
LogAction logAction = loadEditLogHelper.record();
if (logAction.shouldLog()) {
String logSuppressed = "";
if (logAction.getCount() > 1) {
logSuppressed = "; suppressed logging for " +
(logAction.getCount() - 1) + " edit reads";
}
LOG.info("Reading " + editIn + " expecting start txid #" +
(lastAppliedTxId + 1) + logSuppressed);
}
try {
loader.loadFSEdits(editIn, lastAppliedTxId + 1, maxTxnsToRead,
startOpt, recovery);
} finally {
// Update lastAppliedTxId even in case of error, since some ops may
// have been successfully applied before the error.
lastAppliedTxId = loader.getLastAppliedTxId();
}
// If we are in recovery mode, we may have skipped over some txids.
if (editIn.getLastTxId() != HdfsServerConstants.INVALID_TXID
&& recovery != null) {
lastAppliedTxId = editIn.getLastTxId();
}
}
} finally {
FSEditLog.closeAllStreams(editStreams);
}
prog.endPhase(Phase.LOADING_EDITS);
return lastAppliedTxId - prevLastAppliedTxId;
}
/**
* Load the image namespace from the given image file, verifying
* it against the MD5 sum stored in its associated .md5 file.
*/
private void loadFSImage(File imageFile, FSNamesystem target,
MetaRecoveryContext recovery, boolean requireSameLayoutVersion)
throws IOException {
MD5Hash expectedMD5 = MD5FileUtils.readStoredMd5ForFile(imageFile);
if (expectedMD5 == null) {
throw new IOException("No MD5 file found corresponding to image file "
+ imageFile);
}
loadFSImage(imageFile, expectedMD5, target, recovery,
requireSameLayoutVersion);
}
/**
* Load in the filesystem image from file. It's a big list of
* filenames and blocks.
*/
private void loadFSImage(File curFile, MD5Hash expectedMd5,
FSNamesystem target, MetaRecoveryContext recovery,
boolean requireSameLayoutVersion) throws IOException {
// BlockPoolId is required when the FsImageLoader loads the rolling upgrade
// information. Make sure the ID is properly set.
target.setBlockPoolId(this.getBlockPoolID());
FSImageFormat.LoaderDelegator loader = FSImageFormat.newLoader(conf, target);
loader.load(curFile, requireSameLayoutVersion);
// Check that the image digest we loaded matches up with what
// we expected
MD5Hash readImageMd5 = loader.getLoadedImageMd5();
if (expectedMd5 != null &&
!expectedMd5.equals(readImageMd5)) {
throw new IOException("Image file " + curFile +
" is corrupt with MD5 checksum of " + readImageMd5 +
" but expecting " + expectedMd5);
}
long txId = loader.getLoadedImageTxId();
LOG.info("Loaded image for txid " + txId + " from " + curFile);
lastAppliedTxId = txId;
storage.setMostRecentCheckpointInfo(txId, curFile.lastModified());
}
/**
* Save the contents of the FS image to the file.
*/
void saveFSImage(SaveNamespaceContext context, StorageDirectory sd,
NameNodeFile dstType) throws IOException {
long txid = context.getTxId();
File newFile = NNStorage.getStorageFile(sd, NameNodeFile.IMAGE_NEW, txid);
File dstFile = NNStorage.getStorageFile(sd, dstType, txid);
FSImageFormatProtobuf.Saver saver = new FSImageFormatProtobuf.Saver(context,
conf);
FSImageCompression compression = FSImageCompression.createCompression(conf);
long numErrors = saver.save(newFile, compression);
if (numErrors > 0) {
// The image is likely corrupted.
LOG.error("Detected " + numErrors + " errors while saving FsImage " +
dstFile);
exitAfterSave.set(true);
}
MD5FileUtils.saveMD5File(dstFile, saver.getSavedDigest());
storage.setMostRecentCheckpointInfo(txid, Time.now());