Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add replay #753

Merged
merged 54 commits into from
Mar 27, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
54 commits
Select commit Hold shift + click to select a range
f707d5c
feat: add replay entity and repository
arinda-arif Feb 21, 2023
b085b36
feat: add replay service
arinda-arif Feb 21, 2023
a47989a
feat: add replay handler
arinda-arif Feb 21, 2023
9495a27
feat: add replay manager and worker
arinda-arif Feb 23, 2023
085fed6
test: fix job run service and replay service test cases
arinda-arif Mar 1, 2023
fc8cfa2
test: fix scheduler status test case
arinda-arif Mar 1, 2023
4c9b941
refactor: add JobReplayRunService in ReplayWorker
arinda-arif Mar 1, 2023
cd87c8c
refactor: fix lint issues
arinda-arif Mar 1, 2023
e4d04c7
refactor: remove logical time from JobRunStatus
arinda-arif Mar 1, 2023
b7b91c0
feat: implement UpdateReplay in repository
arinda-arif Mar 2, 2023
b421d2a
refactor: fetch a single replay request to be processed
arinda-arif Mar 2, 2023
23a289b
refactor: change implementation of GetReplayToExecute
arinda-arif Mar 2, 2023
24eb969
refactor: update replay state to in progress once it is picked
arinda-arif Mar 2, 2023
7c2b42c
refactor: pass job cron value to replay validate
arinda-arif Mar 2, 2023
43c3b52
feat: implement replay validator
arinda-arif Mar 2, 2023
22a39b5
refactor: change replay entity structure
arinda-arif Mar 2, 2023
0e50878
refactor: validator to accept replay request
arinda-arif Mar 2, 2023
29baf33
feat: implement GetReplayRequestsByStatus
arinda-arif Mar 2, 2023
499f67c
fix: GetReplayRequestByStatus invalid query
arinda-arif Mar 3, 2023
a0aa774
feat: add replay timeout in server config
arinda-arif Mar 3, 2023
01daf4f
refactor: update replay and update replay status in repository
arinda-arif Mar 3, 2023
9df82a5
feat: add replay time out check
arinda-arif Mar 3, 2023
cf74be3
refactor: remove replay id from ReplayWithRun
arinda-arif Mar 3, 2023
d8ed5d9
feat: add replay create cli
arinda-arif Mar 3, 2023
47157cc
refactor: unexport replay entity fields
arinda-arif Mar 3, 2023
f80e12e
Merge branch 'main' into feature/replay
arinda-arif Mar 3, 2023
75a10b1
fix: resolve migration files conflict
arinda-arif Mar 3, 2023
a09bdb8
test: add replay entity unit tests
arinda-arif Mar 3, 2023
4f3d14c
feat: add replay loop timeout in server config
arinda-arif Mar 6, 2023
50f6427
fix: replay repository to return scheduled time in utc
arinda-arif Mar 6, 2023
ec3a46e
fix: unable to process sequential replay properly issue
arinda-arif Mar 6, 2023
d5832dd
fix: error log in replay service
arinda-arif Mar 6, 2023
b23ea41
feat: register replay to optimus server
arinda-arif Mar 6, 2023
368fe84
chore: add todos in replay worker
arinda-arif Mar 20, 2023
5694d8d
refactor: remove replay's worker and loop timeout configuration
arinda-arif Mar 21, 2023
97a2694
chore: reduce replay create client timeout from 1 hour to 1 minute
arinda-arif Mar 21, 2023
03a5b05
fix: log level, log message and replay message
arinda-arif Mar 21, 2023
a87b650
refactor: reword error message when unable to parse cron interval
arinda-arif Mar 21, 2023
3f6c025
fix: add missing description flag in replay create
arinda-arif Mar 21, 2023
de523c5
refactor: rename var and simplify processing new replay in replay worker
arinda-arif Mar 21, 2023
57beb08
refactor: remove start and end time index in replay request table
arinda-arif Mar 21, 2023
28b180c
chore: add note and todo in scheduler status and replay repository
arinda-arif Mar 21, 2023
c1a8502
Merge branch 'main' into feature/replay
arinda-arif Mar 24, 2023
c1d50f7
fix: update conflicted migration files number
arinda-arif Mar 24, 2023
63099dd
chore: fix lint issues
arinda-arif Mar 24, 2023
99c22d3
chore: remove unnecessary error not found catch in replay repository
arinda-arif Mar 27, 2023
96340f9
fix: issue when replay worker unable to get job details and add more …
arinda-arif Mar 27, 2023
d69e416
test: add more replay test cases
arinda-arif Mar 27, 2023
e37508c
chore: fix lint issue
arinda-arif Mar 27, 2023
ce7bfa3
test: add replay manager unit tests
arinda-arif Mar 27, 2023
b949e47
test: add scheduler BC status unit tests
arinda-arif Mar 27, 2023
301d1ab
feat: replay config overriding (#765)
deryrahman Mar 27, 2023
22f2cb6
chore: update proton commit
arinda-arif Mar 27, 2023
7a3303e
fix: issue on replay creation when job config is not set
arinda-arif Mar 27, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ NAME = "github.com/odpf/optimus"
LAST_COMMIT := $(shell git rev-parse --short HEAD)
LAST_TAG := "$(shell git rev-list --tags --max-count=1)"
OPMS_VERSION := "$(shell git describe --tags ${LAST_TAG})-next"
PROTON_COMMIT := "fae8287656b163ae07a7f03edd3ea3f5df499dcb"
PROTON_COMMIT := "31ac9046d1a8c95a2f4645b87bf0620a3e6bb8bc"


.PHONY: build test test-ci generate-proto unit-test-ci integration-test vet coverage clean install lint
Expand Down Expand Up @@ -66,4 +66,4 @@ install: ## install required dependencies
go install github.com/bufbuild/buf/cmd/buf@v1.5.0
go install google.golang.org/grpc/cmd/protoc-gen-go-grpc@v1.2.0
go install github.com/grpc-ecosystem/grpc-gateway/v2/protoc-gen-grpc-gateway@v2.5.0
go install github.com/grpc-ecosystem/grpc-gateway/v2/protoc-gen-openapiv2@v2.5.0
go install github.com/grpc-ecosystem/grpc-gateway/v2/protoc-gen-openapiv2@v2.5.0
2 changes: 2 additions & 0 deletions client/cmd/commands.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/odpf/optimus/client/cmd/playground"
"github.com/odpf/optimus/client/cmd/plugin"
"github.com/odpf/optimus/client/cmd/project"
"github.com/odpf/optimus/client/cmd/replay"
"github.com/odpf/optimus/client/cmd/resource"
"github.com/odpf/optimus/client/cmd/scheduler"
"github.com/odpf/optimus/client/cmd/secret"
Expand Down Expand Up @@ -66,6 +67,7 @@ func New() *cli.Command {
version.NewVersionCommand(),
playground.NewPlaygroundCommand(),
scheduler.NewSchedulerCommand(),
replay.NewReplayCommand(),

// Will decide later, to add it server side or not
plugin.NewPluginCommand(),
Expand Down
159 changes: 159 additions & 0 deletions client/cmd/replay/create.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
package replay

import (
"errors"
"fmt"
"time"

"github.com/odpf/salt/log"
"github.com/spf13/cobra"
"golang.org/x/net/context"
"google.golang.org/protobuf/types/known/timestamppb"

"github.com/odpf/optimus/client/cmd/internal"
"github.com/odpf/optimus/client/cmd/internal/connectivity"
"github.com/odpf/optimus/client/cmd/internal/logger"
"github.com/odpf/optimus/config"
pb "github.com/odpf/optimus/protos/odpf/optimus/core/v1beta1"
)

const (
replayTimeout = time.Minute * 1
ISOTimeLayout = time.RFC3339
)

type createCommand struct {
logger log.Logger
configFilePath string

parallel bool
description string
jobConfig string

projectName string
namespaceName string
host string
}

// CreateCommand initializes command for creating a replay request
func CreateCommand() *cobra.Command {
refresh := &createCommand{
logger: logger.NewClientLogger(),
}

cmd := &cobra.Command{
Use: "create",
Short: "Run replay operation on a dag based on provided start and end time range",
Long: "This operation takes three arguments, first is DAG name[required]\nused in optimus specification, " +
"second is start time[required] of\nreplay, third is end time[optional] of replay. \nDate ranges are inclusive.",
Example: "optimus replay create <job_name> <2023-01-01T02:30:00Z00:00> [2023-01-02T02:30:00Z00:00]",
Args: func(cmd *cobra.Command, args []string) error {
if len(args) < 1 {
return errors.New("job name is required")
}
if len(args) < 2 { //nolint: gomnd
return errors.New("replay start time is required")
}
return nil
},
RunE: refresh.RunE,
PreRunE: refresh.PreRunE,
}

refresh.injectFlags(cmd)
return cmd
}

func (r *createCommand) injectFlags(cmd *cobra.Command) {
// Config filepath flag
cmd.Flags().StringVarP(&r.configFilePath, "config", "c", config.EmptyPath, "File path for client configuration")
cmd.Flags().StringVarP(&r.namespaceName, "namespace-name", "n", "", "Name of the optimus namespace")

cmd.Flags().BoolVarP(&r.parallel, "parallel", "", false, "Backfill job runs in parallel")
cmd.Flags().StringVarP(&r.description, "description", "d", "", "Description of why backfill is needed")
cmd.Flags().StringVarP(&r.jobConfig, "job-config", "", "", "additional job configurations")

// Mandatory flags if config is not set
cmd.Flags().StringVarP(&r.projectName, "project-name", "p", "", "Name of the optimus project")
cmd.Flags().StringVar(&r.host, "host", "", "Optimus service endpoint url")
}

func (r *createCommand) PreRunE(cmd *cobra.Command, _ []string) error {
conf, err := internal.LoadOptionalConfig(r.configFilePath)
if err != nil {
return err
}

if conf == nil {
internal.MarkFlagsRequired(cmd, []string{"project-name", "host"})
return nil
}

if r.projectName == "" {
r.projectName = conf.Project.Name
}
if r.host == "" {
r.host = conf.Host
}
return nil
}

func (r *createCommand) RunE(_ *cobra.Command, args []string) error {
jobName := args[0]
startTime := args[1]
endTime := args[1]
if len(args) >= 3 { //nolint: gomnd
endTime = args[2]
}

replayID, err := r.createReplayRequest(jobName, startTime, endTime, r.jobConfig)
if err != nil {
return err
}
r.logger.Info("Replay request created with id %s", replayID)
return nil
}

func (r *createCommand) createReplayRequest(jobName, startTimeStr, endTimeStr, jobConfig string) (string, error) {
conn, err := connectivity.NewConnectivity(r.host, replayTimeout)
if err != nil {
return "", err
}
defer conn.Close()

replayService := pb.NewReplayServiceClient(conn.GetConnection())

startTime, err := getTimeProto(startTimeStr)
if err != nil {
return "", err
}
endTime, err := getTimeProto(endTimeStr)
if err != nil {
return "", err
}
respStream, err := replayService.Replay(conn.GetContext(), &pb.ReplayRequest{
ProjectName: r.projectName,
JobName: jobName,
NamespaceName: r.namespaceName,
StartTime: startTime,
EndTime: endTime,
Parallel: r.parallel,
Description: r.description,
JobConfig: jobConfig,
})
if err != nil {
if errors.Is(err, context.DeadlineExceeded) {
r.logger.Error("Replay creation took too long, timing out")
}
return "", fmt.Errorf("replay request failed: %w", err)
}
return respStream.Id, nil
}

func getTimeProto(timeStr string) (*timestamppb.Timestamp, error) {
parsedTime, err := time.Parse(ISOTimeLayout, timeStr)
if err != nil {
return nil, err
}
return timestamppb.New(parsedTime), nil
}
21 changes: 21 additions & 0 deletions client/cmd/replay/replay.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package replay

import (
"github.com/spf13/cobra"
)

// NewReplayCommand initializes command for replay
func NewReplayCommand() *cobra.Command {
cmd := &cobra.Command{
Use: "replay",
Short: "replay related functions",
Annotations: map[string]string{
"group:core": "false",
},
}

cmd.AddCommand(
CreateCommand(),
)
return cmd
}
8 changes: 8 additions & 0 deletions config/config_server.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package config

import "time"

type ServerConfig struct {
Version Version `mapstructure:"version"`
Log LogConfig `mapstructure:"log"`
Expand All @@ -8,6 +10,7 @@ type ServerConfig struct {
Telemetry TelemetryConfig `mapstructure:"telemetry"`
ResourceManagers []ResourceManager `mapstructure:"resource_managers"`
Plugin PluginConfig `mapstructure:"plugin"`
Replay ReplayConfig `mapstructure:"replay"`
}

type Serve struct {
Expand Down Expand Up @@ -47,3 +50,8 @@ type ResourceManagerConfigOptimus struct {
type PluginConfig struct {
Artifacts []string `mapstructure:"artifacts"`
}

// TODO: add worker interval
type ReplayConfig struct {
ReplayTimeout time.Duration `mapstructure:"replay_timeout" default:"3h"`
}
3 changes: 3 additions & 0 deletions config/loader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"path/filepath"
"strings"
"testing"
"time"

saltConfig "github.com/odpf/salt/config"
"github.com/spf13/afero"
Expand Down Expand Up @@ -277,6 +278,8 @@ func (s *ConfigTestSuite) initExpectedServerConfig() {
},
}
s.expectedServerConfig.Plugin = config.PluginConfig{}

s.expectedServerConfig.Replay.ReplayTimeout = time.Hour * 3
}

func (*ConfigTestSuite) initServerConfigEnv() {
Expand Down
2 changes: 1 addition & 1 deletion core/job/handler/v1beta1/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,7 @@ func (jh *JobHandler) ListJobSpecification(ctx context.Context, req *pb.ListJobS

func (*JobHandler) GetWindow(_ context.Context, req *pb.GetWindowRequest) (*pb.GetWindowResponse, error) {
// TODO: the default version to be deprecated & made mandatory in future releases
version := 1
version := 2
if err := req.GetScheduledAt().CheckValid(); err != nil {
return nil, fmt.Errorf("%w: failed to parse schedule time %s", err, req.GetScheduledAt())
}
Expand Down
83 changes: 83 additions & 0 deletions core/scheduler/handler/v1beta1/replay.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
package v1beta1

import (
"fmt"
"strings"

"github.com/google/uuid"
"github.com/odpf/salt/log"
"golang.org/x/net/context"

"github.com/odpf/optimus/core/scheduler"
"github.com/odpf/optimus/core/tenant"
"github.com/odpf/optimus/internal/errors"
pb "github.com/odpf/optimus/protos/odpf/optimus/core/v1beta1"
)

type ReplayService interface {
CreateReplay(ctx context.Context, tenant tenant.Tenant, jobName scheduler.JobName, config *scheduler.ReplayConfig) (replayID uuid.UUID, err error)
}

type ReplayHandler struct {
l log.Logger
service ReplayService

pb.UnimplementedReplayServiceServer
}

func (h ReplayHandler) Replay(ctx context.Context, req *pb.ReplayRequest) (*pb.ReplayResponse, error) {
replayTenant, err := tenant.NewTenant(req.GetProjectName(), req.NamespaceName)
if err != nil {
return nil, errors.GRPCErr(err, "unable to start replay for "+req.GetJobName())
}

jobName, err := scheduler.JobNameFrom(req.GetJobName())
if err != nil {
return nil, errors.GRPCErr(err, "unable to start replay for "+req.GetJobName())
}

if err = req.GetStartTime().CheckValid(); err != nil {
return nil, errors.GRPCErr(errors.InvalidArgument(scheduler.EntityJobRun, "invalid start_time"), "unable to start replay for "+req.GetJobName())
}

if req.GetEndTime() != nil {
if err = req.GetEndTime().CheckValid(); err != nil {
return nil, errors.GRPCErr(errors.InvalidArgument(scheduler.EntityJobRun, "invalid end_time"), "unable to start replay for "+req.GetJobName())
}
}

jobConfig := make(map[string]string)
if req.JobConfig != "" {
jobConfig, err = parseJobConfig(req.JobConfig)
if err != nil {
return nil, errors.GRPCErr(err, "unable to parse replay job config for "+req.JobName)
}
}

replayConfig := scheduler.NewReplayConfig(req.GetStartTime().AsTime(), req.GetEndTime().AsTime(), req.Parallel, jobConfig, req.Description)
replayID, err := h.service.CreateReplay(ctx, replayTenant, jobName, replayConfig)
if err != nil {
return nil, errors.GRPCErr(err, "unable to start replay for "+req.GetJobName())
}

return &pb.ReplayResponse{Id: replayID.String()}, nil
}

func parseJobConfig(jobConfig string) (map[string]string, error) {
configs := map[string]string{}
for _, config := range strings.Split(jobConfig, ",") {
keyValue := strings.Split(config, "=")
valueLen := 2
if len(keyValue) != valueLen {
return nil, fmt.Errorf("error on job config value, %s", config)
}
key := strings.TrimSpace(strings.ToUpper(keyValue[0]))
value := keyValue[1]
configs[key] = value
}
return configs, nil
}

func NewReplayHandler(l log.Logger, service ReplayService) *ReplayHandler {
return &ReplayHandler{l: l, service: service}
}
Loading