-
Notifications
You must be signed in to change notification settings - Fork 0
/
set.go
102 lines (85 loc) · 2.11 KB
/
set.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
package mongorec
import (
"context"
"fmt"
"github.com/ischenkx/kantoku/pkg/common/data/record"
"github.com/ischenkx/kantoku/pkg/common/data/record/ops"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo/options"
)
var _ record.Set[int] = (*Set[int])(nil)
type Set[Item any] struct {
storage *Storage[Item]
filter record.R
}
func newSet[Item any](storage *Storage[Item]) Set[Item] {
return Set[Item]{storage: storage}
}
func (set Set[Item]) Filter(rec record.R) record.Set[Item] {
newFilter := record.R{}
for key, value := range set.filter {
newFilter[key] = value
}
for key, value := range rec {
if oldKey, ok := newFilter[key]; ok {
value = ops.And(value, oldKey)
}
newFilter[key] = value
}
set.filter = newFilter
return set
}
func (set Set[Item]) Distinct(keys ...string) record.Cursor[Item] {
return DistinctCursor[Item]{
skip: 0,
limit: 0,
filter: set.filter,
keys: keys,
storage: set.storage,
}
}
func (set Set[Item]) Erase(ctx context.Context) error {
filter, err := makeRecordFilter(set.filter)
if err != nil {
return fmt.Errorf("failed to make a filter: %w", err)
}
_, err = set.storage.collection.DeleteMany(ctx, filter)
if err != nil {
return fmt.Errorf("failed to delete: %s", err)
}
return nil
}
func (set Set[Item]) Update(ctx context.Context, update, upsert record.R) error {
bsonUpdate := bson.M{"$set": record2bson(unwrapUpdateRecord(update))}
if upsert != nil {
bsonUpsert := record2bson(upsert)
bsonSetter := bsonUpdate["$set"].(bson.M)
for key := range bsonSetter {
delete(bsonUpsert, key)
}
bsonUpdate["$setOnInsert"] = bsonUpsert
}
filter, err := makeRecordFilter(set.filter)
if err != nil {
return fmt.Errorf("failed to make a filter: %w", err)
}
_, err = set.storage.collection.UpdateMany(ctx,
filter,
bsonUpdate,
options.
Update().
SetUpsert(upsert != nil),
)
if err != nil {
return fmt.Errorf("failed to update many: %s", err)
}
return nil
}
func (set Set[Item]) Cursor() record.Cursor[Item] {
return FilterCursor[Item]{
skip: 0,
limit: 0,
filter: set.filter,
storage: set.storage,
}
}