From 5e9377524f44e5c401845ef5f58314b8030e8c66 Mon Sep 17 00:00:00 2001 From: Alexander Leitner Date: Thu, 15 Nov 2018 14:06:09 -0500 Subject: [PATCH] Refactor basystem (#641) * Signature verification * Clean up agreement sender to have less errors * overlay address in captnplanet * Refactor bandwidth.proto to not use streams * Make sure the send worked * Handle connection to satellite * Save renter public key inside of renter bandwidth allocations * Default diag to sqlite. Make configurable * Separate bw server and dbm; regenerate dbx files * Make sure test uses protobufs * Demonstrate creating bandwidth allocations --- cmd/captplanet/run.go | 3 + cmd/satellite/main.go | 28 +- pkg/bwagreement/config.go | 22 +- .../database-manager/database-manager.go | 77 +++++ .../dbx/bwagreement.dbx | 0 .../dbx/bwagreement.dbx.go | 296 ++++++++++++++++++ .../dbx/bwagreement.dbx.postgres.sql | 0 .../dbx/bwagreement.dbx.sqlite3.sql | 8 + pkg/bwagreement/database-manager/dbx/gen.go | 7 + pkg/bwagreement/dbx/gen.go | 7 - pkg/bwagreement/server.go | 149 +++------ pkg/bwagreement/server_test.go | 106 +++++-- pkg/pb/bandwidth.pb.go | 145 ++++----- pkg/pb/bandwidth.proto | 9 +- pkg/pb/piecestore.pb.go | 164 +++++----- pkg/pb/piecestore.proto | 1 + pkg/piecestore/psclient/readerwriter.go | 27 ++ .../agreementsender/agreementsender.go | 16 +- 18 files changed, 741 insertions(+), 324 deletions(-) create mode 100644 pkg/bwagreement/database-manager/database-manager.go rename pkg/bwagreement/{ => database-manager}/dbx/bwagreement.dbx (100%) rename pkg/bwagreement/{ => database-manager}/dbx/bwagreement.dbx.go (71%) rename pkg/bwagreement/{ => database-manager}/dbx/bwagreement.dbx.postgres.sql (100%) create mode 100644 pkg/bwagreement/database-manager/dbx/bwagreement.dbx.sqlite3.sql create mode 100644 pkg/bwagreement/database-manager/dbx/gen.go delete mode 100644 pkg/bwagreement/dbx/gen.go diff --git a/cmd/captplanet/run.go b/cmd/captplanet/run.go index 05bcea90c17e..cd9e49754431 100644 --- a/cmd/captplanet/run.go +++ b/cmd/captplanet/run.go @@ -14,6 +14,7 @@ import ( "storj.io/storj/pkg/audit" "storj.io/storj/pkg/auth/grpcauth" + "storj.io/storj/pkg/bwagreement" "storj.io/storj/pkg/cfgstruct" "storj.io/storj/pkg/datarepair/checker" "storj.io/storj/pkg/datarepair/repairer" @@ -44,6 +45,7 @@ type Satellite struct { Repairer repairer.Config Audit audit.Config StatDB statdb.Config + BwAgreement bwagreement.Config Web satelliteweb.Config MockOverlay struct { Enabled bool `default:"true" help:"if false, use real overlay"` @@ -149,6 +151,7 @@ func cmdRun(cmd *cobra.Command, args []string) (err error) { // TODO(coyle): re-enable the checker after we determine why it is panicing // runCfg.Satellite.Checker, runCfg.Satellite.Repairer, + runCfg.Satellite.BwAgreement, runCfg.Satellite.Web, ) }() diff --git a/cmd/satellite/main.go b/cmd/satellite/main.go index c30fff9d581a..cf051ba32f4f 100644 --- a/cmd/satellite/main.go +++ b/cmd/satellite/main.go @@ -6,6 +6,7 @@ package main import ( "context" "fmt" + "net/url" "os" "path/filepath" "sort" @@ -13,11 +14,11 @@ import ( "github.com/golang/protobuf/proto" "github.com/spf13/cobra" - - "go.uber.org/zap" + "github.com/zeebo/errs" "storj.io/storj/pkg/auth/grpcauth" "storj.io/storj/pkg/bwagreement" + "storj.io/storj/pkg/bwagreement/database-manager" "storj.io/storj/pkg/cfgstruct" "storj.io/storj/pkg/kademlia" "storj.io/storj/pkg/overlay" @@ -51,11 +52,9 @@ var ( } runCfg struct { - Identity provider.IdentityConfig - Kademlia kademlia.Config - PointerDB pointerdb.Config - // Checker checker.Config - // Repairer repairer.Config + Identity provider.IdentityConfig + Kademlia kademlia.Config + PointerDB pointerdb.Config Overlay overlay.Config MockOverlay mockOverlay.Config StatDB statdb.Config @@ -72,11 +71,10 @@ var ( Overwrite bool `default:"false" help:"whether to overwrite pre-existing configuration files"` } diagCfg struct { - BasePath string `default:"$CONFDIR" help:"base path for setup"` + DatabaseURL string `help:"the database connection string to use" default:"sqlite3://$CONFDIR/bw.db"` } defaultConfDir = "$HOME/.storj/satellite" - defaultDiagDir = "postgres://postgres@localhost/storj?sslmode=disable" ) func init() { @@ -85,7 +83,7 @@ func init() { rootCmd.AddCommand(diagCmd) cfgstruct.Bind(runCmd.Flags(), &runCfg, cfgstruct.ConfDir(defaultConfDir)) cfgstruct.Bind(setupCmd.Flags(), &setupCfg, cfgstruct.ConfDir(defaultConfDir)) - cfgstruct.Bind(diagCmd.Flags(), &diagCfg, cfgstruct.ConfDir(defaultDiagDir)) + cfgstruct.Bind(diagCmd.Flags(), &diagCfg, cfgstruct.ConfDir(defaultConfDir)) } func cmdRun(cmd *cobra.Command, args []string) (err error) { @@ -152,8 +150,12 @@ func cmdSetup(cmd *cobra.Command, args []string) (err error) { func cmdDiag(cmd *cobra.Command, args []string) (err error) { // open the psql db - dbpath := diagCfg.BasePath - dbm, err := bwagreement.NewDBManager("postgres", dbpath, zap.NewNop()) + u, err := url.Parse(diagCfg.DatabaseURL) + if err != nil { + return errs.New("Invalid Database URL: %+v", err) + } + + dbm, err := dbmanager.NewDBManager(u.Scheme, u.Path) if err != nil { return err } @@ -161,7 +163,7 @@ func cmdDiag(cmd *cobra.Command, args []string) (err error) { //get all bandwidth aggrements rows already ordered baRows, err := dbm.GetBandwidthAllocations(context.Background()) if err != nil { - fmt.Printf("error reading satellite database %v: %v\n", dbpath, err) + fmt.Printf("error reading satellite database %v: %v\n", u.Path, err) return err } diff --git a/pkg/bwagreement/config.go b/pkg/bwagreement/config.go index 0695d6e08138..d1c808c5fb90 100644 --- a/pkg/bwagreement/config.go +++ b/pkg/bwagreement/config.go @@ -5,11 +5,14 @@ package bwagreement import ( "context" + "net/url" + "github.com/zeebo/errs" "go.uber.org/zap" monkit "gopkg.in/spacemonkeygo/monkit.v2" + "storj.io/storj/pkg/bwagreement/database-manager" "storj.io/storj/pkg/pb" "storj.io/storj/pkg/provider" ) @@ -21,8 +24,7 @@ var ( // Config is a configuration struct that is everything you need to start an // agreement receiver responsibility type Config struct { - DatabaseURL string `help:"the database connection string to use" default:"postgres://postgres@localhost/storj?sslmode=disable"` - DatabaseDriver string `help:"the database driver to use" default:"postgres"` + DatabaseURL string `help:"the database connection string to use" default:"sqlite3://$CONFDIR/bw.db"` } // Run implements the provider.Responsibility interface @@ -30,9 +32,21 @@ func (c Config) Run(ctx context.Context, server *provider.Provider) (err error) defer mon.Task()(&ctx)(&err) k := server.Identity().Leaf.PublicKey - ns, err := NewServer(c.DatabaseDriver, c.DatabaseURL, zap.L(), k) + zap.S().Debug("Starting Bandwidth Agreement Receiver...") + + u, err := url.Parse(c.DatabaseURL) + if err != nil { + return errs.New("Invalid Database URL: %+v", err) + } + + dbm, err := dbmanager.NewDBManager(u.Scheme, u.Path) + if err != nil { + return errs.New("Error starting initializing database for Bandwidth Agreement server on satellite: %+v", err) + } + + ns, err := NewServer(dbm, zap.L(), k) if err != nil { - return err + return errs.New("Error starting Bandwidth Agreement server on satellite: %+v", err) } pb.RegisterBandwidthServer(server.GRPC(), ns) diff --git a/pkg/bwagreement/database-manager/database-manager.go b/pkg/bwagreement/database-manager/database-manager.go new file mode 100644 index 000000000000..bc67afdb1742 --- /dev/null +++ b/pkg/bwagreement/database-manager/database-manager.go @@ -0,0 +1,77 @@ +// Copyright (C) 2018 Storj Labs, Inc. +// See LICENSE for copying information. + +package dbmanager + +import ( + "context" + "sync" + + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" + + monkit "gopkg.in/spacemonkeygo/monkit.v2" + + "storj.io/storj/internal/migrate" + dbx "storj.io/storj/pkg/bwagreement/database-manager/dbx" + "storj.io/storj/pkg/pb" +) + +var ( + mon = monkit.Package() +) + +// DBManager is an implementation of the database access interface +type DBManager struct { + DB *dbx.DB + mu sync.Mutex +} + +// NewDBManager creates a new instance of a DatabaseManager +func NewDBManager(driver, source string) (*DBManager, error) { + db, err := dbx.Open(driver, source) + if err != nil { + return nil, err + } + + err = migrate.Create("bwagreement", db) + if err != nil { + return nil, err + } + return &DBManager{ + DB: db, + }, nil +} + +func (dbm *DBManager) locked() func() { + dbm.mu.Lock() + return dbm.mu.Unlock +} + +// Create a db entry for the provided storagenode +func (dbm *DBManager) Create(ctx context.Context, createBwAgreement *pb.RenterBandwidthAllocation) (bwagreement *dbx.Bwagreement, err error) { + defer mon.Task()(&ctx)(&err) + defer dbm.locked()() + + signature := createBwAgreement.GetSignature() + data := createBwAgreement.GetData() + + bwagreement, err = dbm.DB.Create_Bwagreement( + ctx, + dbx.Bwagreement_Signature(signature), + dbx.Bwagreement_Data(data), + ) + if err != nil { + return nil, status.Errorf(codes.Internal, err.Error()) + } + + return bwagreement, nil +} + +// GetBandwidthAllocations all bandwidth agreements and sorts by satellite +func (dbm *DBManager) GetBandwidthAllocations(ctx context.Context) (rows []*dbx.Bwagreement, err error) { + defer mon.Task()(&ctx)(&err) + defer dbm.locked()() + rows, err = dbm.DB.All_Bwagreement(ctx) + return rows, err +} diff --git a/pkg/bwagreement/dbx/bwagreement.dbx b/pkg/bwagreement/database-manager/dbx/bwagreement.dbx similarity index 100% rename from pkg/bwagreement/dbx/bwagreement.dbx rename to pkg/bwagreement/database-manager/dbx/bwagreement.dbx diff --git a/pkg/bwagreement/dbx/bwagreement.dbx.go b/pkg/bwagreement/database-manager/dbx/bwagreement.dbx.go similarity index 71% rename from pkg/bwagreement/dbx/bwagreement.dbx.go rename to pkg/bwagreement/database-manager/dbx/bwagreement.dbx.go index 19abf1fca057..6e51f6a7a095 100644 --- a/pkg/bwagreement/dbx/bwagreement.dbx.go +++ b/pkg/bwagreement/database-manager/dbx/bwagreement.dbx.go @@ -18,6 +18,8 @@ import ( "unicode" "github.com/lib/pq" + + "github.com/mattn/go-sqlite3" ) // Prevent conditional imports from causing build failures @@ -142,6 +144,8 @@ func Open(driver, source string) (db *DB, err error) { switch driver { case "postgres": sql_db, err = openpostgres(source) + case "sqlite3": + sql_db, err = opensqlite3(source) default: return nil, unsupportedDriver(driver) } @@ -166,6 +170,8 @@ func Open(driver, source string) (db *DB, err error) { switch driver { case "postgres": db.dbMethods = newpostgres(db) + case "sqlite3": + db.dbMethods = newsqlite3(db) default: return nil, unsupportedDriver(driver) } @@ -297,6 +303,75 @@ func postgresLogStmt(stmt string, args ...interface{}) { } } +type sqlite3Impl struct { + db *DB + dialect __sqlbundle_sqlite3 + driver driver +} + +func (obj *sqlite3Impl) Rebind(s string) string { + return obj.dialect.Rebind(s) +} + +func (obj *sqlite3Impl) logStmt(stmt string, args ...interface{}) { + sqlite3LogStmt(stmt, args...) +} + +func (obj *sqlite3Impl) makeErr(err error) error { + constraint, ok := obj.isConstraintError(err) + if ok { + return constraintViolation(err, constraint) + } + return makeErr(err) +} + +type sqlite3DB struct { + db *DB + *sqlite3Impl +} + +func newsqlite3(db *DB) *sqlite3DB { + return &sqlite3DB{ + db: db, + sqlite3Impl: &sqlite3Impl{ + db: db, + driver: db.DB, + }, + } +} + +func (obj *sqlite3DB) Schema() string { + return `CREATE TABLE bwagreements ( + signature BLOB NOT NULL, + data BLOB NOT NULL, + created_at TIMESTAMP NOT NULL, + PRIMARY KEY ( signature ) +);` +} + +func (obj *sqlite3DB) wrapTx(tx *sql.Tx) txMethods { + return &sqlite3Tx{ + dialectTx: dialectTx{tx: tx}, + sqlite3Impl: &sqlite3Impl{ + db: obj.db, + driver: tx, + }, + } +} + +type sqlite3Tx struct { + dialectTx + *sqlite3Impl +} + +func sqlite3LogStmt(stmt string, args ...interface{}) { + // TODO: render placeholders + if Logger != nil { + out := fmt.Sprintf("stmt: %s\nargs: %v\n", stmt, pretty(args)) + Logger(out) + } +} + type pretty []interface{} func (p pretty) Format(f fmt.State, c rune) { @@ -733,6 +808,198 @@ func (obj *postgresImpl) deleteAll(ctx context.Context) (count int64, err error) } +func (obj *sqlite3Impl) Create_Bwagreement(ctx context.Context, + bwagreement_signature Bwagreement_Signature_Field, + bwagreement_data Bwagreement_Data_Field) ( + bwagreement *Bwagreement, err error) { + + __now := obj.db.Hooks.Now().UTC() + __signature_val := bwagreement_signature.value() + __data_val := bwagreement_data.value() + __created_at_val := __now + + var __embed_stmt = __sqlbundle_Literal("INSERT INTO bwagreements ( signature, data, created_at ) VALUES ( ?, ?, ? )") + + var __stmt = __sqlbundle_Render(obj.dialect, __embed_stmt) + obj.logStmt(__stmt, __signature_val, __data_val, __created_at_val) + + __res, err := obj.driver.Exec(__stmt, __signature_val, __data_val, __created_at_val) + if err != nil { + return nil, obj.makeErr(err) + } + __pk, err := __res.LastInsertId() + if err != nil { + return nil, obj.makeErr(err) + } + return obj.getLastBwagreement(ctx, __pk) + +} + +func (obj *sqlite3Impl) Get_Bwagreement_By_Signature(ctx context.Context, + bwagreement_signature Bwagreement_Signature_Field) ( + bwagreement *Bwagreement, err error) { + + var __embed_stmt = __sqlbundle_Literal("SELECT bwagreements.signature, bwagreements.data, bwagreements.created_at FROM bwagreements WHERE bwagreements.signature = ?") + + var __values []interface{} + __values = append(__values, bwagreement_signature.value()) + + var __stmt = __sqlbundle_Render(obj.dialect, __embed_stmt) + obj.logStmt(__stmt, __values...) + + bwagreement = &Bwagreement{} + err = obj.driver.QueryRow(__stmt, __values...).Scan(&bwagreement.Signature, &bwagreement.Data, &bwagreement.CreatedAt) + if err != nil { + return nil, obj.makeErr(err) + } + return bwagreement, nil + +} + +func (obj *sqlite3Impl) Limited_Bwagreement(ctx context.Context, + limit int, offset int64) ( + rows []*Bwagreement, err error) { + + var __embed_stmt = __sqlbundle_Literal("SELECT bwagreements.signature, bwagreements.data, bwagreements.created_at FROM bwagreements LIMIT ? OFFSET ?") + + var __values []interface{} + __values = append(__values) + + __values = append(__values, limit, offset) + + var __stmt = __sqlbundle_Render(obj.dialect, __embed_stmt) + obj.logStmt(__stmt, __values...) + + __rows, err := obj.driver.Query(__stmt, __values...) + if err != nil { + return nil, obj.makeErr(err) + } + defer __rows.Close() + + for __rows.Next() { + bwagreement := &Bwagreement{} + err = __rows.Scan(&bwagreement.Signature, &bwagreement.Data, &bwagreement.CreatedAt) + if err != nil { + return nil, obj.makeErr(err) + } + rows = append(rows, bwagreement) + } + if err := __rows.Err(); err != nil { + return nil, obj.makeErr(err) + } + return rows, nil + +} + +func (obj *sqlite3Impl) All_Bwagreement(ctx context.Context) ( + rows []*Bwagreement, err error) { + + var __embed_stmt = __sqlbundle_Literal("SELECT bwagreements.signature, bwagreements.data, bwagreements.created_at FROM bwagreements") + + var __values []interface{} + __values = append(__values) + + var __stmt = __sqlbundle_Render(obj.dialect, __embed_stmt) + obj.logStmt(__stmt, __values...) + + __rows, err := obj.driver.Query(__stmt, __values...) + if err != nil { + return nil, obj.makeErr(err) + } + defer __rows.Close() + + for __rows.Next() { + bwagreement := &Bwagreement{} + err = __rows.Scan(&bwagreement.Signature, &bwagreement.Data, &bwagreement.CreatedAt) + if err != nil { + return nil, obj.makeErr(err) + } + rows = append(rows, bwagreement) + } + if err := __rows.Err(); err != nil { + return nil, obj.makeErr(err) + } + return rows, nil + +} + +func (obj *sqlite3Impl) Delete_Bwagreement_By_Signature(ctx context.Context, + bwagreement_signature Bwagreement_Signature_Field) ( + deleted bool, err error) { + + var __embed_stmt = __sqlbundle_Literal("DELETE FROM bwagreements WHERE bwagreements.signature = ?") + + var __values []interface{} + __values = append(__values, bwagreement_signature.value()) + + var __stmt = __sqlbundle_Render(obj.dialect, __embed_stmt) + obj.logStmt(__stmt, __values...) + + __res, err := obj.driver.Exec(__stmt, __values...) + if err != nil { + return false, obj.makeErr(err) + } + + __count, err := __res.RowsAffected() + if err != nil { + return false, obj.makeErr(err) + } + + return __count > 0, nil + +} + +func (obj *sqlite3Impl) getLastBwagreement(ctx context.Context, + pk int64) ( + bwagreement *Bwagreement, err error) { + + var __embed_stmt = __sqlbundle_Literal("SELECT bwagreements.signature, bwagreements.data, bwagreements.created_at FROM bwagreements WHERE _rowid_ = ?") + + var __stmt = __sqlbundle_Render(obj.dialect, __embed_stmt) + obj.logStmt(__stmt, pk) + + bwagreement = &Bwagreement{} + err = obj.driver.QueryRow(__stmt, pk).Scan(&bwagreement.Signature, &bwagreement.Data, &bwagreement.CreatedAt) + if err != nil { + return nil, obj.makeErr(err) + } + return bwagreement, nil + +} + +func (impl sqlite3Impl) isConstraintError(err error) ( + constraint string, ok bool) { + if e, ok := err.(sqlite3.Error); ok { + if e.Code == sqlite3.ErrConstraint { + msg := err.Error() + colon := strings.LastIndex(msg, ":") + if colon != -1 { + return strings.TrimSpace(msg[colon:]), true + } + return "", true + } + } + return "", false +} + +func (obj *sqlite3Impl) deleteAll(ctx context.Context) (count int64, err error) { + var __res sql.Result + var __count int64 + __res, err = obj.driver.Exec("DELETE FROM bwagreements;") + if err != nil { + return 0, obj.makeErr(err) + } + + __count, err = __res.RowsAffected() + if err != nil { + return 0, obj.makeErr(err) + } + count += __count + + return count, nil + +} + type Rx struct { db *DB tx *Tx @@ -880,3 +1147,32 @@ type dbMethods interface { func openpostgres(source string) (*sql.DB, error) { return sql.Open("postgres", source) } + +var sqlite3DriverName = "sqlite3_" + fmt.Sprint(time.Now().UnixNano()) + +func init() { + sql.Register(sqlite3DriverName, &sqlite3.SQLiteDriver{ + ConnectHook: sqlite3SetupConn, + }) +} + +// SQLite3JournalMode controls the journal_mode pragma for all new connections. +// Since it is read without a mutex, it must be changed to the value you want +// before any Open calls. +var SQLite3JournalMode = "WAL" + +func sqlite3SetupConn(conn *sqlite3.SQLiteConn) (err error) { + _, err = conn.Exec("PRAGMA foreign_keys = ON", nil) + if err != nil { + return makeErr(err) + } + _, err = conn.Exec("PRAGMA journal_mode = "+SQLite3JournalMode, nil) + if err != nil { + return makeErr(err) + } + return nil +} + +func opensqlite3(source string) (*sql.DB, error) { + return sql.Open(sqlite3DriverName, source) +} diff --git a/pkg/bwagreement/dbx/bwagreement.dbx.postgres.sql b/pkg/bwagreement/database-manager/dbx/bwagreement.dbx.postgres.sql similarity index 100% rename from pkg/bwagreement/dbx/bwagreement.dbx.postgres.sql rename to pkg/bwagreement/database-manager/dbx/bwagreement.dbx.postgres.sql diff --git a/pkg/bwagreement/database-manager/dbx/bwagreement.dbx.sqlite3.sql b/pkg/bwagreement/database-manager/dbx/bwagreement.dbx.sqlite3.sql new file mode 100644 index 000000000000..b70244da0fbe --- /dev/null +++ b/pkg/bwagreement/database-manager/dbx/bwagreement.dbx.sqlite3.sql @@ -0,0 +1,8 @@ +-- AUTOGENERATED BY gopkg.in/spacemonkeygo/dbx.v1 +-- DO NOT EDIT +CREATE TABLE bwagreements ( + signature BLOB NOT NULL, + data BLOB NOT NULL, + created_at TIMESTAMP NOT NULL, + PRIMARY KEY ( signature ) +); diff --git a/pkg/bwagreement/database-manager/dbx/gen.go b/pkg/bwagreement/database-manager/dbx/gen.go new file mode 100644 index 000000000000..9445d64adc03 --- /dev/null +++ b/pkg/bwagreement/database-manager/dbx/gen.go @@ -0,0 +1,7 @@ +// Copyright (C) 2018 Storj Labs, Inc. +// See LICENSE for copying information. + +package bwagreement + +//go:generate dbx.v1 golang -d postgres -d sqlite3 bwagreement.dbx . +//go:generate dbx.v1 schema -d postgres -d sqlite3 bwagreement.dbx . diff --git a/pkg/bwagreement/dbx/gen.go b/pkg/bwagreement/dbx/gen.go deleted file mode 100644 index 3e2c8a78e336..000000000000 --- a/pkg/bwagreement/dbx/gen.go +++ /dev/null @@ -1,7 +0,0 @@ -// Copyright (C) 2018 Storj Labs, Inc. -// See LICENSE for copying information. - -package bwagreement - -//go:generate dbx.v1 golang -d postgres bwagreement.dbx . -//go:generate dbx.v1 schema -d postgres bwagreement.dbx . diff --git a/pkg/bwagreement/server.go b/pkg/bwagreement/server.go index a296b6a77105..c68348266c28 100644 --- a/pkg/bwagreement/server.go +++ b/pkg/bwagreement/server.go @@ -7,68 +7,32 @@ import ( "context" "crypto" "crypto/ecdsa" - "sync" + "crypto/x509" "github.com/golang/protobuf/proto" "github.com/gtank/cryptopasta" "go.uber.org/zap" - "google.golang.org/grpc/codes" - "google.golang.org/grpc/status" - "storj.io/storj/internal/migrate" - dbx "storj.io/storj/pkg/bwagreement/dbx" + "storj.io/storj/pkg/bwagreement/database-manager" "storj.io/storj/pkg/pb" "storj.io/storj/pkg/peertls" - "storj.io/storj/pkg/provider" ) -// OK - Success! -const OK = "OK" - // Server is an implementation of the pb.BandwidthServer interface type Server struct { - dbm *DBManager + dbm *dbmanager.DBManager pkey crypto.PublicKey logger *zap.Logger } -// DBManager is an implementation of the database access interface -type DBManager struct { - DB *dbx.DB - mu sync.Mutex - logger *zap.Logger -} - // Agreement is a struct that contains a bandwidth agreement and the associated signature type Agreement struct { Agreement []byte Signature []byte } -// NewDBManager creates a new instance of a DatabaseManager -func NewDBManager(driver, source string, logger *zap.Logger) (*DBManager, error) { - db, err := dbx.Open(driver, source) - if err != nil { - return nil, err - } - - err = migrate.Create("bwagreement", db) - if err != nil { - return nil, err - } - return &DBManager{ - DB: db, - logger: logger, - }, nil -} - // NewServer creates instance of Server -func NewServer(driver, source string, logger *zap.Logger, pkey crypto.PublicKey) (*Server, error) { - dbm, err := NewDBManager(driver, source, logger) - if err != nil { - return nil, err - } - +func NewServer(dbm *dbmanager.DBManager, logger *zap.Logger, pkey crypto.PublicKey) (*Server, error) { return &Server{ dbm: dbm, logger: logger, @@ -76,89 +40,52 @@ func NewServer(driver, source string, logger *zap.Logger, pkey crypto.PublicKey) }, nil } -func (dbm *DBManager) locked() func() { - dbm.mu.Lock() - return dbm.mu.Unlock -} - -// Create a db entry for the provided storagenode -func (dbm *DBManager) Create(ctx context.Context, createBwAgreement *pb.RenterBandwidthAllocation) (bwagreement *dbx.Bwagreement, err error) { +// BandwidthAgreements receives and stores bandwidth agreements from storage nodes +func (s *Server) BandwidthAgreements(ctx context.Context, agreement *pb.RenterBandwidthAllocation) (reply *pb.AgreementsSummary, err error) { defer mon.Task()(&ctx)(&err) - dbm.logger.Debug("entering bwagreement Create") - signature := createBwAgreement.GetSignature() - data := createBwAgreement.GetData() + s.logger.Debug("Received Agreement...") - bwagreement, err = dbm.DB.Create_Bwagreement( - ctx, - dbx.Bwagreement_Signature(signature), - dbx.Bwagreement_Data(data), - ) - if err != nil { - return nil, status.Errorf(codes.Internal, err.Error()) + reply = &pb.AgreementsSummary{ + Status: pb.AgreementsSummary_FAIL, } - return bwagreement, nil -} - -// GetBandwidthAllocations all bandwidth agreements and sorts by satellite -func (dbm *DBManager) GetBandwidthAllocations(ctx context.Context) (rows []*dbx.Bwagreement, err error) { - defer mon.Task()(&ctx)(&err) - defer dbm.locked()() - rows, err = dbm.DB.All_Bwagreement(ctx) - return rows, err -} + if err = s.verifySignature(ctx, agreement); err != nil { + return reply, err + } -// BandwidthAgreements receives and stores bandwidth agreements from storage nodes -func (s *Server) BandwidthAgreements(stream pb.Bandwidth_BandwidthAgreementsServer) (err error) { - ctx := stream.Context() - defer mon.Task()(&ctx)(&err) - defer s.dbm.locked()() - - ch := make(chan *pb.RenterBandwidthAllocation, 1) - errch := make(chan error, 1) - go func() { - for { - msg, err := stream.Recv() - if err != nil { - s.logger.Error("Grpc Receive Error", zap.Error(err)) - errch <- err - return - } - ch <- msg - } - }() - - for { - select { - case err := <-errch: - return err - case <-ctx.Done(): - return nil - case agreement := <-ch: - if err = s.verifySignature(ctx, agreement); err != nil { - return err - } - _, err = s.dbm.Create(ctx, agreement) - if err != nil { - s.logger.Error("DB entry creation Error", zap.Error(err)) - return err - } - } + _, err = s.dbm.Create(ctx, agreement) + if err != nil { + return reply, err } + reply.Status = pb.AgreementsSummary_OK + + s.logger.Debug("Stored Agreement...") + + return reply, nil } func (s *Server) verifySignature(ctx context.Context, ba *pb.RenterBandwidthAllocation) error { // TODO(security): detect replay attacks - pi, err := provider.PeerIdentityFromContext(ctx) + + //Deserealize RenterBandwidthAllocation.GetData() so we can get public key + rbad := &pb.RenterBandwidthAllocation_Data{} + if err := proto.Unmarshal(ba.GetData(), rbad); err != nil { + return BwAgreementError.New("Failed to unmarshal RenterBandwidthAllocation: %+v", err) + } + + // Extract renter's public key from RenterBandwidthAllocation_Data + // TODO: Look this public key up in a database + pubkey, err := x509.ParsePKIXPublicKey(rbad.GetPubKey()) if err != nil { - return err + return BwAgreementError.New("Failed to extract Public Key from RenterBandwidthAllocation: %+v", err) } - k, ok := pi.Leaf.PublicKey.(*ecdsa.PublicKey) + // Typecast public key + k, ok := pubkey.(*ecdsa.PublicKey) if !ok { - return peertls.ErrUnsupportedKey.New("%T", pi.Leaf.PublicKey) + return peertls.ErrUnsupportedKey.New("%T", pubkey) } // verify Renter's (uplink) signature @@ -166,19 +93,13 @@ func (s *Server) verifySignature(ctx context.Context, ba *pb.RenterBandwidthAllo return BwAgreementError.New("Failed to verify Renter's Signature") } - // deserializing pbad you get satelliteID, uplinkID, max size, exp, serial# & action - pba := &pb.PayerBandwidthAllocation{} - if err := proto.Unmarshal(ba.GetData(), pba); err != nil { - return err - } - k, ok = s.pkey.(*ecdsa.PublicKey) if !ok { return peertls.ErrUnsupportedKey.New("%T", s.pkey) } // verify Payer's (satellite) signature - if ok := cryptopasta.Verify(pba.GetData(), pba.GetSignature(), k); !ok { + if ok := cryptopasta.Verify(rbad.GetPayerAllocation().GetData(), rbad.GetPayerAllocation().GetSignature(), k); !ok { return BwAgreementError.New("Failed to verify Payer's Signature") } return nil diff --git a/pkg/bwagreement/server_test.go b/pkg/bwagreement/server_test.go index 5fcf805752f2..a3733db84642 100644 --- a/pkg/bwagreement/server_test.go +++ b/pkg/bwagreement/server_test.go @@ -7,20 +7,24 @@ import ( "context" "crypto" "crypto/ecdsa" + "crypto/x509" "flag" "log" "net" "os" "testing" + "time" + "github.com/gogo/protobuf/proto" "github.com/gtank/cryptopasta" + "github.com/stretchr/testify/assert" + "github.com/zeebo/errs" + "go.uber.org/zap" "google.golang.org/grpc" - "github.com/stretchr/testify/assert" - + "storj.io/storj/pkg/bwagreement/database-manager" "storj.io/storj/pkg/pb" - "storj.io/storj/pkg/peertls" "storj.io/storj/pkg/provider" ) @@ -32,25 +36,15 @@ func TestBandwidthAgreements(t *testing.T) { TS := NewTestServer(t) defer TS.Stop() - signature := []byte("iamthedummysignatureoftypebyteslice") - data := []byte("iamthedummydataoftypebyteslice") - - msg := &pb.RenterBandwidthAllocation{ - Signature: signature, - Data: data, - } + pba, err := generatePayerBandwidthAllocation(pb.PayerBandwidthAllocation_GET, TS.k) + assert.NoError(t, err) - s, err := cryptopasta.Sign(msg.Data, TS.k.(*ecdsa.PrivateKey)) + rba, err := generateRenterBandwidthAllocation(pba, TS.k) assert.NoError(t, err) - msg.Signature = s /* emulate sending the bwagreement stream from piecestore node */ - stream, err := TS.c.BandwidthAgreements(ctx) + _, err = TS.c.BandwidthAgreements(ctx, rba) assert.NoError(t, err) - err = stream.Send(msg) - assert.NoError(t, err) - - _, _ = stream.CloseAndRecv() } type TestServer struct { @@ -82,7 +76,7 @@ func NewTestServer(t *testing.T) *TestServer { co, err := fiC.DialOption("") check(err) - s := newTestServerStruct(t) + s := newTestServerStruct(t, fiC.Key) grpcs := grpc.NewServer(so) k, ok := fiC.Key.(*ecdsa.PrivateKey) @@ -104,16 +98,18 @@ var ( testPostgres = flag.String("postgres-test-db", os.Getenv("STORJ_POSTGRES_TEST"), "PostgreSQL test database connection string") ) -func newTestServerStruct(t *testing.T) *Server { +func newTestServerStruct(t *testing.T, k crypto.PrivateKey) *Server { if *testPostgres == "" { t.Skipf("postgres flag missing, example:\n-postgres-test-db=%s", defaultPostgresConn) } - k, err := peertls.NewKey() - assert.NoError(t, err) + dbm, err := dbmanager.NewDBManager("postgres", *testPostgres) + if err != nil { + t.Fatalf("Failed to initialize dbmanager when creating test server: %+v", err) + } p, _ := k.(*ecdsa.PrivateKey) - server, err := NewServer("postgres", *testPostgres, zap.NewNop(), &p.PublicKey) + server, err := NewServer(dbm, zap.NewNop(), &p.PublicKey) if err != nil { t.Fatal(err) } @@ -146,6 +142,72 @@ func connect(addr string, o ...grpc.DialOption) (pb.BandwidthClient, *grpc.Clien return c, conn } +func generatePayerBandwidthAllocation(action pb.PayerBandwidthAllocation_Action, satelliteKey crypto.PrivateKey) (*pb.PayerBandwidthAllocation, error) { + satelliteKeyEcdsa, ok := satelliteKey.(*ecdsa.PrivateKey) + if !ok { + return nil, errs.New("Satellite Private Key is not a valid *ecdsa.PrivateKey") + } + + // Generate PayerBandwidthAllocation_Data + data, _ := proto.Marshal( + &pb.PayerBandwidthAllocation_Data{ + SatelliteId: []byte("SatelliteID"), + UplinkId: []byte("UplinkID"), + ExpirationUnixSec: time.Now().Add(time.Hour * 24 * 10).Unix(), + SerialNumber: "SerialNumber", + Action: action, + CreatedUnixSec: time.Now().Unix(), + }, + ) + + // Sign the PayerBandwidthAllocation_Data with the "Satellite" Private Key + s, err := cryptopasta.Sign(data, satelliteKeyEcdsa) + if err != nil { + return nil, errs.New("Failed to sign PayerBandwidthAllocation_Data with satellite Private Key: %+v", err) + } + + // Combine Signature and Data for PayerBandwidthAllocation + return &pb.PayerBandwidthAllocation{ + Data: data, + Signature: s, + }, nil +} + +func generateRenterBandwidthAllocation(pba *pb.PayerBandwidthAllocation, uplinkKey crypto.PrivateKey) (*pb.RenterBandwidthAllocation, error) { + // get "Uplink" Public Key + uplinkKeyEcdsa, ok := uplinkKey.(*ecdsa.PrivateKey) + if !ok { + return nil, errs.New("Uplink Private Key is not a valid *ecdsa.PrivateKey") + } + + pubbytes, err := x509.MarshalPKIXPublicKey(&uplinkKeyEcdsa.PublicKey) + if err != nil { + return nil, errs.New("Could not generate byte array from Uplink Public key: %+v", err) + } + + // Generate RenterBandwidthAllocation_Data + data, _ := proto.Marshal( + &pb.RenterBandwidthAllocation_Data{ + PayerAllocation: pba, + PubKey: pubbytes, // TODO: Take this out. It will be kept in a database on the satellite + StorageNodeId: []byte("StorageNodeID"), + Total: int64(666), + }, + ) + + // Sign the PayerBandwidthAllocation_Data with the "Uplink" Private Key + s, err := cryptopasta.Sign(data, uplinkKeyEcdsa) + if err != nil { + return nil, errs.New("Failed to sign RenterBandwidthAllocation_Data with uplink Private Key: %+v", err) + } + + // Combine Signature and Data for RenterBandwidthAllocation + return &pb.RenterBandwidthAllocation{ + Signature: s, + Data: data, + }, nil +} + func (TS *TestServer) Stop() { if err := TS.conn.Close(); err != nil { panic(err) diff --git a/pkg/pb/bandwidth.pb.go b/pkg/pb/bandwidth.pb.go index 36617409a2f4..110f56f4b451 100644 --- a/pkg/pb/bandwidth.pb.go +++ b/pkg/pb/bandwidth.pb.go @@ -23,17 +23,41 @@ var _ = math.Inf // proto package needs to be updated. const _ = proto.ProtoPackageIsVersion2 // please upgrade the proto package +type AgreementsSummary_Status int32 + +const ( + AgreementsSummary_FAIL AgreementsSummary_Status = 0 + AgreementsSummary_OK AgreementsSummary_Status = 1 +) + +var AgreementsSummary_Status_name = map[int32]string{ + 0: "FAIL", + 1: "OK", +} +var AgreementsSummary_Status_value = map[string]int32{ + "FAIL": 0, + "OK": 1, +} + +func (x AgreementsSummary_Status) String() string { + return proto.EnumName(AgreementsSummary_Status_name, int32(x)) +} +func (AgreementsSummary_Status) EnumDescriptor() ([]byte, []int) { + return fileDescriptor_bandwidth_01db992f91c47bae, []int{0, 0} +} + type AgreementsSummary struct { - XXX_NoUnkeyedLiteral struct{} `json:"-"` - XXX_unrecognized []byte `json:"-"` - XXX_sizecache int32 `json:"-"` + Status AgreementsSummary_Status `protobuf:"varint,1,opt,name=status,proto3,enum=bandwidth.AgreementsSummary_Status" json:"status,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` } func (m *AgreementsSummary) Reset() { *m = AgreementsSummary{} } func (m *AgreementsSummary) String() string { return proto.CompactTextString(m) } func (*AgreementsSummary) ProtoMessage() {} func (*AgreementsSummary) Descriptor() ([]byte, []int) { - return fileDescriptor_bandwidth_99a1cbdb0d6e51ae, []int{0} + return fileDescriptor_bandwidth_01db992f91c47bae, []int{0} } func (m *AgreementsSummary) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_AgreementsSummary.Unmarshal(m, b) @@ -53,8 +77,16 @@ func (m *AgreementsSummary) XXX_DiscardUnknown() { var xxx_messageInfo_AgreementsSummary proto.InternalMessageInfo +func (m *AgreementsSummary) GetStatus() AgreementsSummary_Status { + if m != nil { + return m.Status + } + return AgreementsSummary_FAIL +} + func init() { proto.RegisterType((*AgreementsSummary)(nil), "bandwidth.AgreementsSummary") + proto.RegisterEnum("bandwidth.AgreementsSummary_Status", AgreementsSummary_Status_name, AgreementsSummary_Status_value) } // Reference imports to suppress errors if they are not otherwise used. @@ -69,7 +101,7 @@ const _ = grpc.SupportPackageIsVersion4 // // For semantics around ctx use and closing/ending streaming RPCs, please refer to https://godoc.org/google.golang.org/grpc#ClientConn.NewStream. type BandwidthClient interface { - BandwidthAgreements(ctx context.Context, opts ...grpc.CallOption) (Bandwidth_BandwidthAgreementsClient, error) + BandwidthAgreements(ctx context.Context, in *RenterBandwidthAllocation, opts ...grpc.CallOption) (*AgreementsSummary, error) } type bandwidthClient struct { @@ -80,101 +112,70 @@ func NewBandwidthClient(cc *grpc.ClientConn) BandwidthClient { return &bandwidthClient{cc} } -func (c *bandwidthClient) BandwidthAgreements(ctx context.Context, opts ...grpc.CallOption) (Bandwidth_BandwidthAgreementsClient, error) { - stream, err := c.cc.NewStream(ctx, &_Bandwidth_serviceDesc.Streams[0], "/bandwidth.Bandwidth/BandwidthAgreements", opts...) +func (c *bandwidthClient) BandwidthAgreements(ctx context.Context, in *RenterBandwidthAllocation, opts ...grpc.CallOption) (*AgreementsSummary, error) { + out := new(AgreementsSummary) + err := c.cc.Invoke(ctx, "/bandwidth.Bandwidth/BandwidthAgreements", in, out, opts...) if err != nil { return nil, err } - x := &bandwidthBandwidthAgreementsClient{stream} - return x, nil -} - -type Bandwidth_BandwidthAgreementsClient interface { - Send(*RenterBandwidthAllocation) error - CloseAndRecv() (*AgreementsSummary, error) - grpc.ClientStream -} - -type bandwidthBandwidthAgreementsClient struct { - grpc.ClientStream -} - -func (x *bandwidthBandwidthAgreementsClient) Send(m *RenterBandwidthAllocation) error { - return x.ClientStream.SendMsg(m) -} - -func (x *bandwidthBandwidthAgreementsClient) CloseAndRecv() (*AgreementsSummary, error) { - if err := x.ClientStream.CloseSend(); err != nil { - return nil, err - } - m := new(AgreementsSummary) - if err := x.ClientStream.RecvMsg(m); err != nil { - return nil, err - } - return m, nil + return out, nil } // BandwidthServer is the server API for Bandwidth service. type BandwidthServer interface { - BandwidthAgreements(Bandwidth_BandwidthAgreementsServer) error + BandwidthAgreements(context.Context, *RenterBandwidthAllocation) (*AgreementsSummary, error) } func RegisterBandwidthServer(s *grpc.Server, srv BandwidthServer) { s.RegisterService(&_Bandwidth_serviceDesc, srv) } -func _Bandwidth_BandwidthAgreements_Handler(srv interface{}, stream grpc.ServerStream) error { - return srv.(BandwidthServer).BandwidthAgreements(&bandwidthBandwidthAgreementsServer{stream}) -} - -type Bandwidth_BandwidthAgreementsServer interface { - SendAndClose(*AgreementsSummary) error - Recv() (*RenterBandwidthAllocation, error) - grpc.ServerStream -} - -type bandwidthBandwidthAgreementsServer struct { - grpc.ServerStream -} - -func (x *bandwidthBandwidthAgreementsServer) SendAndClose(m *AgreementsSummary) error { - return x.ServerStream.SendMsg(m) -} - -func (x *bandwidthBandwidthAgreementsServer) Recv() (*RenterBandwidthAllocation, error) { - m := new(RenterBandwidthAllocation) - if err := x.ServerStream.RecvMsg(m); err != nil { +func _Bandwidth_BandwidthAgreements_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(RenterBandwidthAllocation) + if err := dec(in); err != nil { return nil, err } - return m, nil + if interceptor == nil { + return srv.(BandwidthServer).BandwidthAgreements(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/bandwidth.Bandwidth/BandwidthAgreements", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(BandwidthServer).BandwidthAgreements(ctx, req.(*RenterBandwidthAllocation)) + } + return interceptor(ctx, in, info, handler) } var _Bandwidth_serviceDesc = grpc.ServiceDesc{ ServiceName: "bandwidth.Bandwidth", HandlerType: (*BandwidthServer)(nil), - Methods: []grpc.MethodDesc{}, - Streams: []grpc.StreamDesc{ + Methods: []grpc.MethodDesc{ { - StreamName: "BandwidthAgreements", - Handler: _Bandwidth_BandwidthAgreements_Handler, - ClientStreams: true, + MethodName: "BandwidthAgreements", + Handler: _Bandwidth_BandwidthAgreements_Handler, }, }, + Streams: []grpc.StreamDesc{}, Metadata: "bandwidth.proto", } -func init() { proto.RegisterFile("bandwidth.proto", fileDescriptor_bandwidth_99a1cbdb0d6e51ae) } +func init() { proto.RegisterFile("bandwidth.proto", fileDescriptor_bandwidth_01db992f91c47bae) } -var fileDescriptor_bandwidth_99a1cbdb0d6e51ae = []byte{ - // 146 bytes of a gzipped FileDescriptorProto +var fileDescriptor_bandwidth_01db992f91c47bae = []byte{ + // 196 bytes of a gzipped FileDescriptorProto 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0xe2, 0x4f, 0x4a, 0xcc, 0x4b, 0x29, 0xcf, 0x4c, 0x29, 0xc9, 0xd0, 0x2b, 0x28, 0xca, 0x2f, 0xc9, 0x17, 0xe2, 0x84, 0x0b, 0x48, - 0x09, 0x14, 0x64, 0xa6, 0x26, 0xa7, 0x16, 0x97, 0xe4, 0x17, 0xa5, 0x42, 0x24, 0x95, 0x84, 0xb9, + 0x09, 0x14, 0x64, 0xa6, 0x26, 0xa7, 0x16, 0x97, 0xe4, 0x17, 0xa5, 0x42, 0x24, 0x95, 0x72, 0xb8, 0x04, 0x1d, 0xd3, 0x8b, 0x52, 0x53, 0x73, 0x53, 0xf3, 0x4a, 0x8a, 0x83, 0x4b, 0x73, 0x73, 0x13, - 0x8b, 0x2a, 0x8d, 0x0a, 0xb9, 0x38, 0x9d, 0x60, 0x7a, 0x84, 0x52, 0xb8, 0x84, 0xe1, 0x1c, 0x84, - 0x52, 0x21, 0x6d, 0x3d, 0x84, 0x59, 0x45, 0xf9, 0xa5, 0x25, 0xa9, 0xc5, 0x7a, 0x41, 0xa9, 0x79, - 0x25, 0xa9, 0x45, 0x08, 0xc5, 0x39, 0x39, 0xf9, 0xc9, 0x89, 0x25, 0x99, 0xf9, 0x79, 0x52, 0x32, - 0x7a, 0x08, 0x47, 0x61, 0x58, 0xa7, 0xc4, 0xa0, 0xc1, 0xe8, 0xc4, 0x12, 0xc5, 0x54, 0x90, 0x94, - 0xc4, 0x06, 0x76, 0x94, 0x31, 0x20, 0x00, 0x00, 0xff, 0xff, 0x47, 0x9f, 0xcd, 0x7b, 0xc4, 0x00, - 0x00, 0x00, + 0x8b, 0x2a, 0x85, 0xac, 0xb9, 0xd8, 0x8a, 0x4b, 0x12, 0x4b, 0x4a, 0x8b, 0x25, 0x18, 0x15, 0x18, + 0x35, 0xf8, 0x8c, 0x94, 0xf5, 0x10, 0x66, 0x62, 0xa8, 0xd6, 0x0b, 0x06, 0x2b, 0x0d, 0x82, 0x6a, + 0x51, 0x92, 0xe2, 0x62, 0x83, 0x88, 0x08, 0x71, 0x70, 0xb1, 0xb8, 0x39, 0x7a, 0xfa, 0x08, 0x30, + 0x08, 0xb1, 0x71, 0x31, 0xf9, 0x7b, 0x0b, 0x30, 0x1a, 0xe5, 0x73, 0x71, 0x3a, 0xc1, 0x4c, 0x12, + 0x4a, 0xe2, 0x12, 0x86, 0x73, 0x10, 0xa6, 0x0a, 0x69, 0xeb, 0x21, 0x1c, 0x59, 0x94, 0x5f, 0x5a, + 0x92, 0x5a, 0xac, 0x17, 0x94, 0x9a, 0x57, 0x92, 0x5a, 0x84, 0x50, 0x9c, 0x93, 0x93, 0x9f, 0x9c, + 0x58, 0x92, 0x99, 0x9f, 0x27, 0x25, 0x83, 0xcf, 0x65, 0x4a, 0x0c, 0x4e, 0x2c, 0x51, 0x4c, 0x05, + 0x49, 0x49, 0x6c, 0x60, 0xbf, 0x1a, 0x03, 0x02, 0x00, 0x00, 0xff, 0xff, 0xdc, 0x38, 0x8e, 0x59, + 0x1b, 0x01, 0x00, 0x00, } diff --git a/pkg/pb/bandwidth.proto b/pkg/pb/bandwidth.proto index d3820062b323..4997fc402ec1 100644 --- a/pkg/pb/bandwidth.proto +++ b/pkg/pb/bandwidth.proto @@ -9,9 +9,14 @@ package bandwidth; import "piecestore.proto"; service Bandwidth { - rpc BandwidthAgreements(stream piecestoreroutes.RenterBandwidthAllocation) returns (AgreementsSummary) {} + rpc BandwidthAgreements(piecestoreroutes.RenterBandwidthAllocation) returns (AgreementsSummary) {} } - message AgreementsSummary { + enum Status { + FAIL = 0; + OK = 1; + } + + Status status = 1; } \ No newline at end of file diff --git a/pkg/pb/piecestore.pb.go b/pkg/pb/piecestore.pb.go index 787ad18120bb..aa15ed333e9d 100644 --- a/pkg/pb/piecestore.pb.go +++ b/pkg/pb/piecestore.pb.go @@ -43,7 +43,7 @@ func (x PayerBandwidthAllocation_Action) String() string { return proto.EnumName(PayerBandwidthAllocation_Action_name, int32(x)) } func (PayerBandwidthAllocation_Action) EnumDescriptor() ([]byte, []int) { - return fileDescriptor_piecestore_229fd615c638d46f, []int{0, 0} + return fileDescriptor_piecestore_51f85e261f621d08, []int{0, 0} } type PayerBandwidthAllocation struct { @@ -58,7 +58,7 @@ func (m *PayerBandwidthAllocation) Reset() { *m = PayerBandwidthAllocati func (m *PayerBandwidthAllocation) String() string { return proto.CompactTextString(m) } func (*PayerBandwidthAllocation) ProtoMessage() {} func (*PayerBandwidthAllocation) Descriptor() ([]byte, []int) { - return fileDescriptor_piecestore_229fd615c638d46f, []int{0} + return fileDescriptor_piecestore_51f85e261f621d08, []int{0} } func (m *PayerBandwidthAllocation) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_PayerBandwidthAllocation.Unmarshal(m, b) @@ -109,7 +109,7 @@ func (m *PayerBandwidthAllocation_Data) Reset() { *m = PayerBandwidthAll func (m *PayerBandwidthAllocation_Data) String() string { return proto.CompactTextString(m) } func (*PayerBandwidthAllocation_Data) ProtoMessage() {} func (*PayerBandwidthAllocation_Data) Descriptor() ([]byte, []int) { - return fileDescriptor_piecestore_229fd615c638d46f, []int{0, 0} + return fileDescriptor_piecestore_51f85e261f621d08, []int{0, 0} } func (m *PayerBandwidthAllocation_Data) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_PayerBandwidthAllocation_Data.Unmarshal(m, b) @@ -190,7 +190,7 @@ func (m *RenterBandwidthAllocation) Reset() { *m = RenterBandwidthAlloca func (m *RenterBandwidthAllocation) String() string { return proto.CompactTextString(m) } func (*RenterBandwidthAllocation) ProtoMessage() {} func (*RenterBandwidthAllocation) Descriptor() ([]byte, []int) { - return fileDescriptor_piecestore_229fd615c638d46f, []int{1} + return fileDescriptor_piecestore_51f85e261f621d08, []int{1} } func (m *RenterBandwidthAllocation) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_RenterBandwidthAllocation.Unmarshal(m, b) @@ -228,6 +228,7 @@ type RenterBandwidthAllocation_Data struct { PayerAllocation *PayerBandwidthAllocation `protobuf:"bytes,1,opt,name=payer_allocation,json=payerAllocation,proto3" json:"payer_allocation,omitempty"` Total int64 `protobuf:"varint,2,opt,name=total,proto3" json:"total,omitempty"` StorageNodeId []byte `protobuf:"bytes,3,opt,name=storage_node_id,json=storageNodeId,proto3" json:"storage_node_id,omitempty"` + PubKey []byte `protobuf:"bytes,4,opt,name=pub_key,json=pubKey,proto3" json:"pub_key,omitempty"` XXX_NoUnkeyedLiteral struct{} `json:"-"` XXX_unrecognized []byte `json:"-"` XXX_sizecache int32 `json:"-"` @@ -237,7 +238,7 @@ func (m *RenterBandwidthAllocation_Data) Reset() { *m = RenterBandwidthA func (m *RenterBandwidthAllocation_Data) String() string { return proto.CompactTextString(m) } func (*RenterBandwidthAllocation_Data) ProtoMessage() {} func (*RenterBandwidthAllocation_Data) Descriptor() ([]byte, []int) { - return fileDescriptor_piecestore_229fd615c638d46f, []int{1, 0} + return fileDescriptor_piecestore_51f85e261f621d08, []int{1, 0} } func (m *RenterBandwidthAllocation_Data) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_RenterBandwidthAllocation_Data.Unmarshal(m, b) @@ -278,6 +279,13 @@ func (m *RenterBandwidthAllocation_Data) GetStorageNodeId() []byte { return nil } +func (m *RenterBandwidthAllocation_Data) GetPubKey() []byte { + if m != nil { + return m.PubKey + } + return nil +} + type PieceStore struct { Bandwidthallocation *RenterBandwidthAllocation `protobuf:"bytes,1,opt,name=bandwidthallocation,proto3" json:"bandwidthallocation,omitempty"` Piecedata *PieceStore_PieceData `protobuf:"bytes,2,opt,name=piecedata,proto3" json:"piecedata,omitempty"` @@ -291,7 +299,7 @@ func (m *PieceStore) Reset() { *m = PieceStore{} } func (m *PieceStore) String() string { return proto.CompactTextString(m) } func (*PieceStore) ProtoMessage() {} func (*PieceStore) Descriptor() ([]byte, []int) { - return fileDescriptor_piecestore_229fd615c638d46f, []int{2} + return fileDescriptor_piecestore_51f85e261f621d08, []int{2} } func (m *PieceStore) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_PieceStore.Unmarshal(m, b) @@ -345,7 +353,7 @@ func (m *PieceStore_PieceData) Reset() { *m = PieceStore_PieceData{} } func (m *PieceStore_PieceData) String() string { return proto.CompactTextString(m) } func (*PieceStore_PieceData) ProtoMessage() {} func (*PieceStore_PieceData) Descriptor() ([]byte, []int) { - return fileDescriptor_piecestore_229fd615c638d46f, []int{2, 0} + return fileDescriptor_piecestore_51f85e261f621d08, []int{2, 0} } func (m *PieceStore_PieceData) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_PieceStore_PieceData.Unmarshal(m, b) @@ -398,7 +406,7 @@ func (m *PieceId) Reset() { *m = PieceId{} } func (m *PieceId) String() string { return proto.CompactTextString(m) } func (*PieceId) ProtoMessage() {} func (*PieceId) Descriptor() ([]byte, []int) { - return fileDescriptor_piecestore_229fd615c638d46f, []int{3} + return fileDescriptor_piecestore_51f85e261f621d08, []int{3} } func (m *PieceId) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_PieceId.Unmarshal(m, b) @@ -445,7 +453,7 @@ func (m *PieceSummary) Reset() { *m = PieceSummary{} } func (m *PieceSummary) String() string { return proto.CompactTextString(m) } func (*PieceSummary) ProtoMessage() {} func (*PieceSummary) Descriptor() ([]byte, []int) { - return fileDescriptor_piecestore_229fd615c638d46f, []int{4} + return fileDescriptor_piecestore_51f85e261f621d08, []int{4} } func (m *PieceSummary) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_PieceSummary.Unmarshal(m, b) @@ -499,7 +507,7 @@ func (m *PieceRetrieval) Reset() { *m = PieceRetrieval{} } func (m *PieceRetrieval) String() string { return proto.CompactTextString(m) } func (*PieceRetrieval) ProtoMessage() {} func (*PieceRetrieval) Descriptor() ([]byte, []int) { - return fileDescriptor_piecestore_229fd615c638d46f, []int{5} + return fileDescriptor_piecestore_51f85e261f621d08, []int{5} } func (m *PieceRetrieval) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_PieceRetrieval.Unmarshal(m, b) @@ -553,7 +561,7 @@ func (m *PieceRetrieval_PieceData) Reset() { *m = PieceRetrieval_PieceDa func (m *PieceRetrieval_PieceData) String() string { return proto.CompactTextString(m) } func (*PieceRetrieval_PieceData) ProtoMessage() {} func (*PieceRetrieval_PieceData) Descriptor() ([]byte, []int) { - return fileDescriptor_piecestore_229fd615c638d46f, []int{5, 0} + return fileDescriptor_piecestore_51f85e261f621d08, []int{5, 0} } func (m *PieceRetrieval_PieceData) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_PieceRetrieval_PieceData.Unmarshal(m, b) @@ -606,7 +614,7 @@ func (m *PieceRetrievalStream) Reset() { *m = PieceRetrievalStream{} } func (m *PieceRetrievalStream) String() string { return proto.CompactTextString(m) } func (*PieceRetrievalStream) ProtoMessage() {} func (*PieceRetrievalStream) Descriptor() ([]byte, []int) { - return fileDescriptor_piecestore_229fd615c638d46f, []int{6} + return fileDescriptor_piecestore_51f85e261f621d08, []int{6} } func (m *PieceRetrievalStream) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_PieceRetrievalStream.Unmarshal(m, b) @@ -652,7 +660,7 @@ func (m *PieceDelete) Reset() { *m = PieceDelete{} } func (m *PieceDelete) String() string { return proto.CompactTextString(m) } func (*PieceDelete) ProtoMessage() {} func (*PieceDelete) Descriptor() ([]byte, []int) { - return fileDescriptor_piecestore_229fd615c638d46f, []int{7} + return fileDescriptor_piecestore_51f85e261f621d08, []int{7} } func (m *PieceDelete) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_PieceDelete.Unmarshal(m, b) @@ -697,7 +705,7 @@ func (m *PieceDeleteSummary) Reset() { *m = PieceDeleteSummary{} } func (m *PieceDeleteSummary) String() string { return proto.CompactTextString(m) } func (*PieceDeleteSummary) ProtoMessage() {} func (*PieceDeleteSummary) Descriptor() ([]byte, []int) { - return fileDescriptor_piecestore_229fd615c638d46f, []int{8} + return fileDescriptor_piecestore_51f85e261f621d08, []int{8} } func (m *PieceDeleteSummary) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_PieceDeleteSummary.Unmarshal(m, b) @@ -736,7 +744,7 @@ func (m *PieceStoreSummary) Reset() { *m = PieceStoreSummary{} } func (m *PieceStoreSummary) String() string { return proto.CompactTextString(m) } func (*PieceStoreSummary) ProtoMessage() {} func (*PieceStoreSummary) Descriptor() ([]byte, []int) { - return fileDescriptor_piecestore_229fd615c638d46f, []int{9} + return fileDescriptor_piecestore_51f85e261f621d08, []int{9} } func (m *PieceStoreSummary) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_PieceStoreSummary.Unmarshal(m, b) @@ -780,7 +788,7 @@ func (m *StatsReq) Reset() { *m = StatsReq{} } func (m *StatsReq) String() string { return proto.CompactTextString(m) } func (*StatsReq) ProtoMessage() {} func (*StatsReq) Descriptor() ([]byte, []int) { - return fileDescriptor_piecestore_229fd615c638d46f, []int{10} + return fileDescriptor_piecestore_51f85e261f621d08, []int{10} } func (m *StatsReq) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_StatsReq.Unmarshal(m, b) @@ -814,7 +822,7 @@ func (m *StatSummary) Reset() { *m = StatSummary{} } func (m *StatSummary) String() string { return proto.CompactTextString(m) } func (*StatSummary) ProtoMessage() {} func (*StatSummary) Descriptor() ([]byte, []int) { - return fileDescriptor_piecestore_229fd615c638d46f, []int{11} + return fileDescriptor_piecestore_51f85e261f621d08, []int{11} } func (m *StatSummary) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_StatSummary.Unmarshal(m, b) @@ -875,7 +883,7 @@ func (m *SignedMessage) Reset() { *m = SignedMessage{} } func (m *SignedMessage) String() string { return proto.CompactTextString(m) } func (*SignedMessage) ProtoMessage() {} func (*SignedMessage) Descriptor() ([]byte, []int) { - return fileDescriptor_piecestore_229fd615c638d46f, []int{12} + return fileDescriptor_piecestore_51f85e261f621d08, []int{12} } func (m *SignedMessage) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_SignedMessage.Unmarshal(m, b) @@ -1208,63 +1216,65 @@ var _PieceStoreRoutes_serviceDesc = grpc.ServiceDesc{ Metadata: "piecestore.proto", } -func init() { proto.RegisterFile("piecestore.proto", fileDescriptor_piecestore_229fd615c638d46f) } - -var fileDescriptor_piecestore_229fd615c638d46f = []byte{ - // 879 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xb4, 0x56, 0xe1, 0x6e, 0xdb, 0x36, - 0x10, 0x8e, 0x24, 0xc7, 0x8e, 0xcf, 0xb1, 0xeb, 0xb2, 0xc5, 0xa0, 0x68, 0xe9, 0xe6, 0xa9, 0x45, - 0x60, 0x74, 0x80, 0xb1, 0x65, 0x4f, 0xd0, 0xc2, 0x45, 0x67, 0x0c, 0xcb, 0x02, 0xb9, 0xf9, 0x53, - 0x60, 0xd0, 0x68, 0xf1, 0x9a, 0x12, 0x93, 0x25, 0x4d, 0xa2, 0x32, 0x27, 0x0f, 0xb2, 0x3f, 0x7d, - 0x82, 0x61, 0x4f, 0xb2, 0xa7, 0xd8, 0x03, 0xec, 0x25, 0x06, 0x91, 0xb4, 0x64, 0xc7, 0x56, 0x3c, - 0x14, 0xe9, 0x3f, 0xf1, 0x8e, 0xf7, 0xdd, 0x77, 0xc7, 0xef, 0x48, 0x41, 0x3f, 0xe1, 0x18, 0x60, - 0x26, 0xe2, 0x14, 0x47, 0x49, 0x1a, 0x8b, 0x98, 0xac, 0x58, 0xd2, 0x38, 0x17, 0x98, 0xb9, 0x7f, - 0x58, 0x60, 0x9f, 0xd3, 0x6b, 0x4c, 0x5f, 0xd2, 0x88, 0xfd, 0xce, 0x99, 0x78, 0xff, 0x22, 0x0c, - 0xe3, 0x80, 0x0a, 0x1e, 0x47, 0xe4, 0x18, 0xda, 0x19, 0xbf, 0x8c, 0xa8, 0xc8, 0x53, 0xb4, 0x8d, - 0x81, 0x31, 0x3c, 0xf4, 0x2a, 0x03, 0x21, 0xd0, 0x60, 0x54, 0x50, 0xdb, 0x94, 0x0e, 0xf9, 0xed, - 0xfc, 0x65, 0x42, 0x63, 0x4c, 0x05, 0x25, 0x5f, 0xc1, 0x61, 0x46, 0x05, 0x86, 0x21, 0x17, 0xe8, - 0x73, 0xa6, 0xa3, 0x3b, 0xa5, 0x6d, 0xc2, 0xc8, 0xe7, 0xd0, 0xce, 0x93, 0x90, 0x47, 0xbf, 0x16, - 0x7e, 0x05, 0x72, 0xa0, 0x0c, 0x13, 0x46, 0x8e, 0xe0, 0x60, 0x4e, 0x17, 0x7e, 0xc6, 0x6f, 0xd0, - 0xb6, 0x06, 0xc6, 0xd0, 0xf2, 0x5a, 0x73, 0xba, 0x98, 0xf2, 0x1b, 0x24, 0x23, 0x78, 0x84, 0x8b, - 0x84, 0xa7, 0x92, 0xa3, 0x9f, 0x47, 0x7c, 0xe1, 0x67, 0x18, 0xd8, 0x0d, 0xb9, 0xeb, 0x61, 0xe5, - 0xba, 0x88, 0xf8, 0x62, 0x8a, 0x01, 0x79, 0x0a, 0xdd, 0x0c, 0x53, 0x4e, 0x43, 0x3f, 0xca, 0xe7, - 0x33, 0x4c, 0xed, 0xfd, 0x81, 0x31, 0x6c, 0x7b, 0x87, 0xca, 0x78, 0x26, 0x6d, 0x64, 0x02, 0x4d, - 0x1a, 0x14, 0x51, 0x76, 0x73, 0x60, 0x0c, 0x7b, 0xa7, 0xdf, 0x8e, 0x6e, 0xb7, 0x6a, 0x54, 0xd7, - 0xa6, 0xd1, 0x0b, 0x19, 0xe8, 0x69, 0x00, 0x32, 0x84, 0x7e, 0x90, 0x22, 0x15, 0xc8, 0x2a, 0x72, - 0x2d, 0x49, 0xae, 0xa7, 0xed, 0x9a, 0x99, 0xeb, 0x40, 0x53, 0xc5, 0x92, 0x16, 0x58, 0xe7, 0x17, - 0x6f, 0xfa, 0x7b, 0xc5, 0xc7, 0xeb, 0x57, 0x6f, 0xfa, 0x86, 0xfb, 0xaf, 0x01, 0x47, 0x1e, 0x46, - 0xe2, 0xbe, 0x4e, 0xe6, 0x83, 0xa1, 0x4f, 0xe6, 0x02, 0xfa, 0x49, 0x51, 0x89, 0x4f, 0x4b, 0x38, - 0x89, 0xd0, 0x39, 0x7d, 0xfe, 0xff, 0x6b, 0xf6, 0x1e, 0x48, 0x8c, 0x15, 0x46, 0x8f, 0x61, 0x5f, - 0xc4, 0x82, 0x86, 0x32, 0xa9, 0xe5, 0xa9, 0x05, 0x39, 0x81, 0x07, 0x05, 0x1c, 0xbd, 0x44, 0x3f, - 0x8a, 0x99, 0x54, 0x82, 0x25, 0x49, 0x75, 0xb5, 0xf9, 0x2c, 0x66, 0x38, 0x61, 0xee, 0x3f, 0x26, - 0xc0, 0x79, 0x91, 0x7c, 0x5a, 0x24, 0x27, 0x3f, 0xc3, 0xa3, 0xd9, 0x32, 0xe9, 0x06, 0xcd, 0xaf, - 0x37, 0x69, 0xd6, 0x36, 0xca, 0xdb, 0x86, 0x43, 0xc6, 0xd0, 0x96, 0x10, 0x65, 0x93, 0x3a, 0xa7, - 0x27, 0x5b, 0x6a, 0x2f, 0xf9, 0xa8, 0xcf, 0xa2, 0x7b, 0x5e, 0x15, 0x48, 0x5e, 0x41, 0x97, 0xe6, - 0xe2, 0x7d, 0x9c, 0xf2, 0x1b, 0x45, 0xcf, 0x92, 0x48, 0x5f, 0x6e, 0x22, 0x4d, 0xf9, 0x65, 0x84, - 0xec, 0x47, 0xcc, 0x32, 0x7a, 0x89, 0xde, 0x7a, 0x94, 0x83, 0xd0, 0x2e, 0xe1, 0x49, 0x0f, 0x4c, - 0x3d, 0x2c, 0x6d, 0xcf, 0xe4, 0xac, 0x4e, 0xeb, 0x66, 0x9d, 0xd6, 0x6d, 0x68, 0x05, 0x71, 0x24, - 0x30, 0x12, 0xba, 0xcf, 0xcb, 0xa5, 0xfb, 0x0b, 0xb4, 0x64, 0x9a, 0x09, 0xdb, 0x48, 0xb2, 0x51, - 0x88, 0xf9, 0x31, 0x85, 0xb8, 0x33, 0x38, 0x54, 0x2d, 0xcb, 0xe7, 0x73, 0x9a, 0x5e, 0x6f, 0xa4, - 0x21, 0xd0, 0x90, 0xe3, 0xac, 0xc8, 0xcb, 0xef, 0xba, 0xfa, 0xac, 0x9a, 0xfa, 0xdc, 0xbf, 0x4d, - 0xe8, 0xc9, 0x24, 0x1e, 0x8a, 0x94, 0xe3, 0x15, 0x0d, 0x3f, 0xb5, 0x56, 0xbe, 0xd7, 0x5a, 0x19, - 0x57, 0x5a, 0x79, 0x5e, 0xa3, 0x95, 0x92, 0xd3, 0x86, 0x5e, 0xc6, 0xf7, 0xa8, 0x97, 0xd7, 0x77, - 0xe9, 0x65, 0x5b, 0x8f, 0x3f, 0x83, 0x66, 0xfc, 0xee, 0x5d, 0x86, 0x42, 0xb7, 0x55, 0xaf, 0xdc, - 0x31, 0x3c, 0x5e, 0xa7, 0x3d, 0x15, 0x29, 0xd2, 0x79, 0x89, 0x61, 0xac, 0x60, 0xac, 0xe8, 0xca, - 0x5c, 0xd7, 0x15, 0x83, 0x8e, 0xa2, 0x83, 0x21, 0x0a, 0xdc, 0xad, 0xad, 0x8f, 0x2a, 0xda, 0x1d, - 0x01, 0x59, 0xc9, 0xb2, 0x54, 0x98, 0x0d, 0xad, 0xb9, 0xda, 0xaf, 0x33, 0x2e, 0x97, 0xee, 0x14, - 0x1e, 0x56, 0xe3, 0xbb, 0x73, 0x3b, 0x79, 0x06, 0x5d, 0x79, 0x5f, 0x79, 0x18, 0x20, 0xbf, 0x42, - 0xa6, 0xfb, 0xb7, 0x6e, 0x74, 0x01, 0x0e, 0xa6, 0x82, 0x8a, 0xcc, 0xc3, 0xdf, 0xdc, 0x3f, 0x0d, - 0xe8, 0x14, 0x8b, 0x25, 0xf6, 0x31, 0xb4, 0xf3, 0x0c, 0xd9, 0x34, 0xa1, 0xc1, 0xb2, 0x73, 0x95, - 0x81, 0x9c, 0x40, 0x8f, 0x5e, 0x51, 0x1e, 0xd2, 0x59, 0x88, 0x6a, 0x8b, 0x4a, 0x70, 0xcb, 0x5a, - 0xf0, 0x28, 0x82, 0x4a, 0x71, 0xea, 0x13, 0x5b, 0x37, 0x92, 0x11, 0x90, 0x32, 0xae, 0xda, 0xaa, - 0xde, 0xbf, 0x2d, 0x1e, 0xd7, 0x87, 0xee, 0x5a, 0x73, 0xcb, 0xf7, 0xc1, 0xa8, 0xde, 0x87, 0xf5, - 0x17, 0xc5, 0xbc, 0xfd, 0xa2, 0x1c, 0x43, 0x3b, 0xc9, 0x67, 0x21, 0x0f, 0x7e, 0xc0, 0x6b, 0x7d, - 0xb3, 0x54, 0x86, 0xd3, 0x0f, 0x16, 0xf4, 0xab, 0x76, 0x7b, 0xf2, 0x3c, 0xc9, 0x18, 0xf6, 0xa5, - 0x8d, 0x1c, 0xd5, 0x8c, 0xcb, 0x84, 0x39, 0x5f, 0xd4, 0xdd, 0xba, 0xaa, 0xab, 0xee, 0x1e, 0x79, - 0x0b, 0x07, 0x5a, 0x9f, 0x48, 0x06, 0xbb, 0xe6, 0xce, 0x39, 0xd9, 0xb5, 0x43, 0x49, 0xdc, 0xdd, - 0x1b, 0x1a, 0xdf, 0x18, 0xe4, 0x0c, 0xf6, 0xd5, 0x73, 0x73, 0x7c, 0xd7, 0xe5, 0xef, 0x3c, 0xbd, - 0xcb, 0x5b, 0x32, 0x1d, 0x1a, 0xe4, 0x27, 0x68, 0xea, 0x29, 0x78, 0x52, 0x13, 0xa2, 0xdc, 0xce, - 0xb3, 0x3b, 0xdd, 0x55, 0xf1, 0xe3, 0x82, 0x20, 0x15, 0x19, 0x71, 0xb6, 0x8c, 0x8b, 0x56, 0xa2, - 0xf3, 0x64, 0xbb, 0xaf, 0x44, 0x79, 0xd9, 0x78, 0x6b, 0x26, 0xb3, 0x59, 0x53, 0xfe, 0x01, 0x7e, - 0xf7, 0x5f, 0x00, 0x00, 0x00, 0xff, 0xff, 0x0e, 0xa7, 0xcf, 0x0c, 0x15, 0x0a, 0x00, 0x00, +func init() { proto.RegisterFile("piecestore.proto", fileDescriptor_piecestore_51f85e261f621d08) } + +var fileDescriptor_piecestore_51f85e261f621d08 = []byte{ + // 898 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xb4, 0x56, 0x5d, 0x6e, 0xdb, 0x46, + 0x10, 0x36, 0x49, 0x59, 0xb2, 0x46, 0x3f, 0x51, 0x36, 0x41, 0x4b, 0xb3, 0x4e, 0xab, 0x32, 0x81, + 0x21, 0xa4, 0x80, 0xd0, 0xba, 0x27, 0x48, 0xa0, 0x20, 0x15, 0x82, 0xba, 0x06, 0x15, 0xbf, 0x04, + 0x28, 0xd8, 0x25, 0x39, 0x71, 0x16, 0xa1, 0x48, 0x96, 0x5c, 0xba, 0x92, 0x9f, 0x7b, 0x86, 0xbe, + 0xf4, 0x04, 0x45, 0x5f, 0x7b, 0x89, 0x9e, 0xa2, 0x57, 0x29, 0xb8, 0xbb, 0x22, 0x25, 0x4b, 0xb4, + 0x8a, 0x20, 0x7d, 0xe3, 0xce, 0xec, 0x7c, 0xf3, 0xcd, 0xec, 0x37, 0xbb, 0x84, 0x41, 0xc2, 0xd0, + 0xc7, 0x8c, 0xc7, 0x29, 0x8e, 0x93, 0x34, 0xe6, 0x31, 0x59, 0xb3, 0xa4, 0x71, 0xce, 0x31, 0xb3, + 0x7f, 0x33, 0xc0, 0xbc, 0xa0, 0x4b, 0x4c, 0x9f, 0xd3, 0x28, 0xf8, 0x85, 0x05, 0xfc, 0xdd, 0xb3, + 0x30, 0x8c, 0x7d, 0xca, 0x59, 0x1c, 0x91, 0x13, 0x68, 0x67, 0xec, 0x2a, 0xa2, 0x3c, 0x4f, 0xd1, + 0xd4, 0x86, 0xda, 0xa8, 0xeb, 0x54, 0x06, 0x42, 0xa0, 0x11, 0x50, 0x4e, 0x4d, 0x5d, 0x38, 0xc4, + 0xb7, 0xf5, 0xa7, 0x0e, 0x8d, 0x09, 0xe5, 0x94, 0x7c, 0x09, 0xdd, 0x8c, 0x72, 0x0c, 0x43, 0xc6, + 0xd1, 0x65, 0x81, 0x8a, 0xee, 0x94, 0xb6, 0x69, 0x40, 0x3e, 0x83, 0x76, 0x9e, 0x84, 0x2c, 0x7a, + 0x5f, 0xf8, 0x25, 0xc8, 0x91, 0x34, 0x4c, 0x03, 0x72, 0x0c, 0x47, 0x73, 0xba, 0x70, 0x33, 0x76, + 0x83, 0xa6, 0x31, 0xd4, 0x46, 0x86, 0xd3, 0x9a, 0xd3, 0xc5, 0x8c, 0xdd, 0x20, 0x19, 0xc3, 0x03, + 0x5c, 0x24, 0x2c, 0x15, 0x1c, 0xdd, 0x3c, 0x62, 0x0b, 0x37, 0x43, 0xdf, 0x6c, 0x88, 0x5d, 0xf7, + 0x2b, 0xd7, 0x65, 0xc4, 0x16, 0x33, 0xf4, 0xc9, 0x63, 0xe8, 0x65, 0x98, 0x32, 0x1a, 0xba, 0x51, + 0x3e, 0xf7, 0x30, 0x35, 0x0f, 0x87, 0xda, 0xa8, 0xed, 0x74, 0xa5, 0xf1, 0x5c, 0xd8, 0xc8, 0x14, + 0x9a, 0xd4, 0x2f, 0xa2, 0xcc, 0xe6, 0x50, 0x1b, 0xf5, 0xcf, 0xbe, 0x19, 0xdf, 0x6e, 0xd5, 0xb8, + 0xae, 0x4d, 0xe3, 0x67, 0x22, 0xd0, 0x51, 0x00, 0x64, 0x04, 0x03, 0x3f, 0x45, 0xca, 0x31, 0xa8, + 0xc8, 0xb5, 0x04, 0xb9, 0xbe, 0xb2, 0x2b, 0x66, 0xb6, 0x05, 0x4d, 0x19, 0x4b, 0x5a, 0x60, 0x5c, + 0x5c, 0xbe, 0x1e, 0x1c, 0x14, 0x1f, 0x2f, 0x5f, 0xbc, 0x1e, 0x68, 0xf6, 0xaf, 0x3a, 0x1c, 0x3b, + 0x18, 0xf1, 0x8f, 0x75, 0x32, 0x7f, 0x69, 0xea, 0x64, 0x2e, 0x61, 0x90, 0x14, 0x95, 0xb8, 0xb4, + 0x84, 0x13, 0x08, 0x9d, 0xb3, 0xa7, 0xff, 0xbd, 0x66, 0xe7, 0x9e, 0xc0, 0x58, 0x63, 0xf4, 0x10, + 0x0e, 0x79, 0xcc, 0x69, 0x28, 0x92, 0x1a, 0x8e, 0x5c, 0x90, 0x53, 0xb8, 0x57, 0xc0, 0xd1, 0x2b, + 0x74, 0xa3, 0x38, 0x10, 0x4a, 0x30, 0x04, 0xa9, 0x9e, 0x32, 0x9f, 0xc7, 0x41, 0xa1, 0x85, 0x4f, + 0xa1, 0x95, 0xe4, 0x9e, 0xfb, 0x1e, 0x97, 0xe2, 0x1c, 0xbb, 0x4e, 0x33, 0xc9, 0xbd, 0x57, 0xb8, + 0xb4, 0xff, 0xd1, 0x01, 0x2e, 0x0a, 0x56, 0xb3, 0x82, 0x15, 0xf9, 0x11, 0x1e, 0x78, 0x2b, 0x36, + 0x5b, 0xfc, 0xbf, 0xda, 0xe6, 0x5f, 0xdb, 0x41, 0x67, 0x17, 0x0e, 0x99, 0x40, 0x5b, 0x40, 0x94, + 0xdd, 0xeb, 0x9c, 0x9d, 0xee, 0x68, 0x4a, 0xc9, 0x47, 0x7e, 0x16, 0x6d, 0x75, 0xaa, 0x40, 0xf2, + 0x02, 0x7a, 0x34, 0xe7, 0xef, 0xe2, 0x94, 0xdd, 0x48, 0x7a, 0x86, 0x40, 0xfa, 0x62, 0x1b, 0x69, + 0xc6, 0xae, 0x22, 0x0c, 0xbe, 0xc7, 0x2c, 0xa3, 0x57, 0xe8, 0x6c, 0x46, 0x59, 0x08, 0xed, 0x12, + 0x9e, 0xf4, 0x41, 0x57, 0x53, 0xd4, 0x76, 0x74, 0x16, 0xd4, 0x0d, 0x81, 0x5e, 0x37, 0x04, 0x26, + 0xb4, 0xfc, 0x38, 0xe2, 0x18, 0x71, 0x75, 0x00, 0xab, 0xa5, 0xfd, 0x13, 0xb4, 0x44, 0x9a, 0x69, + 0xb0, 0x95, 0x64, 0xab, 0x10, 0xfd, 0x43, 0x0a, 0xb1, 0x3d, 0xe8, 0xca, 0x96, 0xe5, 0xf3, 0x39, + 0x4d, 0x97, 0x5b, 0x69, 0x08, 0x34, 0xc4, 0x9c, 0x4b, 0xf2, 0xe2, 0xbb, 0xae, 0x3e, 0xa3, 0xa6, + 0x3e, 0xfb, 0x6f, 0x1d, 0xfa, 0x22, 0x89, 0x83, 0x3c, 0x65, 0x78, 0x4d, 0xc3, 0xff, 0x5b, 0x2b, + 0xdf, 0x29, 0xad, 0x4c, 0x2a, 0xad, 0x3c, 0xad, 0xd1, 0x4a, 0xc9, 0x69, 0x4b, 0x2f, 0x93, 0x8f, + 0xa8, 0x97, 0x97, 0x77, 0xe9, 0x65, 0x57, 0x8f, 0x3f, 0x81, 0x66, 0xfc, 0xf6, 0x6d, 0x86, 0x5c, + 0xb5, 0x55, 0xad, 0xec, 0x09, 0x3c, 0xdc, 0xa4, 0x3d, 0xe3, 0x29, 0xd2, 0x79, 0x89, 0xa1, 0xad, + 0x61, 0xac, 0xe9, 0x4a, 0xdf, 0xd4, 0x55, 0x00, 0x1d, 0x49, 0x07, 0x43, 0xe4, 0xb8, 0x5f, 0x5b, + 0x1f, 0x54, 0xb4, 0x3d, 0x06, 0xb2, 0x96, 0x65, 0xa5, 0x30, 0x13, 0x5a, 0x73, 0xb9, 0x5f, 0x65, + 0x5c, 0x2d, 0xed, 0x19, 0xdc, 0xaf, 0xc6, 0x77, 0xef, 0x76, 0xf2, 0x04, 0x7a, 0xe2, 0x22, 0x73, + 0xd0, 0x47, 0x76, 0x8d, 0x81, 0xea, 0xdf, 0xa6, 0xd1, 0x06, 0x38, 0x9a, 0x71, 0xca, 0x33, 0x07, + 0x7f, 0xb6, 0xff, 0xd0, 0xa0, 0x53, 0x2c, 0x56, 0xd8, 0x27, 0xd0, 0xce, 0x33, 0x0c, 0x66, 0x09, + 0xf5, 0x57, 0x9d, 0xab, 0x0c, 0xe4, 0x14, 0xfa, 0xf4, 0x9a, 0xb2, 0x90, 0x7a, 0x21, 0xca, 0x2d, + 0x32, 0xc1, 0x2d, 0x6b, 0xc1, 0xa3, 0x08, 0x2a, 0xc5, 0xa9, 0x4e, 0x6c, 0xd3, 0x48, 0xc6, 0x40, + 0xca, 0xb8, 0x6a, 0xab, 0x7c, 0x18, 0x77, 0x78, 0x6c, 0x17, 0x7a, 0x1b, 0xcd, 0x2d, 0x1f, 0x0e, + 0xad, 0x7a, 0x38, 0x36, 0x9f, 0x1a, 0xfd, 0xf6, 0x53, 0x73, 0x02, 0xed, 0x24, 0xf7, 0x42, 0xe6, + 0xbf, 0xc2, 0xa5, 0xba, 0x59, 0x2a, 0xc3, 0xd9, 0xef, 0x06, 0x0c, 0xaa, 0x76, 0x3b, 0xe2, 0x3c, + 0xc9, 0x04, 0x0e, 0x85, 0x8d, 0x1c, 0xd7, 0x8c, 0xcb, 0x34, 0xb0, 0x3e, 0xaf, 0xbb, 0x75, 0x65, + 0x57, 0xed, 0x03, 0xf2, 0x06, 0x8e, 0x94, 0x3e, 0x91, 0x0c, 0xf7, 0xcd, 0x9d, 0x75, 0xba, 0x6f, + 0x87, 0x94, 0xb8, 0x7d, 0x30, 0xd2, 0xbe, 0xd6, 0xc8, 0x39, 0x1c, 0xca, 0xe7, 0xe6, 0xe4, 0xae, + 0xcb, 0xdf, 0x7a, 0x7c, 0x97, 0xb7, 0x64, 0x3a, 0xd2, 0xc8, 0x0f, 0xd0, 0x54, 0x53, 0xf0, 0xa8, + 0x26, 0x44, 0xba, 0xad, 0x27, 0x77, 0xba, 0xab, 0xe2, 0x27, 0x05, 0x41, 0xca, 0x33, 0x62, 0xed, + 0x18, 0x17, 0xa5, 0x44, 0xeb, 0xd1, 0x6e, 0x5f, 0x89, 0xf2, 0xbc, 0xf1, 0x46, 0x4f, 0x3c, 0xaf, + 0x29, 0x7e, 0x0d, 0xbf, 0xfd, 0x37, 0x00, 0x00, 0xff, 0xff, 0xd1, 0x5b, 0xd5, 0xb9, 0x2e, 0x0a, + 0x00, 0x00, } diff --git a/pkg/pb/piecestore.proto b/pkg/pb/piecestore.proto index 24ca7e770e2f..d3acade5fac5 100644 --- a/pkg/pb/piecestore.proto +++ b/pkg/pb/piecestore.proto @@ -44,6 +44,7 @@ message RenterBandwidthAllocation { // Renter refers to uplink PayerBandwidthAllocation payer_allocation = 1; // Bandwidth Allocation from Satellite int64 total = 2; // Total Bytes Stored bytes storage_node_id = 3; // Storage Node Identity + bytes pub_key = 4; // Renter Public Key // TODO: Take this out. It will be kept in a database on the satellite } bytes signature = 1; // Seralized Data signed by Uplink diff --git a/pkg/piecestore/psclient/readerwriter.go b/pkg/piecestore/psclient/readerwriter.go index 931a7a1c6c40..c745eb3fe05b 100644 --- a/pkg/piecestore/psclient/readerwriter.go +++ b/pkg/piecestore/psclient/readerwriter.go @@ -4,9 +4,12 @@ package psclient import ( + "crypto/ecdsa" + "crypto/x509" "fmt" "github.com/gogo/protobuf/proto" + "github.com/zeebo/errs" "go.uber.org/zap" "storj.io/storj/internal/sync2" @@ -24,11 +27,22 @@ type StreamWriter struct { // Write Piece data to a piece store server upload stream func (s *StreamWriter) Write(b []byte) (int, error) { + prikey, ok := s.signer.prikey.(*ecdsa.PrivateKey) + if !ok { + return 0, errs.New("Invalid Private Key. Can't create RenterBandwidthAllocation") + } + + pubbytes, err := x509.MarshalPKIXPublicKey(&prikey.PublicKey) + if err != nil { + return 0, errs.New("Can't Marshal Public Key for RenterBandwidthAllocation: %+v", err) + } + updatedAllocation := s.totalWritten + int64(len(b)) allocationData := &pb.RenterBandwidthAllocation_Data{ PayerAllocation: s.pba, Total: updatedAllocation, StorageNodeId: s.signer.nodeID.Bytes(), + PubKey: pubbytes, // TODO: Take this out. It will be kept in a database on the satellite } serializedAllocation, err := proto.Marshal(allocationData) @@ -106,10 +120,23 @@ func NewStreamReader(client *PieceStore, stream pb.PieceStoreRoutes_RetrieveClie allocate = size - sr.allocated } + prikey, ok := sr.client.prikey.(*ecdsa.PrivateKey) + if !ok { + sr.pendingAllocs.Fail(errs.New("Invalid Private Key. Can't create RenterBandwidthAllocation")) + return + } + + pubbytes, err := x509.MarshalPKIXPublicKey(&prikey.PublicKey) + if err != nil { + sr.pendingAllocs.Fail(errs.New("Can't Marshal Public Key for RenterBandwidthAllocation: %+v", err)) + return + } + allocationData := &pb.RenterBandwidthAllocation_Data{ PayerAllocation: pba, Total: sr.allocated + allocate, StorageNodeId: sr.client.nodeID.Bytes(), + PubKey: pubbytes, // TODO: Take this out. It will be kept in a database on the satellite } serializedAllocation, err := proto.Marshal(allocationData) diff --git a/pkg/piecestore/psserver/agreementsender/agreementsender.go b/pkg/piecestore/psserver/agreementsender/agreementsender.go index 1124e49b92b2..44d333a61afb 100644 --- a/pkg/piecestore/psserver/agreementsender/agreementsender.go +++ b/pkg/piecestore/psserver/agreementsender/agreementsender.go @@ -103,17 +103,6 @@ func (as *AgreementSender) Run(ctx context.Context) error { } client := pb.NewBandwidthClient(conn) - stream, err := client.BandwidthAgreements(ctx) - if err != nil { - zap.S().Error(err) - return - } - - defer func() { - if _, closeErr := stream.CloseAndRecv(); closeErr != nil { - zap.S().Errorf("error closing stream %s :: %v.Send() = %v", closeErr, stream, closeErr) - } - }() for _, agreement := range agreementGroup.agreements { @@ -123,8 +112,9 @@ func (as *AgreementSender) Run(ctx context.Context) error { } // Send agreement to satellite - if err = stream.Send(msg); err != nil { - zap.S().Error(err) + r, err := client.BandwidthAgreements(ctx, msg) + if err != nil || r.GetStatus() != pb.AgreementsSummary_OK { + zap.S().Errorf("Failed to send agreement to satellite: %+v", err) return }