Skip to content

Commit

Permalink
Merge 604e005 into 906643f
Browse files Browse the repository at this point in the history
  • Loading branch information
aleitner committed Aug 17, 2018
2 parents 906643f + 604e005 commit a23d6f1
Show file tree
Hide file tree
Showing 23 changed files with 2,212 additions and 730 deletions.
4 changes: 2 additions & 2 deletions cmd/captplanet/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (
"storj.io/storj/pkg/kademlia"
"storj.io/storj/pkg/miniogw"
"storj.io/storj/pkg/overlay"
"storj.io/storj/pkg/piecestore/psservice"
psserver "storj.io/storj/pkg/piecestore/rpc/server"
"storj.io/storj/pkg/pointerdb"
"storj.io/storj/pkg/process"
"storj.io/storj/pkg/provider"
Expand All @@ -33,7 +33,7 @@ type HeavyClient struct {
type Farmer struct {
Identity provider.IdentityConfig
Kademlia kademlia.Config
Storage psservice.Config
Storage psserver.Config
}

var (
Expand Down
4 changes: 2 additions & 2 deletions cmd/farmer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (

"storj.io/storj/pkg/cfgstruct"
"storj.io/storj/pkg/kademlia"
"storj.io/storj/pkg/piecestore/psservice"
psserver "storj.io/storj/pkg/piecestore/rpc/server"
"storj.io/storj/pkg/process"
"storj.io/storj/pkg/provider"
)
Expand All @@ -35,7 +35,7 @@ var (
runCfg struct {
Identity provider.IdentityConfig
Kademlia kademlia.Config
Storage psservice.Config
Storage psserver.Config
}
setupCfg struct {
BasePath string `default:"$CONFDIR" help:"base path for setup"`
Expand Down
10 changes: 7 additions & 3 deletions examples/piecestore-client/rpc/client/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"google.golang.org/grpc"

"storj.io/storj/pkg/piecestore/rpc/client"
pb "storj.io/storj/protos/piecestore"
)

var argError = errs.Class("argError")
Expand All @@ -32,7 +33,10 @@ func main() {
log.Fatalf("did not connect: %s", err)
}
defer conn.Close()
psClient := client.NewPSClient(conn)
psClient, err := client.NewPSClient(conn, 1024*32)
if err != nil {
log.Fatalf("could not initialize PSClient: %s", err)
}

app.Commands = []cli.Command{
{
Expand Down Expand Up @@ -69,7 +73,7 @@ func main() {

id := client.NewPieceID()

if err := psClient.Put(context.Background(), id, dataSection, ttl); err != nil {
if err := psClient.Put(context.Background(), id, dataSection, ttl, &pb.PayerBandwidthAllocation{}); err != nil {
fmt.Printf("Failed to Store data of id: %s\n", id)
return err
}
Expand Down Expand Up @@ -124,7 +128,7 @@ func main() {
}

ctx := context.Background()
rr, err := psClient.Get(ctx, client.PieceID(c.Args().Get(id)), pieceInfo.Size)
rr, err := psClient.Get(ctx, client.PieceID(c.Args().Get(id)), pieceInfo.Size, &pb.PayerBandwidthAllocation{})
if err != nil {
fmt.Printf("Failed to retrieve file of id: %s\n", c.Args().Get(id))
os.Remove(c.Args().Get(outputDir))
Expand Down
58 changes: 0 additions & 58 deletions pkg/piecestore/psservice/config.go

This file was deleted.

83 changes: 67 additions & 16 deletions pkg/piecestore/rpc/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,13 @@ package client

import (
"bufio"
"flag"
"fmt"
"io"
"log"
"time"

"github.com/zeebo/errs"
"go.uber.org/zap"
"golang.org/x/net/context"
"google.golang.org/grpc"
Expand All @@ -18,32 +20,69 @@ import (
pb "storj.io/storj/protos/piecestore"
)

// PSClient interface defines the set of methods to interact with a Piecestore Client
// ClientError is any error returned by the client
var ClientError = errs.Class("PSClient error")

var (
defaultBandwidthMsgSize = flag.Int(
"piecestore.rpc.client.default_bandwidth_msg_size", 32*1024,
"default bandwidth message size in kilobytes")
maxBandwidthMsgSize = flag.Int(
"piecestore.rpc.client.max_bandwidth_msg_size", 64*1024,
"max bandwidth message size in kilobytes")
)

// PSClient is an interface describing the functions for interacting with piecestore nodes
type PSClient interface {
Meta(ctx context.Context, id PieceID) (*pb.PieceSummary, error)
Put(ctx context.Context, id PieceID, data io.Reader, ttl time.Time) error
Get(ctx context.Context, id PieceID, size int64) (ranger.RangeCloser, error)
Put(ctx context.Context, id PieceID, data io.Reader, ttl time.Time, ba *pb.PayerBandwidthAllocation) error
Get(ctx context.Context, id PieceID, size int64, ba *pb.PayerBandwidthAllocation) (ranger.RangeCloser, error)
Delete(ctx context.Context, pieceID PieceID) error
CloseConn() error
}

// Client -- Struct Info needed for protobuf api calls
type Client struct {
route pb.PieceStoreRoutesClient
conn *grpc.ClientConn
route pb.PieceStoreRoutesClient
conn *grpc.ClientConn
pkey []byte
bandwidthMsgSize int
}

// NewPSClient initilizes a PSClient
func NewPSClient(conn *grpc.ClientConn) PSClient {
return &Client{conn: conn, route: pb.NewPieceStoreRoutesClient(conn)}
func NewPSClient(conn *grpc.ClientConn, bandwidthMsgSize int) (PSClient, error) {
if bandwidthMsgSize < 0 || bandwidthMsgSize > *maxBandwidthMsgSize {
return nil, ClientError.New(fmt.Sprintf("Invalid Bandwidth Message Size: %v", bandwidthMsgSize))
}

if bandwidthMsgSize == 0 {
bandwidthMsgSize = *defaultBandwidthMsgSize
}

return &Client{
conn: conn,
route: pb.NewPieceStoreRoutesClient(conn),
bandwidthMsgSize: bandwidthMsgSize,
}, nil
}

// NewCustomRoute creates new Client with custom route interface
func NewCustomRoute(route pb.PieceStoreRoutesClient) *Client {
return &Client{route: route}
func NewCustomRoute(route pb.PieceStoreRoutesClient, bandwidthMsgSize int) (*Client, error) {
if bandwidthMsgSize < 0 || bandwidthMsgSize > *maxBandwidthMsgSize {
return nil, ClientError.New(fmt.Sprintf("Invalid Bandwidth Message Size: %v", bandwidthMsgSize))
}

if bandwidthMsgSize == 0 {
bandwidthMsgSize = *defaultBandwidthMsgSize
}

return &Client{
route: route,
bandwidthMsgSize: bandwidthMsgSize,
}, nil
}

// CloseConn closes the connection stored on the Client struct
// CloseConn closes the connection with piecestore
func (client *Client) CloseConn() error {
return client.conn.Close()
}
Expand All @@ -54,22 +93,22 @@ func (client *Client) Meta(ctx context.Context, id PieceID) (*pb.PieceSummary, e
}

// Put uploads a Piece to a piece store Server
func (client *Client) Put(ctx context.Context, id PieceID, data io.Reader, ttl time.Time) error {
func (client *Client) Put(ctx context.Context, id PieceID, data io.Reader, ttl time.Time, ba *pb.PayerBandwidthAllocation) error {
stream, err := client.route.Store(ctx)
if err != nil {
return err
}

// Send preliminary data
if err := stream.Send(&pb.PieceStore{Id: id.String(), Ttl: ttl.Unix()}); err != nil {
msg := &pb.PieceStore{Piecedata: &pb.PieceStore_PieceData{Id: id.String(), ExpirationUnixSec: ttl.Unix()}}
if err = stream.Send(msg); err != nil {
if _, closeErr := stream.CloseAndRecv(); closeErr != nil {
zap.S().Errorf("error closing stream %s :: %v.Send() = %v", closeErr, stream, closeErr)
}

return fmt.Errorf("%v.Send() = %v", stream, err)
}

writer := &StreamWriter{stream: stream}
writer := &StreamWriter{signer: client, stream: stream, pba: ba}

defer func() {
if err := writer.Close(); err != nil && err != io.EOF {
Expand All @@ -93,8 +132,13 @@ func (client *Client) Put(ctx context.Context, id PieceID, data io.Reader, ttl t
}

// Get begins downloading a Piece from a piece store Server
func (client *Client) Get(ctx context.Context, id PieceID, size int64) (ranger.RangeCloser, error) {
return PieceRangerSize(client, id, size), nil
func (client *Client) Get(ctx context.Context, id PieceID, size int64, ba *pb.PayerBandwidthAllocation) (ranger.RangeCloser, error) {
stream, err := client.route.Retrieve(ctx)
if err != nil {
return nil, err
}

return PieceRangerSize(client, stream, id, size, ba), nil
}

// Delete a Piece from a piece store Server
Expand All @@ -106,3 +150,10 @@ func (client *Client) Delete(ctx context.Context, id PieceID) error {
log.Printf("Route summary : %v", reply)
return nil
}

// sign a message using the clients private key
func (client *Client) sign(msg []byte) (signature []byte, err error) {
// use c.pkey to sign msg

return signature, err
}
23 changes: 13 additions & 10 deletions pkg/piecestore/rpc/client/pieceranger.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,25 +19,27 @@ import (
var Error = errs.Class("pieceRanger error")

type pieceRanger struct {
c *Client
id PieceID
size int64
c *Client
id PieceID
size int64
stream pb.PieceStoreRoutes_RetrieveClient
pba *pb.PayerBandwidthAllocation
}

// PieceRanger PieceRanger returns a RangeCloser from a PieceID.
func PieceRanger(ctx context.Context, c *Client, id PieceID) (ranger.RangeCloser, error) {
func PieceRanger(ctx context.Context, c *Client, stream pb.PieceStoreRoutes_RetrieveClient, id PieceID, pba *pb.PayerBandwidthAllocation) (ranger.RangeCloser, error) {
piece, err := c.Meta(ctx, PieceID(id))
if err != nil {
return nil, err
}
return &pieceRanger{c: c, id: id, size: piece.Size}, nil
return &pieceRanger{c: c, id: id, size: piece.Size, stream: stream, pba: pba}, nil
}

// PieceRangerSize creates a PieceRanger with known size.
// Use it if you know the piece size. This will safe the extra request for
// retrieving the piece size from the piece storage.
func PieceRangerSize(c *Client, id PieceID, size int64) ranger.RangeCloser {
return &pieceRanger{c: c, id: id, size: size}
func PieceRangerSize(c *Client, stream pb.PieceStoreRoutes_RetrieveClient, id PieceID, size int64, pba *pb.PayerBandwidthAllocation) ranger.RangeCloser {
return &pieceRanger{c: c, id: id, size: size, stream: stream, pba: pba}
}

// Size implements Ranger.Size
Expand All @@ -64,10 +66,11 @@ func (r *pieceRanger) Range(ctx context.Context, offset, length int64) (io.ReadC
if length == 0 {
return ioutil.NopCloser(bytes.NewReader([]byte{})), nil
}
stream, err := r.c.route.Retrieve(ctx, &pb.PieceRetrieval{Id: r.id.String(), Size: length, Offset: offset})
if err != nil {

// send piece data
if err := r.stream.Send(&pb.PieceRetrieval{PieceData: &pb.PieceRetrieval_PieceData{Id: r.id.String(), Size: length, Offset: offset}}); err != nil {
return nil, err
}

return NewStreamReader(stream), nil
return NewStreamReader(r.c, r.stream, r.pba), nil
}

0 comments on commit a23d6f1

Please sign in to comment.