Skip to content

Commit

Permalink
Changed on start on finish actions source
Browse files Browse the repository at this point in the history
  • Loading branch information
Панов Никита committed Jul 22, 2021
1 parent 0b254e1 commit 8f19613
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 25 deletions.
32 changes: 10 additions & 22 deletions worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,9 @@ type AsyncJobWorkerOptions struct {
}

type Job interface {
OnStart() error
Run(*logrus.Entry)
OnFinish()
}

type JobWorker interface {
Expand All @@ -26,27 +28,16 @@ type JobWorker interface {
}

func NewAsyncJobWorker(onStart OnStartFunc, onFinished OnFinishedFunc, logger *logrus.Logger, opts *AsyncJobWorkerOptions) JobWorker {
return newAsyncJobWorker(onStart, onFinished, logger, opts)
return newAsyncJobWorker(logger, opts)
}

func newAsyncJobWorker(onStart OnStartFunc, onFinished OnFinishedFunc, logger *logrus.Logger, opts *AsyncJobWorkerOptions) *asyncJobWorker {
func newAsyncJobWorker(logger *logrus.Logger, opts *AsyncJobWorkerOptions) *asyncJobWorker {
ajw := &asyncJobWorker{
onStart: onStart,
onFinished: onFinished,

logger: logger,

opts: opts,
}

if ajw.onStart == nil {
ajw.onStart = dummyOnStart
}

if ajw.onFinished == nil {
ajw.onFinished = dummyOnFinished
}

if ajw.logger == nil {
logger = logrus.StandardLogger()
}
Expand All @@ -58,9 +49,6 @@ type asyncJobWorker struct {
mutex sync.Mutex
currentJob Job

onStart OnStartFunc
onFinished OnFinishedFunc

logger *logrus.Logger

opts *AsyncJobWorkerOptions
Expand Down Expand Up @@ -113,13 +101,13 @@ func (ajw *asyncJobWorker) RunWith(job Job) error {

ajw.mutex.Unlock()

go ajw.waitAndFinish(wg)
go ajw.waitAndFinish(wg, job)

return nil
}

func (ajw *asyncJobWorker) setRunning(job Job) error {
err := ajw.onStart()
err := job.OnStart()
if err != nil {
return err
}
Expand All @@ -131,17 +119,17 @@ func (ajw *asyncJobWorker) setRunning(job Job) error {
return nil
}

func (ajw *asyncJobWorker) setFinished() {
func (ajw *asyncJobWorker) setFinished(job Job) {
ajw.currentJob = nil
ajw.running = false
ajw.finishedAt = time.Now()
ajw.onFinished()
job.OnFinish()
}

func (ajw *asyncJobWorker) waitAndFinish(wg *sync.WaitGroup) {
func (ajw *asyncJobWorker) waitAndFinish(wg *sync.WaitGroup, job Job) {
wg.Wait()
ajw.mutex.Lock()
ajw.setFinished()
ajw.setFinished(job)
ajw.mutex.Unlock()
}

Expand Down
6 changes: 3 additions & 3 deletions worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ func (tj *testJob) Run(_ *logrus.Entry) {
tj.jobChan <- eventRunner
}

func (tj *testJob) onStart() error {
func (tj *testJob) OnStart() error {
if tj.onStartError == nil {
tj.jobChan <- eventOnStart
return nil
Expand All @@ -32,7 +32,7 @@ func (tj *testJob) onStart() error {
return tj.onStartError
}

func (tj *testJob) onFinish() {
func (tj *testJob) OnFinish() {
tj.jobChan <- eventOnFinish
close(tj.jobChan)
}
Expand Down Expand Up @@ -79,7 +79,7 @@ func TestJobWorkOnStartError(t *testing.T) {

func testAsyncJobWorker(test *jobWorkerTest, t *testing.T) {
logrus.SetLevel(logrus.DebugLevel)
worker := newAsyncJobWorker(test.testJob.onStart, test.testJob.onFinish, logrus.StandardLogger(), test.workerOpts)
worker := newAsyncJobWorker(logrus.StandardLogger(), test.workerOpts)

err := worker.RunWith(test.testJob)
if err != test.testJob.onStartError {
Expand Down

0 comments on commit 8f19613

Please sign in to comment.