Skip to content

Commit

Permalink
Merge pull request #69 from profefe/storage-s3
Browse files Browse the repository at this point in the history
Implement storage on top of s3
  • Loading branch information
narqo committed Mar 2, 2020
2 parents 97e9fbc + a39cf45 commit 55e1a3e
Show file tree
Hide file tree
Showing 234 changed files with 71,578 additions and 12 deletions.
7 changes: 3 additions & 4 deletions DESIGN.md
Expand Up @@ -15,10 +15,9 @@ i.e. `pb.gz`, or [Go runtime traces](https://golang.org/pkg/runtime/trace/)).
Collector persists the data in a plugable storage.
It also provides an API for querying and retrieving stored profiles.

The only storage implemented currently uses [Badger](https://github.com/dgraph-io/badger).

*Note: it's tempting to split collector into two separate applications (collector and querier).
That might inctease service's scalability. But is the subject of future research.*
Currently implemented storages:
- `storage/badger`, keeps data in [Badger DB](https://github.com/dgraph-io/badger);
- `storage/s3`, stores data in s3-compatible object storage service.

### Agent

Expand Down
69 changes: 69 additions & 0 deletions Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 5 additions & 1 deletion Gopkg.toml
Expand Up @@ -3,7 +3,7 @@ required = [
]

ignored = [
"golang.org/x/lint",
"golang.org/x/lint*",
]

[prune]
Expand Down Expand Up @@ -41,3 +41,7 @@ ignored = [
[[constraint]]
name = "honnef.co/go/tools"
version = "2020.1.2"

[[constraint]]
name = "github.com/aws/aws-sdk-go"
version = "v1.26.5"
4 changes: 3 additions & 1 deletion README.md
Expand Up @@ -28,11 +28,13 @@ To build and start profefe collector, run:

```
> make
> ./BUILD/profefe -addr :10100 -log.level debug -badger.dir /tmp/profefe
> ./BUILD/profefe -addr :10100 -log.level debug -badger.dir /tmp/profefe-data
2019-06-06T00:07:58.499+0200 info profefe/main.go:86 server is running {"addr": ":10100"}
```

`./BUILD/profefe -help` will print the list of available options.

---

You can build a docker image with the collector running the command:
Expand Down
49 changes: 44 additions & 5 deletions cmd/profefe/main.go
Expand Up @@ -13,12 +13,17 @@ import (
"syscall"
"time"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/s3"
"github.com/dgraph-io/badger"
"github.com/profefe/profefe/pkg/config"
"github.com/profefe/profefe/pkg/log"
"github.com/profefe/profefe/pkg/middleware"
"github.com/profefe/profefe/pkg/profefe"
"github.com/profefe/profefe/pkg/storage"
storageBadger "github.com/profefe/profefe/pkg/storage/badger"
storageS3 "github.com/profefe/profefe/pkg/storage/s3"
"github.com/profefe/profefe/version"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
Expand Down Expand Up @@ -50,15 +55,30 @@ func main() {
}

func run(logger *log.Logger, conf config.Config, stdout io.Writer) error {
st, closer, err := initBadgerStorage(logger, conf)
if err != nil {
return err
var (
sr storage.Reader
sw storage.Writer
)
if conf.Badger.Dir != "" {
st, closer, err := initBadgerStorage(logger, conf)
if err != nil {
return err
}
defer closer.Close()
sr, sw = st, st
} else if conf.S3.Bucket != "" {
st, err := initS3Storage(logger, conf)
if err != nil {
return err
}
sr, sw = st, st
} else {
return fmt.Errorf("storage configuration required")
}
defer closer.Close()

mux := http.NewServeMux()

profefe.SetupRoutes(mux, logger, prometheus.DefaultRegisterer, st, st)
profefe.SetupRoutes(mux, logger, prometheus.DefaultRegisterer, sr, sw)

setupDebugRoutes(mux)

Expand Down Expand Up @@ -121,6 +141,25 @@ func initBadgerStorage(logger *log.Logger, conf config.Config) (*storageBadger.S
return st, db, nil
}

func initS3Storage(logger *log.Logger, conf config.Config) (*storageS3.Storage, error) {
var forcePathStyle bool
if conf.S3.EndpointURL != "" {
// should one use custom object storage service (e.g. Minio), path-style addressing needs to be set
forcePathStyle = true
}
sess, err := session.NewSession(&aws.Config{
Endpoint: aws.String(conf.S3.EndpointURL),
DisableSSL: aws.Bool(conf.S3.DisableSSL),
Region: aws.String(conf.S3.Region),
MaxRetries: aws.Int(conf.S3.MaxRetries),
S3ForcePathStyle: aws.Bool(forcePathStyle),
})
if err != nil {
return nil, xerrors.Errorf("could not create s3 session: %w", err)
}
return storageS3.New(logger, s3.New(sess), conf.S3.Bucket), nil
}

func setupDebugRoutes(mux *http.ServeMux) {
// pprof handlers, see https://github.com/golang/go/blob/release-branch.go1.13/src/net/http/pprof/pprof.go
mux.HandleFunc("/debug/pprof/", pprof.Index)
Expand Down
20 changes: 19 additions & 1 deletion pkg/config/config.go
Expand Up @@ -21,6 +21,7 @@ type Config struct {
ExitTimeout time.Duration
Logger log.Config
Badger BadgerConfig
S3 S3Config
}

func (conf *Config) RegisterFlags(f *flag.FlagSet) {
Expand All @@ -29,6 +30,7 @@ func (conf *Config) RegisterFlags(f *flag.FlagSet) {

conf.Logger.RegisterFlags(f)
conf.Badger.RegisterFlags(f)
conf.S3.RegisterFlags(f)
}

type BadgerConfig struct {
Expand All @@ -39,8 +41,24 @@ type BadgerConfig struct {
}

func (conf *BadgerConfig) RegisterFlags(f *flag.FlagSet) {
f.StringVar(&conf.Dir, "badger.dir", "data", "badger data dir")
f.StringVar(&conf.Dir, "badger.dir", "", "badger data dir")
f.DurationVar(&conf.ProfileTTL, "badger.profile-ttl", defaultRetentionPeriod, "badger profile data ttl")
f.DurationVar(&conf.GCInterval, "badger.gc-interval", defaultGCInternal, "interval in which the badger garbage collector is run")
f.Float64Var(&conf.GCDiscardRatio, "badger.gc-discard-ratio", defaultGCDiscardRatio, "a badger file is rewritten if this ratio of the file can be discarded")
}

type S3Config struct {
EndpointURL string
DisableSSL bool
Region string
Bucket string
MaxRetries int
}

func (conf *S3Config) RegisterFlags(f *flag.FlagSet) {
f.StringVar(&conf.EndpointURL, "s3.endpoint-url", "", "override default URL to s3 service")
f.BoolVar(&conf.DisableSSL, "s3.disable-ssl", false, "disable SSL verification")
f.StringVar(&conf.Region, "s3.region", "us-east-1", "object storage region")
f.StringVar(&conf.Bucket, "s3.bucket", "", "s3 bucket profile destination")
f.IntVar(&conf.MaxRetries, "s3.max-retries", 3, "s3 request maximum number of retries")
}
28 changes: 28 additions & 0 deletions pkg/profile/labels.go
Expand Up @@ -63,6 +63,34 @@ func (labels Labels) Equal(labels2 Labels) bool {
return true
}

// XXX(narqo): doesn't cover the case where Labels have multiple values of a key.
func (labels Labels) Include(labels2 Labels) bool {
if len(labels2) == 0 {
return true
}

if len(labels) == 0 {
return false
}

kvset := make(map[string]string, len(labels))
for _, label := range labels {
kvset[label.Key] = label.Value
}

for _, label := range labels2 {
v, ok := kvset[label.Key]
if !ok {
return false
}
if label.Value != v {
return false
}
}

return true
}

func (labels Labels) Add(labels2 Labels) Labels {
if labels == nil {
return labels2
Expand Down
45 changes: 45 additions & 0 deletions pkg/profile/labels_test.go
Expand Up @@ -92,6 +92,51 @@ func TestLabels_Add(t *testing.T) {
}
}

func TestLabels_Include(t *testing.T) {
cases := []struct {
name string
labels1 Labels
labels2 Labels
want bool
}{
{
"empty labels1 include empty labels2",
Labels{},
Labels{},
true,
},
{
"labels1 includes empty labels2",
Labels{{"k1", "v1"}},
Labels{},
true,
},
{
"labels1 includes labels2",
Labels{{"k1", "v1"}, {Key: "k2", Value: "v2"}},
Labels{{"k1", "v1"}},
true,
},
{
"labels1 does NOT include all of labels2",
Labels{{"k1", "v1"}},
Labels{{"k1", "v1"}, {"k2", "v2"}},
false,
},
{
"labels include same key but different value",
Labels{{"k1", "v1"}},
Labels{{"k1", "v2"}},
false,
},
}
for _, tt := range cases {
t.Run(tt.name, func(t *testing.T) {
assert.Equal(t, tt.want, tt.labels1.Include(tt.labels2))
})
}
}

func TestLabels_FromString(t *testing.T) {
cases := []struct {
in string
Expand Down

0 comments on commit 55e1a3e

Please sign in to comment.