From bcdf5c8d5ef2171aa3b9126008960ae4a96fef51 Mon Sep 17 00:00:00 2001 From: 0xbuidl <74317911+0x-buidl@users.noreply.github.com> Date: Tue, 5 Sep 2023 06:22:58 -0400 Subject: [PATCH] chore: cleanups --- internal/find.go | 146 +++++++++++++++++++++++++++++++++++++++++++- internal/options.go | 108 -------------------------------- model.go | 32 +--------- 3 files changed, 147 insertions(+), 139 deletions(-) delete mode 100644 internal/options.go diff --git a/internal/find.go b/internal/find.go index b09a54a..2c93406 100644 --- a/internal/find.go +++ b/internal/find.go @@ -10,10 +10,56 @@ import ( "go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/mongo" + "go.mongodb.org/mongo-driver/mongo/options" ) +type UnionFindOpts interface { + *mopt.FindOptions | *mopt.FindOneOptions +} + +func BuildPopulatePipeline[P UnionFindOpts]( + d any, q bson.M, opt P, +) (mongo.Pipeline, *options.AggregateOptions, error) { + pipelineOpts, aggrOpts, queryOpts := mergeFindOptsWithAggregatOpts(opt) + pipeline := append(mongo.Pipeline{bson.D{{Key: "$match", Value: q}}}, pipelineOpts...) + var wg sync.WaitGroup + var mu sync.Mutex + var err error + for _, pop := range *queryOpts.PopulateOption { + wg.Add(1) + func(pop *mopt.PopulateOptions) { + defer wg.Done() + mu.Lock() + if err != nil { + mu.Unlock() + return + } + mu.Unlock() + + pipe, pErr := getPopulateStages(d, pop) + + mu.Lock() + if pErr != nil { + err = pErr + mu.Unlock() + return + } + pipeline = append(pipeline, pipe...) + mu.Unlock() + }(pop) + } + wg.Wait() + + if err != nil { + return nil, nil, err + } + + return pipeline, aggrOpts, nil +} + // TODO: custom populate errors -func GetPopulateStages(doc any, opt *mopt.PopulateOptions) (mongo.Pipeline, error) { + +func getPopulateStages(doc any, opt *mopt.PopulateOptions) (mongo.Pipeline, error) { lookupPipeline := mongo.Pipeline{ bson.D{ { @@ -42,7 +88,7 @@ func GetPopulateStages(doc any, opt *mopt.PopulateOptions) (mongo.Pipeline, erro } mu.Unlock() - pipe, pErr := GetPopulateStages(opt.Schema, p) + pipe, pErr := getPopulateStages(opt.Schema, p) mu.Lock() if pErr != nil { @@ -184,6 +230,102 @@ func GetPopulateStages(doc any, opt *mopt.PopulateOptions) (mongo.Pipeline, erro return populatePipeline, nil } +// TODO: merge these options with aggregate options if possible +// if opt.AllowPartialResults != nil { +// } +// if opt.CursorType != nil { +// } +// if opt.Max != nil { +// } +// if opt.Min != nil { +// } +// if opt.NoCursorTimeout != nil { +// } +// if opt.OplogReplay != nil { +// } +// if opt.ReturnKey != nil { +// } +// if opt.ShowRecordID != nil { +// } +// if opt.Snapshot != nil { +// } + +func mergeFindOptsWithAggregatOpts[T UnionFindOpts]( + opt T, +) (mongo.Pipeline, *options.AggregateOptions, *mopt.QueryOptions) { + aggOpts, pipelineOpts, queryOpts := options.Aggregate(), mongo.Pipeline{}, mopt.Query() + switch opt := any(opt).(type) { + case *mopt.FindOptions: + if opt.AllowDiskUse != nil { + aggOpts.SetAllowDiskUse(*opt.AllowDiskUse) + } + if opt.BatchSize != nil { + aggOpts.SetBatchSize(*opt.BatchSize) + } + if opt.Collation != nil { + aggOpts.SetCollation(opt.Collation) + } + if opt.Comment != nil { + aggOpts.SetComment(*opt.Comment) + } + if opt.Hint != nil { + aggOpts.SetHint(opt.Hint) + } + if opt.MaxAwaitTime != nil { + aggOpts.SetMaxAwaitTime(*opt.MaxAwaitTime) + } + if opt.MaxTime != nil { + aggOpts.SetMaxTime(*opt.MaxTime) + } + if opt.Let != nil { + aggOpts.SetLet(opt.Let) + } + if opt.Sort != nil { + pipelineOpts = append(pipelineOpts, bson.D{{Key: "$sort", Value: opt.Sort}}) + } + if opt.Limit != nil { + pipelineOpts = append(pipelineOpts, bson.D{{Key: "$limit", Value: *opt.Limit}}) + } + if opt.Skip != nil { + pipelineOpts = append(pipelineOpts, bson.D{{Key: "$skip", Value: *opt.Skip}}) + } + if opt.Projection != nil { + pipelineOpts = append(pipelineOpts, bson.D{{Key: "$project", Value: opt.Projection}}) + } + queryOpts = opt.QueryOptions + case *mopt.FindOneOptions: + if opt.BatchSize != nil { + aggOpts.SetBatchSize(*opt.BatchSize) + } + if opt.Collation != nil { + aggOpts.SetCollation(opt.Collation) + } + if opt.Comment != nil { + aggOpts.SetComment(*opt.Comment) + } + if opt.Hint != nil { + aggOpts.SetHint(opt.Hint) + } + if opt.MaxAwaitTime != nil { + aggOpts.SetMaxAwaitTime(*opt.MaxAwaitTime) + } + if opt.MaxTime != nil { + aggOpts.SetMaxTime(*opt.MaxTime) + } + if opt.Sort != nil { + pipelineOpts = append(pipelineOpts, bson.D{{Key: "$sort", Value: opt.Sort}}) + } + if opt.Skip != nil { + pipelineOpts = append(pipelineOpts, bson.D{{Key: "$skip", Value: *opt.Skip}}) + } + if opt.Projection != nil { + pipelineOpts = append(pipelineOpts, bson.D{{Key: "$project", Value: opt.Projection}}) + } + queryOpts = opt.QueryOptions + } + return pipelineOpts, aggOpts, queryOpts +} + func getStructFields(t reflect.Type) map[string]reflect.StructField { fields := make(map[string]reflect.StructField) for i := 0; i < t.NumField(); i++ { diff --git a/internal/options.go b/internal/options.go deleted file mode 100644 index 5eab7a4..0000000 --- a/internal/options.go +++ /dev/null @@ -1,108 +0,0 @@ -package internal - -import ( - mopt "github.com/0x-buidl/mgs/options" - "go.mongodb.org/mongo-driver/bson" - "go.mongodb.org/mongo-driver/mongo" - "go.mongodb.org/mongo-driver/mongo/options" -) - -type UnionFindOpts interface { - *mopt.FindOptions | *mopt.FindOneOptions -} - -// TODO: merge these options with aggregate options if possible -// if opt.AllowPartialResults != nil { -// } -// if opt.CursorType != nil { -// } -// if opt.Max != nil { -// } -// if opt.Min != nil { -// } -// if opt.NoCursorTimeout != nil { -// } -// if opt.OplogReplay != nil { -// } -// if opt.ReturnKey != nil { -// } -// if opt.ShowRecordID != nil { -// } -// if opt.Snapshot != nil { -// } - -func MergeFindOptsWithAggregatOpts[T UnionFindOpts]( - opt T, -) (mongo.Pipeline, *options.AggregateOptions, *mopt.QueryOptions) { - aggOpts, pipelineOpts, queryOpts := options.Aggregate(), mongo.Pipeline{}, mopt.Query() - switch opt := any(opt).(type) { - case *mopt.FindOptions: - if opt.AllowDiskUse != nil { - aggOpts.SetAllowDiskUse(*opt.AllowDiskUse) - } - if opt.BatchSize != nil { - aggOpts.SetBatchSize(*opt.BatchSize) - } - if opt.Collation != nil { - aggOpts.SetCollation(opt.Collation) - } - if opt.Comment != nil { - aggOpts.SetComment(*opt.Comment) - } - if opt.Hint != nil { - aggOpts.SetHint(opt.Hint) - } - if opt.MaxAwaitTime != nil { - aggOpts.SetMaxAwaitTime(*opt.MaxAwaitTime) - } - if opt.MaxTime != nil { - aggOpts.SetMaxTime(*opt.MaxTime) - } - if opt.Let != nil { - aggOpts.SetLet(opt.Let) - } - if opt.Sort != nil { - pipelineOpts = append(pipelineOpts, bson.D{{Key: "$sort", Value: opt.Sort}}) - } - if opt.Limit != nil { - pipelineOpts = append(pipelineOpts, bson.D{{Key: "$limit", Value: *opt.Limit}}) - } - if opt.Skip != nil { - pipelineOpts = append(pipelineOpts, bson.D{{Key: "$skip", Value: *opt.Skip}}) - } - if opt.Projection != nil { - pipelineOpts = append(pipelineOpts, bson.D{{Key: "$project", Value: opt.Projection}}) - } - queryOpts = opt.QueryOptions - case *mopt.FindOneOptions: - if opt.BatchSize != nil { - aggOpts.SetBatchSize(*opt.BatchSize) - } - if opt.Collation != nil { - aggOpts.SetCollation(opt.Collation) - } - if opt.Comment != nil { - aggOpts.SetComment(*opt.Comment) - } - if opt.Hint != nil { - aggOpts.SetHint(opt.Hint) - } - if opt.MaxAwaitTime != nil { - aggOpts.SetMaxAwaitTime(*opt.MaxAwaitTime) - } - if opt.MaxTime != nil { - aggOpts.SetMaxTime(*opt.MaxTime) - } - if opt.Sort != nil { - pipelineOpts = append(pipelineOpts, bson.D{{Key: "$sort", Value: opt.Sort}}) - } - if opt.Skip != nil { - pipelineOpts = append(pipelineOpts, bson.D{{Key: "$skip", Value: *opt.Skip}}) - } - if opt.Projection != nil { - pipelineOpts = append(pipelineOpts, bson.D{{Key: "$project", Value: opt.Projection}}) - } - queryOpts = opt.QueryOptions - } - return pipelineOpts, aggOpts, queryOpts -} diff --git a/model.go b/model.go index ccfdf17..9898077 100644 --- a/model.go +++ b/model.go @@ -3,7 +3,6 @@ package mgs import ( "context" "reflect" - "sync" "time" int "github.com/0x-buidl/mgs/internal" @@ -502,35 +501,10 @@ func findWithPopulate[U int.UnionFindOpts, T Schema, P IDefaultSchema]( ctx context.Context, c *mongo.Collection, q bson.M, d T, opt U, ) ([]*Document[T, P], error) { - pipelineOpts, aggrOpts, queryOpts := int.MergeFindOptsWithAggregatOpts(opt) - pipeline := append(mongo.Pipeline{bson.D{{Key: "$match", Value: q}}}, pipelineOpts...) - var wg sync.WaitGroup - var mu sync.Mutex - var err error - for _, pop := range *queryOpts.PopulateOption { - wg.Add(1) - func(pop *mopt.PopulateOptions) { - defer wg.Done() - mu.Lock() - if err != nil { - mu.Unlock() - return - } - mu.Unlock() - - pipe, pErr := int.GetPopulateStages(d, pop) - - mu.Lock() - if pErr != nil { - err = pErr - mu.Unlock() - return - } - pipeline = append(pipeline, pipe...) - mu.Unlock() - }(pop) + pipeline, aggrOpts, err := int.BuildPopulatePipeline(d, q, opt) + if err != nil { + return nil, err } - wg.Wait() docs := make([]*Document[T, P], 0) cursor, err := c.Aggregate(ctx, pipeline, aggrOpts)