Skip to content

Commit

Permalink
Initial commit
Browse files Browse the repository at this point in the history
  • Loading branch information
Панов Никита committed Jul 22, 2021
0 parents commit 0b254e1
Show file tree
Hide file tree
Showing 14 changed files with 1,179 additions and 0 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
go.sum
.DS_Store
10 changes: 10 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
# workers

Simple waiting workers implementation. Here are:

SimpleWorker - one thread worker, that sleeps until time elapsed or it is not forced
AsyncWorker - multi-thread worker, that sleeps until time elapsed or it is not forced

AsyncWorker runs N goroutines, defined by runnerFunc.

OnStart and OnFinish funcs are not necessary. runnerFunc is necessary - if not defined workers panics on job starting.
104 changes: 104 additions & 0 deletions async.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
package workers

import (
"errors"
"sync"
"time"

uuid "github.com/satori/go.uuid"
"github.com/sirupsen/logrus"
)

// AsyncOptions - async worker options
type AsyncOptions struct {
ParallelWorkersCount int
SleepTimeout time.Duration
RunOnLoad bool
}

type asyncWorker struct {
*commonWaitingWorker
options *AsyncOptions

onStartFunc OnStartFunc
onFinishedFunc OnFinishedFunc
runnerFunc RunnerFunc
}

func NewAsync(
logger *logrus.Logger,
onStartFunc OnStartFunc,
onFinishedFunc OnFinishedFunc,
runnerFunc RunnerFunc,
o *AsyncOptions,
) WaitingWorker {
return newAsync(logger, onStartFunc, onFinishedFunc, runnerFunc, o)
}

func newAsync(logger *logrus.Logger,
onStartFunc OnStartFunc,
onFinishedFunc OnFinishedFunc,
runnerFunc RunnerFunc,
o *AsyncOptions,
) *asyncWorker {
aw := &asyncWorker{
options: o,
commonWaitingWorker: newCommon(
logger,
o.RunOnLoad,
o.SleepTimeout,
),

onStartFunc: onStartFunc,
onFinishedFunc: onFinishedFunc,
runnerFunc: runnerFunc,
}

if aw.onStartFunc == nil {
aw.onStartFunc = dummyOnStart
}
if aw.onFinishedFunc == nil {
aw.onFinishedFunc = dummyOnFinished
}

aw.commonWaitingWorker.workerFunc = aw.startWork

return aw
}

// startWork - starts async worker
func (aw *asyncWorker) startWork() {
if aw.runnerFunc == nil {
panic(errors.New("runner func is not defined"))
}

err := aw.onStartFunc()
if err != nil {
aw.logger.WithError(err).Error("starting worker was aborted by error")
return
}

wg := &sync.WaitGroup{}
for i := 0; i < aw.options.ParallelWorkersCount; i++ {
go aw.runner(wg)
wg.Add(1)
}
aw.logger.Warn("workers started")

wg.Wait()

aw.onFinishedFunc()

aw.logger.Warn("workers finished")
}

// runner - async worker subworker
func (aw *asyncWorker) runner(wg *sync.WaitGroup) {
workerLogger := aw.logger.WithField("logger_id", uuid.NewV4().String())
workerLogger.Warn("worker started")
defer wg.Done()

aw.runnerFunc(workerLogger)

workerLogger.Warn("worker stopped")
}
75 changes: 75 additions & 0 deletions async_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
package workers

import (
"errors"
"testing"
"time"

"github.com/sirupsen/logrus"
)

type asyncTestConfig struct {
errOnStart error
opts *AsyncOptions
mustBeForced bool
}

func TestAsyncWorkerByTimeout(t *testing.T) {
test := &asyncTestConfig{
opts: &AsyncOptions{
ParallelWorkersCount: 10,
SleepTimeout: 2 * time.Second,
},
}

testAsyncWorker(test, t)
}

func TestAsyncWorkerOnLoad(t *testing.T) {
test := &asyncTestConfig{
opts: &AsyncOptions{
ParallelWorkersCount: 20,
SleepTimeout: 2 * time.Second,
RunOnLoad: true,
},
}

testAsyncWorker(test, t)
}

func TestAsyncWorkerForced(t *testing.T) {
test := &asyncTestConfig{
opts: &AsyncOptions{
ParallelWorkersCount: 5,
SleepTimeout: 2 * time.Hour,
RunOnLoad: false,
},
mustBeForced: true,
}

testAsyncWorker(test, t)
}

func TestAsyncWorkerStartError(t *testing.T) {
test := &asyncTestConfig{
opts: &AsyncOptions{
ParallelWorkersCount: 5,
SleepTimeout: 1 * time.Second,
RunOnLoad: false,
},
errOnStart: errors.New("some start error"),
}

testAsyncWorker(test, t)
}

func testAsyncWorker(test *asyncTestConfig, t *testing.T) {
buf := test.opts.ParallelWorkersCount + 2 //старт и стоп
ast := newWorkerTester(buf, test.errOnStart)

logger := logrus.StandardLogger()
logger.SetLevel(logrus.DebugLevel)
w := newAsync(logger, ast.onStart, ast.onFinish, ast.Run, test.opts)

testWorker(t, w, ast.events(), test.opts.ParallelWorkersCount, test.mustBeForced, test.opts.RunOnLoad, test.opts.SleepTimeout, test.errOnStart)
}
11 changes: 11 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
module github.com/jr2df69/workers

go 1.16

require (
github.com/satori/go.uuid v1.2.0
github.com/sirupsen/logrus v1.8.1
github.com/stretchr/objx v0.3.0 // indirect
github.com/stretchr/testify v1.3.0
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c // indirect
)
88 changes: 88 additions & 0 deletions mocks/JobWorker.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

89 changes: 89 additions & 0 deletions mocks/WaitingWorker.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit 0b254e1

Please sign in to comment.