Skip to content

Commit

Permalink
Merge 3ae3b4b into 40b9702
Browse files Browse the repository at this point in the history
  • Loading branch information
threez committed Jan 31, 2020
2 parents 40b9702 + 3ae3b4b commit 7fd38f0
Show file tree
Hide file tree
Showing 320 changed files with 93,923 additions and 2,064 deletions.
19 changes: 8 additions & 11 deletions .travis.yml
@@ -1,23 +1,20 @@
language: go

before_install:
- go get github.com/mattn/goveralls
- go get golang.org/x/tools/cmd/cover
- go get github.com/mattn/goveralls
- go get golang.org/x/tools/cmd/cover

after_success:
- $HOME/gopath/bin/goveralls -coverprofile=coverage.out -service=travis-ci

services:
- postgresql
- redis-server
- docker

env:
- GO111MODULE=on REDIS_HOSTS=localhost:6379 POSTGRES_HOST=localhost POSTGRES_PASSWORD=
- GO111MODULE=on

go:
- 1.13.x

after_success:
- GO111MODULE=off

script:
- make ci
- make coveralls-test
- $HOME/gopath/bin/goveralls -coverprofile=coverage.out -service=travis-ci
- docker-compose run testserver make ci
7 changes: 2 additions & 5 deletions Makefile
Expand Up @@ -43,8 +43,5 @@ integration:
testserver:
docker-compose up

coveralls-test:
go test -mod=vendor -count=1 -v -cover -covermode=count -coverprofile=coverage.out -short ./...


ci: test integration
ci:
go test -mod=vendor -count=1 -v -cover -race -covermode=atomic -coverprofile=coverage.out ./...
19 changes: 19 additions & 0 deletions backend/objstore/README.md
@@ -0,0 +1,19 @@

## Environment based configuration

* `S3_ENDPOINT` default: `"s3.amazonaws.com"`
* host:port address.
* `S3_ACCESS_KEY_ID`
* Access Key ID for the service.
* `S3_SECRET_ACCESS_KEY`
* Secret Access Key for the service.
* `S3_USE_SSL`
* Determine whether to use SSL or not.
* `S3_REGION` default: `"us-east-1"`
* S3 region for the bucket.
* `S3_HEALTH_CHECK_BUCKET_NAME` default: `health-check`
* Name of the bucket that is used for health check operations.
* `S3_HEALTH_CHECK_OBJECT_NAME` default: `"latest.log`
* Name of the object that is used for the health check operation.
* `S3_HEALTH_CHECK_RESULT_TTL` default: `10s`
* Amount of time to cache the last health check result.
76 changes: 76 additions & 0 deletions backend/objstore/health_objstore.go
@@ -0,0 +1,76 @@
package objstore

import (
"bytes"
"context"
"fmt"
"io/ioutil"
"time"

"github.com/minio/minio-go/v6"
"github.com/pace/bricks/maintenance/health/servicehealthcheck"
)

// HealthCheck checks the state of the object storage client. It must not be changed
// after it was registered as a health check.
type HealthCheck struct {
state servicehealthcheck.ConnectionState
Client *minio.Client
}

// HealthCheck checks if the object storage client is healthy. If the last result is outdated,
// object storage is checked for upload and download,
// otherwise returns the old result
func (h *HealthCheck) HealthCheck(ctx context.Context) servicehealthcheck.HealthCheckResult {
if time.Since(h.state.LastChecked()) <= cfg.HealthCheckResultTTL {
// the last health check is not outdated, an can be reused.
return h.state.GetState()
}

expContent := []byte(time.Now().Format(time.RFC3339))
expSize := int64(len(expContent))

_, err := h.Client.PutObjectWithContext(
ctx,
cfg.HealthCheckBucketName,
cfg.HealthCheckObjectName,
bytes.NewReader(expContent),
expSize,
minio.PutObjectOptions{
ContentType: "text/plain",
},
)
if err != nil {
h.state.SetErrorState(fmt.Errorf("failed to put object: %v", err))
return h.state.GetState()
}

// Try download
obj, err := h.Client.GetObjectWithContext(
ctx,
cfg.HealthCheckBucketName,
cfg.HealthCheckObjectName,
minio.GetObjectOptions{},
)
if err != nil {
h.state.SetErrorState(fmt.Errorf("failed to get object: %v", err))
return h.state.GetState()
}
defer obj.Close()

// Assert expectations
buf, err := ioutil.ReadAll(obj)
if err != nil {
h.state.SetErrorState(fmt.Errorf("failed to compare object: %v", err))
return h.state.GetState()
}

if bytes.Compare(buf, expContent) != 0 {
h.state.SetErrorState(fmt.Errorf("unexpected content: %q <-> %q", string(buf), string(expContent)))
return h.state.GetState()
}

// If uploading and downloading worked set the Health Check to healthy
h.state.SetHealthy()
return h.state.GetState()
}
41 changes: 41 additions & 0 deletions backend/objstore/health_objstore_test.go
@@ -0,0 +1,41 @@
package objstore

import (
"io/ioutil"
"net/http"
"net/http/httptest"
"strings"
"testing"

http2 "github.com/pace/bricks/http"
"github.com/pace/bricks/maintenance/log"
)

func setup() *http.Response {
r := http2.Router()
rec := httptest.NewRecorder()
req := httptest.NewRequest("GET", "/health/check", nil)
r.ServeHTTP(rec, req)
resp := rec.Result()
defer resp.Body.Close()
return resp
}

// TestIntegrationHealthCheck tests if object storage health check ist working like expected
func TestIntegrationHealthCheck(t *testing.T) {
if testing.Short() {
t.SkipNow()
}
resp := setup()
if resp.StatusCode != 200 {
t.Errorf("Expected /health/check to respond with 200, got: %d", resp.StatusCode)
}

data, err := ioutil.ReadAll(resp.Body)
if err != nil {
log.Fatal(err)
}
if !strings.Contains(string(data), "objstore OK") {
t.Errorf("Expected /health/check to return OK, got: %q", string(data[:]))
}
}
93 changes: 93 additions & 0 deletions backend/objstore/metric_round_tripper.go
@@ -0,0 +1,93 @@
package objstore

import (
"net/http"
"time"

"github.com/prometheus/client_golang/prometheus"
)

var (
paceObjStoreTotal = prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "pace_objstore_req_total",
Help: "Collects stats about the number of object storage requests made",
},
[]string{"method", "bucket"},
)
paceObjStoreFailed = prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "pace_objstore_req_failed",
Help: "Collects stats about the number of object storage requests counterFailed",
},
[]string{"method", "bucket"},
)
paceObjStoreDurationSeconds = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Name: "pace_objstore_req_duration_seconds",
Help: "Collect performance metrics for each method & bucket",
Buckets: []float64{.1, .25, .5, 1, 2.5, 5, 10, 60},
},
[]string{"method", "bucket"},
)
)

func init() {
prometheus.MustRegister(paceObjStoreTotal)
prometheus.MustRegister(paceObjStoreFailed)
prometheus.MustRegister(paceObjStoreDurationSeconds)
}

type metricRoundTripper struct {
transport http.RoundTripper
endpoint string
}

func newMetricRoundTripper(endpoint string) *metricRoundTripper {
return &metricRoundTripper{
endpoint: endpoint,
}
}

func (m *metricRoundTripper) Transport() http.RoundTripper {
return m.transport
}

func (m *metricRoundTripper) SetTransport(rt http.RoundTripper) {
m.transport = rt
}

func (m *metricRoundTripper) RoundTrip(req *http.Request) (*http.Response, error) {
labels := prometheus.Labels{
"method": req.Method,
"bucket": m.endpoint,
}

start := time.Now()
resp, err := m.Transport().RoundTrip(req)
dur := time.Since(start)

// total
paceObjStoreTotal.With(labels).Inc()

// duration
measurable := err != nil
if measurable {
// no need to measure timeouts or transport issues
paceObjStoreDurationSeconds.With(labels).Observe(dur.Seconds())
}

// failure
failed := err != nil || m.determineFailure(resp.StatusCode)
if failed {
// count transport issues and by status code
paceObjStoreFailed.With(labels).Inc()
}

return resp, err
}

// determineFailure determines whether the response code is considered failed or not.
func (m *metricRoundTripper) determineFailure(code int) bool {
return !(200 <= code && code < 400)
}
82 changes: 82 additions & 0 deletions backend/objstore/objstore.go
@@ -0,0 +1,82 @@
package objstore

import (
"net/http"
"time"

"github.com/caarlos0/env"
"github.com/minio/minio-go/v6"
"github.com/minio/minio-go/v6/pkg/credentials"
"github.com/pace/bricks/http/transport"
"github.com/pace/bricks/maintenance/health/servicehealthcheck"
"github.com/pace/bricks/maintenance/log"
)

type config struct {
Endpoint string `env:"S3_ENDPOINT" envDefault:"s3.amazonaws.com"`
AccessKeyID string `env:"S3_ACCESS_KEY_ID"`
Region string `env:"S3_REGION" envDefault:"us-east-1"`
SecretAccessKey string `env:"S3_SECRET_ACCESS_KEY"`
UseSSL bool `env:"S3_USE_SSL"`

HealthCheckBucketName string `env:"S3_HEALTH_CHECK_BUCKET_NAME" envDefault:"health-check"`
HealthCheckObjectName string `env:"S3_HEALTH_CHECK_OBJECT_NAME" envDefault:"latest.log"`
HealthCheckResultTTL time.Duration `env:"S3_HEALTH_CHECK_RESULT_TTL" envDefault:"10s"`
}

var cfg config

func init() {
// parse log config
err := env.Parse(&cfg)
if err != nil {
log.Fatalf("Failed to parse object storage environment: %v", err)
}

client, err := Client()
if err != nil {
log.Fatalf("Failed to create object storage client: %v", err)
}
servicehealthcheck.RegisterHealthCheck(&HealthCheck{
Client: client,
}, "objstore")

ok, err := client.BucketExists(cfg.HealthCheckBucketName)
if err != nil {
log.Warnf("Failed to create check for bucket: %v", err)
}
if !ok {
err := client.MakeBucket(cfg.HealthCheckBucketName, cfg.Region)
if err != nil {
log.Warnf("Failed to create bucket: %v", err)
}
}
}

// Client with environment based configuration
func Client() (*minio.Client, error) {
return CustomClient(cfg.Endpoint, &minio.Options{
Secure: cfg.UseSSL,
Region: cfg.Region,
BucketLookup: minio.BucketLookupAuto,
Creds: credentials.NewStaticV4(cfg.AccessKeyID, cfg.SecretAccessKey, ""),
})
}

// CustomClient with customized client
func CustomClient(endpoint string, opts *minio.Options) (*minio.Client, error) {
client, err := minio.NewWithOptions(endpoint, opts)
if err != nil {
return nil, err
}
log.Logger().Info().Str("endpoint", endpoint).
Str("region", opts.Region).
Bool("ssl", opts.Secure).
Msg("S3 connection created")
client.SetCustomTransport(newCustomTransport(endpoint))
return client, nil
}

func newCustomTransport(endpoint string) http.RoundTripper {
return transport.NewDefaultTransportChain().Use(newMetricRoundTripper(endpoint))
}
25 changes: 25 additions & 0 deletions backend/objstore/objstore_test.go
@@ -0,0 +1,25 @@
package objstore

import (
"testing"

"github.com/minio/minio-go/v6"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestClient(t *testing.T) {
client, err := Client()

require.NoError(t, err)
assert.NotNil(t, client)
}

func TestCustomClient(t *testing.T) {
client, err := CustomClient("s3.amazonaws.com", &minio.Options{
Region: "eu-central-1",
})

require.NoError(t, err)
assert.NotNil(t, client)
}

0 comments on commit 7fd38f0

Please sign in to comment.