Skip to content

Commit

Permalink
Refactor basystem (#641)
Browse files Browse the repository at this point in the history
* Signature verification

* Clean up agreement sender to have less errors

* overlay address in captnplanet

* Refactor bandwidth.proto to not use streams

* Make sure the send worked

* Handle connection to satellite

* Save renter public key inside of renter bandwidth allocations

* Default diag to sqlite. Make configurable

* Separate bw server and dbm; regenerate dbx files

* Make sure test uses protobufs

* Demonstrate creating bandwidth allocations
  • Loading branch information
aleitner committed Nov 15, 2018
1 parent c4b90f8 commit 5e93775
Show file tree
Hide file tree
Showing 18 changed files with 741 additions and 324 deletions.
3 changes: 3 additions & 0 deletions cmd/captplanet/run.go
Expand Up @@ -14,6 +14,7 @@ import (

"storj.io/storj/pkg/audit"
"storj.io/storj/pkg/auth/grpcauth"
"storj.io/storj/pkg/bwagreement"
"storj.io/storj/pkg/cfgstruct"
"storj.io/storj/pkg/datarepair/checker"
"storj.io/storj/pkg/datarepair/repairer"
Expand Down Expand Up @@ -44,6 +45,7 @@ type Satellite struct {
Repairer repairer.Config
Audit audit.Config
StatDB statdb.Config
BwAgreement bwagreement.Config
Web satelliteweb.Config
MockOverlay struct {
Enabled bool `default:"true" help:"if false, use real overlay"`
Expand Down Expand Up @@ -149,6 +151,7 @@ func cmdRun(cmd *cobra.Command, args []string) (err error) {
// TODO(coyle): re-enable the checker after we determine why it is panicing
// runCfg.Satellite.Checker,
runCfg.Satellite.Repairer,
runCfg.Satellite.BwAgreement,
runCfg.Satellite.Web,
)
}()
Expand Down
28 changes: 15 additions & 13 deletions cmd/satellite/main.go
Expand Up @@ -6,18 +6,19 @@ package main
import (
"context"
"fmt"
"net/url"
"os"
"path/filepath"
"sort"
"text/tabwriter"

"github.com/golang/protobuf/proto"
"github.com/spf13/cobra"

"go.uber.org/zap"
"github.com/zeebo/errs"

"storj.io/storj/pkg/auth/grpcauth"
"storj.io/storj/pkg/bwagreement"
"storj.io/storj/pkg/bwagreement/database-manager"
"storj.io/storj/pkg/cfgstruct"
"storj.io/storj/pkg/kademlia"
"storj.io/storj/pkg/overlay"
Expand Down Expand Up @@ -51,11 +52,9 @@ var (
}

runCfg struct {
Identity provider.IdentityConfig
Kademlia kademlia.Config
PointerDB pointerdb.Config
// Checker checker.Config
// Repairer repairer.Config
Identity provider.IdentityConfig
Kademlia kademlia.Config
PointerDB pointerdb.Config
Overlay overlay.Config
MockOverlay mockOverlay.Config
StatDB statdb.Config
Expand All @@ -72,11 +71,10 @@ var (
Overwrite bool `default:"false" help:"whether to overwrite pre-existing configuration files"`
}
diagCfg struct {
BasePath string `default:"$CONFDIR" help:"base path for setup"`
DatabaseURL string `help:"the database connection string to use" default:"sqlite3://$CONFDIR/bw.db"`
}

defaultConfDir = "$HOME/.storj/satellite"
defaultDiagDir = "postgres://postgres@localhost/storj?sslmode=disable"
)

func init() {
Expand All @@ -85,7 +83,7 @@ func init() {
rootCmd.AddCommand(diagCmd)
cfgstruct.Bind(runCmd.Flags(), &runCfg, cfgstruct.ConfDir(defaultConfDir))
cfgstruct.Bind(setupCmd.Flags(), &setupCfg, cfgstruct.ConfDir(defaultConfDir))
cfgstruct.Bind(diagCmd.Flags(), &diagCfg, cfgstruct.ConfDir(defaultDiagDir))
cfgstruct.Bind(diagCmd.Flags(), &diagCfg, cfgstruct.ConfDir(defaultConfDir))
}

func cmdRun(cmd *cobra.Command, args []string) (err error) {
Expand Down Expand Up @@ -152,16 +150,20 @@ func cmdSetup(cmd *cobra.Command, args []string) (err error) {

func cmdDiag(cmd *cobra.Command, args []string) (err error) {
// open the psql db
dbpath := diagCfg.BasePath
dbm, err := bwagreement.NewDBManager("postgres", dbpath, zap.NewNop())
u, err := url.Parse(diagCfg.DatabaseURL)
if err != nil {
return errs.New("Invalid Database URL: %+v", err)
}

dbm, err := dbmanager.NewDBManager(u.Scheme, u.Path)
if err != nil {
return err
}

//get all bandwidth aggrements rows already ordered
baRows, err := dbm.GetBandwidthAllocations(context.Background())
if err != nil {
fmt.Printf("error reading satellite database %v: %v\n", dbpath, err)
fmt.Printf("error reading satellite database %v: %v\n", u.Path, err)
return err
}

Expand Down
22 changes: 18 additions & 4 deletions pkg/bwagreement/config.go
Expand Up @@ -5,11 +5,14 @@ package bwagreement

import (
"context"
"net/url"

"github.com/zeebo/errs"
"go.uber.org/zap"

monkit "gopkg.in/spacemonkeygo/monkit.v2"

"storj.io/storj/pkg/bwagreement/database-manager"
"storj.io/storj/pkg/pb"
"storj.io/storj/pkg/provider"
)
Expand All @@ -21,18 +24,29 @@ var (
// Config is a configuration struct that is everything you need to start an
// agreement receiver responsibility
type Config struct {
DatabaseURL string `help:"the database connection string to use" default:"postgres://postgres@localhost/storj?sslmode=disable"`
DatabaseDriver string `help:"the database driver to use" default:"postgres"`
DatabaseURL string `help:"the database connection string to use" default:"sqlite3://$CONFDIR/bw.db"`
}

// Run implements the provider.Responsibility interface
func (c Config) Run(ctx context.Context, server *provider.Provider) (err error) {
defer mon.Task()(&ctx)(&err)
k := server.Identity().Leaf.PublicKey

ns, err := NewServer(c.DatabaseDriver, c.DatabaseURL, zap.L(), k)
zap.S().Debug("Starting Bandwidth Agreement Receiver...")

u, err := url.Parse(c.DatabaseURL)
if err != nil {
return errs.New("Invalid Database URL: %+v", err)
}

dbm, err := dbmanager.NewDBManager(u.Scheme, u.Path)
if err != nil {
return errs.New("Error starting initializing database for Bandwidth Agreement server on satellite: %+v", err)
}

ns, err := NewServer(dbm, zap.L(), k)
if err != nil {
return err
return errs.New("Error starting Bandwidth Agreement server on satellite: %+v", err)
}

pb.RegisterBandwidthServer(server.GRPC(), ns)
Expand Down
77 changes: 77 additions & 0 deletions pkg/bwagreement/database-manager/database-manager.go
@@ -0,0 +1,77 @@
// Copyright (C) 2018 Storj Labs, Inc.
// See LICENSE for copying information.

package dbmanager

import (
"context"
"sync"

"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

monkit "gopkg.in/spacemonkeygo/monkit.v2"

"storj.io/storj/internal/migrate"
dbx "storj.io/storj/pkg/bwagreement/database-manager/dbx"
"storj.io/storj/pkg/pb"
)

var (
mon = monkit.Package()
)

// DBManager is an implementation of the database access interface
type DBManager struct {
DB *dbx.DB
mu sync.Mutex
}

// NewDBManager creates a new instance of a DatabaseManager
func NewDBManager(driver, source string) (*DBManager, error) {
db, err := dbx.Open(driver, source)
if err != nil {
return nil, err
}

err = migrate.Create("bwagreement", db)
if err != nil {
return nil, err
}
return &DBManager{
DB: db,
}, nil
}

func (dbm *DBManager) locked() func() {
dbm.mu.Lock()
return dbm.mu.Unlock
}

// Create a db entry for the provided storagenode
func (dbm *DBManager) Create(ctx context.Context, createBwAgreement *pb.RenterBandwidthAllocation) (bwagreement *dbx.Bwagreement, err error) {
defer mon.Task()(&ctx)(&err)
defer dbm.locked()()

signature := createBwAgreement.GetSignature()
data := createBwAgreement.GetData()

bwagreement, err = dbm.DB.Create_Bwagreement(
ctx,
dbx.Bwagreement_Signature(signature),
dbx.Bwagreement_Data(data),
)
if err != nil {
return nil, status.Errorf(codes.Internal, err.Error())
}

return bwagreement, nil
}

// GetBandwidthAllocations all bandwidth agreements and sorts by satellite
func (dbm *DBManager) GetBandwidthAllocations(ctx context.Context) (rows []*dbx.Bwagreement, err error) {
defer mon.Task()(&ctx)(&err)
defer dbm.locked()()
rows, err = dbm.DB.All_Bwagreement(ctx)
return rows, err
}
File renamed without changes.

0 comments on commit 5e93775

Please sign in to comment.