Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

Move protobuf code into a separate package

  • Loading branch information...
commit f5efee47808984ad1a0a9809aa8b80bf34ec4043 1 parent 144a0c5
Jonathan Rudenberg titanous authored
36 bucket.go
View
@@ -1,5 +1,9 @@
package riak
+import (
+ "github.com/tpjg/goriakpbc/pb"
+)
+
// Implements access to a bucket and its properties
type Bucket struct {
name string
@@ -10,7 +14,7 @@ type Bucket struct {
// Return a bucket
func (c *Client) Bucket(name string) (*Bucket, error) {
- req := &RpbGetBucketReq{
+ req := &pb.RpbGetBucketReq{
Bucket: []byte(name),
}
err, conn := c.request(req, "RpbGetBucketReq")
@@ -18,7 +22,7 @@ func (c *Client) Bucket(name string) (*Bucket, error) {
if err != nil {
return nil, err
}
- resp := &RpbGetBucketResp{}
+ resp := &pb.RpbGetBucketResp{}
err = c.response(conn, resp)
if err != nil {
@@ -40,8 +44,8 @@ func (b *Bucket) AllowMult() bool {
// Set the nval property of a bucket
func (b *Bucket) SetNVal(nval uint32) (err error) {
- props := &RpbBucketProps{NVal: &nval, AllowMult: &b.allowMult}
- req := &RpbSetBucketReq{Bucket: []byte(b.name), Props: props}
+ props := &pb.RpbBucketProps{NVal: &nval, AllowMult: &b.allowMult}
+ req := &pb.RpbSetBucketReq{Bucket: []byte(b.name), Props: props}
err, conn := b.client.request(req, "RpbSetBucketReq")
if err != nil {
return err
@@ -56,8 +60,8 @@ func (b *Bucket) SetNVal(nval uint32) (err error) {
// Set the allowMult property of a bucket
func (b *Bucket) SetAllowMult(allowMult bool) (err error) {
- props := &RpbBucketProps{NVal: &b.nval, AllowMult: &allowMult}
- req := &RpbSetBucketReq{Bucket: []byte(b.name), Props: props}
+ props := &pb.RpbBucketProps{NVal: &b.nval, AllowMult: &allowMult}
+ req := &pb.RpbSetBucketReq{Bucket: []byte(b.name), Props: props}
err, conn := b.client.request(req, "RpbSetBucketReq")
if err != nil {
return err
@@ -72,7 +76,7 @@ func (b *Bucket) SetAllowMult(allowMult bool) (err error) {
// Delete a key/value from the bucket
func (b *Bucket) Delete(key string, options ...map[string]uint32) (err error) {
- req := &RpbDelReq{Bucket: []byte(b.name), Key: []byte(key)}
+ req := &pb.RpbDelReq{Bucket: []byte(b.name), Key: []byte(key)}
for _, omap := range options {
for k, v := range omap {
switch k {
@@ -115,7 +119,7 @@ func (b *Bucket) New(key string, options ...map[string]uint32) *RObject {
// Test if an object exists
func (b *Bucket) Exists(key string, options ...map[string]uint32) (exists bool, err error) {
t := true
- req := &RpbGetReq{
+ req := &pb.RpbGetReq{
Bucket: []byte(b.name),
Key: []byte(key),
NotfoundOk: &t,
@@ -135,7 +139,7 @@ func (b *Bucket) Exists(key string, options ...map[string]uint32) (exists bool,
if err != nil {
return false, err
}
- resp := &RpbGetResp{}
+ resp := &pb.RpbGetResp{}
err = b.client.response(conn, resp)
if err != nil {
return false, err
@@ -145,13 +149,13 @@ func (b *Bucket) Exists(key string, options ...map[string]uint32) (exists bool,
// Return a list of keys using the index for a single key
func (b *Bucket) IndexQuery(index string, key string) (keys []string, err error) {
- req := &RpbIndexReq{Bucket: []byte(b.name), Index: []byte(index),
- Qtype: RpbIndexReq_eq.Enum(), Key: []byte(key)}
+ req := &pb.RpbIndexReq{Bucket: []byte(b.name), Index: []byte(index),
+ Qtype: pb.RpbIndexReq_eq.Enum(), Key: []byte(key)}
err, conn := b.client.request(req, "RpbIndexReq")
if err != nil {
return nil, err
}
- resp := &RpbIndexResp{}
+ resp := &pb.RpbIndexResp{}
err = b.client.response(conn, resp)
if err != nil {
return nil, err
@@ -165,14 +169,14 @@ func (b *Bucket) IndexQuery(index string, key string) (keys []string, err error)
// Return a list of keys using the index range query
func (b *Bucket) IndexQueryRange(index string, min string, max string) (keys []string, err error) {
- req := &RpbIndexReq{Bucket: []byte(b.name), Index: []byte(index),
- Qtype: RpbIndexReq_range.Enum(),
+ req := &pb.RpbIndexReq{Bucket: []byte(b.name), Index: []byte(index),
+ Qtype: pb.RpbIndexReq_range.Enum(),
RangeMin: []byte(min), RangeMax: []byte(max)}
err, conn := b.client.request(req, "RpbIndexReq")
if err != nil {
return nil, err
}
- resp := &RpbIndexResp{}
+ resp := &pb.RpbIndexResp{}
err = b.client.response(conn, resp)
if err != nil {
return nil, err
@@ -186,7 +190,7 @@ func (b *Bucket) IndexQueryRange(index string, min string, max string) (keys []s
// List all keys from bucket
func (b *Bucket) ListKeys() (response [][]byte, err error) {
- req := &RpbListKeysReq{Bucket: []byte(b.name)}
+ req := &pb.RpbListKeysReq{Bucket: []byte(b.name)}
err, conn := b.client.request(req, "RpbListKeysReq")
if err != nil {
26 client.go
View
@@ -5,13 +5,15 @@ It implements a connection to Riak using protobuf.
package riak
import (
- "code.google.com/p/goprotobuf/proto"
"errors"
"fmt"
"io"
"net"
"syscall"
"time"
+
+ "code.google.com/p/goprotobuf/proto"
+ "github.com/tpjg/goriakpbc/pb"
)
/*
@@ -210,7 +212,7 @@ func (c *Client) response(conn *net.TCPConn, response proto.Message) (err error)
msgcode := msgbuf[4]
switch msgcode {
case messageCodes["RpbErrorResp"]:
- errResp := &RpbErrorResp{}
+ errResp := &pb.RpbErrorResp{}
err = proto.Unmarshal(pbmsg, errResp)
if err == nil {
err = errors.New(string(errResp.Errmsg))
@@ -247,7 +249,7 @@ func (c *Client) mr_response(conn *net.TCPConn) (response [][]byte, err error) {
// Deserialize, by default the calling method should provide the expected RbpXXXResp
msgcode := msgbuf[4]
if msgcode == messageCodes["RpbMapRedResp"] {
- partial := &RpbMapRedResp{}
+ partial := &pb.RpbMapRedResp{}
err = proto.Unmarshal(pbmsg, partial)
if err != nil {
return nil, err
@@ -260,7 +262,7 @@ func (c *Client) mr_response(conn *net.TCPConn) (response [][]byte, err error) {
}
for done == nil {
- partial = &RpbMapRedResp{}
+ partial = &pb.RpbMapRedResp{}
// Read another response
msgbuf, err = c.read(conn, 5)
if err != nil {
@@ -288,7 +290,7 @@ func (c *Client) mr_response(conn *net.TCPConn) (response [][]byte, err error) {
response = resp
return
} else if msgcode == messageCodes["RpbErrorResp"] {
- errResp := &RpbErrorResp{}
+ errResp := &pb.RpbErrorResp{}
err = proto.Unmarshal(pbmsg, errResp)
if err == nil {
err = errors.New(string(errResp.Errmsg))
@@ -303,11 +305,11 @@ func (c *Client) mr_response(conn *net.TCPConn) (response [][]byte, err error) {
}
// Deserializes the data from possibly multiple packets,
-// currently only for RpbListKeysResp.
+// currently only for pb.RpbListKeysResp.
func (c *Client) mp_response(conn *net.TCPConn) (response [][]byte, err error) {
defer c.releaseConn(conn)
var (
- partial *RpbListKeysResp
+ partial *pb.RpbListKeysResp
msgcode byte
)
@@ -332,7 +334,7 @@ func (c *Client) mp_response(conn *net.TCPConn) (response [][]byte, err error) {
msgcode = msgbuf[4]
if msgcode == messageCodes["RpbListKeysResp"] {
- partial = &RpbListKeysResp{}
+ partial = &pb.RpbListKeysResp{}
err = proto.Unmarshal(pbmsg, partial)
if err != nil {
return nil, err
@@ -344,7 +346,7 @@ func (c *Client) mp_response(conn *net.TCPConn) (response [][]byte, err error) {
break
}
} else if msgcode == messageCodes["RpbErrorResp"] {
- errResp := &RpbErrorResp{}
+ errResp := &pb.RpbErrorResp{}
err = proto.Unmarshal(pbmsg, errResp)
if err == nil {
err = errors.New(string(errResp.Errmsg))
@@ -384,7 +386,7 @@ func (c *Client) Id() (id string, err error) {
return id, err
}
c.write(conn, msg)
- resp := &RpbGetClientIdResp{}
+ resp := &pb.RpbGetClientIdResp{}
err = c.response(conn, resp)
if err == nil {
id = string(resp.ClientId)
@@ -394,7 +396,7 @@ func (c *Client) Id() (id string, err error) {
// Set the client Id
func (c *Client) SetId(id string) (err error) {
- req := &RpbSetClientIdReq{ClientId: []byte(id)}
+ req := &pb.RpbSetClientIdReq{ClientId: []byte(id)}
err, conn := c.request(req, "RpbSetClientIdReq")
if err != nil {
return err
@@ -411,7 +413,7 @@ func (c *Client) ServerVersion() (node string, version string, err error) {
return node, version, err
}
c.write(conn, msg)
- resp := &RpbGetServerInfoResp{}
+ resp := &pb.RpbGetServerInfoResp{}
err = c.response(conn, resp)
if err == nil {
node = string(resp.Node)
6 mapreduce.go
View
@@ -2,6 +2,8 @@ package riak
import (
"encoding/json"
+
+ "github.com/tpjg/goriakpbc/pb"
)
// An object to build a MapReduce job similar to how the Ruby client can
@@ -80,7 +82,7 @@ func (mr *MapReduce) Run() (resp [][]byte, err error) {
if err != nil {
return nil, err
}
- req := &RpbMapRedReq{
+ req := &pb.RpbMapRedReq{
Request: query,
ContentType: []byte("application/json"),
}
@@ -97,7 +99,7 @@ func (mr *MapReduce) Run() (resp [][]byte, err error) {
// Run a MapReduce query
func (c *Client) RunMapReduce(query string) (resp [][]byte, err error) {
- req := &RpbMapRedReq{
+ req := &pb.RpbMapRedReq{
Request: []byte(query),
ContentType: []byte("application/json"),
}
2  riak.pb.go → pb/riak.pb.go
View
@@ -2,7 +2,7 @@
// source: riak.proto
// DO NOT EDIT!
-package riak
+package pb
import proto "code.google.com/p/goprotobuf/proto"
import json "encoding/json"
0  riak.proto → pb/riak.proto
View
File renamed without changes
36 robject.go
View
@@ -2,6 +2,8 @@ package riak
import (
"errors"
+
+ "github.com/tpjg/goriakpbc/pb"
)
// A Riak link
@@ -42,11 +44,11 @@ var (
// Store an RObject
func (obj *RObject) Store() (err error) {
- // Create base RpbPutReq
+ // Create base pb.RpbPutReq
t := true
- req := &RpbPutReq{
+ req := &pb.RpbPutReq{
Bucket: []byte(obj.Bucket.name),
- Content: &RpbContent{
+ Content: &pb.RpbContent{
Value: []byte(obj.Data),
ContentType: []byte(obj.ContentType),
},
@@ -59,24 +61,24 @@ func (obj *RObject) Store() (err error) {
req.Vclock = obj.Vclock
}
// Add the links
- req.Content.Links = make([]*RpbLink, len(obj.Links))
+ req.Content.Links = make([]*pb.RpbLink, len(obj.Links))
for i, v := range obj.Links {
- req.Content.Links[i] = &RpbLink{Bucket: []byte(v.Bucket),
+ req.Content.Links[i] = &pb.RpbLink{Bucket: []byte(v.Bucket),
Key: []byte(v.Key),
Tag: []byte(v.Tag)}
}
// Add the user metadata
- req.Content.Usermeta = make([]*RpbPair, len(obj.Meta))
+ req.Content.Usermeta = make([]*pb.RpbPair, len(obj.Meta))
i := 0
for k, v := range obj.Meta {
- req.Content.Usermeta[i] = &RpbPair{Key: []byte(k), Value: []byte(v)}
+ req.Content.Usermeta[i] = &pb.RpbPair{Key: []byte(k), Value: []byte(v)}
i += 1
}
// Add the indexes
- req.Content.Indexes = make([]*RpbPair, len(obj.Indexes))
+ req.Content.Indexes = make([]*pb.RpbPair, len(obj.Indexes))
i = 0
for k, v := range obj.Indexes {
- req.Content.Indexes[i] = &RpbPair{Key: []byte(k), Value: []byte(v)}
+ req.Content.Indexes[i] = &pb.RpbPair{Key: []byte(k), Value: []byte(v)}
i += 1
}
// Add the options
@@ -99,7 +101,7 @@ func (obj *RObject) Store() (err error) {
return err
}
// Get response, ReturnHead is true, so we can store the vclock
- resp := &RpbPutResp{}
+ resp := &pb.RpbPutResp{}
err = obj.Bucket.client.response(conn, resp)
if err != nil {
return err
@@ -115,7 +117,7 @@ func (obj *RObject) Store() (err error) {
// Delete the object from Riak
func (obj *RObject) Destroy() (err error) {
- req := &RpbDelReq{Bucket: []byte(obj.Bucket.name), Key: []byte(obj.Key), Vclock: obj.Vclock}
+ req := &pb.RpbDelReq{Bucket: []byte(obj.Bucket.name), Key: []byte(obj.Key), Vclock: obj.Vclock}
for _, omap := range obj.Options {
for k, v := range omap {
switch k {
@@ -151,8 +153,8 @@ func (obj *RObject) Conflict() bool {
return obj.conflict
}
-// Sets the values that returned from a RpbGetResp in the RObject
-func (obj *RObject) setContent(resp *RpbGetResp) {
+// Sets the values that returned from a pb.RpbGetResp in the RObject
+func (obj *RObject) setContent(resp *pb.RpbGetResp) {
// Check if there are siblings
if len(resp.Content) > 1 {
// Mark as conflict, set fields
@@ -218,7 +220,7 @@ func (obj *RObject) AddLink(link Link) bool {
// Get an object
func (b *Bucket) Get(key string, options ...map[string]uint32) (obj *RObject, err error) {
- req := &RpbGetReq{
+ req := &pb.RpbGetReq{
Bucket: []byte(b.name),
Key: []byte(key),
}
@@ -236,7 +238,7 @@ func (b *Bucket) Get(key string, options ...map[string]uint32) (obj *RObject, er
if err != nil {
return nil, err
}
- resp := &RpbGetResp{}
+ resp := &pb.RpbGetResp{}
err = b.client.response(conn, resp)
if err != nil {
return nil, err
@@ -254,7 +256,7 @@ func (b *Bucket) Get(key string, options ...map[string]uint32) (obj *RObject, er
// Reload an object if it has changed (new Vclock)
func (obj *RObject) Reload() (err error) {
- req := &RpbGetReq{
+ req := &pb.RpbGetReq{
Bucket: []byte(obj.Bucket.name),
Key: []byte(obj.Key),
IfModified: obj.Vclock}
@@ -272,7 +274,7 @@ func (obj *RObject) Reload() (err error) {
if err != nil {
return err
}
- resp := &RpbGetResp{}
+ resp := &pb.RpbGetResp{}
err = obj.Bucket.client.response(conn, resp)
if err != nil {
return err
Please sign in to comment.
Something went wrong with that request. Please try again.