Skip to content
This repository has been archived by the owner on Dec 21, 2023. It is now read-only.

Commit

Permalink
fix: Ensure all MongoDB cursors are being closed (#7940)
Browse files Browse the repository at this point in the history
  • Loading branch information
bacherfl committed Jun 2, 2022
1 parent 5021189 commit ac11530
Show file tree
Hide file tree
Showing 11 changed files with 56 additions and 16 deletions.
26 changes: 16 additions & 10 deletions mongodb-datastore/db/mongodb_event_repo.go
Expand Up @@ -476,17 +476,20 @@ func (mr *MongoDBEventRepo) aggregateFromDB(collectionName string, pipeline mong
defer cancel()

cur, err := collection.Aggregate(ctx, pipeline)

if err != nil {
logger.WithError(err).Error("Could not retrieve events from collectiong elements in events collection")
return nil, err
}
// close the cursor after the function has completed to avoid memory leaks
defer func() {
if cur == nil {
return
}
if err := cur.Close(ctx); err != nil {
logger.WithError(err).Error("Could not close cursor")
}
}()
if err != nil {
logger.WithError(err).Error("Could not retrieve events from collectiong elements in events collection")
return nil, err
}

result.Events = formatEventResults(ctx, cur)

return result, nil
Expand Down Expand Up @@ -532,17 +535,20 @@ func (mr *MongoDBEventRepo) findInDB(collectionName string, pageSize int64, next
}

cur, err := collection.Find(ctx, searchOptions, sortOptions)

if err != nil {
logger.WithError(err).Error("Could not retrieve elements from events collection")
return nil, err
}
// close the cursor after the function has completed to avoid memory leaks
defer func() {
if cur == nil {
return
}
if err := cur.Close(ctx); err != nil {
logger.WithError(err).Error("Could not close cursor.")
}
}()
if err != nil {
logger.WithError(err).Error("Could not retrieve elements from events collection")
return nil, err
}

result.Events = formatEventResults(ctx, cur)

result.PageSize = pageSize
Expand Down
16 changes: 16 additions & 0 deletions shipyard-controller/db/common.go
@@ -0,0 +1,16 @@
package db

import (
"context"
log "github.com/sirupsen/logrus"
"go.mongodb.org/mongo-driver/mongo"
)

func closeCursor(ctx context.Context, cur *mongo.Cursor) {
if cur == nil {
return
}
if err := cur.Close(ctx); err != nil {
log.Errorf("Could not close cursor: %v", err)
}
}
2 changes: 2 additions & 0 deletions shipyard-controller/db/mongodb.go
Expand Up @@ -13,6 +13,8 @@ func SetupTTLIndex(ctx context.Context, propertyName string, duration time.Durat
ttlInSeconds := int32(duration.Seconds())
indexName := propertyName + "_1"
cur, err := collection.Indexes().List(ctx)
defer closeCursor(ctx, cur)

if err != nil {
return fmt.Errorf("could not load list of indexes of collection %s: %w", collection.Name(), err)
}
Expand Down
3 changes: 2 additions & 1 deletion shipyard-controller/db/mongodb_event_repo.go
Expand Up @@ -51,6 +51,7 @@ func (mdbrepo *MongoDBEventsRepo) GetEvents(project string, filter common.EventF
sortOptions := options.Find().SetSort(bson.D{{Key: "time", Value: -1}})

cur, err := collection.Find(ctx, searchOptions, sortOptions)
defer closeCursor(ctx, cur)
if err != nil && err == mongo.ErrNoDocuments {
return nil, ErrNoEventFound
} else if err != nil {
Expand All @@ -61,7 +62,6 @@ func (mdbrepo *MongoDBEventsRepo) GetEvents(project string, filter common.EventF

events := []models.Event{}

defer cur.Close(ctx)
for cur.Next(ctx) {
event, err := decodeKeptnEvent(cur)
if err != nil {
Expand Down Expand Up @@ -114,6 +114,7 @@ func (mdbrepo *MongoDBEventsRepo) GetRootEvents(getRootParams models.GetRootEven
}

cur, err := collection.Find(ctx, searchOptions, sortOptions)
defer closeCursor(ctx, cur)
if err != nil && err != mongo.ErrNoDocuments {
return nil, err
}
Expand Down
4 changes: 2 additions & 2 deletions shipyard-controller/db/mongodb_eventqueue_repo.go
Expand Up @@ -184,6 +184,7 @@ func (m *MongoDBEventQueueRepo) GetEventQueueSequenceStates(filter models.EventQ
searchOptions[stageScope] = filter.Scope.Stage
}
cur, err := collection.Find(ctx, searchOptions)
defer closeCursor(ctx, cur)
if err != nil && err == mongo.ErrNoDocuments {
return nil, ErrNoEventFound
} else if err != nil {
Expand All @@ -194,7 +195,6 @@ func (m *MongoDBEventQueueRepo) GetEventQueueSequenceStates(filter models.EventQ

stateItems := []models.EventQueueSequenceState{}

defer cur.Close(ctx)
for cur.Next(ctx) {
stateItem := models.EventQueueSequenceState{}
err := cur.Decode(&stateItem)
Expand Down Expand Up @@ -249,6 +249,7 @@ func insertQueueItemIntoCollection(ctx context.Context, collection *mongo.Collec

func getQueueItemsFromCollection(collection *mongo.Collection, ctx context.Context, searchOptions bson.M, opts ...*options.FindOptions) ([]models.QueueItem, error) {
cur, err := collection.Find(ctx, searchOptions, opts...)
defer closeCursor(ctx, cur)
if err != nil && err == mongo.ErrNoDocuments {
return nil, ErrNoEventFound
} else if err != nil {
Expand All @@ -259,7 +260,6 @@ func getQueueItemsFromCollection(collection *mongo.Collection, ctx context.Conte

queuedItems := []models.QueueItem{}

defer cur.Close(ctx)
for cur.Next(ctx) {
queueItem := models.QueueItem{}
err := cur.Decode(&queueItem)
Expand Down
1 change: 1 addition & 0 deletions shipyard-controller/db/mongodb_log_repo.go
Expand Up @@ -80,6 +80,7 @@ func (mdbrepo *MongoDBLogRepo) GetLogEntries(params models.GetLogParams) (*model
}

cur, err := collection.Find(ctx, searchOptions, sortOptions)
defer closeCursor(ctx, cur)
if err != nil && err != mongo.ErrNoDocuments {
return nil, err
}
Expand Down
1 change: 1 addition & 0 deletions shipyard-controller/db/mongodb_state_repo.go
Expand Up @@ -83,6 +83,7 @@ func (mdbrepo *MongoDBStateRepo) FindSequenceStates(filter models.StateFilter) (
}

cur, err := collection.Find(ctx, searchOptions, sortOptions)
defer closeCursor(ctx, cur)
if err != nil && err != mongo.ErrNoDocuments {
return nil, err
}
Expand Down
1 change: 1 addition & 0 deletions shipyard-controller/db/mongodb_task_sequence_repo.go
Expand Up @@ -32,6 +32,7 @@ func (mdbrepo *TaskSequenceMongoDBRepo) GetTaskExecutions(project string, filter

collection := mdbrepo.getTaskSequenceCollection(project)
cur, err := collection.Find(ctx, mdbrepo.getTaskSequenceMappingSearchOptions(filter))
defer closeCursor(ctx, cur)
if err != nil && err != mongo.ErrNoDocuments {
return nil, err
}
Expand Down
3 changes: 2 additions & 1 deletion shipyard-controller/db/mongodb_uniform_repo.go
Expand Up @@ -328,6 +328,7 @@ func (mdbrepo *MongoDBUniformRepo) getCollectionAndContext() (*mongo.Collection,
func (mdbrepo *MongoDBUniformRepo) findIntegrations(searchParams models.GetUniformIntegrationsParams, collection *mongo.Collection, ctx context.Context) ([]models.Integration, error) {
searchOptions := mdbrepo.getSearchOptions(searchParams)
cur, err := collection.Find(ctx, searchOptions)
defer closeCursor(ctx, cur)
if err != nil && err != mongo.ErrNoDocuments {
return nil, err
}
Expand Down Expand Up @@ -361,7 +362,7 @@ func (mdbrepo *MongoDBUniformRepo) DeleteServiceFromSubscriptions(subscriptionNa
}

cur, err := collection.Find(ctx, filter)

defer closeCursor(ctx, cur)
if err != nil && err != mongo.ErrNoDocuments {
return err
}
Expand Down
7 changes: 6 additions & 1 deletion statistics-service/db/migrator.go
Expand Up @@ -63,10 +63,15 @@ func (m *Migrator) migrateBatch(ctx context.Context) (bool, error) {
Limit: crateInt64P(m.batchSize),
Skip: crateInt64P(skips),
})
defer func() {
if cur == nil {
return
}
cur.Close(ctx)
}()
if err != nil {
return false, err
}
defer cur.Close(ctx)

currBatchSize := 0
for cur.Next(ctx) {
Expand Down
8 changes: 7 additions & 1 deletion statistics-service/db/statistics_mongodb_repo.go
Expand Up @@ -42,12 +42,18 @@ func (s *StatisticsMongoDBRepo) GetStatistics(from, to time.Time) ([]operations.
}

cur, err := s.statsCollection.Find(ctx, searchOptions)
defer func() {
if cur == nil {
return
}
cur.Close(ctx)
}()
if err != nil {
return nil, err
}

result := []operations.Statistics{}
defer cur.Close(ctx)

if cur.RemainingBatchLength() == 0 {
return nil, ErrNoStatisticsFound
}
Expand Down

0 comments on commit ac11530

Please sign in to comment.