-
Notifications
You must be signed in to change notification settings - Fork 7
/
query.go
142 lines (116 loc) · 2.64 KB
/
query.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
package view
import (
"errors"
"fmt"
pool "github.com/libp2p/go-buffer-pool"
"github.com/libp2p/go-libp2p/core/peer"
api "github.com/wetware/pkg/api/cluster"
"github.com/wetware/pkg/cluster/routing"
)
/*
Selectors
*/
func All() Selector {
return func(s SelectorStruct) error {
s.SetAll()
return nil
}
}
func Match(index routing.Index) Selector {
return func(s SelectorStruct) error {
return bindIndex(s.NewMatch, index)
}
}
func From(index routing.Index) Selector {
return func(s SelectorStruct) error {
return bindIndex(s.NewFrom, index)
}
}
/*
Helpers
*/
func bindIndex(fn func() (api.View_Index, error), index routing.Index) error {
target, err := fn()
if err != nil {
return err
}
target.SetPrefix(index.Prefix())
switch index.String() {
case "id", "peer":
return bindPeer(target, index)
case "server":
return bindServer(target, index)
case "host":
return bindHost(target, index)
case "meta":
return bindMeta(target, index)
}
return fmt.Errorf("invalid index: %s", index)
}
func bindPeer(target api.View_Index, index routing.Index) error {
switch ix := index.(type) {
case routing.PeerIndex:
b, err := ix.PeerBytes()
if err == nil {
return target.SetPeer(string(b)) // TODO: unsafe.Pointer
}
return err
case interface{ Peer() peer.ID }:
return target.SetPeer(ix.Peer().String())
}
return errors.New("not a peer index")
}
func bindServer(target api.View_Index, index routing.Index) error {
switch ix := index.(type) {
case routing.ServerIndex:
b, err := ix.ServerBytes()
if err == nil {
return target.SetServer(b)
}
return err
case interface{ Server() routing.ID }:
index, err := ix.Server().MarshalText()
if err == nil {
defer pool.Put(index)
err = target.SetServer(index) // copies index
}
return err
}
return errors.New("not a peer index")
}
func bindHost(target api.View_Index, index routing.Index) error {
switch ix := index.(type) {
case routing.HostIndex:
b, err := ix.HostBytes()
if err == nil {
return target.SetHost(string(b)) // TODO: unsafe.Pointer
}
return err
case interface{ Host() (string, error) }:
id, err := ix.Host()
if err == nil {
err = target.SetHost(id)
}
return err
}
return errors.New("not a peer index")
}
func bindMeta(target api.View_Index, index routing.Index) error {
switch ix := index.(type) {
case routing.MetaIndex:
b, err := ix.MetaBytes()
if err == nil {
err = target.SetMeta(string(b))
}
return err
case interface {
MetaField() (routing.MetaField, error)
}:
f, err := ix.MetaField()
if err == nil {
err = target.SetMeta(f.String())
}
return err
}
return errors.New("not a metadata index")
}