Skip to content

Commit

Permalink
test: add replay manager unit tests
Browse files Browse the repository at this point in the history
  • Loading branch information
arinda-arif committed Mar 27, 2023
1 parent e37508c commit ce7bfa3
Showing 1 changed file with 70 additions and 0 deletions.
70 changes: 70 additions & 0 deletions core/scheduler/service/replay_manager_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
package service_test

import (
"errors"
"testing"
"time"

"github.com/google/uuid"
"github.com/odpf/salt/log"
"golang.org/x/net/context"

"github.com/odpf/optimus/config"
"github.com/odpf/optimus/core/scheduler"
"github.com/odpf/optimus/core/scheduler/service"
"github.com/odpf/optimus/core/tenant"
)

func TestReplayManager(t *testing.T) {
ctx := context.Background()
logger := log.NewNoop()
currentTime := func() time.Time { return time.Now() }
conf := config.ReplayConfig{ReplayTimeout: time.Hour * 3}
replaysToCheck := []scheduler.ReplayState{
scheduler.ReplayStateCreated, scheduler.ReplayStateInProgress,
scheduler.ReplayStatePartialReplayed, scheduler.ReplayStateReplayed,
}
replayID := uuid.New()
jobName := scheduler.JobName("sample_select")
replayStartTimeStr := "2023-01-03T12:00:00Z"
replayStartTime, _ := time.Parse(scheduler.ISODateFormat, replayStartTimeStr)
replayEndTime := replayStartTime.Add(24 * time.Hour)
replayDescription := "for backfill"
replayReqConf := scheduler.NewReplayConfig(replayStartTime, replayEndTime, false, replayDescription)
projName := tenant.ProjectName("proj")
namespaceName := tenant.ProjectName("ns1")
tnnt, _ := tenant.NewTenant(projName.String(), namespaceName.String())

t.Run("StartReplayLoop", func(t *testing.T) {
t.Run("should not proceed on the timeout process if unable to get replay requests by status", func(t *testing.T) {
replayRepository := new(ReplayRepository)
defer replayRepository.AssertExpectations(t)

err := errors.New("internal error")
replayRepository.On("GetReplayRequestsByStatus", ctx, replaysToCheck).Return(nil, err)
replayRepository.On("GetReplayToExecute", ctx).Return(nil, err)

replayManager := service.NewReplayManager(logger, replayRepository, nil, currentTime, conf)
replayManager.StartReplayLoop()
})
t.Run("should mark replay request as failed if it is timed out", func(t *testing.T) {
replayRepository := new(ReplayRepository)
defer replayRepository.AssertExpectations(t)

replayCreatedTime1 := time.Now().Add(-24 * time.Hour)
replayCreatedTime2 := time.Now().Add(-1 * time.Hour)

replayReq1 := scheduler.NewReplay(replayID, jobName, tnnt, replayReqConf, scheduler.ReplayStateInProgress, replayCreatedTime1)
replayReq2 := scheduler.NewReplay(uuid.New(), "other_job", tnnt, replayReqConf, scheduler.ReplayStateInProgress, replayCreatedTime2)

replayRepository.On("GetReplayRequestsByStatus", ctx, replaysToCheck).Return([]*scheduler.Replay{replayReq1, replayReq2}, nil)
replayRepository.On("UpdateReplayStatus", ctx, replayID, scheduler.ReplayStateFailed, "replay timed out").Return(nil).Once()

err := errors.New("internal error")
replayRepository.On("GetReplayToExecute", ctx).Return(nil, err)

replayManager := service.NewReplayManager(logger, replayRepository, nil, currentTime, conf)
replayManager.StartReplayLoop()
})
})
}

0 comments on commit ce7bfa3

Please sign in to comment.