/
mongodb.go
83 lines (76 loc) · 3.79 KB
/
mongodb.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
//go:generate mockgen -source=${GOFILE} -package=${GOPACKAGE} -destination=${GOPACKAGE}_mock.go
package mongodb
import (
"context"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
"go.mongodb.org/mongo-driver/mongo/readpref"
)
type (
Database interface {
Name() string
URI() string
Collection(name string) Collection
}
Collection interface {
Name() string
InsertOne(ctx context.Context, document interface{}, opts ...*options.InsertOneOptions) (*mongo.InsertOneResult, error)
InsertMany(ctx context.Context, documents []interface{}, opts ...*options.InsertManyOptions) (*mongo.InsertManyResult, error)
DeleteOne(ctx context.Context, filter interface{}, opts ...*options.DeleteOptions) (*mongo.DeleteResult, error)
DeleteMany(ctx context.Context, filter interface{}, opts ...*options.DeleteOptions) (*mongo.DeleteResult, error)
UpdateByID(ctx context.Context, id interface{}, update interface{}, opts ...*options.UpdateOptions) (*mongo.UpdateResult, error)
UpdateOne(ctx context.Context, filter interface{}, update interface{}, opts ...*options.UpdateOptions) (*mongo.UpdateResult, error)
UpdateMany(ctx context.Context, filter interface{}, update interface{}, opts ...*options.UpdateOptions) (*mongo.UpdateResult, error)
ReplaceOne(ctx context.Context, filter interface{}, replacement interface{}, opts ...*options.ReplaceOptions) (*mongo.UpdateResult, error)
Aggregate(ctx context.Context, pipeline interface{}, opts ...*options.AggregateOptions) (*mongo.Cursor, error)
CountDocuments(ctx context.Context, filter interface{}, opts ...*options.CountOptions) (int64, error)
EstimatedDocumentCount(ctx context.Context, opts ...*options.EstimatedDocumentCountOptions) (int64, error)
Distinct(ctx context.Context, fieldName string, filter interface{}, opts ...*options.DistinctOptions) ([]interface{}, error)
Find(ctx context.Context, filter interface{}, opts ...*options.FindOptions) (cur *mongo.Cursor, err error)
FindOne(ctx context.Context, filter interface{}, opts ...*options.FindOneOptions) *mongo.SingleResult
FindOneAndDelete(ctx context.Context, filter interface{}, opts ...*options.FindOneAndDeleteOptions) *mongo.SingleResult
FindOneAndReplace(ctx context.Context, filter interface{}, replacement interface{}, opts ...*options.FindOneAndReplaceOptions) *mongo.SingleResult
FindOneAndUpdate(ctx context.Context, filter interface{}, update interface{}, opts ...*options.FindOneAndUpdateOptions) *mongo.SingleResult
Watch(ctx context.Context, pipeline interface{}, opts ...*options.ChangeStreamOptions) (*mongo.ChangeStream, error)
BulkWrite(ctx context.Context, models []mongo.WriteModel, opts ...*options.BulkWriteOptions) (*mongo.BulkWriteResult, error)
Indexes() mongo.IndexView
Drop(ctx context.Context) error
}
mongodbImpl struct {
client *mongo.Client
database *mongo.Database
uri string
}
)
func New(opts Options) (db Database, err error) {
opts.SetDefaults()
ctx, cancel := context.WithTimeout(context.Background(), opts.CtxTimeout)
defer cancel()
connString, err := opts.ExtractConnString()
if err != nil {
return nil, ErrInvalidConnString(err)
}
clientOpts := options.Client().ApplyURI(opts.URI)
if opts.IsReader {
clientOpts.SetReadPreference(readpref.SecondaryPreferred())
}
var dbImpl mongodbImpl
if dbImpl.client, err = mongo.Connect(ctx, clientOpts); err != nil {
return nil, ErrCouldNotConnect(err)
}
if err = dbImpl.client.Ping(ctx, readpref.Primary()); err != nil {
return nil, ErrCantPing(err)
}
dbImpl.uri = clientOpts.GetURI()
dbImpl.database = dbImpl.client.Database(connString.Database)
return dbImpl, nil
}
func (i mongodbImpl) Name() string {
return i.database.Name()
}
func (i mongodbImpl) Collection(name string) Collection {
return i.database.Collection(name)
}
func (i mongodbImpl) URI() string {
return i.uri
}