This repository has been archived by the owner on Jan 27, 2021. It is now read-only.
/
mongo.go
93 lines (76 loc) · 2.35 KB
/
mongo.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
package mongo
import (
"context"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
"github.com/whhe/mongo-replicator/model"
"github.com/whhe/mongo-replicator/operator"
)
type mongoOperator struct {
*mongo.Client
}
// NewOperator creates a mongoOperator instance.
func NewOperator(uri string) (operator.Operator, error) {
client, err := mongo.Connect(context.TODO(), options.Client().ApplyURI(uri))
if err != nil {
return nil, err
}
return &mongoOperator{client}, nil
}
func (m *mongoOperator) Insert(e model.ChangeEvent) error {
_, err := m.Database(e.Namespace.Database).
Collection(e.Namespace.Collection).
InsertOne(context.Background(), e.FullDocument)
return err
}
func (m *mongoOperator) Delete(e model.ChangeEvent) error {
_, err := m.Database(e.Namespace.Database).
Collection(e.Namespace.Collection).
DeleteOne(context.Background(), e.DocumentKey)
return err
}
func (m *mongoOperator) Replace(e model.ChangeEvent) error {
_, err := m.Database(e.Namespace.Database).
Collection(e.Namespace.Collection).
ReplaceOne(context.Background(), e.DocumentKey, e.FullDocument)
return err
}
func (m *mongoOperator) Update(e model.ChangeEvent) error {
update := bson.M{}
if len(e.UpdateDescription.UpdatedFields) != 0 {
update["$set"] = e.UpdateDescription.UpdatedFields
}
if len(e.UpdateDescription.RemovedFields) != 0 {
unset := bson.M{}
for _, field := range e.UpdateDescription.RemovedFields {
unset[field] = ""
}
update["$unset"] = unset
}
_, err := m.Database(e.Namespace.Database).
Collection(e.Namespace.Collection).
UpdateOne(context.Background(), e.DocumentKey, update)
return err
}
func (m *mongoOperator) Drop(e model.ChangeEvent) error {
return m.Database(e.Namespace.Database).
Collection(e.Namespace.Collection).
Drop(context.Background())
}
func (m *mongoOperator) Rename(e model.ChangeEvent) error {
from := e.Namespace.String()
to := e.To.String()
result := m.Database("admin").
RunCommand(
context.Background(),
bson.D{{Key: "renameCollection", Value: from}, {Key: "to", Value: to}},
)
return result.Err()
}
func (m *mongoOperator) DropDatabase(e model.ChangeEvent) error {
return m.Database(e.Namespace.Database).Drop(context.Background())
}
func (m *mongoOperator) Invalidate(e model.ChangeEvent) error {
panic("watch invalidate")
}