Permalink
...
Checking mergeability…
Don’t worry, you can still create the pull request.
Comparing changes
Open a pull request
- 4 commits
- 7 files changed
- 0 commit comments
- 1 contributor
Unified
Split
Showing
with
231 additions
and 71 deletions.
- +7 −10 state/export_test.go
- +65 −60 state/logs.go
- +62 −1 state/upgrades.go
- +79 −0 state/upgrades_test.go
- +5 −0 upgrades/backend.go
- +7 −0 upgrades/steps_22.go
- +6 −0 upgrades/steps_22_test.go
View
17
state/export_test.go
| @@ -32,7 +32,6 @@ import ( | ||
| "github.com/juju/juju/state/storage" | ||
| "github.com/juju/juju/status" | ||
| "github.com/juju/juju/testcharms" | ||
| - "github.com/juju/juju/version" | ||
| ) | ||
| const ( | ||
| @@ -473,15 +472,13 @@ func MakeLogDoc( | ||
| msg string, | ||
| ) *logDoc { | ||
| return &logDoc{ | ||
| - Id: bson.NewObjectId(), | ||
| - Time: t.UnixNano(), | ||
| - ModelUUID: modelUUID, | ||
| - Entity: entity.String(), | ||
| - Version: version.Current.String(), | ||
| - Module: module, | ||
| - Location: location, | ||
| - Level: int(level), | ||
| - Message: msg, | ||
| + Id: bson.NewObjectId(), | ||
| + Time: t.UnixNano(), | ||
| + Entity: entity.String(), | ||
| + Module: module, | ||
| + Location: location, | ||
| + Level: int(level), | ||
| + Message: msg, | ||
| } | ||
| } | ||
View
125
state/logs.go
| @@ -64,14 +64,18 @@ var logIndexes = [][]string{ | ||
| // This index needs to include _id because | ||
| // logTailer.processCollection uses _id to ensure log records with | ||
| // the same time have a consistent ordering. | ||
| - {"e", "t", "_id"}, | ||
| - {"e", "n"}, | ||
| + {"t", "_id"}, | ||
| + {"n"}, | ||
| } | ||
| // InitDbLogs sets up the indexes for the logs collection. It should | ||
| // be called as state is opened. It is idempotent. | ||
| func InitDbLogs(session *mgo.Session) error { | ||
| logsColl := session.DB(logsDB).C(logsC) | ||
| + return initLogCollectionIndices(logsColl) | ||
| +} | ||
| + | ||
| +func initLogCollectionIndices(logsColl *mgo.Collection) error { | ||
| for _, key := range logIndexes { | ||
| err := logsColl.EnsureIndex(mgo.Index{Key: key}) | ||
| if err != nil { | ||
| @@ -201,17 +205,14 @@ func (logger *LastSentLogTracker) Get() (int64, int64, error) { | ||
| // document includes the field names. | ||
| // (alesstimec) It would be really nice if we could store Time as int64 | ||
| // for increased precision. | ||
| -// TODO: remove version from this structure: https://pad.lv/1643743 | ||
| type logDoc struct { | ||
| - Id bson.ObjectId `bson:"_id"` | ||
| - Time int64 `bson:"t"` // unix nano UTC | ||
| - ModelUUID string `bson:"e"` | ||
| - Entity string `bson:"n"` // e.g. "machine-0" | ||
| - Version string `bson:"r"` | ||
| - Module string `bson:"m"` // e.g. "juju.worker.firewaller" | ||
| - Location string `bson:"l"` // "filename:lineno" | ||
| - Level int `bson:"v"` | ||
| - Message string `bson:"x"` | ||
| + Id bson.ObjectId `bson:"_id"` | ||
| + Time int64 `bson:"t"` // unix nano UTC | ||
| + Entity string `bson:"n"` // e.g. "machine-0" | ||
| + Module string `bson:"m"` // e.g. "juju.worker.firewaller" | ||
| + Location string `bson:"l"` // "filename:lineno" | ||
| + Level int `bson:"v"` | ||
| + Message string `bson:"x"` | ||
| } | ||
| type DbLogger struct { | ||
| @@ -221,6 +222,10 @@ type DbLogger struct { | ||
| func NewDbLogger(st ModelSessioner) *DbLogger { | ||
| _, logsColl := initLogsSession(st) | ||
| + // FIXME: don't panic | ||
| + if err := initLogCollectionIndices(logsColl); err != nil { | ||
| + panic(err) | ||
| + } | ||
| return &DbLogger{ | ||
| logsColl: logsColl, | ||
| modelUUID: st.ModelUUID(), | ||
| @@ -229,18 +234,15 @@ func NewDbLogger(st ModelSessioner) *DbLogger { | ||
| // Log writes a log message to the database. | ||
| func (logger *DbLogger) Log(t time.Time, entity string, module string, location string, level loggo.Level, msg string) error { | ||
| - // TODO(ericsnow) Use a controller-global int sequence for Id. | ||
| - | ||
| unixEpochNanoUTC := t.UnixNano() | ||
| return logger.logsColl.Insert(&logDoc{ | ||
| - Id: bson.NewObjectId(), | ||
| - Time: unixEpochNanoUTC, | ||
| - ModelUUID: logger.modelUUID, | ||
| - Entity: entity, | ||
| - Module: module, | ||
| - Location: location, | ||
| - Level: int(level), | ||
| - Message: msg, | ||
| + Id: bson.NewObjectId(), | ||
| + Time: unixEpochNanoUTC, | ||
| + Entity: entity, | ||
| + Module: module, | ||
| + Location: location, | ||
| + Level: int(level), | ||
| + Message: msg, | ||
| }) | ||
| } | ||
| @@ -254,8 +256,7 @@ func (logger *DbLogger) Close() { | ||
| // EntityDbLogger writes log records about one entity. | ||
| type EntityDbLogger struct { | ||
| DbLogger | ||
| - entity string | ||
| - version string | ||
| + entity string | ||
| } | ||
| // NewEntityDbLogger returns an EntityDbLogger instance which is used | ||
| @@ -265,25 +266,20 @@ func NewEntityDbLogger(st ModelSessioner, entity names.Tag, ver version.Number) | ||
| return &EntityDbLogger{ | ||
| DbLogger: *dbLogger, | ||
| entity: entity.String(), | ||
| - version: ver.String(), | ||
| } | ||
| } | ||
| // Log writes a log message to the database. | ||
| func (logger *EntityDbLogger) Log(t time.Time, module string, location string, level loggo.Level, msg string) error { | ||
| - // TODO(ericsnow) Use a controller-global int sequence for Id. | ||
| - | ||
| unixEpochNanoUTC := t.UnixNano() | ||
| return logger.logsColl.Insert(&logDoc{ | ||
| - Id: bson.NewObjectId(), | ||
| - Time: unixEpochNanoUTC, | ||
| - ModelUUID: logger.modelUUID, | ||
| - Entity: logger.entity, | ||
| - Version: logger.version, | ||
| - Module: module, | ||
| - Location: location, | ||
| - Level: int(level), | ||
| - Message: msg, | ||
| + Id: bson.NewObjectId(), | ||
| + Time: unixEpochNanoUTC, | ||
| + Entity: logger.entity, | ||
| + Module: module, | ||
| + Location: location, | ||
| + Level: int(level), | ||
| + Message: msg, | ||
| }) | ||
| } | ||
| @@ -319,7 +315,8 @@ type LogRecord struct { | ||
| // origin fields | ||
| ModelUUID string | ||
| Entity names.Tag | ||
| - Version version.Number | ||
| + // TODO: remove this and see if anything cares | ||
| + Version version.Number | ||
| // logging-specific fields | ||
| Level loggo.Level | ||
| @@ -380,12 +377,13 @@ func NewLogTailer(st LogTailerState, params *LogTailerParams) (LogTailer, error) | ||
| if !st.IsController() && params.AllModels { | ||
| return nil, errors.NewNotValid(nil, "not allowed to tail logs from all models: not a controller") | ||
| } | ||
| + // Explicitly ignoring ALL Models just now. | ||
| - session := st.MongoSession().Copy() | ||
| + session, logsColl := initLogsSession(st) | ||
| t := &logTailer{ | ||
| modelUUID: st.ModelUUID(), | ||
| session: session, | ||
| - logsColl: session.DB(logsDB).C(logsC).With(session), | ||
| + logsColl: logsColl, | ||
| params: params, | ||
| logCh: make(chan *LogRecord), | ||
| recentIds: newRecentIdTracker(maxRecentLogIds), | ||
| @@ -463,7 +461,7 @@ func (t *logTailer) processReversed(query *mgo.Query) error { | ||
| return errors.Errorf("too many lines requested (%d) maximum is %d", | ||
| t.params.InitialLines, maxInitialLines) | ||
| } | ||
| - query.Sort("-e", "-t", "-_id") | ||
| + query.Sort("-t", "-_id") | ||
| query.Limit(t.params.InitialLines) | ||
| iter := query.Iter() | ||
| queue := make([]logDoc, t.params.InitialLines) | ||
| @@ -523,7 +521,7 @@ func (t *logTailer) processCollection() error { | ||
| // Important: it is critical that the sort index includes _id, | ||
| // otherwise MongoDB won't use the index, which risks hitting | ||
| // MongoDB's 32MB sort limit. See https://pad.lv/1590605. | ||
| - iter := query.Sort("e", "t", "_id").Iter() | ||
| + iter := query.Sort("t", "_id").Iter() | ||
| for iter.Next(&doc) { | ||
| rec, err := logDocToRecord(&doc) | ||
| if err != nil { | ||
| @@ -546,6 +544,7 @@ func (t *logTailer) tailOplog() error { | ||
| newParams := t.params | ||
| newParams.StartID = t.lastID // (t.lastID + 1) once Id is a sequential int. | ||
| + logsC := logCollection(t.modelUUID) | ||
| oplogSel := append(t.paramsToSelector(newParams, "o."), | ||
| bson.DocElem{"ns", logsDB + "." + logsC}, | ||
| ) | ||
| @@ -604,9 +603,9 @@ func (t *logTailer) paramsToSelector(params *LogTailerParams, prefix string) bso | ||
| if !params.StartTime.IsZero() { | ||
| sel = append(sel, bson.DocElem{"t", bson.M{"$gte": params.StartTime.UnixNano()}}) | ||
| } | ||
| - if !params.AllModels { | ||
| - sel = append(sel, bson.DocElem{"e", t.modelUUID}) | ||
| - } | ||
| + // if !params.AllModels { | ||
| + // sel = append(sel, bson.DocElem{"e", t.modelUUID}) | ||
| + // } | ||
| if params.MinLevel > loggo.UNSPECIFIED { | ||
| sel = append(sel, bson.DocElem{"v", bson.M{"$gte": int(params.MinLevel)}}) | ||
| } | ||
| @@ -701,14 +700,14 @@ func (s *objectIdSet) Length() int { | ||
| } | ||
| func logDocToRecord(doc *logDoc) (*LogRecord, error) { | ||
| - var ver version.Number | ||
| - if doc.Version != "" { | ||
| - parsed, err := version.Parse(doc.Version) | ||
| - if err != nil { | ||
| - return nil, errors.Annotatef(err, "invalid version %q", doc.Version) | ||
| - } | ||
| - ver = parsed | ||
| - } | ||
| + // var ver version.Number | ||
| + // if doc.Version != "" { | ||
| + // parsed, err := version.Parse(doc.Version) | ||
| + // if err != nil { | ||
| + // return nil, errors.Annotatef(err, "invalid version %q", doc.Version) | ||
| + // } | ||
| + // ver = parsed | ||
| + // } | ||
| level := loggo.Level(doc.Level) | ||
| if level > loggo.CRITICAL { | ||
| @@ -724,9 +723,9 @@ func logDocToRecord(doc *logDoc) (*LogRecord, error) { | ||
| ID: doc.Time, | ||
| Time: time.Unix(0, doc.Time).UTC(), // not worth preserving TZ | ||
| - ModelUUID: doc.ModelUUID, | ||
| - Entity: entity, | ||
| - Version: ver, | ||
| + // ModelUUID: doc.ModelUUID, | ||
| + Entity: entity, | ||
| + // Version: ver, | ||
| Level: level, | ||
| Module: doc.Module, | ||
| @@ -740,10 +739,11 @@ func logDocToRecord(doc *logDoc) (*LogRecord, error) { | ||
| // logs collection. All logs older than minLogTime are | ||
| // removed. Further removal is also performed if the logs collection | ||
| // size is greater than maxLogsMB. | ||
| -func PruneLogs(st MongoSessioner, minLogTime time.Time, maxLogsMB int) error { | ||
| +func PruneLogs(st ModelSessioner, minLogTime time.Time, maxLogsMB int) error { | ||
| session, logsColl := initLogsSession(st) | ||
| defer session.Close() | ||
| + // TODO: fix pruning logs | ||
| modelUUIDs, err := getModelsInLogs(logsColl) | ||
| if err != nil { | ||
| return errors.Annotate(err, "failed to get log counts") | ||
| @@ -818,10 +818,14 @@ func PruneLogs(st MongoSessioner, minLogTime time.Time, maxLogsMB int) error { | ||
| return nil | ||
| } | ||
| +func logCollection(modelUUID string) string { | ||
| + return fmt.Sprintf("%s.%s", logsC, modelUUID) | ||
| +} | ||
| + | ||
| // initLogsSession creates a new session suitable for logging updates, | ||
| // returning the session and a logs mgo.Collection connected to that | ||
| // session. | ||
| -func initLogsSession(st MongoSessioner) (*mgo.Session, *mgo.Collection) { | ||
| +func initLogsSession(st ModelSessioner) (*mgo.Session, *mgo.Collection) { | ||
| // To improve throughput, only wait for the logs to be written to | ||
| // the primary. For some reason, this makes a huge difference even | ||
| // when the replicaset only has one member (i.e. a single primary). | ||
| @@ -830,6 +834,7 @@ func initLogsSession(st MongoSessioner) (*mgo.Session, *mgo.Collection) { | ||
| W: 1, | ||
| }) | ||
| db := session.DB(logsDB) | ||
| + logsC := logCollection(st.ModelUUID()) | ||
| return session, db.C(logsC).With(session) | ||
| } | ||
| @@ -889,14 +894,14 @@ func getLogCountForModel(coll *mgo.Collection, modelUUID string) (int, error) { | ||
| func removeModelLogs(session *mgo.Session, modelUUID string) error { | ||
| logsDB := session.DB(logsDB) | ||
| + logsC := logCollection(modelUUID) | ||
| logsColl := logsDB.C(logsC) | ||
| - _, err := logsColl.RemoveAll(bson.M{"e": modelUUID}) | ||
| - if err != nil { | ||
| + if err := logsColl.DropCollection(); err != nil { | ||
| return errors.Trace(err) | ||
| } | ||
| // Also remove the tracked high-water times. | ||
| trackersColl := logsDB.C(forwardedC) | ||
| - _, err = trackersColl.RemoveAll(bson.M{"model-uuid": modelUUID}) | ||
| + _, err := trackersColl.RemoveAll(bson.M{"model-uuid": modelUUID}) | ||
| return errors.Trace(err) | ||
| } | ||
View
63
state/upgrades.go
| @@ -235,7 +235,7 @@ func DropOldLogIndex(st *State) error { | ||
| // If the log collection still has the old e,t index, remove it. | ||
| key := []string{"e", "t"} | ||
| db := st.MongoSession().DB(logsDB) | ||
| - collection := db.C(logsC) | ||
| + collection := db.C("logs") | ||
| err := collection.DropIndex(key...) | ||
| if err == nil { | ||
| return nil | ||
| @@ -715,6 +715,67 @@ func AddStatusHistoryPruneSettings(st *State) error { | ||
| return nil | ||
| } | ||
| +var splitBatchRemoveSize = 1000 | ||
| + | ||
| +// SplitLogCollections moves log entries from the old single log collection | ||
| +// to the log collection per environment. | ||
| +func SplitLogCollections(st *State) error { | ||
| + db := st.MongoSession().DB(logsDB) | ||
| + oldLogs := db.C("logs") | ||
| + | ||
| + // If we haven't seen any particular environment, we need to initialise | ||
| + // the logs collection with the right indices. | ||
| + seen := set.NewStrings() | ||
| + | ||
| + iter := oldLogs.Find(nil).Iter() | ||
| + var ( | ||
| + doc bson.M | ||
| + ids []string | ||
| + ) | ||
| + | ||
| + for iter.Next(&doc) { | ||
| + | ||
| + newCollName := logCollection(doc["e"].(string)) | ||
| + newLogs := db.C(newCollName) | ||
| + | ||
| + if !seen.Contains(newCollName) { | ||
| + if err := initLogCollectionIndices(newLogs); err != nil { | ||
| + return errors.Annotatef(err, "failed to init new logs collection %q", newCollName) | ||
| + } | ||
| + seen.Add(newCollName) | ||
| + } | ||
| + | ||
| + delete(doc, "e") // old env uuid | ||
| + delete(doc, "r") // version - not needed | ||
| + | ||
| + if err := newLogs.Insert(doc); err != nil { | ||
| + return errors.Annotate(err, "failed to insert log record") | ||
| + } | ||
| + ids = append(ids, doc["_id"].(string)) | ||
| + if len(ids) >= splitBatchRemoveSize { | ||
| + if err := oldLogs.Remove(bson.D{{"_id", bson.D{{"$in", ids}}}}); err != nil { | ||
| + return errors.Annotate(err, "failed to remove batch of logs") | ||
| + } | ||
| + ids = nil | ||
| + } | ||
| + | ||
| + doc = nil | ||
| + } | ||
| + | ||
| + // drop the old collection | ||
| + if err := oldLogs.DropCollection(); err != nil { | ||
| + // If the error is &mgo.QueryError{Code:26, Message:"ns not found", Assertion:false} | ||
| + // that's fine. | ||
| + if merr, ok := err.(*mgo.QueryError); ok { | ||
| + if merr.Code == 26 { | ||
| + return nil | ||
| + } | ||
| + } | ||
| + return errors.Annotate(err, "failed to drop old logs collection") | ||
| + } | ||
| + return nil | ||
| +} | ||
| + | ||
| // AddStorageInstanceConstraints sets the "constraints" field on | ||
| // storage instance docs. | ||
| func AddStorageInstanceConstraints(st *State) error { | ||
Oops, something went wrong.