Skip to content

Commit

Permalink
Mutex/nsclient- WIP (#104)
Browse files Browse the repository at this point in the history
* working on put request for nsclient

* working on put request for nsclient

* netstate put

* netstate put

* wip testing client

* wip - testing client
 and working through some errors

* wip - testing client
 and working through some errors

* put request works

* put request works for client

* get request working

* get request working

* get request working-minor edit

* get request working-minor edit

* list request works

* list request works

* working through delete error

* working through delete error

* fixed exp client, still working through delete error

* fixed exp client, still working through delete error

* delete works; fixed formatting issues

* delete works; fixed formatting issues

* deleted comment

* deleted comment

* resolving merge conflicts

* resolving merge conflict

* fixing merge conflict

* implemented and modified kayloyans paths file

* working on testing

* added test for path_test.go

* fixed string, read through netstate test

* deleted env variables

* initial commit for mocking out grpc client- got it working

* mocked grpc client

* mock put passed test

* 2 tests pass for PUT with mock

* put requests test pass, wip- want mini review

* get tests pass mock

* list test working

* initial commit for list test

* all list req. working, starting on delete tests

* delete tests passed

* cleaned up tests

* resolved merge conflicts

* resolved merge conflicts

* fixed linter errors

* fixed error found in travis

* initial commit for fixes from PR comments

* fixed pr comments and linting

* added error handling for api creds, and rebased

* fixes from dennis comments

* fixed pr with dennis suggestioon

* added copyrights to files

* fixed casing per dennis great comment

* fixed travis complaint on sprintf
  • Loading branch information
nfarah86 committed Jul 19, 2018
1 parent bda13f5 commit e228d09
Show file tree
Hide file tree
Showing 15 changed files with 943 additions and 624 deletions.
4 changes: 4 additions & 0 deletions examples/paths/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,5 +60,9 @@ func Main() error {
return err
}
fmt.Println("decrypted path: ", decryptedPath)

// implement Bytes() function
var pathBytes = path.Bytes()
fmt.Println("path in Bytes is: ", pathBytes)
return nil
}
145 changes: 49 additions & 96 deletions examples/pointerdb-client/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,22 +8,23 @@ import (
"flag"
"fmt"
"strings"
"os"

"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

p "storj.io/storj/pkg/paths"
client "storj.io/storj/pkg/pointerdb"
proto "storj.io/storj/protos/pointerdb"
)

var (
port string
apiKey = []byte("abc123")
pointerdbClientPort string
)

func initializeFlags() {
flag.StringVar(&port, "port", ":8080", "port")
flag.StringVar(&pointerdbClientPort, "pointerdbPort", ":8080", "this is your port")
flag.Parse()
}

Expand All @@ -33,123 +34,75 @@ func main() {
logger, _ := zap.NewDevelopment()
defer logger.Sync()

conn, err := grpc.Dial(port, grpc.WithInsecure())
pdbclient, err := client.NewClient(pointerdbClientPort)

if err != nil {
logger.Error("Failed to dial: ", zap.Error(err))
os.Exit(1)
}

client := proto.NewPointerDBClient(conn)

logger.Debug(fmt.Sprintf("client dialed port %s", port))

logger.Debug(fmt.Sprintf("client dialed port %s", pointerdbClientPort))
ctx := context.Background()

// Example pointer paths to put
pr1 := proto.PutRequest{
Path: []byte("test/path/1"),
Pointer: &proto.Pointer{
Type: proto.Pointer_INLINE,
InlineSegment: []byte("inline1"),
},
APIKey: apiKey,
}
pr2 := proto.PutRequest{
Path: []byte("test/path/2"),
Pointer: &proto.Pointer{
Type: proto.Pointer_INLINE,
InlineSegment: []byte("inline2"),
},
APIKey: apiKey,
}
pr3 := proto.PutRequest{
Path: []byte("test/path/3"),
Pointer: &proto.Pointer{
Type: proto.Pointer_INLINE,
InlineSegment: []byte("inline3"),
},
APIKey: apiKey,
}
// rps is an example slice of RemotePieces, which is passed into
// this example Pointer of type REMOTE.
var rps []*proto.RemotePiece
rps = append(rps, &proto.RemotePiece{
PieceNum: int64(1),
NodeId: "testId",
})
pr4 := proto.PutRequest{
Path: []byte("test/path/4"),
Pointer: &proto.Pointer{
Type: proto.Pointer_REMOTE,
Remote: &proto.RemoteSegment{
Redundancy: &proto.RedundancyScheme{
Type: proto.RedundancyScheme_RS,
MinReq: int64(1),
Total: int64(3),
RepairThreshold: int64(2),
SuccessThreshold: int64(3),
},
PieceId: "testId",
RemotePieces: rps,
},
},
APIKey: apiKey,
// Example parameters to pass into API calls
var path = p.New("fold1/fold2/fold3/file.txt")
pointer := &proto.Pointer{
Type: proto.Pointer_INLINE,
InlineSegment: []byte("popcorn"),
}
APIKey := []byte("abc123")

// Example Put1
err = pdbclient.Put(ctx, path, pointer, APIKey)

// Example Puts
_, err = client.Put(ctx, &pr1)
if err != nil || status.Code(err) == codes.Internal {
logger.Error("failed to put", zap.Error(err))
}
_, err = client.Put(ctx, &pr2)
if err != nil || status.Code(err) == codes.Internal {
logger.Error("failed to put", zap.Error(err))
}
_, err = client.Put(ctx, &pr3)
if err != nil || status.Code(err) == codes.Internal {
logger.Error("failed to put", zap.Error(err))
logger.Error("couldn't put pointer in db", zap.Error(err))
} else {
logger.Debug("Success: put pointer in db")
}
_, err = client.Put(ctx, &pr4)

// Example Put2
err = pdbclient.Put(ctx, p.New("fold1/fold2"), pointer, APIKey)

if err != nil || status.Code(err) == codes.Internal {
logger.Error("failed to put", zap.Error(err))
logger.Error("couldn't put pointer in db", zap.Error(err))
} else {
logger.Debug("Success: put pointer in db")
}

// Example Get
getReq := proto.GetRequest{
Path: []byte("test/path/1"),
APIKey: apiKey,
}
getRes, err := client.Get(ctx, &getReq)
if err != nil || status.Code(err) == codes.Internal {
logger.Error("failed to get", zap.Error(err))
getRes, err := pdbclient.Get(ctx, path, APIKey)

if err != nil {
logger.Error("couldn't GET pointer from db", zap.Error(err))
} else {
pointer := string(getRes.Pointer)
logger.Debug("get response: " + pointer)
logger.Info("Success: got Pointer from db",
zap.String("pointer", getRes.String()),
)
}

// Example List
listReq := proto.ListRequest{
StartingPathKey: []byte("test/path/2"),
Limit: 5,
APIKey: apiKey,
}
listRes, err := client.List(ctx, &listReq)
// Example List with pagination
startingPathKey := p.New("fold1/")
var limit int64 = 1

paths, trunc, err := pdbclient.List(ctx, startingPathKey, limit, APIKey)

if err != nil || status.Code(err) == codes.Internal {
logger.Error("failed to list file paths")
logger.Error("failed to list file paths", zap.Error(err))
} else {
var stringList []string
for _, pathByte := range listRes.Paths {
for _, pathByte := range paths {
stringList = append(stringList, string(pathByte))
}
logger.Debug("listed paths: " + strings.Join(stringList, ", ") + "; truncated: " + fmt.Sprintf("%t", listRes.Truncated))
logger.Debug("Success: listed paths: " + strings.Join(stringList, ", ") + "; truncated: " + fmt.Sprintf("%t", trunc))
}

// Example Delete
delReq := proto.DeleteRequest{
Path: []byte("test/path/1"),
APIKey: apiKey,
}
_, err = client.Delete(ctx, &delReq)
err = pdbclient.Delete(ctx, path, APIKey)

if err != nil || status.Code(err) == codes.Internal {
logger.Error("failed to delete: " + string(delReq.Path))
logger.Error("Error in deleteing file from db", zap.Error(err))
} else {
logger.Debug("Success: file is deleted from db")
}
}
3 changes: 3 additions & 0 deletions internal/test/utils.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
// Copyright (C) 2018 Storj Labs, Inc.
// See LICENSE for copying information.

package test

import (
Expand Down
3 changes: 3 additions & 0 deletions pkg/node/node_test.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
// Copyright (C) 2018 Storj Labs, Inc.
// See LICENSE for copying information.

package node

import (
Expand Down
3 changes: 3 additions & 0 deletions pkg/node/server.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
// Copyright (C) 2018 Storj Labs, Inc.
// See LICENSE for copying information.

package node

import (
Expand Down
5 changes: 5 additions & 0 deletions pkg/paths/paths.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,11 @@ func (p Path) String() string {
return path.Join([]string(p)...)
}

// Bytes serializes the current path to []byte
func (p Path) Bytes() []byte {
return []byte(p.String())
}

// Prepend creates new Path from the current path with the given segments prepended
func (p Path) Prepend(segs ...string) Path {
return New(append(segs, []string(p)...)...)
Expand Down
19 changes: 18 additions & 1 deletion pkg/paths/paths_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ func TestNew(t *testing.T) {
assert.Equal(t, tt.expected, p, errTag)
}
}

func TestNewWithSegments(t *testing.T) {
for i, tt := range []struct {
segs []string
Expand Down Expand Up @@ -68,6 +69,22 @@ func TestString(t *testing.T) {
}
}

func TestBytes(t *testing.T) {
for i, tt := range []struct {
path Path
expected []byte
}{
{[]string{""}, []byte{}},
{[]string{"a/b"}, []byte{97, 47, 98}},
{[]string{"a/b/c"}, []byte{97, 47, 98, 47, 99}},
{[]string{"a/b/c/d/e/f"}, []byte{97, 47, 98, 47, 99, 47, 100, 47, 101, 47, 102}},
}{
errTag := fmt.Sprintf("Test case #%d", i)
b := tt.path.Bytes()
assert.Equal(t, tt.expected, b, errTag)
}
}

func TestPrepend(t *testing.T) {
for i, tt := range []struct {
prefix string
Expand All @@ -79,7 +96,7 @@ func TestPrepend(t *testing.T) {
{"", "my/path", []string{"my", "path"}},
{"prefix", "my/path", []string{"prefix", "my", "path"}},
{"p1/p2/p3", "my/path", []string{"p1", "p2", "p3", "my", "path"}},
} {
}{
errTag := fmt.Sprintf("Test case #%d", i)
p := New(tt.path).Prepend(tt.prefix)
assert.Equal(t, tt.expected, p, errTag)
Expand Down
100 changes: 100 additions & 0 deletions pkg/pointerdb/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
// Copyright (C) 2018 Storj Labs, Inc.
// See LICENSE for copying information.

package pointerdb

import (
"context"

"github.com/golang/protobuf/proto"
"google.golang.org/grpc"
monkit "gopkg.in/spacemonkeygo/monkit.v2"

p "storj.io/storj/pkg/paths"
pb "storj.io/storj/protos/pointerdb"
)

var (
mon = monkit.Package()
)

// PointerDB creates a grpcClient
type PointerDB struct {
grpcClient pb.PointerDBClient
}

// Client services offerred for the interface
type Client interface {
Put(ctx context.Context, path p.Path, pointer *pb.Pointer, APIKey []byte) error
Get(ctx context.Context, path p.Path, APIKey []byte) (*pb.Pointer, error)
List(ctx context.Context, startingPathKey []byte, limit int64, APIKey []byte) (
paths []byte, truncated bool, err error)
Delete(ctx context.Context, path p.Path, APIKey []byte) error
}

// NewClient initializes a new pointerdb client
func NewClient(address string) (*PointerDB, error) {
c, err := clientConnection(address, grpc.WithInsecure())

if err != nil {
return nil, err
}
return &PointerDB{
grpcClient: c,
}, nil
}

// ClientConnection makes a server connection
func clientConnection(serverAddr string, opts ...grpc.DialOption) (pb.PointerDBClient, error) {
conn, err := grpc.Dial(serverAddr, opts...)

if err != nil {
return nil, err
}
return pb.NewPointerDBClient(conn), nil
}

// Put is the interface to make a PUT request, needs Pointer and APIKey
func (pdb *PointerDB) Put(ctx context.Context, path p.Path, pointer *pb.Pointer, APIKey []byte) (err error) {
defer mon.Task()(&ctx)(&err)

_, err = pdb.grpcClient.Put(ctx, &pb.PutRequest{Path: path.Bytes(), Pointer: pointer, APIKey: APIKey})

return err
}

// Get is the interface to make a GET request, needs PATH and APIKey
func (pdb *PointerDB) Get(ctx context.Context, path p.Path, APIKey []byte) (pointer *pb.Pointer, err error) {
defer mon.Task()(&ctx)(&err)

res, err := pdb.grpcClient.Get(ctx, &pb.GetRequest{Path: path.Bytes(), APIKey: APIKey})
if err != nil {
return nil, err
}

pointer = &pb.Pointer{}
err = proto.Unmarshal(res.GetPointer(), pointer)

return pointer, nil
}

// List is the interface to make a LIST request, needs StartingPathKey, Limit, and APIKey
func (pdb *PointerDB) List(ctx context.Context, startingPathKey p.Path, limit int64, APIKey []byte) (paths [][]byte, truncated bool, err error) {
defer mon.Task()(&ctx)(&err)
res, err := pdb.grpcClient.List(ctx, &pb.ListRequest{StartingPathKey: startingPathKey.Bytes(), Limit: limit, APIKey: APIKey})

if err != nil {
return nil, false, err
}

return res.Paths, res.Truncated, nil
}

// Delete is the interface to make a Delete request, needs Path and APIKey
func (pdb *PointerDB) Delete(ctx context.Context, path p.Path, APIKey []byte) (err error) {
defer mon.Task()(&ctx)(&err)

_, err = pdb.grpcClient.Delete(ctx, &pb.DeleteRequest{Path: path.Bytes(), APIKey: APIKey})

return err
}
3 changes: 1 addition & 2 deletions pkg/pointerdb/pointerdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,7 @@ func (s *Server) validateAuth(APIKeyBytes []byte) error {
func (s *Server) Put(ctx context.Context, putReq *pb.PutRequest) (*pb.PutResponse, error) {
s.logger.Debug("entering pointerdb put")

APIKeyBytes := []byte(putReq.APIKey)
if err := s.validateAuth(APIKeyBytes); err != nil {
if err := s.validateAuth(putReq.APIKey); err != nil {
return nil, err
}

Expand Down
Loading

0 comments on commit e228d09

Please sign in to comment.