Skip to content

Commit

Permalink
Merge branch 'master' of github.com:derekchiang/hypergo
Browse files Browse the repository at this point in the history
  • Loading branch information
derekchiang committed Nov 13, 2013
2 parents 882dc1d + 52cb08e commit a8d7239
Show file tree
Hide file tree
Showing 6 changed files with 322 additions and 14 deletions.
136 changes: 136 additions & 0 deletions bindings/go/admin_api.go
@@ -0,0 +1,136 @@
package hypergo

/*
#cgo LDFLAGS: -lhyperdex-admin
#include "hyperdex/admin.h"
*/
import "C"

import (
"fmt"
"strings"
)

// Admin APIs
func (admin *Admin) DumpConfig() string {
return <-admin.AsyncDumpConfig()
}

func (admin *Admin) AsyncDumpConfig() chan string {
return admin.AsyncDumpConfigOrListSpaces("dump_config")
}

func (admin *Admin) ListSpaces() []string {
c := admin.AsyncListSpaces()
strs := make([]string, 0)
for {
s, ok := <-c
if ok {
if len(s) > 0 {
strs = append(strs, s)
}
} else {
break
}
}

return strs
}

func (admin *Admin) AsyncListSpaces() chan string {
return admin.AsyncDumpConfigOrListSpaces("list_spaces")
}

func (admin *Admin) AsyncDumpConfigOrListSpaces(funcName string) chan string {
c := make(chan string, CHANNEL_BUFFER_SIZE)
var C_return_code C.enum_hyperdex_admin_returncode
var C_string *C.char
var req_id int64
var success func()

switch funcName {
case "dump_config":
req_id = int64(C.hyperdex_admin_dump_config(admin.ptr, &C_return_code, &C_string))
success = func() {
c <- C.GoString(C_string)
}
case "list_spaces":
req_id = int64(C.hyperdex_admin_list_spaces(admin.ptr, &C_return_code, &C_string))
success = func() {
spaces := C.GoString(C_string)
for _, s := range strings.Split(spaces, "\n") {
c <- s
}
close(c)
}
}

if req_id < 0 {
panic(fmt.Sprintf("%s failed", funcName))
}

req := adminRequest{
id: req_id,
status: &C_return_code,
success: success,
}

admin.requests = append(admin.requests, req)
return c
}

func (admin *Admin) ValidateSpace(description string) bool {
var C_return_code C.enum_hyperdex_admin_returncode

valid := C.hyperdex_admin_validate_space(admin.ptr,
C.CString(description), &C_return_code)

if valid < 0 {
return false
} else {
return true
}
}

func (admin *Admin) AddSpace(description string) error {
return <-admin.AsyncAddSpace(description)
}

func (admin *Admin) AsyncAddSpace(description string) ErrorChannel {
return admin.AsyncAddOrRemoveSpace(description, "add")
}

func (admin *Admin) RemoveSpace(description string) error {
return <-admin.AsyncRemoveSpace(description)
}

func (admin *Admin) AsyncRemoveSpace(description string) ErrorChannel {
return admin.AsyncAddOrRemoveSpace(description, "remove")
}

func (admin *Admin) AsyncAddOrRemoveSpace(description string, funcName string) ErrorChannel {
var C_return_code C.enum_hyperdex_admin_returncode
errCh := make(chan error, CHANNEL_BUFFER_SIZE)
var req_id int64
switch funcName {
case "add":
req_id = int64(C.hyperdex_admin_add_space(admin.ptr,
C.CString(description), &C_return_code))
case "remove":
req_id = int64(C.hyperdex_admin_rm_space(admin.ptr,
C.CString(description), &C_return_code))
}

req := adminRequest{
id: req_id,
status: &C_return_code,
success: func() {
errCh <- nil
},
failure: errChannelFailureCallbackForAdmin(errCh),
}

admin.requests = append(admin.requests, req)

return errCh
}
77 changes: 77 additions & 0 deletions bindings/go/admin_test.go
@@ -0,0 +1,77 @@
package hypergo

import "testing"

func TestDumpConfig(t *testing.T) {
admin, err := NewAdmin("127.0.0.1", 1982)
if err != nil {
t.Fatal(err)
}
defer admin.Destroy()

println(admin.DumpConfig())
}

func TestValidateSpace(t *testing.T) {
admin, err := NewAdmin("127.0.0.1", 1982)
if err != nil {
t.Fatal(err)
}
defer admin.Destroy()

validSpace := `
space profiles
key username
attributes
string name,
float height,
int profile_views,
list(string) pending_requests,
list(float) ratings,
set(string) hobbies,
set(int) ages,
map(string, string) unread_messages,
map(string, int) upvotes
subspace name
subspace height
subspace profile_views
`
if !(admin.ValidateSpace(validSpace) == true) {
t.Fatalf("The following space schema is valid, "+
"but ValidateSpace says otherwise: %s", validSpace)
}

// Note that the fourth line starts with "strings", which is invalid
invalidSpace := `
space profiles
key username
attributes
strings name,
float height,
int profile_views,
list(string) pending_requests,
list(float) ratings,
set(string) hobbies,
set(int) ages,
map(string, string) unread_messages,
map(string, int) upvotes
subspace name
subspace height
subspace profile_views
`
if !(admin.ValidateSpace(invalidSpace) == false) {
t.Fatalf("The following space schema is invalid, "+
"but ValidateSpace says otherwise: %s", validSpace)
}
}

func TestListSpaces(t *testing.T) {
admin, err := NewAdmin("127.0.0.1", 1982)
if err != nil {
t.Fatal(err)
}
defer admin.Destroy()
for _, space := range admin.ListSpaces() {
println(space)
}
}
2 changes: 1 addition & 1 deletion bindings/go/api.go → bindings/go/client_api.go
@@ -1,6 +1,6 @@
package hypergo

// User-facing APIs
// Data-manipulating APIs

/*
#cgo LDFLAGS: -lhyperdex-client
Expand Down
98 changes: 88 additions & 10 deletions bindings/go/hypergo.go
Expand Up @@ -2,9 +2,10 @@
package hypergo

/*
#cgo LDFLAGS: -lhyperdex-client
#cgo LDFLAGS: -lhyperdex-client -lhyperdex-admin
#include <netinet/in.h>
#include "hyperdex/client.h"
#include "hyperdex/admin.h"
#include "hyperdex/datastructures.h"
*/
import "C"
Expand Down Expand Up @@ -43,7 +44,14 @@ type Client struct {
ptr *C.struct_hyperdex_client
arena *C.struct_hyperdex_ds_arena
requests []request
closeChan chan struct{}
closeChan chan bool
}

// Admin is a priviledged client used to do meta operations to hyperdex
type Admin struct {
ptr *C.struct_hyperdex_admin
requests []adminRequest
closeChan chan bool
}

// Attributes represents a map of key-value attribute pairs.
Expand Down Expand Up @@ -122,6 +130,13 @@ type request struct {
status *C.enum_hyperdex_client_returncode
}

type adminRequest struct {
id int64
success func()
failure func(C.enum_hyperdex_admin_returncode, string)
status *C.enum_hyperdex_admin_returncode
}

// A custom error type that allows for examining HyperDex error code.
type HyperError struct {
returnCode C.enum_hyperdex_client_returncode
Expand Down Expand Up @@ -161,7 +176,7 @@ func NewClient(ip string, port int) (*Client, error) {
C_client,
C_arena,
make([]request, 0, 8), // No reallocation within 8 concurrent requests to hyperdex_client_loop
make(chan struct{}, 1),
make(chan bool, 1),
}

go func() {
Expand All @@ -184,13 +199,9 @@ func NewClient(ip string, port int) (*Client, error) {
// find processed request among pending requests
for i, req := range client.requests {
if req.id == ret {
log.Printf("Processing request %v\n", req.id)
log.Printf("Loop status: %v\n", status)
log.Printf("Request status: %v\n", *req.status)
if status == C.HYPERDEX_CLIENT_SUCCESS {
switch *req.status {
case C.HYPERDEX_CLIENT_SUCCESS:
log.Println("Request success")
if req.success != nil {
req.success()
}
Expand All @@ -204,18 +215,15 @@ func NewClient(ip string, port int) (*Client, error) {
req.complete()
}
case C.HYPERDEX_CLIENT_SEARCHDONE:
log.Println("Request search done")
if req.complete != nil {
req.complete()
}
case C.HYPERDEX_CLIENT_CMPFAIL:
log.Println("Comparison failure")
if req.failure != nil {
req.failure(*req.status,
C.GoString(C.hyperdex_client_error_message(client.ptr)))
}
default:
log.Println("Request failure")
if req.failure != nil {
req.failure(*req.status,
C.GoString(C.hyperdex_client_error_message(client.ptr)))
Expand Down Expand Up @@ -249,3 +257,73 @@ func (client *Client) Destroy() {
C.hyperdex_client_destroy(client.ptr)
//log.Printf("hyperdex_client_destroy(%X)\n", unsafe.Pointer(client.ptr))
}

func NewAdmin(ip string, port int) (*Admin, error) {
C_admin := C.hyperdex_admin_create(C.CString(ip), C.uint16_t(port))

if C_admin == nil {
return nil, fmt.Errorf("Could not create hyperdex_admin (ip=%s, port=%d)", ip, port)
}

admin := &Admin{
C_admin,
make([]adminRequest, 0, 8),
make(chan bool, 1),
}

go func() {
for {
select {
// quit goroutine when client is destroyed
case <-admin.closeChan:
return
default:
// check if there are pending requests
// and only if there are, call hyperdex_client_loop
if len(admin.requests) > 0 {
var status C.enum_hyperdex_admin_returncode
ret := int64(C.hyperdex_admin_loop(admin.ptr, C.int(TIMEOUT), &status))
//log.Printf("hyperdex_client_loop(%X, %d, %X) -> %d\n", unsafe.Pointer(client.ptr), hyperdex_client_loop_timeout, unsafe.Pointer(&status), ret)
if ret < 0 {
panic(newInternalError(status, "Admin error"))
}
// find processed request among pending requests
for i, req := range admin.requests {
if req.id == ret {
if status == C.HYPERDEX_ADMIN_SUCCESS {
switch *req.status {
case C.HYPERDEX_ADMIN_SUCCESS:
if req.success != nil {
req.success()
}
default:
if req.failure != nil {
req.failure(*req.status,
C.GoString(C.hyperdex_admin_error_message(admin.ptr)))
}
}
} else if req.failure != nil {
req.failure(status,
C.GoString(C.hyperdex_admin_error_message(admin.ptr)))
}
admin.requests = append(admin.requests[:i], admin.requests[i+1:]...)
}
}
}
// prevent other goroutines from starving
runtime.Gosched()
}
}
panic("Should not be reached: end of infinite loop")
}()

return admin, nil
}

// Destroy closes the connection between the Admin and hyperdex. It has to be used on a admin that is not used anymore.
//
// For every call to NewAdmin, there must be a call to Destroy.
func (admin *Admin) Destroy() {
close(admin.closeChan)
C.hyperdex_admin_destroy(admin.ptr)
}

0 comments on commit a8d7239

Please sign in to comment.