forked from influxdata/influxdb
-
Notifications
You must be signed in to change notification settings - Fork 0
/
shard_mapper.go
152 lines (126 loc) · 3.18 KB
/
shard_mapper.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
package cluster
import (
"encoding/json"
"io"
"net/http"
"net/url"
"strconv"
"github.com/influxdb/influxdb/meta"
"github.com/influxdb/influxdb/tsdb"
)
const (
MAX_MAP_RESPONSE_SIZE = 1024 * 1024 * 1024
)
// ShardMapper is responsible for providing mappers for requested shards. It is
// responsible for creating those mappers from the local store, or reaching
// out to another node on the cluster.
type ShardMapper struct {
MetaStore interface {
NodeID() uint64
Node(id uint64) (ni *meta.NodeInfo, err error)
}
TSDBStore interface {
CreateMapper(shardID uint64, query string, chunkSize int) (tsdb.Mapper, error)
}
}
// NewShardMapper returns a mapper of local and remote shards.
func NewShardMapper() *ShardMapper {
return &ShardMapper{}
}
// CreateMapper returns a Mapper for the given shard ID.
func (r *ShardMapper) CreateMapper(sh meta.ShardInfo, stmt string, chunkSize int) (tsdb.Mapper, error) {
var err error
var m tsdb.Mapper
if sh.OwnedBy(r.MetaStore.NodeID()) {
m, err = r.TSDBStore.CreateMapper(sh.ID, stmt, chunkSize)
if err != nil {
return nil, err
}
} else {
rm := NewRemoteMaper(sh.OwnerIDs[0], sh.ID, stmt, chunkSize)
rm.MetaStore = r.MetaStore
m = rm
}
return m, nil
}
// RemoteMapper implements the tsdb.Mapper interface. It connects to a remote node,
// sends a query, and interprets the stream of data that comes back.
type RemoteMapper struct {
MetaStore interface {
Node(id uint64) (ni *meta.NodeInfo, err error)
}
nodeID uint64
shardID uint64
stmt string
chunkSize int
tagsets []string
resp *http.Response
decoder *json.Decoder
buffer tsdb.MapperResponse
bufferSent bool
}
// NewRemoteMaper returns a new remote mapper.
func NewRemoteMaper(nodeID, shardID uint64, stmt string, chunkSize int) *RemoteMapper {
return &RemoteMapper{
nodeID: nodeID,
shardID: shardID,
stmt: stmt,
chunkSize: chunkSize,
}
}
// Open connects to the remote node and starts receiving data.
func (r *RemoteMapper) Open() error {
node, err := r.MetaStore.Node(r.nodeID)
if err != nil {
return err
}
v := url.Values{}
v.Set("shard", string(r.shardID))
v.Set("q", r.stmt)
if r.chunkSize != 0 {
v.Set("chunksize", strconv.Itoa(r.chunkSize))
}
u := url.URL{
Scheme: "http",
Host: node.Host,
RawQuery: v.Encode(),
Path: "/shard_mapping",
}
resp, err := http.Get(u.String())
if err != nil {
return err
}
r.resp = resp
// Set up the decoder.
lr := io.LimitReader(r.resp.Body, MAX_MAP_RESPONSE_SIZE)
r.decoder = json.NewDecoder(lr)
// Decode the first response to get the TagSets.
err = r.decoder.Decode(&r.buffer)
if err != nil {
return err
}
r.tagsets = r.buffer.TagSets
return nil
}
func (r *RemoteMapper) TagSets() []string {
return r.tagsets
}
// NextChunk returns the next chunk read from the remote node to the client.
func (r *RemoteMapper) NextChunk() (interface{}, error) {
if !r.bufferSent {
r.bufferSent = true
return r.buffer.Data, nil
}
mr := tsdb.MapperResponse{}
err := r.decoder.Decode(&mr)
if err != nil {
return nil, err
}
return mr.Data, nil
}
// Close the response body
func (r *RemoteMapper) Close() {
if r.resp != nil && r.resp.Body != nil {
r.resp.Body.Close()
}
}