Permalink
Browse files

SERVER-8685: mongorestore oplogLimit fixes

Conflicts:

	src/mongo/tools/restore.cpp
  • Loading branch information...
1 parent 2d57d09 commit 49cb7cab30855e58ef506c97d1280f2ca7d4dc6e @scotthernandez scotthernandez committed with monkey101 Feb 28, 2013
Showing with 75 additions and 10 deletions.
  1. +75 −10 src/mongo/tools/restore.cpp
@@ -21,6 +21,8 @@
#include <boost/filesystem/convenience.hpp>
#include <boost/filesystem/operations.hpp>
#include <boost/program_options.hpp>
+#include <boost/scoped_ptr.hpp>
+#include <boost/lexical_cast.hpp>
#include <fcntl.h>
#include <fstream>
#include <set>
@@ -52,12 +54,16 @@ class Restore : public BSONTool {
string _curdb;
string _curcoll;
set<string> _users; // For restoring users with --drop
- auto_ptr<Matcher> _opmatcher; // For oplog replay
+ scoped_ptr<Matcher> _opmatcher; // For oplog replay
+ scoped_ptr<OpTime> _oplogLimitTS; // for oplog replay (limit)
+ int _oplogEntrySkips; // oplog entries skipped
+ int _oplogEntryApplies; // oplog entries applied
Restore() : BSONTool( "restore" ) , _drop(false) {
add_options()
("drop" , "drop each collection before import" )
("oplogReplay", "replay oplog for point-in-time restore")
- ("oplogLimit", po::value<string>(), "exclude oplog entries newer than provided timestamp (epoch[:ordinal])")
+ ("oplogLimit", po::value<string>(), "include oplog entries before the provided Timestamp "
+ "(seconds[:ordinal]) during the oplog replay; the ordinal value is optional")
("keepIndexVersion" , "don't upgrade indexes to newest version")
("noOptionsRestore" , "don't restore collection options")
("noIndexRestore" , "don't restore indexes")
@@ -140,9 +146,50 @@ class Restore : public BSONTool {
oplogLimit = oplogLimit.substr(0, i);
}
-
- if ( ! oplogLimit.empty() ) {
- _opmatcher.reset( new Matcher( fromjson( string("{ \"ts\": { \"$lt\": { \"$timestamp\": { \"t\": ") + oplogLimit + string(", \"i\": ") + oplogInc + string(" } } } }") ) ) );
+
+ try {
+ _oplogLimitTS.reset(new OpTime(
+ boost::lexical_cast<unsigned long>(oplogLimit.c_str()),
+ boost::lexical_cast<unsigned long>(oplogInc.c_str())));
+ } catch( const boost::bad_lexical_cast& error) {
+ log() << "Could not parse oplogLimit into Timestamp from values ( "
+ << oplogLimit << " , " << oplogInc << " )"
+ << endl;
+ return -1;
+ }
+
+ if (!oplogLimit.empty()) {
+ // Only for a replica set as master will have no-op entries so we would need to
+ // skip them all to find the real op
+ scoped_ptr<DBClientCursor> cursor(
+ conn().query("local.oplog.rs", Query().sort(BSON("$natural" << -1)),
+ 1 /*return first*/));
+ OpTime tsOptime;
+ // get newest oplog entry and make sure it is older than the limit to apply.
+ if (cursor->more()) {
+ tsOptime = cursor->next().getField("ts")._opTime();
+ if (tsOptime > *_oplogLimitTS.get()) {
+ log() << "The oplogLimit is not newer than"
+ << " the last oplog entry on the server."
+ << endl;
+ return -1;
+ }
+ }
+
+ BSONObjBuilder tsRestrictBldr;
+ if (tsOptime != NULL)
+ tsRestrictBldr << "$gt" << tsOptime;
+ tsRestrictBldr << "$lt" << *_oplogLimitTS.get();
+
+ BSONObj query = BSON("ts" << tsRestrictBldr.obj());
+
+ if (tsOptime != NULL) {
+ log() << "Latest oplog entry on the server is " << tsOptime.getSecs()
+ << ":" << tsOptime.getInc() << endl;
+ log() << "Only applying oplog entries matching this criteria: "
+ << query.jsonString() << endl;
+ }
+ _opmatcher.reset(new Matcher(query));
}
}
}
@@ -156,7 +203,7 @@ class Restore : public BSONTool {
* given either a root directory that contains only a single
* .bson file, or a single .bson file itself (a collection).
*/
- drillDown(root, _db != "", _coll != "", true);
+ drillDown(root, _db != "", _coll != "", !(_oplogLimitTS.get() == NULL), true);
// should this happen for oplog replay as well?
conn().getLastError(_db == "" ? "admin" : _db);
@@ -165,12 +212,20 @@ class Restore : public BSONTool {
log() << "\t Replaying oplog" << endl;
_curns = OPLOG_SENTINEL;
processFile( root / "oplog.bson" );
+ log() << "Applied " << _oplogEntryApplies << " oplog entries out of "
+ << _oplogEntryApplies + _oplogEntrySkips << " (" << _oplogEntrySkips
+ << " skipped)." << endl;
}
return EXIT_CLEAN;
}
- void drillDown( boost::filesystem::path root, bool use_db, bool use_coll, bool top_level=false ) {
+ void drillDown( boost::filesystem::path root,
+ bool use_db,
+ bool use_coll,
+ bool oplogReplayLimit,
+ bool top_level=false) {
+ bool json_metadata = false;
LOG(2) << "drillDown: " << root.string() << endl;
// skip hidden files and directories
@@ -210,12 +265,13 @@ class Restore : public BSONTool {
if ( p.leaf() == "system.indexes.bson" ) {
indexes = p;
} else {
- drillDown(p, use_db, use_coll);
+ drillDown(p, use_db, use_coll, oplogReplayLimit);
}
}
- if (!indexes.empty())
- drillDown(indexes, use_db, use_coll);
+ if (!indexes.empty() && !json_metadata) {
+ drillDown(indexes, use_db, use_coll, oplogReplayLimit);
+ }
return;
}
@@ -264,6 +320,13 @@ class Restore : public BSONTool {
ns += "." + oldCollName;
}
+ if (oplogReplayLimit) {
+ error() << "The oplogLimit option cannot be used if "
+ << "normal databases/collections exist in the dump directory."
+ << endl;
+ exit(EXIT_FAILURE);
+ }
+
log() << "\tgoing into namespace [" << ns << "]" << endl;
if ( _drop ) {
@@ -341,6 +404,7 @@ class Restore : public BSONTool {
// exclude operations that don't meet (timestamp) criteria
if ( _opmatcher.get() && ! _opmatcher->matches ( obj ) ) {
+ _oplogEntrySkips++;
return;
}
@@ -350,6 +414,7 @@ class Restore : public BSONTool {
BSONObj cmd = BSON( "applyOps" << BSON_ARRAY( obj ) );
BSONObj out;
conn().runCommand(db, cmd, out);
+ _oplogEntryApplies++;
// wait for ops to propagate to "w" nodes (doesn't warn if w used without replset)
if ( _w > 1 ) {

0 comments on commit 49cb7ca

Please sign in to comment.