This repository has been archived by the owner on Feb 16, 2018. It is now read-only.
forked from tpjg/goriakpbc
/
mapreduce.go
115 lines (103 loc) · 2.52 KB
/
mapreduce.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
package riak
import (
"encoding/json"
"github.com/cupcake/go-riak/pb"
)
// An object to build a MapReduce job similar to how the Ruby client can
// build it by adding different stages.
type MapReduce struct {
client *Client
inputs [][]string
phases []string
request string
}
func (c *Client) MapReduce() *MapReduce {
return &MapReduce{client: c, inputs: make([][]string, 0), phases: make([]string, 0), request: ""}
}
func (mr *MapReduce) Add(bucket string, key string) {
mr.inputs = append(mr.inputs, []string{bucket, key})
}
func (mr *MapReduce) LinkBucket(name string, keep bool) {
link := `{"link":{"bucket":"` + name + `", "tag":"_","keep":`
if keep {
link = link + "true}}"
} else {
link = link + "false}}"
}
mr.phases = append(mr.phases, link)
}
func (mr *MapReduce) Map(fun string, keep bool) {
m := `{"map":{"language":"javascript","keep":`
if keep {
m = m + "true,"
} else {
m = m + "false,"
}
m = m + `"source":"` + fun + `"}}`
mr.phases = append(mr.phases, m)
}
func (mr *MapReduce) MapErlang(module string, fun string, keep bool) {
m := `{"map":{"language":"erlang","module":"` + module + `","function":"` + fun + `","keep":`
if keep {
m = m + `true`
} else {
m = m + `false`
}
m = m + `}}`
mr.phases = append(mr.phases, m)
}
func (mr *MapReduce) MapObjectValue(keep bool) {
//{"map":{"language":"erlang","module":"riak_kv_mapreduce","function":"map_object_value"}}
mr.MapErlang("riak_kv_mapreduce", "map_object_value", keep)
}
// Generate the Query string
func (mr *MapReduce) Query() (query []byte, err error) {
inputs, err := json.Marshal(mr.inputs)
if err != nil {
return nil, err
}
q := `{"inputs":` + string(inputs) + `, "query":[`
for i, s := range mr.phases {
if i > 0 {
q = q + ","
}
q = q + s
}
q = q + "]}"
return []byte(q), nil
}
func (mr *MapReduce) Run() (resp [][]byte, err error) {
query, err := mr.Query()
if err != nil {
return nil, err
}
req := &pb.RpbMapRedReq{
Request: query,
ContentType: []byte("application/json"),
}
err, conn := mr.client.request(req, rpbMapRedReq)
if err != nil {
return nil, err
}
resp, err = mr.client.mr_response(conn)
if err != nil {
return nil, err
}
return
}
// Run a MapReduce query
func (c *Client) RunMapReduce(query string) (resp [][]byte, err error) {
req := &pb.RpbMapRedReq{
Request: []byte(query),
ContentType: []byte("application/json"),
}
err, conn := c.request(req, rpbMapRedReq)
if err != nil {
return nil, err
}
resp, err = c.mr_response(conn)
if err != nil {
return nil, err
}
return
}