From ac11530d0eb82bdc5f66a94c2c8bd0171e53dc5c Mon Sep 17 00:00:00 2001 From: Florian Bacher Date: Thu, 2 Jun 2022 08:44:59 +0200 Subject: [PATCH] fix: Ensure all MongoDB cursors are being closed (#7940) --- mongodb-datastore/db/mongodb_event_repo.go | 26 ++++++++++++------- shipyard-controller/db/common.go | 16 ++++++++++++ shipyard-controller/db/mongodb.go | 2 ++ shipyard-controller/db/mongodb_event_repo.go | 3 ++- .../db/mongodb_eventqueue_repo.go | 4 +-- shipyard-controller/db/mongodb_log_repo.go | 1 + shipyard-controller/db/mongodb_state_repo.go | 1 + .../db/mongodb_task_sequence_repo.go | 1 + .../db/mongodb_uniform_repo.go | 3 ++- statistics-service/db/migrator.go | 7 ++++- .../db/statistics_mongodb_repo.go | 8 +++++- 11 files changed, 56 insertions(+), 16 deletions(-) create mode 100644 shipyard-controller/db/common.go diff --git a/mongodb-datastore/db/mongodb_event_repo.go b/mongodb-datastore/db/mongodb_event_repo.go index 9bc5b3cc94..4e0565d867 100644 --- a/mongodb-datastore/db/mongodb_event_repo.go +++ b/mongodb-datastore/db/mongodb_event_repo.go @@ -476,17 +476,20 @@ func (mr *MongoDBEventRepo) aggregateFromDB(collectionName string, pipeline mong defer cancel() cur, err := collection.Aggregate(ctx, pipeline) - - if err != nil { - logger.WithError(err).Error("Could not retrieve events from collectiong elements in events collection") - return nil, err - } // close the cursor after the function has completed to avoid memory leaks defer func() { + if cur == nil { + return + } if err := cur.Close(ctx); err != nil { logger.WithError(err).Error("Could not close cursor") } }() + if err != nil { + logger.WithError(err).Error("Could not retrieve events from collectiong elements in events collection") + return nil, err + } + result.Events = formatEventResults(ctx, cur) return result, nil @@ -532,17 +535,20 @@ func (mr *MongoDBEventRepo) findInDB(collectionName string, pageSize int64, next } cur, err := collection.Find(ctx, searchOptions, sortOptions) - - if err != nil { - logger.WithError(err).Error("Could not retrieve elements from events collection") - return nil, err - } // close the cursor after the function has completed to avoid memory leaks defer func() { + if cur == nil { + return + } if err := cur.Close(ctx); err != nil { logger.WithError(err).Error("Could not close cursor.") } }() + if err != nil { + logger.WithError(err).Error("Could not retrieve elements from events collection") + return nil, err + } + result.Events = formatEventResults(ctx, cur) result.PageSize = pageSize diff --git a/shipyard-controller/db/common.go b/shipyard-controller/db/common.go new file mode 100644 index 0000000000..6187da22d6 --- /dev/null +++ b/shipyard-controller/db/common.go @@ -0,0 +1,16 @@ +package db + +import ( + "context" + log "github.com/sirupsen/logrus" + "go.mongodb.org/mongo-driver/mongo" +) + +func closeCursor(ctx context.Context, cur *mongo.Cursor) { + if cur == nil { + return + } + if err := cur.Close(ctx); err != nil { + log.Errorf("Could not close cursor: %v", err) + } +} diff --git a/shipyard-controller/db/mongodb.go b/shipyard-controller/db/mongodb.go index fa3da98b34..bea8d19b29 100644 --- a/shipyard-controller/db/mongodb.go +++ b/shipyard-controller/db/mongodb.go @@ -13,6 +13,8 @@ func SetupTTLIndex(ctx context.Context, propertyName string, duration time.Durat ttlInSeconds := int32(duration.Seconds()) indexName := propertyName + "_1" cur, err := collection.Indexes().List(ctx) + defer closeCursor(ctx, cur) + if err != nil { return fmt.Errorf("could not load list of indexes of collection %s: %w", collection.Name(), err) } diff --git a/shipyard-controller/db/mongodb_event_repo.go b/shipyard-controller/db/mongodb_event_repo.go index b0d1dbb9ba..552b438d46 100644 --- a/shipyard-controller/db/mongodb_event_repo.go +++ b/shipyard-controller/db/mongodb_event_repo.go @@ -51,6 +51,7 @@ func (mdbrepo *MongoDBEventsRepo) GetEvents(project string, filter common.EventF sortOptions := options.Find().SetSort(bson.D{{Key: "time", Value: -1}}) cur, err := collection.Find(ctx, searchOptions, sortOptions) + defer closeCursor(ctx, cur) if err != nil && err == mongo.ErrNoDocuments { return nil, ErrNoEventFound } else if err != nil { @@ -61,7 +62,6 @@ func (mdbrepo *MongoDBEventsRepo) GetEvents(project string, filter common.EventF events := []models.Event{} - defer cur.Close(ctx) for cur.Next(ctx) { event, err := decodeKeptnEvent(cur) if err != nil { @@ -114,6 +114,7 @@ func (mdbrepo *MongoDBEventsRepo) GetRootEvents(getRootParams models.GetRootEven } cur, err := collection.Find(ctx, searchOptions, sortOptions) + defer closeCursor(ctx, cur) if err != nil && err != mongo.ErrNoDocuments { return nil, err } diff --git a/shipyard-controller/db/mongodb_eventqueue_repo.go b/shipyard-controller/db/mongodb_eventqueue_repo.go index 131e9df14c..fd893c369d 100644 --- a/shipyard-controller/db/mongodb_eventqueue_repo.go +++ b/shipyard-controller/db/mongodb_eventqueue_repo.go @@ -184,6 +184,7 @@ func (m *MongoDBEventQueueRepo) GetEventQueueSequenceStates(filter models.EventQ searchOptions[stageScope] = filter.Scope.Stage } cur, err := collection.Find(ctx, searchOptions) + defer closeCursor(ctx, cur) if err != nil && err == mongo.ErrNoDocuments { return nil, ErrNoEventFound } else if err != nil { @@ -194,7 +195,6 @@ func (m *MongoDBEventQueueRepo) GetEventQueueSequenceStates(filter models.EventQ stateItems := []models.EventQueueSequenceState{} - defer cur.Close(ctx) for cur.Next(ctx) { stateItem := models.EventQueueSequenceState{} err := cur.Decode(&stateItem) @@ -249,6 +249,7 @@ func insertQueueItemIntoCollection(ctx context.Context, collection *mongo.Collec func getQueueItemsFromCollection(collection *mongo.Collection, ctx context.Context, searchOptions bson.M, opts ...*options.FindOptions) ([]models.QueueItem, error) { cur, err := collection.Find(ctx, searchOptions, opts...) + defer closeCursor(ctx, cur) if err != nil && err == mongo.ErrNoDocuments { return nil, ErrNoEventFound } else if err != nil { @@ -259,7 +260,6 @@ func getQueueItemsFromCollection(collection *mongo.Collection, ctx context.Conte queuedItems := []models.QueueItem{} - defer cur.Close(ctx) for cur.Next(ctx) { queueItem := models.QueueItem{} err := cur.Decode(&queueItem) diff --git a/shipyard-controller/db/mongodb_log_repo.go b/shipyard-controller/db/mongodb_log_repo.go index c7ef121136..5ef60f50ef 100644 --- a/shipyard-controller/db/mongodb_log_repo.go +++ b/shipyard-controller/db/mongodb_log_repo.go @@ -80,6 +80,7 @@ func (mdbrepo *MongoDBLogRepo) GetLogEntries(params models.GetLogParams) (*model } cur, err := collection.Find(ctx, searchOptions, sortOptions) + defer closeCursor(ctx, cur) if err != nil && err != mongo.ErrNoDocuments { return nil, err } diff --git a/shipyard-controller/db/mongodb_state_repo.go b/shipyard-controller/db/mongodb_state_repo.go index 08200cb76c..e5b6ee4b65 100644 --- a/shipyard-controller/db/mongodb_state_repo.go +++ b/shipyard-controller/db/mongodb_state_repo.go @@ -83,6 +83,7 @@ func (mdbrepo *MongoDBStateRepo) FindSequenceStates(filter models.StateFilter) ( } cur, err := collection.Find(ctx, searchOptions, sortOptions) + defer closeCursor(ctx, cur) if err != nil && err != mongo.ErrNoDocuments { return nil, err } diff --git a/shipyard-controller/db/mongodb_task_sequence_repo.go b/shipyard-controller/db/mongodb_task_sequence_repo.go index 78beac4075..d0f09f2b8d 100644 --- a/shipyard-controller/db/mongodb_task_sequence_repo.go +++ b/shipyard-controller/db/mongodb_task_sequence_repo.go @@ -32,6 +32,7 @@ func (mdbrepo *TaskSequenceMongoDBRepo) GetTaskExecutions(project string, filter collection := mdbrepo.getTaskSequenceCollection(project) cur, err := collection.Find(ctx, mdbrepo.getTaskSequenceMappingSearchOptions(filter)) + defer closeCursor(ctx, cur) if err != nil && err != mongo.ErrNoDocuments { return nil, err } diff --git a/shipyard-controller/db/mongodb_uniform_repo.go b/shipyard-controller/db/mongodb_uniform_repo.go index 63d7979998..d5d580631e 100644 --- a/shipyard-controller/db/mongodb_uniform_repo.go +++ b/shipyard-controller/db/mongodb_uniform_repo.go @@ -328,6 +328,7 @@ func (mdbrepo *MongoDBUniformRepo) getCollectionAndContext() (*mongo.Collection, func (mdbrepo *MongoDBUniformRepo) findIntegrations(searchParams models.GetUniformIntegrationsParams, collection *mongo.Collection, ctx context.Context) ([]models.Integration, error) { searchOptions := mdbrepo.getSearchOptions(searchParams) cur, err := collection.Find(ctx, searchOptions) + defer closeCursor(ctx, cur) if err != nil && err != mongo.ErrNoDocuments { return nil, err } @@ -361,7 +362,7 @@ func (mdbrepo *MongoDBUniformRepo) DeleteServiceFromSubscriptions(subscriptionNa } cur, err := collection.Find(ctx, filter) - + defer closeCursor(ctx, cur) if err != nil && err != mongo.ErrNoDocuments { return err } diff --git a/statistics-service/db/migrator.go b/statistics-service/db/migrator.go index b802f00ce9..d51db44210 100644 --- a/statistics-service/db/migrator.go +++ b/statistics-service/db/migrator.go @@ -63,10 +63,15 @@ func (m *Migrator) migrateBatch(ctx context.Context) (bool, error) { Limit: crateInt64P(m.batchSize), Skip: crateInt64P(skips), }) + defer func() { + if cur == nil { + return + } + cur.Close(ctx) + }() if err != nil { return false, err } - defer cur.Close(ctx) currBatchSize := 0 for cur.Next(ctx) { diff --git a/statistics-service/db/statistics_mongodb_repo.go b/statistics-service/db/statistics_mongodb_repo.go index 28422ab2e2..18c1902dae 100644 --- a/statistics-service/db/statistics_mongodb_repo.go +++ b/statistics-service/db/statistics_mongodb_repo.go @@ -42,12 +42,18 @@ func (s *StatisticsMongoDBRepo) GetStatistics(from, to time.Time) ([]operations. } cur, err := s.statsCollection.Find(ctx, searchOptions) + defer func() { + if cur == nil { + return + } + cur.Close(ctx) + }() if err != nil { return nil, err } result := []operations.Statistics{} - defer cur.Close(ctx) + if cur.RemainingBatchLength() == 0 { return nil, ErrNoStatisticsFound }