From d8c2707981b8ba8e31fc239b381b2be65059cdfd Mon Sep 17 00:00:00 2001 From: JT Olio Date: Sat, 21 Jul 2018 10:51:54 -0600 Subject: [PATCH 1/2] captplanet I kind of went overboard this weekend. The major goal of this changeset is to provide an environment for local development where all of the various services can be easily run together. Developing on Storj v3 should be as easy as running a setup command and a run command! To do this, this changeset introduces a new tool called captplanet, which combines the powers of the Overlay Cache, the PointerDB, the PieceStore, Kademlia, the Minio Gateway, etc. Running 40 farmers and a heavy client inside the same process forced a rethinking of the "services" that we had. To avoid confusion by reusing prior terms, this changeset introduces two new types: Providers and Responsibilities. I wanted to avoid as many merge conflicts as possible, so I left the existing Services and code for now, but if people like this route we can clean up the duplication. A Responsibility is a collection of gRPC methods and corresponding state. The following systems are examples of Responsibilities: * Kademlia * OverlayCache * PointerDB * StatDB * PieceStore * etc. A Provider is a collection of Responsibilities that share an Identity, such as: * The heavy client * The farmer * The gateway An Identity is a public/private key pair, a node id, etc. Farmers all need different Identities, so captplanet needs to support running multiple concurrent Providers with different Identities. Each Responsibility and Provider should allow for configuration of multiple copies on its own so creating Responsibilities and Providers use a new workflow. To make a Responsibility, one should create a "config" struct, such as: ``` type Config struct { RepairThreshold int `help:"If redundancy falls below this number of pieces, repair is triggered" default:"30"` SuccessThreshold int `help:"If redundancy is above this number then no additional uploads are needed" default:"40"` } ``` To use "config" structs, this changeset introduces another new library called 'cfgstruct', which allows for the configuration of arbitrary structs through flagsets, and thus through cobra and viper. cfgstruct relies on Go's "struct tags" feature to document help information and default values. Config structs can be configured via cfgstruct.Bind for binding the struct to a flagset. Because this configuration system makes setup and configuration easier *in general*, additional commands are provided that allow for easy standup of separate Providers. Please make sure to check out: * cmd/captplanet/farmer/main.go (a new farmer binary) * cmd/captplanet/hc/main.go (a new heavy client binary) * cmd/captplanet/gw/main.go (a new minio gateway binary) Usage: ``` $ go install -v storj.io/storj/cmd/captplanet $ captplanet setup $ captplanet run ``` Configuration is placed by default in `~/.storj/capt/` Other changes: * introduces new config structs for currently existing Responsibilities that conform to the new Responsibility interface. Please see the `pkg/*/config.go` files for examples. * integrates the PointerDB API key with other global configuration via flags, instead of through environment variables through viper like it's been doing. (ultimately this should also change to use the PointerDB config struct but this is an okay shortterm solution). * changes the Overlay cache to use a URL for database configuration instead of separate redis and bolt config settings. * stubs out some peer identity skeleton code (but not the meat). * Fixes the SegmentStore to use the overlay client and pointerdb clients instead of gRPC client code directly * Leaves a very clear spot where we need to tie the object to stream to segment store together. There's sort of a "golden spike" opportunity to connect all the train tracks together at the bottom of pkg/miniogw/config.go, labeled with a bunch of TODOs. Future stuff: * I now prefer this design over the original pkg/process.Service thing I had been pushing before (sorry!) * The experience of trying to have multiple farmers configurable concurrently led me to prefer config structs over global flags (I finally came around) or using viper directly. I think global flags are okay sometimes but in general going forward we should try and get all relevant config into config structs. * If you all like this direction, I think we can go delete my old Service interfaces and a bunch of flags and clean up a bunch of stuff. * If you don't like this direction, it's no sweat at all, and despite how much code there is here I'm not very tied to any of this! Considering a lot of this was written between midnight and 6 am, it might not be any good! --- cmd/captplanet/farmer/main.go | 44 +++++ cmd/captplanet/gw/main.go | 38 ++++ cmd/captplanet/hc/main.go | 46 +++++ cmd/captplanet/main.go | 27 +++ cmd/captplanet/run.go | 134 ++++++++++++++ cmd/captplanet/setup.go | 80 +++++++++ cmd/minio/main.go | 22 ++- examples/auth/main.go | 20 ++- examples/pointerdb-client/main.go | 2 +- pkg/cfgstruct/bind.go | 101 +++++++++++ pkg/cfgstruct/case.go | 39 +++++ pkg/cfgstruct/case_test.go | 30 ++++ pkg/cfgstruct/flags.go | 22 +++ pkg/kademlia/config.go | 91 ++++++++++ pkg/miniogw/config.go | 141 +++++++++++++++ pkg/miniogw/gateway-storj.go | 25 +-- pkg/objects/impl.go | 3 + pkg/overlay/cache.go | 2 +- pkg/overlay/client.go | 10 +- pkg/overlay/config.go | 112 ++++++++++++ pkg/overlay/overlay_test.go | 4 +- pkg/overlay/service.go | 4 +- pkg/overlay/service_test.go | 6 +- pkg/overlay/{test_utils.go => utils_test.go} | 0 pkg/peertls/io_util.go | 13 +- pkg/peertls/peertls.go | 5 +- pkg/piecestore/psservice/config.go | 58 ++++++ pkg/pointerdb/client.go | 7 +- pkg/pointerdb/config.go | 41 +++++ pkg/process/debug.go | 3 +- pkg/process/exec_conf.go | 175 +++++++++++++++++++ pkg/process/service.go | 8 +- pkg/provider/common.go | 16 ++ pkg/provider/identity.go | 123 +++++++++++++ pkg/provider/provider.go | 78 +++++++++ pkg/segment/segment.go | 119 +++++-------- pointerdb/auth/process_api_key.go | 36 +--- 37 files changed, 1520 insertions(+), 165 deletions(-) create mode 100644 cmd/captplanet/farmer/main.go create mode 100644 cmd/captplanet/gw/main.go create mode 100644 cmd/captplanet/hc/main.go create mode 100644 cmd/captplanet/main.go create mode 100644 cmd/captplanet/run.go create mode 100644 cmd/captplanet/setup.go create mode 100644 pkg/cfgstruct/bind.go create mode 100644 pkg/cfgstruct/case.go create mode 100644 pkg/cfgstruct/case_test.go create mode 100644 pkg/cfgstruct/flags.go create mode 100644 pkg/kademlia/config.go create mode 100644 pkg/miniogw/config.go create mode 100644 pkg/overlay/config.go rename pkg/overlay/{test_utils.go => utils_test.go} (100%) create mode 100644 pkg/piecestore/psservice/config.go create mode 100644 pkg/pointerdb/config.go create mode 100644 pkg/process/exec_conf.go create mode 100644 pkg/provider/common.go create mode 100644 pkg/provider/identity.go create mode 100644 pkg/provider/provider.go diff --git a/cmd/captplanet/farmer/main.go b/cmd/captplanet/farmer/main.go new file mode 100644 index 000000000000..db7b51876ec4 --- /dev/null +++ b/cmd/captplanet/farmer/main.go @@ -0,0 +1,44 @@ +// Copyright (C) 2018 Storj Labs, Inc. +// See LICENSE for copying information. + +package main + +import ( + "github.com/spf13/cobra" + + "storj.io/storj/pkg/cfgstruct" + "storj.io/storj/pkg/kademlia" + "storj.io/storj/pkg/piecestore/psservice" + "storj.io/storj/pkg/process" + "storj.io/storj/pkg/provider" +) + +var ( + rootCmd = &cobra.Command{ + Use: "farmer", + Short: "Farmer", + } + + cfg struct { + Identity provider.IdentityConfig + Kademlia kademlia.Config + Storage psservice.Config + } +) + +func init() { + rootCmd.AddCommand(&cobra.Command{ + Use: "run", + Short: "Run the farmer", + RunE: cmdRun, + }) + cfgstruct.Bind(rootCmd.PersistentFlags(), &cfg) +} + +func cmdRun(cmd *cobra.Command, args []string) (err error) { + return cfg.Identity.Run(process.Ctx(cmd), cfg.Kademlia, cfg.Storage) +} + +func main() { + process.ExecuteWithConfig(rootCmd, "$HOME/.storj/farmer/config.yaml") +} diff --git a/cmd/captplanet/gw/main.go b/cmd/captplanet/gw/main.go new file mode 100644 index 000000000000..359cdc6d9abc --- /dev/null +++ b/cmd/captplanet/gw/main.go @@ -0,0 +1,38 @@ +// Copyright (C) 2018 Storj Labs, Inc. +// See LICENSE for copying information. + +package main + +import ( + "github.com/spf13/cobra" + + "storj.io/storj/pkg/cfgstruct" + "storj.io/storj/pkg/miniogw" + "storj.io/storj/pkg/process" +) + +var ( + rootCmd = &cobra.Command{ + Use: "gw", + Short: "Gateway", + } + + cfg miniogw.Config +) + +func init() { + rootCmd.AddCommand(&cobra.Command{ + Use: "run", + Short: "Run the gateway", + RunE: cmdRun, + }) + cfgstruct.Bind(rootCmd.PersistentFlags(), &cfg) +} + +func cmdRun(cmd *cobra.Command, args []string) (err error) { + return cfg.Run(process.Ctx(cmd)) +} + +func main() { + process.ExecuteWithConfig(rootCmd, "$HOME/.storj/gw/config.yaml") +} diff --git a/cmd/captplanet/hc/main.go b/cmd/captplanet/hc/main.go new file mode 100644 index 000000000000..30cf23705c85 --- /dev/null +++ b/cmd/captplanet/hc/main.go @@ -0,0 +1,46 @@ +// Copyright (C) 2018 Storj Labs, Inc. +// See LICENSE for copying information. + +package main + +import ( + "github.com/spf13/cobra" + + "storj.io/storj/pkg/cfgstruct" + "storj.io/storj/pkg/kademlia" + "storj.io/storj/pkg/overlay" + "storj.io/storj/pkg/pointerdb" + "storj.io/storj/pkg/process" + "storj.io/storj/pkg/provider" +) + +var ( + rootCmd = &cobra.Command{ + Use: "hc", + Short: "Heavy client", + } + + cfg struct { + Identity provider.IdentityConfig + Kademlia kademlia.Config + PointerDB pointerdb.Config + Overlay overlay.Config + } +) + +func init() { + rootCmd.AddCommand(&cobra.Command{ + Use: "run", + Short: "Run the heavy client", + RunE: cmdRun, + }) + cfgstruct.Bind(rootCmd.PersistentFlags(), &cfg) +} + +func cmdRun(cmd *cobra.Command, args []string) (err error) { + return cfg.Identity.Run(process.Ctx(cmd), cfg.Kademlia, cfg.PointerDB, cfg.Overlay) +} + +func main() { + process.ExecuteWithConfig(rootCmd, "$HOME/.storj/hc/config.yaml") +} diff --git a/cmd/captplanet/main.go b/cmd/captplanet/main.go new file mode 100644 index 000000000000..e697bb2771a4 --- /dev/null +++ b/cmd/captplanet/main.go @@ -0,0 +1,27 @@ +// Copyright (C) 2018 Storj Labs, Inc. +// See LICENSE for copying information. + +package main + +import ( + "flag" + + "github.com/spf13/cobra" + "github.com/spf13/pflag" + monkit "gopkg.in/spacemonkeygo/monkit.v2" + "storj.io/storj/pkg/process" +) + +var ( + mon = monkit.Package() + + rootCmd = &cobra.Command{ + Use: "captplanet", + Short: "Captain Planet! With our powers combined!", + } +) + +func main() { + pflag.CommandLine.AddGoFlagSet(flag.CommandLine) + process.ExecuteWithConfig(rootCmd, "$HOME/.storj/capt/config.yaml") +} diff --git a/cmd/captplanet/run.go b/cmd/captplanet/run.go new file mode 100644 index 000000000000..1307737184a7 --- /dev/null +++ b/cmd/captplanet/run.go @@ -0,0 +1,134 @@ +// Copyright (C) 2018 Storj Labs, Inc. +// See LICENSE for copying information. + +package main + +import ( + "fmt" + "net" + "path/filepath" + + "github.com/spf13/cobra" + + "storj.io/storj/pkg/cfgstruct" + "storj.io/storj/pkg/kademlia" + "storj.io/storj/pkg/miniogw" + "storj.io/storj/pkg/overlay" + "storj.io/storj/pkg/piecestore/psservice" + "storj.io/storj/pkg/pointerdb" + "storj.io/storj/pkg/process" + "storj.io/storj/pkg/provider" +) + +var ( + runCmd = &cobra.Command{ + Use: "run", + Short: "Run all providers", + RunE: cmdRun, + } + runCfg Config +) + +func init() { + rootCmd.AddCommand(runCmd) + cfgstruct.Bind(runCmd.Flags(), &runCfg) +} + +func cmdRun(cmd *cobra.Command, args []string) (err error) { + ctx := process.Ctx(cmd) + defer mon.Task()(&ctx)(&err) + + startingPort := runCfg.StartingPort + + errch := make(chan error, runCfg.FarmerCount+2) + + // define heavy client config programmatically + type HeavyClient struct { + Identity provider.IdentityConfig + Kademlia kademlia.Config + PointerDB pointerdb.Config + Overlay overlay.Config + } + + hc := HeavyClient{ + Identity: provider.IdentityConfig{ + CertPath: filepath.Join(runCfg.BasePath, "hc", "ident.leaf.cert"), + KeyPath: filepath.Join(runCfg.BasePath, "hc", "ident.leaf.key"), + Address: joinHostPort(runCfg.ListenHost, startingPort+1), + }, + Kademlia: kademlia.Config{ + TODOListenAddr: joinHostPort(runCfg.ListenHost, startingPort+2), + BootstrapAddr: joinHostPort(runCfg.ListenHost, startingPort+4), + }, + PointerDB: pointerdb.Config{ + DatabaseURL: "bolt://" + filepath.Join( + runCfg.BasePath, "hc", "pointerdb.db"), + }, + Overlay: overlay.Config{ + DatabaseURL: "bolt://" + filepath.Join( + runCfg.BasePath, "hc", "overlay.db"), + }, + } + + // start heavy client + go func() { + errch <- hc.Identity.Run(ctx, hc.Kademlia, hc.PointerDB, hc.Overlay) + }() + + // define and start a bunch of farmers programmatically + type Farmer struct { + Identity provider.IdentityConfig + Kademlia kademlia.Config + Storage psservice.Config + } + + for i := 0; i < runCfg.FarmerCount; i++ { + basepath := filepath.Join(runCfg.BasePath, fmt.Sprintf("f%d", i)) + farmer := Farmer{ + Identity: provider.IdentityConfig{ + CertPath: filepath.Join(basepath, "ident.leaf.cert"), + KeyPath: filepath.Join(basepath, "ident.leaf.key"), + Address: joinHostPort(runCfg.ListenHost, startingPort+i*2+3), + }, + Kademlia: kademlia.Config{ + TODOListenAddr: joinHostPort(runCfg.ListenHost, startingPort+i*2+4), + BootstrapAddr: joinHostPort(runCfg.ListenHost, startingPort+1), + }, + Storage: psservice.Config{ + Path: filepath.Join(basepath, "data"), + }, + } + go func() { + errch <- farmer.Identity.Run(ctx, farmer.Kademlia, farmer.Storage) + }() + } + + // start s3 gateway + gw := miniogw.Config{ + IdentityConfig: provider.IdentityConfig{ + CertPath: filepath.Join(runCfg.BasePath, "gw", "ident.leaf.cert"), + KeyPath: filepath.Join(runCfg.BasePath, "gw", "ident.leaf.key"), + Address: joinHostPort(runCfg.ListenHost, startingPort), + }, + MinioConfig: runCfg.MinioConfig, + ClientConfig: miniogw.ClientConfig{ + OverlayAddr: joinHostPort( + runCfg.ListenHost, startingPort+1), + PointerDBAddr: joinHostPort( + runCfg.ListenHost, startingPort+1), + }, + RSConfig: runCfg.RSConfig, + } + gw.MinioConfig.MinioDir = filepath.Join(runCfg.BasePath, "gw", "minio") + + // start s3 gateway + go func() { + errch <- gw.Run(ctx) + }() + + return <-errch +} + +func joinHostPort(host string, port int) string { + return net.JoinHostPort(host, fmt.Sprint(port)) +} diff --git a/cmd/captplanet/setup.go b/cmd/captplanet/setup.go new file mode 100644 index 000000000000..4c5837835540 --- /dev/null +++ b/cmd/captplanet/setup.go @@ -0,0 +1,80 @@ +// Copyright (C) 2018 Storj Labs, Inc. +// See LICENSE for copying information. + +package main + +import ( + "fmt" + "os" + "path/filepath" + + "github.com/spf13/cobra" + + "storj.io/storj/pkg/cfgstruct" + "storj.io/storj/pkg/miniogw" + "storj.io/storj/pkg/peertls" + "storj.io/storj/pkg/process" +) + +// Config defines broad Captain Planet configuration +type Config struct { + FarmerCount int `help:"number of farmers to run" default:"20"` + BasePath string `help:"base path for captain planet storage" default:"$HOME/.storj/capt"` + ListenHost string `help:"the host for providers to listen on" default:"127.0.0.1"` + StartingPort int `help:"all providers will listen on ports consecutively starting with this one" default:"7777"` + miniogw.RSConfig + miniogw.MinioConfig +} + +var ( + setupCmd = &cobra.Command{ + Use: "setup", + Short: "Set up configurations", + RunE: cmdSetup, + } + setupCfg Config +) + +func init() { + rootCmd.AddCommand(setupCmd) + cfgstruct.Bind(setupCmd.Flags(), &setupCfg) +} + +func cmdSetup(cmd *cobra.Command, args []string) (err error) { + hcPath := filepath.Join(setupCfg.BasePath, "hc") + err = os.MkdirAll(hcPath, 0700) + if err != nil { + return err + } + identPath := filepath.Join(hcPath, "ident") + _, err = peertls.NewTLSFileOptions(identPath, identPath, true, false) + if err != nil { + return err + } + + for i := 0; i < setupCfg.FarmerCount; i++ { + farmerPath := filepath.Join(setupCfg.BasePath, fmt.Sprintf("f%d", i)) + err = os.MkdirAll(farmerPath, 0700) + if err != nil { + return err + } + identPath = filepath.Join(farmerPath, "ident") + _, err = peertls.NewTLSFileOptions(identPath, identPath, true, false) + if err != nil { + return err + } + } + + gwPath := filepath.Join(setupCfg.BasePath, "gw") + err = os.MkdirAll(gwPath, 0700) + if err != nil { + return err + } + identPath = filepath.Join(gwPath, "ident") + _, err = peertls.NewTLSFileOptions(identPath, identPath, true, false) + if err != nil { + return err + } + + return process.SaveConfig(cmd) +} diff --git a/cmd/minio/main.go b/cmd/minio/main.go index 4fd78139e7aa..361a7752da64 100644 --- a/cmd/minio/main.go +++ b/cmd/minio/main.go @@ -7,16 +7,34 @@ import ( "context" "os" + "github.com/minio/cli" "github.com/minio/minio/cmd" "github.com/spf13/cobra" - _ "storj.io/storj/pkg/miniogw" + "storj.io/storj/pkg/miniogw" + "storj.io/storj/pkg/objects" "storj.io/storj/pkg/process" ) -func main() { process.Must(process.Main(process.ConfigEnvironment, process.ServiceFunc(run))) } +func main() { + process.Must(process.Main( + process.ConfigEnvironment, process.ServiceFunc(run))) +} func run(ctx context.Context, _ *cobra.Command, args []string) error { + err := cmd.RegisterGatewayCommand(cli.Command{ + Name: "storj", + Usage: "Storj", + Action: storjGatewayMain, + HideHelpCommand: true, + }) + if err != nil { + return err + } cmd.Main(append([]string{os.Args[0]}, args...)) return nil } + +func storjGatewayMain(ctx *cli.Context) { + cmd.StartGateway(ctx, miniogw.NewStorjGateway(objects.NewObjectStore())) +} diff --git a/examples/auth/main.go b/examples/auth/main.go index 2903142701d6..8824afecfe7e 100644 --- a/examples/auth/main.go +++ b/examples/auth/main.go @@ -5,12 +5,13 @@ package main import ( "fmt" + "net/http" "os" - "storj.io/storj/pointerdb/auth" - "github.com/spf13/pflag" "github.com/spf13/viper" + + "storj.io/storj/pointerdb/auth" ) // example of how the auth package is working. @@ -25,9 +26,22 @@ func main() { os.Setenv("API_KEY", "12345") viper.AutomaticEnv() - httpRequestHeaders := auth.InitializeHeaders() + httpRequestHeaders := InitializeHeaders() xAPIKey := httpRequestHeaders.Get("X-Api-Key") isAuthorized := auth.ValidateAPIKey(xAPIKey) fmt.Println(isAuthorized) } + +// InitializeHeaders mocks HTTP headers to help test X-API-Key +func InitializeHeaders() *http.Header { + httpHeaders := http.Header{ + "Accept-Encoding": {"gzip, deflate"}, + "Accept-Language": {"en-US,en;q=0.9"}, + "X-Api-Key": {"12345"}, + "Cache-Control": {"max-age=0"}, + "Accept": {"text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8"}, + "Connection": {"keep-alive"}, + } + return &httpHeaders +} diff --git a/examples/pointerdb-client/main.go b/examples/pointerdb-client/main.go index dbd70a90dd6d..5b335b3dafa5 100644 --- a/examples/pointerdb-client/main.go +++ b/examples/pointerdb-client/main.go @@ -7,8 +7,8 @@ import ( "context" "flag" "fmt" - "strings" "os" + "strings" "go.uber.org/zap" "google.golang.org/grpc/codes" diff --git a/pkg/cfgstruct/bind.go b/pkg/cfgstruct/bind.go new file mode 100644 index 000000000000..d86fb781adde --- /dev/null +++ b/pkg/cfgstruct/bind.go @@ -0,0 +1,101 @@ +// Copyright (C) 2018 Storj Labs, Inc. +// See LICENSE for copying information. + +package cfgstruct + +import ( + "fmt" + "os" + "reflect" + "regexp" + "strconv" + "strings" + "time" +) + +// Bind sets flags on a FlagSet that match the configuration struct +// 'config'. This works by traversing the config struct using the 'reflect' +// package. +func Bind(flags FlagSet, config interface{}) { + ptrtype := reflect.TypeOf(config) + if ptrtype.Kind() != reflect.Ptr { + panic(fmt.Sprintf("invalid config type: %#v. "+ + "Expecting pointer to struct.", config)) + } + bindConfig(flags, "", reflect.ValueOf(config).Elem()) +} + +var ( + whitespace = regexp.MustCompile(`\s+`) +) + +func bindConfig(flags FlagSet, prefix string, val reflect.Value) { + if val.Kind() != reflect.Struct { + panic(fmt.Sprintf("invalid config type: %#v. Expecting struct.", + val.Interface())) + } + typ := val.Type() + + for i := 0; i < typ.NumField(); i++ { + field := typ.Field(i) + fieldval := val.Field(i) + flagname := prefix + hyphenate(snakeCase(field.Name)) + + switch field.Type.Kind() { + case reflect.Struct: + if field.Anonymous { + bindConfig(flags, prefix, fieldval) + } else { + bindConfig(flags, flagname+".", fieldval) + } + case reflect.Array, reflect.Slice: + digits := len(fmt.Sprint(fieldval.Len())) + for j := 0; j < fieldval.Len(); j++ { + padding := strings.Repeat("0", digits-len(fmt.Sprint(j))) + bindConfig(flags, fmt.Sprintf("%s.%s%d.", flagname, padding, j), + fieldval.Index(j)) + } + default: + tag := reflect.StructTag( + whitespace.ReplaceAllString(string(field.Tag), " ")) + help := tag.Get("help") + def := tag.Get("default") + fieldaddr := fieldval.Addr().Interface() + check := func(err error) { + if err != nil { + panic(fmt.Sprintf("invalid default value for %s: %#v", flagname, def)) + } + } + switch field.Type { + case reflect.TypeOf(int(0)): + val, err := strconv.ParseInt(def, 0, strconv.IntSize) + check(err) + flags.IntVar(fieldaddr.(*int), flagname, int(val), help) + case reflect.TypeOf(int64(0)): + val, err := strconv.ParseInt(def, 0, 64) + check(err) + flags.Int64Var(fieldaddr.(*int64), flagname, val, help) + case reflect.TypeOf(uint(0)): + val, err := strconv.ParseUint(def, 0, strconv.IntSize) + check(err) + flags.UintVar(fieldaddr.(*uint), flagname, uint(val), help) + case reflect.TypeOf(uint64(0)): + val, err := strconv.ParseUint(def, 0, 64) + check(err) + flags.Uint64Var(fieldaddr.(*uint64), flagname, val, help) + case reflect.TypeOf(time.Duration(0)): + val, err := time.ParseDuration(def) + check(err) + flags.DurationVar(fieldaddr.(*time.Duration), flagname, val, help) + case reflect.TypeOf(string("")): + flags.StringVar(fieldaddr.(*string), flagname, os.ExpandEnv(def), help) + case reflect.TypeOf(bool(false)): + val, err := strconv.ParseBool(def) + check(err) + flags.BoolVar(fieldaddr.(*bool), flagname, val, help) + default: + panic(fmt.Sprintf("invalid field type: %s", field.Type.String())) + } + } + } +} diff --git a/pkg/cfgstruct/case.go b/pkg/cfgstruct/case.go new file mode 100644 index 000000000000..b911e64f2c41 --- /dev/null +++ b/pkg/cfgstruct/case.go @@ -0,0 +1,39 @@ +// Copyright (C) 2018 Storj Labs, Inc. +// See LICENSE for copying information. + +package cfgstruct + +import ( + "strings" + "unicode" +) + +func hyphenate(val string) string { + return strings.Replace(val, "_", "-", -1) +} + +func snakeCase(val string) string { + // don't you think this function should be in the standard library? + // seems useful + if len(val) <= 1 { + return strings.ToLower(val) + } + runes := []rune(val) + rv := make([]rune, 0, len(runes)) + for i := 0; i < len(runes); i++ { + rv = append(rv, unicode.ToLower(runes[i])) + if i < len(runes)-1 && + unicode.IsLower(runes[i]) && + unicode.IsUpper(runes[i+1]) { + // lower-to-uppercase case + rv = append(rv, '_') + } else if i < len(runes)-2 && + unicode.IsUpper(runes[i]) && + unicode.IsUpper(runes[i+1]) && + unicode.IsLower(runes[i+2]) { + // end-of-acronym case + rv = append(rv, '_') + } + } + return string(rv) +} diff --git a/pkg/cfgstruct/case_test.go b/pkg/cfgstruct/case_test.go new file mode 100644 index 000000000000..44ffef9a0616 --- /dev/null +++ b/pkg/cfgstruct/case_test.go @@ -0,0 +1,30 @@ +// Copyright (C) 2018 Storj Labs, Inc. +// See LICENSE for copying information. + +package cfgstruct + +import ( + "testing" +) + +func TestSnakeCase(t *testing.T) { + for _, test := range []struct { + input, expected string + }{ + {"CoolBeans", "cool_beans"}, + {"coolBeans", "cool_beans"}, + {"JSONBeans", "json_beans"}, + {"JSONBeAns", "json_be_ans"}, + {"JSONBeANS", "json_be_ans"}, + {"coolbeans", "coolbeans"}, + {"COOLBEANS", "coolbeans"}, + {"CoolJSON", "cool_json"}, + {"CoolJSONBeans", "cool_json_beans"}, + } { + actual := snakeCase(test.input) + if actual != test.expected { + t.Logf("expected %#v but got %#v", test.expected, actual) + t.Fail() + } + } +} diff --git a/pkg/cfgstruct/flags.go b/pkg/cfgstruct/flags.go new file mode 100644 index 000000000000..1eb18b6e0a57 --- /dev/null +++ b/pkg/cfgstruct/flags.go @@ -0,0 +1,22 @@ +// Copyright (C) 2018 Storj Labs, Inc. +// See LICENSE for copying information. + +package cfgstruct + +import ( + "flag" + "time" +) + +// FlagSet is an interface that matches both *flag.FlagSet and *pflag.FlagSet +type FlagSet interface { + BoolVar(p *bool, name string, value bool, usage string) + IntVar(p *int, name string, value int, usage string) + Int64Var(p *int64, name string, value int64, usage string) + UintVar(p *uint, name string, value uint, usage string) + Uint64Var(p *uint64, name string, value uint64, usage string) + DurationVar(p *time.Duration, name string, value time.Duration, usage string) + StringVar(p *string, name string, value string, usage string) +} + +var _ FlagSet = (*flag.FlagSet)(nil) diff --git a/pkg/kademlia/config.go b/pkg/kademlia/config.go new file mode 100644 index 000000000000..faf8efe9950e --- /dev/null +++ b/pkg/kademlia/config.go @@ -0,0 +1,91 @@ +// Copyright (C) 2018 Storj Labs, Inc. +// See LICENSE for copying information. + +package kademlia + +import ( + "context" + "net" + + "github.com/zeebo/errs" + monkit "gopkg.in/spacemonkeygo/monkit.v2" + + "storj.io/storj/pkg/provider" + proto "storj.io/storj/protos/overlay" +) + +var ( + // Error defines a Kademlia error + Error = errs.Class("kademlia error") + mon = monkit.Package() +) + +type ctxKey int + +const ( + ctxKeyKad ctxKey = iota +) + +// Config defines all of the things that are needed to start up Kademlia +// server endpoints (and not necessarily client code). +type Config struct { + BootstrapAddr string `help:"the kademlia node to bootstrap against" default:"bootstrap-dev.storj.io:8080"` + // TODO(jt): remove this! kademlia should just use the grpc server + TODOListenAddr string `help:"the host/port for kademlia to listen on. TODO(jt): this should be removed!" default:"127.0.0.1:7776"` +} + +// Run implements provider.Responsibility +func (c Config) Run(ctx context.Context, server *provider.Provider) ( + err error) { + defer mon.Task()(&ctx)(&err) + + // TODO(jt): don't split the host/port + host, port, err := net.SplitHostPort(c.BootstrapAddr) + if err != nil { + return Error.Wrap(err) + } + // TODO(jt): an intro node shouldn't require an ID, and should only be an + // address + in, err := GetIntroNode("", host, port) + if err != nil { + return err + } + + // TODO(jt): don't split the host/port + host, port, err = net.SplitHostPort(c.TODOListenAddr) + if err != nil { + return Error.Wrap(err) + } + // TODO(jt): kademlia should register on server.GRPC() instead of listening + // itself + kad, err := NewKademlia(server.Identity().ID, []proto.Node{*in}, host, port) + if err != nil { + return err + } + defer func() { _ = kad.Disconnect() }() + + // TODO(jt): ListenAndServe should probably be blocking and we should kick + // it off in a goroutine here + err = kad.ListenAndServe() + if err != nil { + return err + } + + // TODO(jt): Bootstrap should probably be blocking and we should kick it off + // in a goroutine here + err = kad.Bootstrap(ctx) + if err != nil { + return err + } + + return server.Run(context.WithValue(ctx, ctxKeyKad, kad)) +} + +// LoadFromContext loads an existing Kademlia from the Provider context +// stack if one exists. +func LoadFromContext(ctx context.Context) *Kademlia { + if v, ok := ctx.Value(ctxKeyKad).(*Kademlia); ok { + return v + } + return nil +} diff --git a/pkg/miniogw/config.go b/pkg/miniogw/config.go new file mode 100644 index 000000000000..a1d415fde7cf --- /dev/null +++ b/pkg/miniogw/config.go @@ -0,0 +1,141 @@ +// Copyright (C) 2018 Storj Labs, Inc. +// See LICENSE for copying information. + +package miniogw + +import ( + "context" + "os" + + "github.com/minio/cli" + minio "github.com/minio/minio/cmd" + "github.com/vivint/infectious" + + "storj.io/storj/pkg/eestream" + "storj.io/storj/pkg/objects" + "storj.io/storj/pkg/overlay" + "storj.io/storj/pkg/pointerdb" + "storj.io/storj/pkg/provider" + "storj.io/storj/pkg/segment" + ecclient "storj.io/storj/pkg/storage/ec" + "storj.io/storj/pkg/transport" +) + +// RSConfig is a configuration struct that keeps details about default +// redundancy strategy information +type RSConfig struct { + MaxBufferMem int `help:"maximum buffer memory (in bytes) to be allocated for read buffers" default:"0x400000"` + StripeSize int `help:"the size of each new stripe in bytes" default:"1024"` + MinThreshold int `help:"the minimum pieces required to recover a segment. k." default:"20"` + RepairThreshold int `help:"the minimum safe pieces before a repair is triggered. m." default:"30"` + SuccessThreshold int `help:"the desired total pieces for a segment. o." default:"40"` + MaxThreshold int `help:"the largest amount of pieces to encode to. n." default:"50"` +} + +// MinioConfig is a configuration struct that keeps details about starting +// Minio +type MinioConfig struct { + AccessKey string `help:"Minio Access Key to use" default:"insecure-dev-access-key"` + SecretKey string `help:"Minio Secret Key to use" default:"insecure-dev-secret-key"` + MinioDir string `help:"Minio generic server config path" default:"$HOME/.storj/miniogw"` +} + +// ClientConfig is a configuration struct for the miniogw that controls how +// the miniogw figures out how to talk to the rest of the network. +type ClientConfig struct { + // TODO(jt): these should probably be the same + OverlayAddr string `help:"Address to contact overlay server through"` + PointerDBAddr string `help:"Address to contact pointerdb server through"` +} + +// Config is a general miniogw configuration struct. This should be everything +// one needs to start a minio gateway. +type Config struct { + provider.IdentityConfig + MinioConfig + ClientConfig + RSConfig +} + +// Run starts a Minio Gateway given proper config +func (c Config) Run(ctx context.Context) (err error) { + defer mon.Task()(&ctx)(&err) + + identity, err := c.LoadIdentity() + if err != nil { + return err + } + + err = minio.RegisterGatewayCommand(cli.Command{ + Name: "storj", + Usage: "Storj", + Action: func(cliCtx *cli.Context) error { + return c.action(ctx, cliCtx, identity) + }, + HideHelpCommand: true, + }) + if err != nil { + return err + } + + // TODO(jt): Surely there is a better way. This is so upsetting + err = os.Setenv("MINIO_ACCESS_KEY", c.AccessKey) + if err != nil { + return err + } + err = os.Setenv("MINIO_SECRET_KEY", c.SecretKey) + if err != nil { + return err + } + minio.Main([]string{"storj", "gateway", "storj", + "--address", c.Address, "--config-dir", c.MinioDir}) + return Error.New("unexpected minio exit") +} + +func (c Config) action(ctx context.Context, cliCtx *cli.Context, + identity *provider.FullIdentity) ( + err error) { + defer mon.Task()(&ctx)(&err) + + // TODO(jt): the transport client should use tls and should use the identity + // defined in this function. + t := transport.NewClient() + + // TODO(jt): overlay.NewClient should dial the overlay server with the + // transport client. probably should use the same connection as the + // pointerdb client + oc, err := overlay.NewOverlayClient(c.OverlayAddr) + if err != nil { + return err + } + + // TODO(jt): pointerdb.NewClient should dial the pointerdb server with the + // transport client. probably should use the same connection as the + // overlay client + pdb, err := pointerdb.NewClient(c.PointerDBAddr) + if err != nil { + return err + } + + ec := ecclient.NewClient(t, c.MaxBufferMem) + fc, err := infectious.NewFEC(c.MinThreshold, c.MaxThreshold) + if err != nil { + return err + } + rs, err := eestream.NewRedundancyStrategy( + eestream.NewRSScheme(fc, c.StripeSize/c.MaxThreshold), + c.RepairThreshold, c.SuccessThreshold) + if err != nil { + return err + } + segments := segment.NewSegmentStore(oc, ec, pdb, rs) + + // TODO(jt): wrap segments and turn segments into streams + // TODO(jt): hook streams into object store + // TODO(jt): this should work: + // NewStorjGateway(objects.NewStore(streams.NewStore(segments))) + _ = segments + + minio.StartGateway(cliCtx, NewStorjGateway(objects.NewObjectStore())) + return Error.New("unexpected minio exit") +} diff --git a/pkg/miniogw/gateway-storj.go b/pkg/miniogw/gateway-storj.go index 868cf5d6667f..0a8487ccd5c3 100644 --- a/pkg/miniogw/gateway-storj.go +++ b/pkg/miniogw/gateway-storj.go @@ -1,16 +1,14 @@ // Copyright (C) 2018 Storj Labs, Inc. // See LICENSE for copying information. -package storj +package miniogw import ( "context" "io" - "log" "time" "github.com/gogo/protobuf/proto" - "github.com/minio/cli" minio "github.com/minio/minio/cmd" "github.com/minio/minio/pkg/auth" "github.com/minio/minio/pkg/hash" @@ -28,24 +26,9 @@ var ( Error = errs.Class("Storj Gateway error") ) -func init() { - if err := minio.RegisterGatewayCommand(cli.Command{ - Name: "storj", - Usage: "Storj", - Action: storjGatewayMain, - HideHelpCommand: true, - }); err != nil { - log.Fatal("Failed to register command storj") - } -} - -func storjGatewayMain(ctx *cli.Context) { - s := &Storj{os: mockObjectStore()} - minio.StartGateway(ctx, s) -} - -func mockObjectStore() objects.ObjectStore { - return &objects.Objects{} +// NewStorjGateway creates a *Storj object from an existing ObjectStore +func NewStorjGateway(os objects.ObjectStore) *Storj { + return &Storj{os: os} } // Storj is the implementation of a minio cmd.Gateway diff --git a/pkg/objects/impl.go b/pkg/objects/impl.go index bfc077086b20..9268dd5a879c 100644 --- a/pkg/objects/impl.go +++ b/pkg/objects/impl.go @@ -16,6 +16,9 @@ import ( type Objects struct { } +// NewObjectStore creates an *Objects struct +func NewObjectStore() *Objects { return &Objects{} } + //Meta structure type Meta struct { Modified time.Time diff --git a/pkg/overlay/cache.go b/pkg/overlay/cache.go index 784dd948cfbb..7eb9c65ec2bb 100644 --- a/pkg/overlay/cache.go +++ b/pkg/overlay/cache.go @@ -45,7 +45,7 @@ func NewRedisOverlayCache(address, password string, db int, DHT dht.DHT) (*Cache // NewBoltOverlayCache returns a pointer to a new Cache instance with an initalized connection to a Bolt db. func NewBoltOverlayCache(dbPath string, DHT dht.DHT) (*Cache, error) { - bc, err := boltdb.NewClient(nil, dbPath, boltdb.OverlayBucket) + bc, err := boltdb.NewClient(zap.L(), dbPath, boltdb.OverlayBucket) if err != nil { return nil, err } diff --git a/pkg/overlay/client.go b/pkg/overlay/client.go index 6e64ec1bb925..81493d9db881 100644 --- a/pkg/overlay/client.go +++ b/pkg/overlay/client.go @@ -9,7 +9,6 @@ import ( "google.golang.org/grpc" "storj.io/storj/pkg/dht" - "storj.io/storj/pkg/kademlia" proto "storj.io/storj/protos/overlay" ) @@ -21,8 +20,8 @@ import ( // // Lookup finds a Node with the provided identifier. type Client interface { - Choose(ctx context.Context, limit, space int64) ([]*dht.NodeID, error) - Lookup(ctx context.Context, nodeID kademlia.NodeID) (*proto.Node, error) + Choose(ctx context.Context, limit, space int64) ([]*proto.Node, error) + Lookup(ctx context.Context, nodeID dht.NodeID) (*proto.Node, error) } // Overlay is the overlay concrete implementation of the client interface @@ -32,7 +31,7 @@ type Overlay struct { // NewOverlayClient returns a new intialized Overlay Client func NewOverlayClient(address string) (*Overlay, error) { - c, err := NewClient(&address, grpc.WithInsecure()) + c, err := NewClient(address, grpc.WithInsecure()) if err != nil { return nil, err } @@ -41,6 +40,9 @@ func NewOverlayClient(address string) (*Overlay, error) { }, nil } +// a compiler trick to make sure *Overlay implements Client +var _ Client = (*Overlay)(nil) + // Choose implements the client.Choose interface func (o *Overlay) Choose(ctx context.Context, limit, space int64) ([]*proto.Node, error) { // TODO(coyle): We will also need to communicate with the reputation service here diff --git a/pkg/overlay/config.go b/pkg/overlay/config.go new file mode 100644 index 000000000000..eb0ff4b0c829 --- /dev/null +++ b/pkg/overlay/config.go @@ -0,0 +1,112 @@ +// Copyright (C) 2018 Storj Labs, Inc. +// See LICENSE for copying information. + +package overlay + +import ( + "context" + "net/url" + "strconv" + + "github.com/zeebo/errs" + "go.uber.org/zap" + monkit "gopkg.in/spacemonkeygo/monkit.v2" + + "storj.io/storj/pkg/kademlia" + "storj.io/storj/pkg/provider" + proto "storj.io/storj/protos/overlay" +) + +var ( + mon = monkit.Package() + // Error represents an overlay error + Error = errs.Class("overlay error") +) + +// Config is a configuration struct for everything you need to start the +// Overlay cache responsibility. +type Config struct { + DatabaseURL string `help:"the database connection string to use" default:"bolt://$HOME/.storj/overlay.db"` +} + +// Run implements the provider.Responsibility interface. Run assumes a +// Kademlia responsibility has been started before this one. +func (c Config) Run(ctx context.Context, server *provider.Provider) ( + err error) { + defer mon.Task()(&ctx)(&err) + + kad := kademlia.LoadFromContext(ctx) + if kad == nil { + return Error.New("programmer error: kademlia responsibility unstarted") + } + + dburl, err := url.Parse(c.DatabaseURL) + if err != nil { + return Error.Wrap(err) + } + + var cache *Cache + switch dburl.Scheme { + case "bolt": + cache, err = NewBoltOverlayCache(dburl.Path, kad) + if err != nil { + return err + } + zap.S().Info("Starting overlay cache with BoltDB") + case "redis": + db, err := strconv.Atoi(dburl.Query().Get("db")) + if err != nil { + return Error.New("invalid db: %s", err) + } + cache, err = NewRedisOverlayCache(dburl.Host, urlPwd(dburl), db, kad) + if err != nil { + return err + } + zap.S().Info("Starting overlay cache with Redis") + default: + return Error.New("database scheme not supported: %s", dburl.Scheme) + } + + err = cache.Bootstrap(ctx) + if err != nil { + return err + } + + go func() { + // TODO(jt): should there be a for loop here? + err := cache.Refresh(ctx) + if err != nil { + zap.S().Fatal("cache refreshes stopped", zap.Error(err)) + } + }() + + proto.RegisterOverlayServer(server.GRPC(), &Server{ + dht: kad, + cache: cache, + + // TODO(jt): do something else + logger: zap.L(), + metrics: monkit.Default, + }) + + go func() { + // TODO(jt): should there be a for loop here? + // TODO(jt): how is this different from Refresh? + err := cache.Walk(ctx) + if err != nil { + zap.S().Fatal("cache walking stopped", zap.Error(err)) + } + }() + + return server.Run(ctx) +} + +func urlPwd(u *url.URL) string { + if u.User == nil { + return "" + } + if pw, ok := u.User.Password(); ok { + return pw + } + return "" +} diff --git a/pkg/overlay/overlay_test.go b/pkg/overlay/overlay_test.go index 096525c9cf0e..1fc07535aa6c 100644 --- a/pkg/overlay/overlay_test.go +++ b/pkg/overlay/overlay_test.go @@ -33,7 +33,7 @@ func TestFindStorageNodes(t *testing.T) { defer srv.Stop() address := lis.Addr().String() - c, err := NewClient(&address, grpc.WithInsecure()) + c, err := NewClient(address, grpc.WithInsecure()) assert.NoError(t, err) r, err := c.FindStorageNodes(context.Background(), &proto.FindStorageNodesRequest{}) @@ -56,7 +56,7 @@ func TestOverlayLookup(t *testing.T) { defer srv.Stop() address := lis.Addr().String() - c, err := NewClient(&address, grpc.WithInsecure()) + c, err := NewClient(address, grpc.WithInsecure()) assert.NoError(t, err) r, err := c.Lookup(context.Background(), &proto.LookupRequest{NodeID: id.String()}) diff --git a/pkg/overlay/service.go b/pkg/overlay/service.go index 3d68f65933a2..f57bc66ac55e 100644 --- a/pkg/overlay/service.go +++ b/pkg/overlay/service.go @@ -67,8 +67,8 @@ func NewServer(k *kademlia.Kademlia, cache *Cache, l *zap.Logger, m *monkit.Regi // NewClient connects to grpc server at the provided address with the provided options // returns a new instance of an overlay Client -func NewClient(serverAddr *string, opts ...grpc.DialOption) (proto.OverlayClient, error) { - conn, err := grpc.Dial(*serverAddr, opts...) +func NewClient(serverAddr string, opts ...grpc.DialOption) (proto.OverlayClient, error) { + conn, err := grpc.Dial(serverAddr, opts...) if err != nil { return nil, err } diff --git a/pkg/overlay/service_test.go b/pkg/overlay/service_test.go index 302f557b3d25..79de401e4f10 100644 --- a/pkg/overlay/service_test.go +++ b/pkg/overlay/service_test.go @@ -61,7 +61,7 @@ func TestNewClient_CreateTLS(t *testing.T) { defer srv.Stop() address := lis.Addr().String() - c, err := NewClient(&address, tlsOpts.DialOption()) + c, err := NewClient(address, tlsOpts.DialOption()) assert.NoError(t, err) r, err := c.Lookup(context.Background(), &proto.LookupRequest{}) @@ -95,7 +95,7 @@ func TestNewClient_LoadTLS(t *testing.T) { defer srv.Stop() address := lis.Addr().String() - c, err := NewClient(&address, tlsOpts.DialOption()) + c, err := NewClient(address, tlsOpts.DialOption()) assert.NoError(t, err) r, err := c.Lookup(context.Background(), &proto.LookupRequest{}) @@ -130,7 +130,7 @@ func TestNewClient_IndependentTLS(t *testing.T) { assert.NoError(t, err) address := lis.Addr().String() - c, err := NewClient(&address, clientTLSOps.DialOption()) + c, err := NewClient(address, clientTLSOps.DialOption()) assert.NoError(t, err) r, err := c.Lookup(context.Background(), &proto.LookupRequest{}) diff --git a/pkg/overlay/test_utils.go b/pkg/overlay/utils_test.go similarity index 100% rename from pkg/overlay/test_utils.go rename to pkg/overlay/utils_test.go diff --git a/pkg/peertls/io_util.go b/pkg/peertls/io_util.go index 77d32fd94839..6037152a0e9b 100644 --- a/pkg/peertls/io_util.go +++ b/pkg/peertls/io_util.go @@ -7,7 +7,6 @@ import ( "crypto/ecdsa" "encoding/pem" "io" - "log" "os" "github.com/zeebo/errs" @@ -28,11 +27,7 @@ func writeCerts(certs [][]byte, path string) error { return errs.New("unable to open file \"%s\" for writing", path, err) } - defer func() { - if err := file.Close(); err != nil { - log.Printf("Failed to close file: %s\n", err) - } - }() + defer func() { _ = file.Close() }() for _, cert := range certs { if err := writePem(newCertBlock(cert), file); err != nil { @@ -54,11 +49,7 @@ func writeKey(key *ecdsa.PrivateKey, path string) error { return errs.New("unable to open \"%s\" for writing", path, err) } - defer func() { - if err := file.Close(); err != nil { - log.Printf("Failed to close file: %s\n", err) - } - }() + defer func() { _ = file.Close() }() block, err := keyToBlock(key) if err != nil { diff --git a/pkg/peertls/peertls.go b/pkg/peertls/peertls.go index 2ada364f0081..83e1c79fc620 100644 --- a/pkg/peertls/peertls.go +++ b/pkg/peertls/peertls.go @@ -136,7 +136,10 @@ func verifyCertSignature(parentCert, childCert *x509.Certificate) (bool, error) } h := crypto.SHA256.New() - h.Write(childCert.RawTBSCertificate) + _, err := h.Write(childCert.RawTBSCertificate) + if err != nil { + return false, err + } digest := h.Sum(nil) isValid := ecdsa.Verify(pubkey, digest, signature.R, signature.S) diff --git a/pkg/piecestore/psservice/config.go b/pkg/piecestore/psservice/config.go new file mode 100644 index 000000000000..46d16fbdefda --- /dev/null +++ b/pkg/piecestore/psservice/config.go @@ -0,0 +1,58 @@ +// Copyright (C) 2018 Storj Labs, Inc. +// See LICENSE for copying information. + +package psservice + +import ( + "context" + "log" + "path/filepath" + + "github.com/zeebo/errs" + monkit "gopkg.in/spacemonkeygo/monkit.v2" + + psserver "storj.io/storj/pkg/piecestore/rpc/server" + "storj.io/storj/pkg/piecestore/rpc/server/ttl" + "storj.io/storj/pkg/provider" + pspb "storj.io/storj/protos/piecestore" +) + +var ( + // Error represents a farmer error + Error = errs.Class("farmer error") + mon = monkit.Package() +) + +// Config is a configuration struct that implements all the configuration +// needed for the piece store responsibility +type Config struct { + Path string `help:"path to store data in" default:"$HOME/.storj"` +} + +// Run implements provider.Responsibility +func (c Config) Run(ctx context.Context, server *provider.Provider) ( + err error) { + defer mon.Task()(&ctx)(&err) + + ttlPath := filepath.Join(c.Path, "ttl.db") + piecePath := filepath.Join(c.Path, "data") + + ttldb, err := ttl.NewTTL(ttlPath) + if err != nil { + return err + } + // TODO(jt): defer ttldb.Close() + + // TODO(jt): server.Server constructor + s := &psserver.Server{PieceStoreDir: piecePath, DB: ttldb} + // TODO(jt): defer s.Close() + + pspb.RegisterPieceStoreRoutesServer(server.GRPC(), s) + + go func() { + // TODO(jt): why isn't the piecestore server doing this? + log.Fatal(s.DB.DBCleanup(piecePath)) + }() + + return server.Run(ctx) +} diff --git a/pkg/pointerdb/client.go b/pkg/pointerdb/client.go index d0dbe6ed4a62..618049bdad52 100644 --- a/pkg/pointerdb/client.go +++ b/pkg/pointerdb/client.go @@ -27,8 +27,8 @@ type PointerDB struct { type Client interface { Put(ctx context.Context, path p.Path, pointer *pb.Pointer, APIKey []byte) error Get(ctx context.Context, path p.Path, APIKey []byte) (*pb.Pointer, error) - List(ctx context.Context, startingPathKey []byte, limit int64, APIKey []byte) ( - paths []byte, truncated bool, err error) + List(ctx context.Context, startingPathKey p.Path, limit int64, APIKey []byte) ( + paths [][]byte, truncated bool, err error) Delete(ctx context.Context, path p.Path, APIKey []byte) error } @@ -44,6 +44,9 @@ func NewClient(address string) (*PointerDB, error) { }, nil } +// a compiler trick to make sure *PointerDB implements Client +var _ Client = (*PointerDB)(nil) + // ClientConnection makes a server connection func clientConnection(serverAddr string, opts ...grpc.DialOption) (pb.PointerDBClient, error) { conn, err := grpc.Dial(serverAddr, opts...) diff --git a/pkg/pointerdb/config.go b/pkg/pointerdb/config.go new file mode 100644 index 000000000000..6d9cdef77b63 --- /dev/null +++ b/pkg/pointerdb/config.go @@ -0,0 +1,41 @@ +// Copyright (C) 2018 Storj Labs, Inc. +// See LICENSE for copying information. + +package pointerdb + +import ( + "context" + "net/url" + + "go.uber.org/zap" + + "storj.io/storj/pkg/provider" + proto "storj.io/storj/protos/pointerdb" + "storj.io/storj/storage/boltdb" +) + +// Config is a configuration struct that is everything you need to start a +// PointerDB responsibility +type Config struct { + DatabaseURL string `help:"the database connection string to use" default:"bolt://$HOME/.storj/pointerdb.db"` +} + +// Run implements the provider.Responsibility interface +func (c Config) Run(ctx context.Context, server *provider.Provider) error { + dburl, err := url.Parse(c.DatabaseURL) + if err != nil { + return err + } + if dburl.Scheme != "bolt" { + return Error.New("unsupported db scheme: %s", dburl.Scheme) + } + bdb, err := boltdb.NewClient(zap.L(), dburl.Path, boltdb.PointerBucket) + if err != nil { + return err + } + defer func() { _ = bdb.Close() }() + + proto.RegisterPointerDBServer(server.GRPC(), NewServer(bdb, zap.L())) + + return server.Run(ctx) +} diff --git a/pkg/process/debug.go b/pkg/process/debug.go index c67bead8295f..d10176976058 100644 --- a/pkg/process/debug.go +++ b/pkg/process/debug.go @@ -4,7 +4,6 @@ package process import ( - "context" "net" "net/http" "net/http/pprof" @@ -20,7 +19,7 @@ func init() { *http.DefaultServeMux = http.ServeMux{} } -func initDebug(ctx context.Context, logger *zap.Logger, r *monkit.Registry) ( +func initDebug(logger *zap.Logger, r *monkit.Registry) ( err error) { var mux http.ServeMux mux.HandleFunc("/debug/pprof/", pprof.Index) diff --git a/pkg/process/exec_conf.go b/pkg/process/exec_conf.go new file mode 100644 index 000000000000..f83bc5325555 --- /dev/null +++ b/pkg/process/exec_conf.go @@ -0,0 +1,175 @@ +// Copyright (C) 2018 Storj Labs, Inc. +// See LICENSE for copying information. + +package process + +import ( + "context" + "flag" + "log" + "os" + "strings" + "sync" + + "github.com/spf13/cobra" + "github.com/spf13/pflag" + "github.com/spf13/viper" + "go.uber.org/zap" + monkit "gopkg.in/spacemonkeygo/monkit.v2" + + "storj.io/storj/pkg/telemetry" + "storj.io/storj/pkg/utils" +) + +// ExecuteWithConfig runs a Cobra command with the provided default config +func ExecuteWithConfig(cmd *cobra.Command, defaultConfig string) { + pflag.CommandLine.AddGoFlagSet(flag.CommandLine) + cleanup(cmd, defaultConfig) + _ = cmd.Execute() +} + +var ( + mon = monkit.Package() + + contextMtx sync.Mutex + contexts = map[*cobra.Command]context.Context{} +) + +// Ctx returns the appropriate context.Context for ExecuteWithConfig commands +func Ctx(cmd *cobra.Command) context.Context { + contextMtx.Lock() + defer contextMtx.Unlock() + ctx := contexts[cmd] + if ctx == nil { + return context.Background() + } + return ctx +} + +type ctxKey int + +const ( + ctxKeyVip ctxKey = iota + ctxKeyCfg +) + +// SaveConfig outputs the configuration to the configured (or default) config +// file given to ExecuteWithConfig +func SaveConfig(cmd *cobra.Command) error { + ctx := Ctx(cmd) + return getViper(ctx).WriteConfigAs(CfgPath(ctx)) +} + +// SaveConfigAs outputs the configuration to the provided path assuming the +// command was executed with ExecuteWithConfig +func SaveConfigAs(cmd *cobra.Command, path string) error { + return getViper(Ctx(cmd)).WriteConfigAs(path) +} + +func getViper(ctx context.Context) *viper.Viper { + if v, ok := ctx.Value(ctxKeyVip).(*viper.Viper); ok { + return v + } + return nil +} + +// CfgPath returns the configuration path used with ExecuteWithConfig +func CfgPath(ctx context.Context) string { + if v, ok := ctx.Value(ctxKeyCfg).(string); ok { + return v + } + return "" +} + +func cleanup(cmd *cobra.Command, defaultConfig string) { + for _, ccmd := range cmd.Commands() { + cleanup(ccmd, defaultConfig) + } + if cmd.Run != nil { + panic("Please use cobra's RunE instead of Run") + } + internalRun := cmd.RunE + if internalRun == nil { + return + } + cfgFile := cmd.Flags().String("config", os.ExpandEnv(defaultConfig), + "config file") + cmd.RunE = func(cmd *cobra.Command, args []string) (err error) { + ctx := context.Background() + defer mon.TaskNamed("root")(&ctx)(&err) + + vip := viper.New() + err = vip.BindPFlags(cmd.Flags()) + if err != nil { + return err + } + vip.SetEnvPrefix("storj") + vip.AutomaticEnv() + if *cfgFile != "" && fileExists(*cfgFile) { + vip.SetConfigFile(*cfgFile) + err = vip.ReadInConfig() + if err != nil { + return err + } + } + + // go back and propagate changed config values to appropriate flags + var brokenKeys []string + for _, key := range vip.AllKeys() { + if cmd.Flags().Lookup(key) == nil { + // flag couldn't be found + brokenKeys = append(brokenKeys, key) + } else { + err := cmd.Flags().Set(key, vip.GetString(key)) + if err != nil { + // flag couldn't be set + brokenKeys = append(brokenKeys, key) + } + } + } + + ctx = context.WithValue(ctx, ctxKeyVip, vip) + ctx = context.WithValue(ctx, ctxKeyCfg, *cfgFile) + + logger, err := utils.NewLogger(*logDisposition) + if err != nil { + return err + } + defer func() { _ = logger.Sync() }() + defer zap.ReplaceGlobals(logger)() + defer zap.RedirectStdLog(logger)() + + // okay now that logging is working, inform about the broken keys + // these keys are almost certainly broken because they have capital + // letters + logger.Sugar().Infof("TODO: these flags are not configurable via "+ + "config file, probably due to having uppercase letters: %s", + strings.Join(brokenKeys, ", ")) + + err = initMetrics(ctx, monkit.Default, + telemetry.DefaultInstanceID()) + if err != nil { + logger.Error("failed to configure telemetry", zap.Error(err)) + } + + err = initDebug(logger, monkit.Default) + if err != nil { + logger.Error("failed to start debug endpoints", zap.Error(err)) + } + + contextMtx.Lock() + contexts[cmd] = ctx + contextMtx.Unlock() + defer func() { + contextMtx.Lock() + delete(contexts, cmd) + contextMtx.Unlock() + }() + + err = internalRun(cmd, args) + if err != nil { + log.Fatalf("%+v", err) + } + return err + } +} diff --git a/pkg/process/service.go b/pkg/process/service.go index dd5add12297e..1085ee7167b6 100644 --- a/pkg/process/service.go +++ b/pkg/process/service.go @@ -102,11 +102,7 @@ func CtxService(s Service) func(cmd *cobra.Command, args []string) error { if err != nil { return err } - defer func() { - if err := logger.Sync(); err != nil { - logger.Error("failed to sync logger", zap.Error(err)) - } - }() + defer func() { _ = logger.Sync() }() defer zap.ReplaceGlobals(logger)() defer zap.RedirectStdLog(logger)() @@ -124,7 +120,7 @@ func CtxService(s Service) func(cmd *cobra.Command, args []string) error { logger.Error("failed to configure telemetry", zap.Error(err)) } - err = initDebug(ctx, logger, registry) + err = initDebug(logger, registry) if err != nil { logger.Error("failed to start debug endpoints", zap.Error(err)) } diff --git a/pkg/provider/common.go b/pkg/provider/common.go new file mode 100644 index 000000000000..953a620944df --- /dev/null +++ b/pkg/provider/common.go @@ -0,0 +1,16 @@ +// Copyright (C) 2018 Storj Labs, Inc. +// See LICENSE for copying information. + +package provider + +import ( + "github.com/zeebo/errs" + monkit "gopkg.in/spacemonkeygo/monkit.v2" +) + +var ( + mon = monkit.Package() + + // Error is a provider error + Error = errs.Class("provider error") +) diff --git a/pkg/provider/identity.go b/pkg/provider/identity.go new file mode 100644 index 000000000000..d99b6c2e6cc1 --- /dev/null +++ b/pkg/provider/identity.go @@ -0,0 +1,123 @@ +// Copyright (C) 2018 Storj Labs, Inc. +// See LICENSE for copying information. + +package provider + +import ( + "context" + "crypto" + "crypto/sha256" + "crypto/tls" + "crypto/x509" + "net" + + base58 "github.com/jbenet/go-base58" + "go.uber.org/zap" + + "storj.io/storj/pkg/dht" + "storj.io/storj/pkg/peertls" +) + +// PeerIdentity represents another peer on the network. +type PeerIdentity struct { + // CA represents the peer's self-signed CA. The ID is taken from this cert. + CA *x509.Certificate + // Leaf represents the leaf they're currently using. The leaf should be + // signed by the CA. The leaf is what is used for communication. + Leaf *x509.Certificate + // The ID is calculated from the CA cert. + ID dht.NodeID +} + +// FullIdentity represents you on the network. In addition to a PeerIdentity, +// a FullIdentity also has a PrivateKey, which a PeerIdentity doesn't have. +// The PrivateKey should be for the PeerIdentity's Leaf certificate. +type FullIdentity struct { + PeerIdentity + PrivateKey crypto.PrivateKey + + todoCert *tls.Certificate // TODO(jt): get rid of this and only use the above +} + +// IdentityConfig allows you to run a set of Responsibilities with the given +// identity. You can also just load an Identity from disk. +type IdentityConfig struct { + CertPath string `help:"path to the certificate chain for this identity" default:"$HOME/.storj/identity.cert"` + KeyPath string `help:"path to the private key for this identity" default:"$HOME/.storj/identity.key"` + Address string `help:"address to listen on" default:":7777"` +} + +// LoadIdentity loads a FullIdentity from the given configuration +func (ic IdentityConfig) LoadIdentity() (*FullIdentity, error) { + pi, err := FullIdentityFromFiles(ic.CertPath, ic.KeyPath) + if err != nil { + return nil, Error.New("failed to load identity %#v, %#v: %v", + ic.CertPath, ic.KeyPath, err) + } + return pi, nil +} + +// Run will run the given responsibilities with the configured identity. +func (ic IdentityConfig) Run(ctx context.Context, + responsibilities ...Responsibility) ( + err error) { + defer mon.Task()(&ctx)(&err) + + pi, err := ic.LoadIdentity() + if err != nil { + return err + } + + lis, err := net.Listen("tcp", ic.Address) + if err != nil { + return err + } + defer func() { _ = lis.Close() }() + + s, err := NewProvider(pi, lis, responsibilities...) + if err != nil { + return err + } + defer func() { _ = s.Close() }() + + zap.S().Infof("Node %s started", s.Identity().ID) + + return s.Run(ctx) +} + +// PeerIdentityFromCertChain loads a PeerIdentity from a chain of certificates +func PeerIdentityFromCertChain(chain [][]byte) (*PeerIdentity, error) { + // TODO(jt): yeah, this totally does not do the right thing yet + // TODO(jt): fill this in correctly. + hash := sha256.Sum256(chain[0]) // TODO(jt): this is wrong + return &PeerIdentity{ + CA: nil, // TODO(jt) + Leaf: nil, // TODO(jt) + ID: nodeID(base58.Encode(hash[:])), // TODO(jt): this is wrong + }, nil +} + +// FullIdentityFromFiles loads a FullIdentity from a certificate chain and +// private key file +func FullIdentityFromFiles(certPath, keyPath string) (*FullIdentity, error) { + cert, err := peertls.LoadCert(certPath, keyPath) + if err != nil { + return nil, Error.Wrap(err) + } + + peer, err := PeerIdentityFromCertChain(cert.Certificate) + if err != nil { + return nil, Error.Wrap(err) + } + + return &FullIdentity{ + PeerIdentity: *peer, + PrivateKey: cert.PrivateKey, + todoCert: cert, + }, nil +} + +type nodeID string + +func (n nodeID) String() string { return string(n) } +func (n nodeID) Bytes() []byte { return []byte(n) } diff --git a/pkg/provider/provider.go b/pkg/provider/provider.go new file mode 100644 index 000000000000..38a9bda77ef0 --- /dev/null +++ b/pkg/provider/provider.go @@ -0,0 +1,78 @@ +// Copyright (C) 2018 Storj Labs, Inc. +// See LICENSE for copying information. + +package provider + +import ( + "context" + "crypto/tls" + "net" + + "google.golang.org/grpc" + + "storj.io/storj/pkg/peertls" +) + +// Responsibility represents a specific gRPC method collection to be registered +// on a shared gRPC server. PointerDB, OverlayCache, PieceStore, Kademlia, +// StatDB, etc. are all examples of Responsibilities. +type Responsibility interface { + Run(ctx context.Context, server *Provider) error +} + +// Provider represents a bundle of responsibilities defined by a specific ID. +// Examples of providers are the heavy client, the farmer, and the gateway. +type Provider struct { + lis net.Listener + g *grpc.Server + next []Responsibility + identity *FullIdentity +} + +// NewProvider creates a Provider out of an Identity, a net.Listener, and a set +// of responsibilities. +func NewProvider(identity *FullIdentity, lis net.Listener, + responsibilities ...Responsibility) (*Provider, error) { + + return &Provider{ + lis: lis, + g: grpc.NewServer(), + next: responsibilities, + identity: identity, + }, nil +} + +// Identity returns the provider's identity +func (p *Provider) Identity() *FullIdentity { return p.identity } + +// GRPC returns the provider's gRPC server for registration purposes +func (p *Provider) GRPC() *grpc.Server { return p.g } + +// Close shuts down the provider +func (p *Provider) Close() error { + p.g.GracefulStop() + return nil +} + +// Run will run the provider and all of its responsibilities +func (p *Provider) Run(ctx context.Context) (err error) { + defer mon.Task()(&ctx)(&err) + + // are there any unstarted responsibilities? start those first. the + // responsibilities know to call Run again once they're ready. + if len(p.next) > 0 { + next := p.next[0] + p.next = p.next[1:] + return next.Run(ctx, p) + } + + return p.g.Serve(p.lis) +} + +// TLSConfig returns the provider's identity as a TLS Config +func (p *Provider) TLSConfig() *tls.Config { + // TODO(jt): get rid of tls.Certificate + return (&peertls.TLSFileOptions{ + LeafCertificate: p.identity.todoCert, + }).NewTLSConfig(nil) +} diff --git a/pkg/segment/segment.go b/pkg/segment/segment.go index 7d4edc08a7b7..88a9ef6a8fc7 100644 --- a/pkg/segment/segment.go +++ b/pkg/segment/segment.go @@ -8,15 +8,17 @@ import ( "io" "time" - "github.com/gogo/protobuf/proto" "go.uber.org/zap" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" monkit "gopkg.in/spacemonkeygo/monkit.v2" "storj.io/storj/pkg/eestream" + "storj.io/storj/pkg/kademlia" + "storj.io/storj/pkg/overlay" "storj.io/storj/pkg/paths" "storj.io/storj/pkg/piecestore/rpc/client" + "storj.io/storj/pkg/pointerdb" "storj.io/storj/pkg/ranger" "storj.io/storj/pkg/storage/ec" opb "storj.io/storj/protos/overlay" @@ -43,15 +45,15 @@ type Store interface { } type segmentStore struct { - oc opb.OverlayClient + oc overlay.Client ec ecclient.Client - pdb ppb.PointerDBClient + pdb pointerdb.Client rs eestream.RedundancyStrategy } // NewSegmentStore creates a new instance of segmentStore -func NewSegmentStore(oc opb.OverlayClient, ec ecclient.Client, pdb ppb.PointerDBClient, - rs eestream.RedundancyStrategy) Store { +func NewSegmentStore(oc overlay.Client, ec ecclient.Client, + pdb pointerdb.Client, rs eestream.RedundancyStrategy) Store { return &segmentStore{oc: oc, ec: ec, pdb: pdb, rs: rs} } @@ -61,7 +63,7 @@ func (s *segmentStore) Put(ctx context.Context, path paths.Path, data io.Reader, defer mon.Task()(&ctx)(&err) // uses overlay client to request a list of nodes - nodeRes, err := s.oc.FindStorageNodes(ctx, &opb.FindStorageNodesRequest{}) + nodeRes, err := s.oc.Choose(ctx, 0, 0) if err != nil { return Error.Wrap(err) } @@ -69,43 +71,39 @@ func (s *segmentStore) Put(ctx context.Context, path paths.Path, data io.Reader, pieceID := client.NewPieceID() // puts file to ecclient - err = s.ec.Put(ctx, nodeRes.GetNodes(), s.rs, pieceID, data, expiration) + err = s.ec.Put(ctx, nodeRes, s.rs, pieceID, data, expiration) if err != nil { zap.S().Error("Failed putting nodes to ecclient") return Error.Wrap(err) } var remotePieces []*ppb.RemotePiece - for i := range nodeRes.Nodes { + for i := range nodeRes { remotePieces = append(remotePieces, &ppb.RemotePiece{ PieceNum: int64(i), - NodeId: nodeRes.Nodes[i].Id, + NodeId: nodeRes[i].Id, }) } // creates pointer - pr := ppb.PutRequest{ - Path: []byte(path.String()), - Pointer: &ppb.Pointer{ - Type: ppb.Pointer_REMOTE, - Remote: &ppb.RemoteSegment{ - Redundancy: &ppb.RedundancyScheme{ - Type: ppb.RedundancyScheme_RS, - MinReq: int64(s.rs.RequiredCount()), - Total: int64(s.rs.TotalCount()), - RepairThreshold: int64(s.rs.Min), - SuccessThreshold: int64(s.rs.Opt), - }, - PieceId: string(pieceID), - RemotePieces: remotePieces, + pr := &ppb.Pointer{ + Type: ppb.Pointer_REMOTE, + Remote: &ppb.RemoteSegment{ + Redundancy: &ppb.RedundancyScheme{ + Type: ppb.RedundancyScheme_RS, + MinReq: int64(s.rs.RequiredCount()), + Total: int64(s.rs.TotalCount()), + RepairThreshold: int64(s.rs.Min), + SuccessThreshold: int64(s.rs.Opt), }, - Metadata: metadata, + PieceId: string(pieceID), + RemotePieces: remotePieces, }, - APIKey: nil, + Metadata: metadata, } // puts pointer to pointerDB - _, err = s.pdb.Put(ctx, &pr) + err = s.pdb.Put(ctx, path, pr, nil) if err != nil || status.Code(err) == codes.Internal { zap.L().Error("failed to put", zap.Error(err)) return Error.Wrap(err) @@ -114,22 +112,11 @@ func (s *segmentStore) Put(ctx context.Context, path paths.Path, data io.Reader, } // Get retrieves a file using erasure code, overlay, and pointerdb clients -func (s *segmentStore) Get(ctx context.Context, path paths.Path) (ranger.Ranger, Meta, error) { +func (s *segmentStore) Get(ctx context.Context, path paths.Path) ( + ranger.Ranger, Meta, error) { m := Meta{} - // TODO: remove this chunk after pointerdb client interface merged - gr := &ppb.GetRequest{ - Path: []byte(path.String()), - APIKey: nil, - } - - pdbRes, err := s.pdb.Get(ctx, gr) - if err != nil { - return nil, m, err - } - // TODO: remove this chunk after pointerdb client interface merged - pointer := &ppb.Pointer{} - err = proto.Unmarshal(pdbRes.Pointer, pointer) + pointer, err := s.pdb.Get(ctx, path, nil) if err != nil { return nil, m, err } @@ -155,23 +142,11 @@ func (s *segmentStore) Get(ctx context.Context, path paths.Path) (ranger.Ranger, return ecRes, m, nil } -// Delete tells piece stores to delete a segment and deletes pointer from pointerdb +// Delete tells piece stores to delete a segment and deletes pointer from +// pointerdb func (s *segmentStore) Delete(ctx context.Context, path paths.Path) error { - // TODO: remove this chunk after pointerdb client interface merged - gr := &ppb.GetRequest{ - Path: []byte(path.String()), - APIKey: nil, - } - // gets pointer from pointerdb - pdbRes, err := s.pdb.Get(ctx, gr) - if err != nil { - return err - } - - // TODO: remove this chunk after pointerdb client interface merged - pointer := &ppb.Pointer{} - err = proto.Unmarshal(pdbRes.Pointer, pointer) + pointer, err := s.pdb.Get(ctx, path, nil) if err != nil { return err } @@ -187,14 +162,8 @@ func (s *segmentStore) Delete(ctx context.Context, path paths.Path) error { return err } - // TODO: remove this chunk after pointerdb client interface merged - dr := &ppb.DeleteRequest{ - Path: []byte(path.String()), - APIKey: nil, - } - // deletes pointer from pointerdb - _, err = s.pdb.Delete(ctx, dr) + err = s.pdb.Delete(ctx, path, nil) if err != nil { return err } @@ -203,38 +172,34 @@ func (s *segmentStore) Delete(ctx context.Context, path paths.Path) error { } // overlayHelper calls Lookup to get node addresses from the overlay -func (s *segmentStore) overlayHelper(ctx context.Context, rem *ppb.RemoteSegment) (nodes []*opb.Node, err error) { +func (s *segmentStore) overlayHelper(ctx context.Context, + rem *ppb.RemoteSegment) (nodes []*opb.Node, err error) { for i := 0; i < len(rem.RemotePieces); i++ { - overlayRes, err := s.oc.Lookup(ctx, &opb.LookupRequest{NodeID: rem.RemotePieces[i].NodeId}) + overlayRes, err := s.oc.Lookup(ctx, + kademlia.StringToNodeID(rem.RemotePieces[i].NodeId)) if err != nil { return nil, err } - nodes = append(nodes, overlayRes.Node) + nodes = append(nodes, overlayRes) } return nodes, nil } // List lists paths stored in the pointerdb -func (s *segmentStore) List(ctx context.Context, startingPath, endingPath paths.Path) ( +func (s *segmentStore) List(ctx context.Context, + startingPath, endingPath paths.Path) ( listPaths []paths.Path, truncated bool, err error) { - // TODO: remove this chunk after pointerdb client interface merged - lr := &ppb.ListRequest{ - StartingPathKey: []byte(startingPath.String()), - // TODO: change limit to endingPath when supported - Limit: 1, - APIKey: nil, - } - - res, err := s.pdb.List(ctx, lr) + pathsResp, truncated, err := s.pdb.List( + ctx, startingPath, 0, nil) if err != nil { return nil, false, err } - for _, path := range res.Paths { + for _, path := range pathsResp { np := paths.New(string(path[:])) listPaths = append(listPaths, np) } - return listPaths, res.Truncated, nil + return listPaths, truncated, nil } diff --git a/pointerdb/auth/process_api_key.go b/pointerdb/auth/process_api_key.go index 05a69fee2517..e6c1a830b006 100644 --- a/pointerdb/auth/process_api_key.go +++ b/pointerdb/auth/process_api_key.go @@ -5,39 +5,21 @@ package auth import ( "crypto/subtle" - "net/http" - - "github.com/spf13/viper" + "flag" ) -// InitializeHeaders : mocks HTTP headers to preset X-API-Key -func InitializeHeaders() *http.Header { - - httpHeaders := http.Header{ - "Accept-Encoding": {"gzip, deflate"}, - "Accept-Language": {"en-US,en;q=0.9"}, - "X-Api-Key": {"12345"}, - "Cache-Control": {"max-age=0"}, - "Accept": {"text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8"}, - "Connection": {"keep-alive"}, - } - return &httpHeaders -} +var ( + apiKey = flag.String("pointerdb.auth.api_key", "", "api key") +) // ValidateAPIKey : validates the X-API-Key header to an env/flag input func ValidateAPIKey(header string) bool { + var expected = []byte(*apiKey) + var actual = []byte(header) - var apiKeyByte = []byte(viper.GetString("key")) - var xAPIKeyByte = []byte(header) - - switch { - case len(apiKeyByte) == 0: + if len(expected) <= 0 { return false - case len(apiKeyByte) > 0: - result := subtle.ConstantTimeCompare(apiKeyByte, xAPIKeyByte) - if result == 1 { - return true - } } - return false + + return 1 == subtle.ConstantTimeCompare(expected, actual) } From 8388326ea2feeb7acdf52cd2413f5fc22ad05803 Mon Sep 17 00:00:00 2001 From: JT Olio Date: Mon, 23 Jul 2018 12:46:25 -0600 Subject: [PATCH 2/2] Revert segment changes, squash an unneeded log line --- pkg/miniogw/config.go | 10 +++- pkg/process/exec_conf.go | 8 ++- pkg/segment/segment.go | 119 +++++++++++++++++++++++++-------------- 3 files changed, 89 insertions(+), 48 deletions(-) diff --git a/pkg/miniogw/config.go b/pkg/miniogw/config.go index a1d415fde7cf..5aee81989472 100644 --- a/pkg/miniogw/config.go +++ b/pkg/miniogw/config.go @@ -16,7 +16,6 @@ import ( "storj.io/storj/pkg/overlay" "storj.io/storj/pkg/pointerdb" "storj.io/storj/pkg/provider" - "storj.io/storj/pkg/segment" ecclient "storj.io/storj/pkg/storage/ec" "storj.io/storj/pkg/transport" ) @@ -128,13 +127,18 @@ func (c Config) action(ctx context.Context, cliCtx *cli.Context, if err != nil { return err } - segments := segment.NewSegmentStore(oc, ec, pdb, rs) + + // TODO(jt): make segment store + // segments := segment.NewSegmentStore(oc, ec, pdb, rs) + _ = oc + _ = ec + _ = pdb + _ = rs // TODO(jt): wrap segments and turn segments into streams // TODO(jt): hook streams into object store // TODO(jt): this should work: // NewStorjGateway(objects.NewStore(streams.NewStore(segments))) - _ = segments minio.StartGateway(cliCtx, NewStorjGateway(objects.NewObjectStore())) return Error.New("unexpected minio exit") diff --git a/pkg/process/exec_conf.go b/pkg/process/exec_conf.go index f83bc5325555..9433026e2ec0 100644 --- a/pkg/process/exec_conf.go +++ b/pkg/process/exec_conf.go @@ -142,9 +142,11 @@ func cleanup(cmd *cobra.Command, defaultConfig string) { // okay now that logging is working, inform about the broken keys // these keys are almost certainly broken because they have capital // letters - logger.Sugar().Infof("TODO: these flags are not configurable via "+ - "config file, probably due to having uppercase letters: %s", - strings.Join(brokenKeys, ", ")) + if len(brokenKeys) > 0 { + logger.Sugar().Infof("TODO: these flags are not configurable via "+ + "config file, probably due to having uppercase letters: %s", + strings.Join(brokenKeys, ", ")) + } err = initMetrics(ctx, monkit.Default, telemetry.DefaultInstanceID()) diff --git a/pkg/segment/segment.go b/pkg/segment/segment.go index 88a9ef6a8fc7..7d4edc08a7b7 100644 --- a/pkg/segment/segment.go +++ b/pkg/segment/segment.go @@ -8,17 +8,15 @@ import ( "io" "time" + "github.com/gogo/protobuf/proto" "go.uber.org/zap" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" monkit "gopkg.in/spacemonkeygo/monkit.v2" "storj.io/storj/pkg/eestream" - "storj.io/storj/pkg/kademlia" - "storj.io/storj/pkg/overlay" "storj.io/storj/pkg/paths" "storj.io/storj/pkg/piecestore/rpc/client" - "storj.io/storj/pkg/pointerdb" "storj.io/storj/pkg/ranger" "storj.io/storj/pkg/storage/ec" opb "storj.io/storj/protos/overlay" @@ -45,15 +43,15 @@ type Store interface { } type segmentStore struct { - oc overlay.Client + oc opb.OverlayClient ec ecclient.Client - pdb pointerdb.Client + pdb ppb.PointerDBClient rs eestream.RedundancyStrategy } // NewSegmentStore creates a new instance of segmentStore -func NewSegmentStore(oc overlay.Client, ec ecclient.Client, - pdb pointerdb.Client, rs eestream.RedundancyStrategy) Store { +func NewSegmentStore(oc opb.OverlayClient, ec ecclient.Client, pdb ppb.PointerDBClient, + rs eestream.RedundancyStrategy) Store { return &segmentStore{oc: oc, ec: ec, pdb: pdb, rs: rs} } @@ -63,7 +61,7 @@ func (s *segmentStore) Put(ctx context.Context, path paths.Path, data io.Reader, defer mon.Task()(&ctx)(&err) // uses overlay client to request a list of nodes - nodeRes, err := s.oc.Choose(ctx, 0, 0) + nodeRes, err := s.oc.FindStorageNodes(ctx, &opb.FindStorageNodesRequest{}) if err != nil { return Error.Wrap(err) } @@ -71,39 +69,43 @@ func (s *segmentStore) Put(ctx context.Context, path paths.Path, data io.Reader, pieceID := client.NewPieceID() // puts file to ecclient - err = s.ec.Put(ctx, nodeRes, s.rs, pieceID, data, expiration) + err = s.ec.Put(ctx, nodeRes.GetNodes(), s.rs, pieceID, data, expiration) if err != nil { zap.S().Error("Failed putting nodes to ecclient") return Error.Wrap(err) } var remotePieces []*ppb.RemotePiece - for i := range nodeRes { + for i := range nodeRes.Nodes { remotePieces = append(remotePieces, &ppb.RemotePiece{ PieceNum: int64(i), - NodeId: nodeRes[i].Id, + NodeId: nodeRes.Nodes[i].Id, }) } // creates pointer - pr := &ppb.Pointer{ - Type: ppb.Pointer_REMOTE, - Remote: &ppb.RemoteSegment{ - Redundancy: &ppb.RedundancyScheme{ - Type: ppb.RedundancyScheme_RS, - MinReq: int64(s.rs.RequiredCount()), - Total: int64(s.rs.TotalCount()), - RepairThreshold: int64(s.rs.Min), - SuccessThreshold: int64(s.rs.Opt), + pr := ppb.PutRequest{ + Path: []byte(path.String()), + Pointer: &ppb.Pointer{ + Type: ppb.Pointer_REMOTE, + Remote: &ppb.RemoteSegment{ + Redundancy: &ppb.RedundancyScheme{ + Type: ppb.RedundancyScheme_RS, + MinReq: int64(s.rs.RequiredCount()), + Total: int64(s.rs.TotalCount()), + RepairThreshold: int64(s.rs.Min), + SuccessThreshold: int64(s.rs.Opt), + }, + PieceId: string(pieceID), + RemotePieces: remotePieces, }, - PieceId: string(pieceID), - RemotePieces: remotePieces, + Metadata: metadata, }, - Metadata: metadata, + APIKey: nil, } // puts pointer to pointerDB - err = s.pdb.Put(ctx, path, pr, nil) + _, err = s.pdb.Put(ctx, &pr) if err != nil || status.Code(err) == codes.Internal { zap.L().Error("failed to put", zap.Error(err)) return Error.Wrap(err) @@ -112,11 +114,22 @@ func (s *segmentStore) Put(ctx context.Context, path paths.Path, data io.Reader, } // Get retrieves a file using erasure code, overlay, and pointerdb clients -func (s *segmentStore) Get(ctx context.Context, path paths.Path) ( - ranger.Ranger, Meta, error) { +func (s *segmentStore) Get(ctx context.Context, path paths.Path) (ranger.Ranger, Meta, error) { m := Meta{} + // TODO: remove this chunk after pointerdb client interface merged + gr := &ppb.GetRequest{ + Path: []byte(path.String()), + APIKey: nil, + } + + pdbRes, err := s.pdb.Get(ctx, gr) + if err != nil { + return nil, m, err + } - pointer, err := s.pdb.Get(ctx, path, nil) + // TODO: remove this chunk after pointerdb client interface merged + pointer := &ppb.Pointer{} + err = proto.Unmarshal(pdbRes.Pointer, pointer) if err != nil { return nil, m, err } @@ -142,11 +155,23 @@ func (s *segmentStore) Get(ctx context.Context, path paths.Path) ( return ecRes, m, nil } -// Delete tells piece stores to delete a segment and deletes pointer from -// pointerdb +// Delete tells piece stores to delete a segment and deletes pointer from pointerdb func (s *segmentStore) Delete(ctx context.Context, path paths.Path) error { + // TODO: remove this chunk after pointerdb client interface merged + gr := &ppb.GetRequest{ + Path: []byte(path.String()), + APIKey: nil, + } + // gets pointer from pointerdb - pointer, err := s.pdb.Get(ctx, path, nil) + pdbRes, err := s.pdb.Get(ctx, gr) + if err != nil { + return err + } + + // TODO: remove this chunk after pointerdb client interface merged + pointer := &ppb.Pointer{} + err = proto.Unmarshal(pdbRes.Pointer, pointer) if err != nil { return err } @@ -162,8 +187,14 @@ func (s *segmentStore) Delete(ctx context.Context, path paths.Path) error { return err } + // TODO: remove this chunk after pointerdb client interface merged + dr := &ppb.DeleteRequest{ + Path: []byte(path.String()), + APIKey: nil, + } + // deletes pointer from pointerdb - err = s.pdb.Delete(ctx, path, nil) + _, err = s.pdb.Delete(ctx, dr) if err != nil { return err } @@ -172,34 +203,38 @@ func (s *segmentStore) Delete(ctx context.Context, path paths.Path) error { } // overlayHelper calls Lookup to get node addresses from the overlay -func (s *segmentStore) overlayHelper(ctx context.Context, - rem *ppb.RemoteSegment) (nodes []*opb.Node, err error) { +func (s *segmentStore) overlayHelper(ctx context.Context, rem *ppb.RemoteSegment) (nodes []*opb.Node, err error) { for i := 0; i < len(rem.RemotePieces); i++ { - overlayRes, err := s.oc.Lookup(ctx, - kademlia.StringToNodeID(rem.RemotePieces[i].NodeId)) + overlayRes, err := s.oc.Lookup(ctx, &opb.LookupRequest{NodeID: rem.RemotePieces[i].NodeId}) if err != nil { return nil, err } - nodes = append(nodes, overlayRes) + nodes = append(nodes, overlayRes.Node) } return nodes, nil } // List lists paths stored in the pointerdb -func (s *segmentStore) List(ctx context.Context, - startingPath, endingPath paths.Path) ( +func (s *segmentStore) List(ctx context.Context, startingPath, endingPath paths.Path) ( listPaths []paths.Path, truncated bool, err error) { - pathsResp, truncated, err := s.pdb.List( - ctx, startingPath, 0, nil) + // TODO: remove this chunk after pointerdb client interface merged + lr := &ppb.ListRequest{ + StartingPathKey: []byte(startingPath.String()), + // TODO: change limit to endingPath when supported + Limit: 1, + APIKey: nil, + } + + res, err := s.pdb.List(ctx, lr) if err != nil { return nil, false, err } - for _, path := range pathsResp { + for _, path := range res.Paths { np := paths.New(string(path[:])) listPaths = append(listPaths, np) } - return listPaths, truncated, nil + return listPaths, res.Truncated, nil }