Join GitHub today
GitHub is home to over 20 million developers working together to host and review code, manage projects, and build software together.
Use disk-based pruning strategy #22
Conversation
| - // memory hit is short-lived this is probably acceptable. | ||
| - txnIds := make(map[bson.ObjectId]struct{}) | ||
| - collNames := make(map[string]struct{}) | ||
| + workingSetName := txns.Name + ".prunetemp" |
axw
Mar 30, 2017
Member
Do we know what will happen with concurrent calls to PruneTxns? I'm just wondering about what happens if we call PruneTxns and the mongo client request times out, but the op's still running in mongo. Maybe we could check for existence first?
According to the mongo docs, the output collection will be atomically replaced. I guess at worst we would invalidate the earlier call's iterators, if any are still active.
mjs
Mar 30, 2017
Concurrent calls are unlikely given how PruneTxns will ultimately be run, but it is worth considering. I'm reluctant to use the existence of the temp collection as to block prune attempts b/c if a jujud gets killed for some reason during pruning then pruning will never happen again (until someone manually fixes the problem).
I'll try out current calls and sees what happens I guess.
mjs
Mar 31, 2017
I've manually tested this quite a bit now. Depending on timing concurrent runs tend to cause cursors to be killed. This is handled gracefully enough without doing anything bad to the collections or txn data. Purging continues correctly the next time around.
| + {"$out": workingSetName}, | ||
| + }) | ||
| + var dummy bson.D | ||
| + if err := runPipe(pipe, &dummy); err != nil { |
| - return err | ||
| - } | ||
| +func newBulkRemover(coll *mgo.Collection) *bulkRemover { | ||
| + session := coll.Database.Session.Copy() |
mjs
Mar 30, 2017
Thanks for picking this up. I ended to dig into this more. I wasn't sure if it was ok to perform DB operations on the same session that had an active iterator.
On reflection it's probably ok, but I'll make sure first. If so, I'll remove the Copy.
mjs
Mar 31, 2017
It doesn't look like the Copy was needed so I've removed it. That simplifies a few other aspects as well.
jameinel
approved these changes
Mar 30, 2017
Being faster and more memory efficient is great. I had a few thoughts about the algorithm, which I think we can chose whether or not we want to address them now.
| - // memory hit is short-lived this is probably acceptable. | ||
| - txnIds := make(map[bson.ObjectId]struct{}) | ||
| - collNames := make(map[string]struct{}) | ||
| + workingSetName := txns.Name + ".prunetemp" |
jameinel
Mar 30, 2017
Owner
I take it, it was easier to project into another collection in the same DB rather than a second DB?
| + matchCompleted := bson.M{"$match": bson.M{"s": bson.M{"$in": []int{taborted, tapplied}}}} | ||
| + pipe := txns.Pipe([]bson.M{ | ||
| + matchCompleted, | ||
| + {"$project": bson.M{"_id": 1}}, |
jameinel
Mar 30, 2017
Owner
Nice that because the Transaction ids are the _id field, it maps directly into your target collections primary key.
| + logger.Debugf("loading collections with transactions") | ||
| + pipe = txns.Pipe([]bson.M{ | ||
| + matchCompleted, | ||
| + {"$project": bson.M{"_id": 0, "c": "$o.c"}}, |
jameinel
Mar 30, 2017
Owner
I take it this is "give me only things in the o.c field as just the value c"
and then "turn the arrays of C into a single top level list of C" objects
and then "aggregate all of the separate C objects so that we have only unique c values".
Given that you don't have interesting documents (just a single field), is there something cheaper than "$group"? Maybe not.
Given all this work only gives you the collection names. I'm curious why do it this way instead of just iterating all known collections and looking for 'txn-queue' fields.
mjs
Mar 30, 2017
I thought about this a bit. The code doesn't know which collections use transactions so it works it out by looking in txns. There are a few collections like statushistory and sequnce which don't use txns so I was trying to avoid searching those unnecessarily for txn-queue fields.
This pipeline is fairly expensive though so maybe the algorithm should just look in all collections (except a few mgo/txn related ones). I'll play with that.
I don't think there's anything cheaper than $group that would work here.
| - return fmt.Errorf("failed to read all txns: %v", err) | ||
| + var collNames []string | ||
| + for _, doc := range collNameDocs { | ||
| + if len(doc) == 1 { |
mjs
Mar 30, 2017
•
The length should never be anything other than 1 but I wanted to avoid any chance of panic.
| + for _, doc := range collNameDocs { | ||
| + if len(doc) == 1 { | ||
| + if collName, ok := doc[0].Value.(string); ok { | ||
| + collNames = append(collNames, collName) |
jameinel
Mar 30, 2017
Owner
I'm curious if this is also available as something like doc.Id(), but maybe you'd have to use a struct like:
var collNameDocs struct {
Collection string `bson:"_id"`
}
Would that be clearer to parse?
| coll := db.C(collName) | ||
| var tDoc struct { | ||
| Queue []string `bson:"txn-queue"` | ||
| } | ||
| - iter := coll.Find(nil).Select(bson.M{"txn-queue": 1}).Iter() | ||
| + query := coll.Find(nil).Select(bson.M{"txn-queue": 1}) |
jameinel
Mar 30, 2017
Owner
I assume this is the equivalent of:
db.collection.find({}, {"txn-queue": 1})
I wonder if it would be helpful to do the same trick we did earlier with:
db.collection.find({}, {"_id": 0, "txn-queue": 1})
It also really feels like it wants to be another pipeline. Would it be sane to do the aggregation into another temp table so that we wouldn't have to iterate the docs?
Something like:
pipe = txns.Pipe([]bson.M{
{"$project": bson.M{"_id": 0, "txn": "txn-queue"}},
{"$unwind": "$txn"},
{"$group": bson.M{"_id": "$txn"}},
})
Feels really good here, as we don't end up double handling the same txn ids over and over again.
mjs
Mar 30, 2017
•
Unfortunately I don't think that helps because the txn-queue field values are <txn id>_<nonce>. I'll have to check if the nonce is reused but I suspect it's not.
| coll := db.C(collName) | ||
| var tDoc struct { | ||
| Queue []string `bson:"txn-queue"` | ||
| } | ||
| - iter := coll.Find(nil).Select(bson.M{"txn-queue": 1}).Iter() | ||
| + query := coll.Find(nil).Select(bson.M{"txn-queue": 1}) | ||
| + query.Batch(batchBytes) |
jameinel
Mar 30, 2017
Owner
https://docs.mongodb.com/manual/reference/method/cursor.batchSize/
Seems to say that the batch size is the number of documents to batch, not the number of bytes to batch.
https://godoc.org/gopkg.in/mgo.v2#Pipe.Batch
Says "sets the batch size when fetching documents" but that doesn't specify units either.
Given that you've run this with a batch size of 16M, maybe I'm wrong. This feels like something that might be Mongo Version dependent.
mjs
Mar 30, 2017
I did research this a bit when I wrote the code and convinced myself it was in bytes but I've looked again and now I'm pretty sure it's in documents. It's really not very clear in the docs.
It seems if you set the number to a number of docs that results in a return value bigger than the 16MB it'll limit to the maximum anyway. According to the docs 101 docs is about 1MB (not quite sure how that works out). I'll set the batch size to 1616 to get the maximum.
| + iter := query.Iter() | ||
| + var doc bson.D | ||
| + for iter.Next(&doc) { | ||
| + if err := remover.remove(doc[0].Value); err != nil { |
jameinel
Mar 30, 2017
Owner
I'm curious why before you had to do the "if len() == 1" check, but here you don't
| +} | ||
| + | ||
| +func (r *bulkRemover) remove(id interface{}) error { | ||
| + r.chunk.Remove(bson.D{{"_id", id}}) |
jameinel
Mar 30, 2017
Owner
It feels like we're going to end up removing the same txn id over and over and over again.
And while I realize we don't really want to cache all the txn ids in memory, I wonder about having a limited txn id set. Maybe a per-chunk set, and we skip items that we know are already in the set? We could probably use some hit-rate tracking here, but just a simple:
if r.thisChunkSet.Contains(id) {
// We've already queued this one for removal
return nil
}
And then thisChunkSet gets reset to the empty Set() in newChunk.
Thoughts?
mjs
Mar 30, 2017
I guess some txns can involve a lot of documents, although most don't so I don't how much of an issue it really is. I'll try out the set and see how much it actually gets hit.
Note that this is only relevant for when still-referenced txns are being removed from the working set. The actual removals from the txns collection - which also uses bulkRemover - works from a unique set of txn ids.
| +func (r *bulkRemover) remove(id interface{}) error { | ||
| + r.chunk.Remove(bson.D{{"_id", id}}) | ||
| + r.chunkSize++ | ||
| + if r.chunkSize == maxChunkSize { |
jameinel
Mar 30, 2017
Owner
As a general peeve of mine, I really prefer these checks to be:
if r.chunkSize >= maxChunkSize {
}
its a bit of belt-and-suspenders checking, but "==" always has the catastrophic failure when you accidentally incremented twice and suddenly the trap never triggers because you walked right past it.
| + // Load the ids of all completed and aborted txns into a separate | ||
| + // temporary collection. | ||
| + logger.Debugf("loading all completed transactions") | ||
| + matchCompleted := bson.M{"$match": bson.M{"s": bson.M{"$in": []int{taborted, tapplied}}}} |
jameinel
Mar 30, 2017
Owner
Another thing I just noted:
time db.txns.find({"s": 5}).count()
and
time db.txns.find({"s": 6}).count()
are quite fast vs
time db.txns.find({"s": {"$in": [5, 6]}}).count()
is very slow (db.txns.find().explain() still said it was using the index).
So we might consider doing 2 passes with explicit values might be significantly faster than the "in" version.
mjs
Mar 30, 2017
•
db.txns.find({"s": {$gte: 5}} is fast but using that in the aggregation pipeline is still just as slow as $in, even though explain: true says it'll use the index. Very strange.
I've tried with MongoDB 2.6 and 3.2. Same problem with both.
mjs
Mar 30, 2017
Doing two passes means a lot of extra work later (2 collections to remove txns from) so I'm just going to leave it using $gte.
mjs
commented
Mar 31, 2017
|
$$merge$$ |
mjs commentedMar 30, 2017
•
Edited 1 time
-
mjs
Mar 30, 2017
Previously, transaction pruning would consume unmanagable amounts of
memory when large numbers of transactions were present.
Instead of holding the transaction ids working set in memory, it is
now kept in a temporary database collection (generated by an
aggregation pipeline query) which uses much less memory.
Also of note:
resulting in less noise and more useful feedback
As well as consuming less memory, the new approach is also much
faster. Using a real-world database dump containing 19 million
transactions where 18.5 million were to be pruned, the total prune
time dropped from 154 minutes to 9 minutes (-94%)!
No test changes were required as the external behaviour hasn't changed.