forked from influxdata/influxdb
/
shard_mapper.go
207 lines (173 loc) · 4.72 KB
/
shard_mapper.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
package cluster
import (
"encoding/json"
"fmt"
"io"
"math/rand"
"net"
"time"
"github.com/influxdb/influxdb/meta"
"github.com/influxdb/influxdb/tsdb"
"gopkg.in/fatih/pool.v2"
)
// ShardMapper is responsible for providing mappers for requested shards. It is
// responsible for creating those mappers from the local store, or reaching
// out to another node on the cluster.
type ShardMapper struct {
ForceRemoteMapping bool // All shards treated as remote. Useful for testing.
MetaStore interface {
NodeID() uint64
Node(id uint64) (ni *meta.NodeInfo, err error)
}
TSDBStore interface {
CreateMapper(shardID uint64, query string, chunkSize int) (tsdb.Mapper, error)
}
timeout time.Duration
pool *clientPool
}
// NewShardMapper returns a mapper of local and remote shards.
func NewShardMapper(timeout time.Duration) *ShardMapper {
return &ShardMapper{
pool: newClientPool(),
timeout: timeout,
}
}
// CreateMapper returns a Mapper for the given shard ID.
func (s *ShardMapper) CreateMapper(sh meta.ShardInfo, stmt string, chunkSize int) (tsdb.Mapper, error) {
var err error
var m tsdb.Mapper
if sh.OwnedBy(s.MetaStore.NodeID()) && !s.ForceRemoteMapping {
m, err = s.TSDBStore.CreateMapper(sh.ID, stmt, chunkSize)
if err != nil {
return nil, err
}
} else {
// Pick a node in a pseudo-random manner.
conn, err := s.dial(sh.OwnerIDs[rand.Intn(len(sh.OwnerIDs))])
if err != nil {
return nil, err
}
conn.SetDeadline(time.Now().Add(s.timeout))
rm := NewRemoteMapper(conn.(*pool.PoolConn), sh.ID, stmt, chunkSize)
m = rm
}
return m, nil
}
func (s *ShardMapper) dial(nodeID uint64) (net.Conn, error) {
// If we don't have a connection pool for that addr yet, create one
_, ok := s.pool.getPool(nodeID)
if !ok {
factory := &connFactory{nodeID: nodeID, clientPool: s.pool, timeout: s.timeout}
factory.metaStore = s.MetaStore
p, err := pool.NewChannelPool(1, 3, factory.dial)
if err != nil {
return nil, err
}
s.pool.setPool(nodeID, p)
}
return s.pool.conn(nodeID)
}
type remoteShardConn interface {
io.ReadWriter
Close() error
MarkUnusable()
}
// RemoteMapper implements the tsdb.Mapper interface. It connects to a remote node,
// sends a query, and interprets the stream of data that comes back.
type RemoteMapper struct {
shardID uint64
stmt string
chunkSize int
tagsets []string
fields []string
conn remoteShardConn
bufferedResponse *MapShardResponse
}
// NewRemoteMapper returns a new remote mapper using the given connection.
func NewRemoteMapper(c remoteShardConn, shardID uint64, stmt string, chunkSize int) *RemoteMapper {
return &RemoteMapper{
conn: c,
shardID: shardID,
stmt: stmt,
chunkSize: chunkSize,
}
}
// Open connects to the remote node and starts receiving data.
func (r *RemoteMapper) Open() (err error) {
defer func() {
if err != nil {
r.conn.Close()
}
}()
// Build Map request.
var request MapShardRequest
request.SetShardID(r.shardID)
request.SetQuery(r.stmt)
request.SetChunkSize(int32(r.chunkSize))
// Marshal into protocol buffers.
buf, err := request.MarshalBinary()
if err != nil {
return err
}
// Write request.
if err := WriteTLV(r.conn, mapShardRequestMessage, buf); err != nil {
r.conn.MarkUnusable()
return err
}
// Read the response.
_, buf, err = ReadTLV(r.conn)
if err != nil {
r.conn.MarkUnusable()
return err
}
// Unmarshal response.
r.bufferedResponse = &MapShardResponse{}
if err := r.bufferedResponse.UnmarshalBinary(buf); err != nil {
return err
}
if r.bufferedResponse.Code() != 0 {
return fmt.Errorf("error code %d: %s", r.bufferedResponse.Code(), r.bufferedResponse.Message())
}
// Decode the first response to get the TagSets.
r.tagsets = r.bufferedResponse.TagSets()
return nil
}
func (r *RemoteMapper) TagSets() []string {
return r.tagsets
}
func (r *RemoteMapper) Fields() []string {
return r.fields
}
// NextChunk returns the next chunk read from the remote node to the client.
func (r *RemoteMapper) NextChunk() (chunk interface{}, err error) {
output := &tsdb.MapperOutput{}
var response *MapShardResponse
if r.bufferedResponse != nil {
response = r.bufferedResponse
r.bufferedResponse = nil
} else {
response = &MapShardResponse{}
// Read the response.
_, buf, err := ReadTLV(r.conn)
if err != nil {
r.conn.MarkUnusable()
return nil, err
}
// Unmarshal response.
if err := response.UnmarshalBinary(buf); err != nil {
return nil, err
}
if response.Code() != 0 {
return nil, fmt.Errorf("error code %d: %s", response.Code(), response.Message())
}
}
if response.Data() == nil {
return nil, nil
}
err = json.Unmarshal(response.Data(), output)
return output, err
}
// Close the Mapper
func (r *RemoteMapper) Close() {
r.conn.Close()
}