Skip to content

Commit

Permalink
Revert segment changes, squash an unneeded log line
Browse files Browse the repository at this point in the history
  • Loading branch information
jtolio committed Jul 23, 2018
1 parent d8c2707 commit 8388326
Show file tree
Hide file tree
Showing 3 changed files with 89 additions and 48 deletions.
10 changes: 7 additions & 3 deletions pkg/miniogw/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ import (
"storj.io/storj/pkg/overlay"
"storj.io/storj/pkg/pointerdb"
"storj.io/storj/pkg/provider"
"storj.io/storj/pkg/segment"
ecclient "storj.io/storj/pkg/storage/ec"
"storj.io/storj/pkg/transport"
)
Expand Down Expand Up @@ -128,13 +127,18 @@ func (c Config) action(ctx context.Context, cliCtx *cli.Context,
if err != nil {
return err
}
segments := segment.NewSegmentStore(oc, ec, pdb, rs)

// TODO(jt): make segment store
// segments := segment.NewSegmentStore(oc, ec, pdb, rs)
_ = oc
_ = ec
_ = pdb
_ = rs

// TODO(jt): wrap segments and turn segments into streams
// TODO(jt): hook streams into object store
// TODO(jt): this should work:
// NewStorjGateway(objects.NewStore(streams.NewStore(segments)))
_ = segments

minio.StartGateway(cliCtx, NewStorjGateway(objects.NewObjectStore()))
return Error.New("unexpected minio exit")
Expand Down
8 changes: 5 additions & 3 deletions pkg/process/exec_conf.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,9 +142,11 @@ func cleanup(cmd *cobra.Command, defaultConfig string) {
// okay now that logging is working, inform about the broken keys
// these keys are almost certainly broken because they have capital
// letters
logger.Sugar().Infof("TODO: these flags are not configurable via "+
"config file, probably due to having uppercase letters: %s",
strings.Join(brokenKeys, ", "))
if len(brokenKeys) > 0 {
logger.Sugar().Infof("TODO: these flags are not configurable via "+
"config file, probably due to having uppercase letters: %s",
strings.Join(brokenKeys, ", "))
}

err = initMetrics(ctx, monkit.Default,
telemetry.DefaultInstanceID())
Expand Down
119 changes: 77 additions & 42 deletions pkg/segment/segment.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,15 @@ import (
"io"
"time"

"github.com/gogo/protobuf/proto"
"go.uber.org/zap"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
monkit "gopkg.in/spacemonkeygo/monkit.v2"

"storj.io/storj/pkg/eestream"
"storj.io/storj/pkg/kademlia"
"storj.io/storj/pkg/overlay"
"storj.io/storj/pkg/paths"
"storj.io/storj/pkg/piecestore/rpc/client"
"storj.io/storj/pkg/pointerdb"
"storj.io/storj/pkg/ranger"
"storj.io/storj/pkg/storage/ec"
opb "storj.io/storj/protos/overlay"
Expand All @@ -45,15 +43,15 @@ type Store interface {
}

type segmentStore struct {
oc overlay.Client
oc opb.OverlayClient
ec ecclient.Client
pdb pointerdb.Client
pdb ppb.PointerDBClient
rs eestream.RedundancyStrategy
}

// NewSegmentStore creates a new instance of segmentStore
func NewSegmentStore(oc overlay.Client, ec ecclient.Client,
pdb pointerdb.Client, rs eestream.RedundancyStrategy) Store {
func NewSegmentStore(oc opb.OverlayClient, ec ecclient.Client, pdb ppb.PointerDBClient,
rs eestream.RedundancyStrategy) Store {
return &segmentStore{oc: oc, ec: ec, pdb: pdb, rs: rs}
}

Expand All @@ -63,47 +61,51 @@ func (s *segmentStore) Put(ctx context.Context, path paths.Path, data io.Reader,
defer mon.Task()(&ctx)(&err)

// uses overlay client to request a list of nodes
nodeRes, err := s.oc.Choose(ctx, 0, 0)
nodeRes, err := s.oc.FindStorageNodes(ctx, &opb.FindStorageNodesRequest{})
if err != nil {
return Error.Wrap(err)
}

pieceID := client.NewPieceID()

// puts file to ecclient
err = s.ec.Put(ctx, nodeRes, s.rs, pieceID, data, expiration)
err = s.ec.Put(ctx, nodeRes.GetNodes(), s.rs, pieceID, data, expiration)
if err != nil {
zap.S().Error("Failed putting nodes to ecclient")
return Error.Wrap(err)
}

var remotePieces []*ppb.RemotePiece
for i := range nodeRes {
for i := range nodeRes.Nodes {
remotePieces = append(remotePieces, &ppb.RemotePiece{
PieceNum: int64(i),
NodeId: nodeRes[i].Id,
NodeId: nodeRes.Nodes[i].Id,
})
}

// creates pointer
pr := &ppb.Pointer{
Type: ppb.Pointer_REMOTE,
Remote: &ppb.RemoteSegment{
Redundancy: &ppb.RedundancyScheme{
Type: ppb.RedundancyScheme_RS,
MinReq: int64(s.rs.RequiredCount()),
Total: int64(s.rs.TotalCount()),
RepairThreshold: int64(s.rs.Min),
SuccessThreshold: int64(s.rs.Opt),
pr := ppb.PutRequest{
Path: []byte(path.String()),
Pointer: &ppb.Pointer{
Type: ppb.Pointer_REMOTE,
Remote: &ppb.RemoteSegment{
Redundancy: &ppb.RedundancyScheme{
Type: ppb.RedundancyScheme_RS,
MinReq: int64(s.rs.RequiredCount()),
Total: int64(s.rs.TotalCount()),
RepairThreshold: int64(s.rs.Min),
SuccessThreshold: int64(s.rs.Opt),
},
PieceId: string(pieceID),
RemotePieces: remotePieces,
},
PieceId: string(pieceID),
RemotePieces: remotePieces,
Metadata: metadata,
},
Metadata: metadata,
APIKey: nil,
}

// puts pointer to pointerDB
err = s.pdb.Put(ctx, path, pr, nil)
_, err = s.pdb.Put(ctx, &pr)
if err != nil || status.Code(err) == codes.Internal {
zap.L().Error("failed to put", zap.Error(err))
return Error.Wrap(err)
Expand All @@ -112,11 +114,22 @@ func (s *segmentStore) Put(ctx context.Context, path paths.Path, data io.Reader,
}

// Get retrieves a file using erasure code, overlay, and pointerdb clients
func (s *segmentStore) Get(ctx context.Context, path paths.Path) (
ranger.Ranger, Meta, error) {
func (s *segmentStore) Get(ctx context.Context, path paths.Path) (ranger.Ranger, Meta, error) {
m := Meta{}
// TODO: remove this chunk after pointerdb client interface merged
gr := &ppb.GetRequest{
Path: []byte(path.String()),
APIKey: nil,
}

pdbRes, err := s.pdb.Get(ctx, gr)
if err != nil {
return nil, m, err
}

pointer, err := s.pdb.Get(ctx, path, nil)
// TODO: remove this chunk after pointerdb client interface merged
pointer := &ppb.Pointer{}
err = proto.Unmarshal(pdbRes.Pointer, pointer)
if err != nil {
return nil, m, err
}
Expand All @@ -142,11 +155,23 @@ func (s *segmentStore) Get(ctx context.Context, path paths.Path) (
return ecRes, m, nil
}

// Delete tells piece stores to delete a segment and deletes pointer from
// pointerdb
// Delete tells piece stores to delete a segment and deletes pointer from pointerdb
func (s *segmentStore) Delete(ctx context.Context, path paths.Path) error {
// TODO: remove this chunk after pointerdb client interface merged
gr := &ppb.GetRequest{
Path: []byte(path.String()),
APIKey: nil,
}

// gets pointer from pointerdb
pointer, err := s.pdb.Get(ctx, path, nil)
pdbRes, err := s.pdb.Get(ctx, gr)
if err != nil {
return err
}

// TODO: remove this chunk after pointerdb client interface merged
pointer := &ppb.Pointer{}
err = proto.Unmarshal(pdbRes.Pointer, pointer)
if err != nil {
return err
}
Expand All @@ -162,8 +187,14 @@ func (s *segmentStore) Delete(ctx context.Context, path paths.Path) error {
return err
}

// TODO: remove this chunk after pointerdb client interface merged
dr := &ppb.DeleteRequest{
Path: []byte(path.String()),
APIKey: nil,
}

// deletes pointer from pointerdb
err = s.pdb.Delete(ctx, path, nil)
_, err = s.pdb.Delete(ctx, dr)
if err != nil {
return err
}
Expand All @@ -172,34 +203,38 @@ func (s *segmentStore) Delete(ctx context.Context, path paths.Path) error {
}

// overlayHelper calls Lookup to get node addresses from the overlay
func (s *segmentStore) overlayHelper(ctx context.Context,
rem *ppb.RemoteSegment) (nodes []*opb.Node, err error) {
func (s *segmentStore) overlayHelper(ctx context.Context, rem *ppb.RemoteSegment) (nodes []*opb.Node, err error) {
for i := 0; i < len(rem.RemotePieces); i++ {
overlayRes, err := s.oc.Lookup(ctx,
kademlia.StringToNodeID(rem.RemotePieces[i].NodeId))
overlayRes, err := s.oc.Lookup(ctx, &opb.LookupRequest{NodeID: rem.RemotePieces[i].NodeId})
if err != nil {
return nil, err
}
nodes = append(nodes, overlayRes)
nodes = append(nodes, overlayRes.Node)
}
return nodes, nil
}

// List lists paths stored in the pointerdb
func (s *segmentStore) List(ctx context.Context,
startingPath, endingPath paths.Path) (
func (s *segmentStore) List(ctx context.Context, startingPath, endingPath paths.Path) (
listPaths []paths.Path, truncated bool, err error) {

pathsResp, truncated, err := s.pdb.List(
ctx, startingPath, 0, nil)
// TODO: remove this chunk after pointerdb client interface merged
lr := &ppb.ListRequest{
StartingPathKey: []byte(startingPath.String()),
// TODO: change limit to endingPath when supported
Limit: 1,
APIKey: nil,
}

res, err := s.pdb.List(ctx, lr)
if err != nil {
return nil, false, err
}

for _, path := range pathsResp {
for _, path := range res.Paths {
np := paths.New(string(path[:]))
listPaths = append(listPaths, np)
}

return listPaths, truncated, nil
return listPaths, res.Truncated, nil
}

0 comments on commit 8388326

Please sign in to comment.