Skip to content

Commit

Permalink
Merge 7e4647b into c59fde3
Browse files Browse the repository at this point in the history
  • Loading branch information
Teelevision committed Mar 2, 2020
2 parents c59fde3 + 7e4647b commit e23d2d4
Show file tree
Hide file tree
Showing 39 changed files with 1,859 additions and 489 deletions.
1 change: 1 addition & 0 deletions Makefile
Expand Up @@ -39,6 +39,7 @@ test:

integration:
go test -mod=vendor -count=1 -v -cover -race -run TestIntegration ./...
go test -mod=vendor -count=1 -v -cover -race -run Example_clusterBackgroundTask ./pkg/routine

testserver:
docker-compose up
Expand Down
5 changes: 2 additions & 3 deletions go.mod
Expand Up @@ -5,14 +5,15 @@ go 1.13
require (
github.com/asaskevich/govalidator v0.0.0-20180720115003-f9ffefc3facf
github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973 // indirect
github.com/bsm/redislock v0.4.3
github.com/caarlos0/env v3.3.0+incompatible
github.com/certifi/gocertifi v0.0.0-20180118203423-deb3ae2ef261
github.com/codahale/hdrhistogram v0.0.0-20161010025455-3a0bb77429bd // indirect
github.com/dave/jennifer v1.0.2
github.com/getkin/kin-openapi v0.0.0-20180813063848-e1956e8013e5
github.com/ghodss/yaml v1.0.0 // indirect
github.com/go-pg/pg v6.14.5+incompatible
github.com/go-redis/redis v6.14.1+incompatible
github.com/go-redis/redis v6.15.6+incompatible
github.com/google/jsonapi v0.0.0-20181016150055-d0428f63eb51
github.com/gorilla/context v1.1.1 // indirect
github.com/gorilla/mux v1.6.2
Expand All @@ -21,8 +22,6 @@ require (
github.com/mattn/go-isatty v0.0.8
github.com/matttproud/golang_protobuf_extensions v1.0.1 // indirect
github.com/minio/minio-go/v6 v6.0.44
github.com/onsi/ginkgo v1.8.0 // indirect
github.com/onsi/gomega v1.4.3 // indirect
github.com/opentracing/opentracing-go v1.0.2
github.com/pkg/errors v0.8.1
github.com/pmezard/go-difflib v1.0.0
Expand Down
10 changes: 6 additions & 4 deletions go.sum
Expand Up @@ -2,6 +2,8 @@ github.com/asaskevich/govalidator v0.0.0-20180720115003-f9ffefc3facf h1:eg0MeVzs
github.com/asaskevich/govalidator v0.0.0-20180720115003-f9ffefc3facf/go.mod h1:lB+ZfQJz7igIIfQNfa7Ml4HSf2uFQQRzpGGRXenZAgY=
github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973 h1:xJ4a3vCFaGF/jqvzLMYoU8P317H5OQ+Via4RmuPwCS0=
github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q=
github.com/bsm/redislock v0.4.3 h1:TJ0RzHeSujLSuy4b33OWDknxAzKCdLdit0Hs9kOjElg=
github.com/bsm/redislock v0.4.3/go.mod h1:mcygIsJknQThqWrlOgiPJ97CGmu3aAdQabg1ZIxT1BA=
github.com/caarlos0/env v3.3.0+incompatible h1:jCfY0ilpzC2FFViyZyDKCxKybDESTwaR+ebh8zm6AOE=
github.com/caarlos0/env v3.3.0+incompatible/go.mod h1:tdCsowwCzMLdkqRYDlHpZCp2UooDD3MspDBjZ2AD02Y=
github.com/certifi/gocertifi v0.0.0-20180118203423-deb3ae2ef261 h1:6/yVvBsKeAw05IUj4AzvrxaCnDjN4nUqKjW9+w5wixg=
Expand All @@ -24,8 +26,8 @@ github.com/ghodss/yaml v1.0.0 h1:wQHKEahhL6wmXdzwWG11gIVCkOv05bNOh+Rxn0yngAk=
github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04=
github.com/go-pg/pg v6.14.5+incompatible h1:Tc74MTCCIVd8sAJshYHqutcHhO64/EBHBTydzCGt3Js=
github.com/go-pg/pg v6.14.5+incompatible/go.mod h1:a2oXow+aFOrvwcKs3eIA0lNFmMilrxK2sOkB5NWe0vA=
github.com/go-redis/redis v6.14.1+incompatible h1:kSJohAREGMr344uMa8PzuIg5OU6ylCbyDkWkkNOfEik=
github.com/go-redis/redis v6.14.1+incompatible/go.mod h1:NAIEuMOZ/fxfXJIrKDQDz8wamY7mA7PouImQ2Jvg6kA=
github.com/go-redis/redis v6.15.6+incompatible h1:H9evprGPLI8+ci7fxQx6WNZHJSb7be8FqJQRhdQZ5Sg=
github.com/go-redis/redis v6.15.6+incompatible/go.mod h1:NAIEuMOZ/fxfXJIrKDQDz8wamY7mA7PouImQ2Jvg6kA=
github.com/golang/protobuf v1.2.0 h1:P3YflyNX/ehuJFLhxviNdFxQPkGK5cDcApsge1SqnvM=
github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/google/jsonapi v0.0.0-20181016150055-d0428f63eb51 h1:k+U8IQj6kj659R+Ahq6YsK03GdUo8qQdTsq5HBzfQwM=
Expand Down Expand Up @@ -58,8 +60,8 @@ github.com/mitchellh/go-homedir v1.1.0/go.mod h1:SfyaCUpYCn1Vlf4IUYiD9fPX4A5wJrk
github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
github.com/onsi/ginkgo v1.8.0 h1:VkHVNpR4iVnU8XQR6DBm8BqYjN7CRzw+xKUbVVbbW9w=
github.com/onsi/ginkgo v1.8.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
github.com/onsi/gomega v1.4.3 h1:RE1xgDvH7imwFD45h+u2SgIfERHlS2yNG4DObb5BSKU=
github.com/onsi/gomega v1.4.3/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY=
github.com/onsi/gomega v1.5.0 h1:izbySO9zDPmjJ8rDjLvkA2zJHIo+HkYXHnf7eN7SSyo=
github.com/onsi/gomega v1.5.0/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY=
github.com/opentracing/opentracing-go v1.0.2 h1:3jA2P6O1F9UOrWVpwrIo17pu01KWvNWg4X946/Y5Zwg=
github.com/opentracing/opentracing-go v1.0.2/go.mod h1:UkNAQd3GIcIGf0SeVgPpRdFStlNbqXla1AfSYxPUl2o=
github.com/pkg/errors v0.8.1 h1:iURUrRGxPUNPdy5/HRSm+Yj6okJ6UtLINN0Q9M4+h3I=
Expand Down
156 changes: 156 additions & 0 deletions pkg/routine/cluster_background_task_test.go
@@ -0,0 +1,156 @@
// Copyright © 2020 by PACE Telematics GmbH. All rights reserved.
// Created at 2020/02/26 by Marius Neugebauer

package routine_test

import (
"bytes"
"context"
"fmt"
"io"
"log"
"os"
"os/exec"
"strings"
"sync"
"testing"
"time"

"github.com/pace/bricks/pkg/routine"
"github.com/stretchr/testify/assert"
)

func Example_clusterBackgroundTask() {
// This example is an integration test because it requires redis to run. If
// we are running the short tests just print the output that is expected to
// circumvent the test runner. Because there is no way to skip an example.
if testing.Short() {
fmt.Println("task run 0\ntask run 1\ntask run 2")
return
}

out := make(chan string)

// start the routine in the background
cancel := routine.RunNamed(context.Background(), "task",
func(ctx context.Context) {
for i := 0; ; i++ {
select {
case <-ctx.Done():
return
default:
}
out <- fmt.Sprintf("task run %d", i)
time.Sleep(100 * time.Millisecond)
}
},
// KeepRunningOneInstance will cause the routine to be restarted if it
// finishes. It also will use the default redis database to synchronize
// with other instances running this routine so that in all instances
// exactly one routine is running at all time.
routine.KeepRunningOneInstance(),
)

// Cancel after 3 results. Cancel will only cancel the routine in this
// instance. It will not cancel the synchronized routines of other
// instances.
for i := 0; i < 3; i++ {
println(<-out)
}
cancel()

// Output:
// task run 0
// task run 1
// task run 2
}

func TestIntegrationRunNamed_clusterBackgroundTask(t *testing.T) {
t.Skip("test not working properly in docker, skipping")

if testing.Short() {
t.SkipNow()
}

// buffer that allows writing simultaneously
var buf subprocessOutputBuffer

// Run 2 processes in the "cluster", that will both try to start the
// background task in the example function above. The way this task is
// configured only one process at a time will run the task. But the
// processes are programmed to exit after 3 iterations of the task. This
// tests that the second process will take over the execution of the task
// only after the first process exits.
var wg sync.WaitGroup
for i := 0; i < 2; i++ {
wg.Add(1)
go func() {
spawnProcess(&buf)
wg.Done()
}()
}
wg.Wait() // until both processes are done

exp := `task run 0
task run 1
task run 2
task run 0
task run 1
task run 2
`
assert.Equal(t, exp, buf.String())
}

func spawnProcess(w io.Writer) {
cmd := exec.Command(os.Args[0],
"-test.timeout=2s",
"-test.run=Example_clusterBackgroundTask",
)
cmd.Env = append(os.Environ(),
"TEST_SUBPROCESS=1",
"ROUTINE_REDIS_LOCK_TTL=200ms",
)
cmd.Stdout = w
cmd.Stderr = w
err := cmd.Run()
if err != nil {
_, _ = w.Write([]byte("error starting subprocess: " + err.Error()))
}
}

type subprocessOutputBuffer struct {
mx sync.Mutex
buf bytes.Buffer
}

func (b *subprocessOutputBuffer) Write(p []byte) (int, error) {
b.mx.Lock()
defer b.mx.Unlock()
// ignore test runner output and some other log lines
switch s := string(p); {
case strings.HasPrefix(s, "=== RUN"),
strings.HasPrefix(s, "--- PASS"),
strings.HasPrefix(s, "PASS"),
strings.HasPrefix(s, "coverage: "),
strings.Contains(s, "Redis connection pool created"):
return len(p), nil
}
return b.buf.Write(p)
}

func (b *subprocessOutputBuffer) String() string {
b.mx.Lock()
defer b.mx.Unlock()
return b.buf.String()
}

// Prints the string normally so that it can be consumed by the test runner.
// Additionally go around the test runner in case of a integration test that
// wants examine the output of another test.
func println(s string) {
if os.Getenv("TEST_SUBPROCESS") == "1" {
// go around the test runner
_, _ = log.Writer().Write([]byte(s + "\n"))
}
fmt.Println(s)
}
22 changes: 22 additions & 0 deletions pkg/routine/config.go
@@ -0,0 +1,22 @@
// Copyright © 2020 by PACE Telematics GmbH. All rights reserved.
// Created at 2020/02/27 by Marius Neugebauer

package routine

import (
"time"

"github.com/caarlos0/env"
)

type config struct {
RedisLockTTL time.Duration `env:"ROUTINE_REDIS_LOCK_TTL" envDefault:"5s"`
}

var cfg config

func init() {
if err := env.Parse(&cfg); err != nil {
panic(err)
}
}
101 changes: 101 additions & 0 deletions pkg/routine/redis_lock.go
@@ -0,0 +1,101 @@
// Copyright © 2020 by PACE Telematics GmbH. All rights reserved.
// Created at 2020/02/27 by Marius Neugebauer

package routine

import (
"context"
"sync"
"time"

"github.com/bsm/redislock"
"github.com/go-redis/redis"
redisbackend "github.com/pace/bricks/backend/redis"
)

func routineThatKeepsRunningOneInstance(name string, routine func(context.Context)) func(context.Context) {
return func(ctx context.Context) {
locker := redislock.New(getDefaultRedisClient())
var tryAgainIn time.Duration // zero on first run
for {
select {
case <-ctx.Done():
return
case <-time.After(tryAgainIn):
}
lockCtx := obtainLock(ctx, locker, "routine:lock:"+name, cfg.RedisLockTTL)
if lockCtx != nil {
routine(lockCtx)
}
tryAgainIn = cfg.RedisLockTTL / 5
}
}
}

var (
initRedisOnce sync.Once
redisClient *redis.Client
)

func getDefaultRedisClient() *redis.Client {
initRedisOnce.Do(func() { redisClient = redisbackend.Client() })
return redisClient
}

// Try to obtain a lock. Return a sub-context of ctx that is canceled once the
// lock is lost or ctx is done.
func obtainLock(ctx context.Context, locker *redislock.Client, key string, ttl time.Duration) context.Context {
// obtain lock
lock, err := locker.Obtain(key, ttl, nil)
if err == redislock.ErrNotObtained {
return nil
} else if err != nil {
panic(err)
}

// keep up lock, cancel lockCtx otherwise
lockCtx, cancel := context.WithCancel(ctx)
go func() {
keepUpLock(ctx, lock, ttl)
cancel()
err := lock.Release()
if err != nil && err != redislock.ErrLockNotHeld {
panic(err)
}
}()

return lockCtx
}

// Try to keep up a lock for as long as the context is valid. Return once the
// lock is lost or the context is done.
func keepUpLock(ctx context.Context, lock *redislock.Lock, refreshTTL time.Duration) {
refreshInterval := refreshTTL / 5
lockRunsOutIn := refreshTTL // initial value must be > refresh interval
for {
select {
case <-ctx.Done():
return

// Return if the lock runs out and was not refreshed. lockRunsOutIn is
// always greater than refreshInterval, except the last refresh failed.
case <-time.After(lockRunsOutIn):
return

// Try to refresh lock.
case <-time.After(refreshInterval):
}
if err := lock.Refresh(refreshTTL, nil); err == redislock.ErrNotObtained {
// Don't return just yet. Get the TTL of the lock and try to
// refresh for as long as the TTL is not over.
if lockRunsOutIn, err = lock.TTL(); err != nil {
panic(err)
}
continue
} else if err != nil {
panic(err)
}
// reset, because the lock was refreshed
lockRunsOutIn = refreshTTL
}
}

0 comments on commit e23d2d4

Please sign in to comment.