Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cmd/ydbcp/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ db_connection:
connection_string: "grpcs://localhost:2135/domain/database"
insecure: true
discovery: false
enable_sdk_metrics: false

client_connection:
insecure: true
Expand Down
12 changes: 6 additions & 6 deletions cmd/ydbcp/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,10 +60,10 @@ func main() {
var wg sync.WaitGroup

var logger *xlog.LogConfig
if configInstance.DuplicateLogToFile != "" {
logger, err = xlog.SetupLoggingWithFile(configInstance.GRPCServer.LogLevel, configInstance.DuplicateLogToFile)
if configInstance.GetDuplicateLogToFile() != "" {
logger, err = xlog.SetupLoggingWithFile(configInstance.GetLogLevel(), configInstance.GetDuplicateLogToFile())
} else {
logger, err = xlog.SetupLogging(configInstance.GRPCServer.LogLevel)
logger, err = xlog.SetupLogging(configInstance.GetLogLevel())
}
if err != nil {
log.Error(err)
Expand Down Expand Up @@ -116,7 +116,7 @@ func main() {
xlog.Info(ctx, "Initialized AuthProvider")
metrics.InitializeMetricsRegistry(ctx, &wg, &configInstance.MetricsServer, clockwork.NewRealClock())
xlog.Info(ctx, "Initialized metrics registry")
audit.EventsDestination = configInstance.AuditEventsDestination
audit.EventsDestination = configInstance.GetAuditEventsDestination()
server, err := server.NewServer(&configInstance.GRPCServer, authProvider)
if err != nil {
xlog.Error(ctx, "failed to initialize GRPC server", zap.Error(err))
Expand Down Expand Up @@ -201,10 +201,10 @@ func main() {
os.Exit(1)
}

processor.NewOperationProcessor(ctx, &wg, configInstance.ProcessorIntervalSeconds, dbConnector, handlersRegistry)
processor.NewOperationProcessor(ctx, &wg, configInstance.GetProcessorIntervalSeconds(), dbConnector, handlersRegistry)
xlog.Info(ctx, "Initialized OperationProcessor")

if configInstance.DisableTTLDeletion {
if configInstance.GetDisableTTLDeletion() {
xlog.Info(ctx, "TtlWatcher is disabled, old backups won't be deleted")
} else {
ttl_watcher.NewTtlWatcher(ctx, &wg, dbConnector, queries.NewWriteTableQuery)
Expand Down
122 changes: 104 additions & 18 deletions internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,26 +23,26 @@ type S3Config struct {
PathPrefix string `yaml:"path_prefix"`
AccessKeyIDPath string `yaml:"access_key_id_path"`
SecretAccessKeyPath string `yaml:"secret_access_key_path"`
S3ForcePathStyle bool `yaml:"s3_force_path_style"`
S3ForcePathStyle bool `yaml:"s3_force_path_style" default:"false"`
IsMock bool
}

type YDBConnectionConfig struct {
ConnectionString string `yaml:"connection_string"`
Insecure bool `yaml:"insecure"`
Insecure bool `yaml:"insecure" default:"false"`
Discovery bool `yaml:"discovery" default:"true"`
DialTimeoutSeconds uint32 `yaml:"dial_timeout_seconds" default:"5"`
OAuth2KeyFile string `yaml:"oauth2_key_file"`
EnableSDKMetrics bool `yaml:"enable_sdk_metrics"`
EnableSDKMetrics bool `yaml:"enable_sdk_metrics" default:"true"`
}

type ClientConnectionConfig struct {
Insecure bool `yaml:"insecure"`
Insecure bool `yaml:"insecure" default:"false"`
Discovery bool `yaml:"discovery" default:"true"`
DialTimeoutSeconds uint32 `yaml:"dial_timeout_seconds" default:"5"`
OAuth2KeyFile string `yaml:"oauth2_key_file"`
AllowedEndpointDomains []string `yaml:"allowed_endpoint_domains"`
AllowInsecureEndpoint bool `yaml:"allow_insecure_endpoint"`
AllowInsecureEndpoint bool `yaml:"allow_insecure_endpoint" default:"false"`
}

type AuthConfig struct {
Expand All @@ -55,7 +55,7 @@ type GRPCServerConfig struct {
BindPort uint16 `yaml:"bind_port" default:"2135"`
TLSCertificatePath string `yaml:"tls_certificate_path"`
TLSKeyPath string `yaml:"tls_key_path"`
LogLevel string `yaml:"log_level"`
LogLevel string `yaml:"log_level" default:"DEBUG"`
}

type MetricsServerConfig struct {
Expand All @@ -65,23 +65,109 @@ type MetricsServerConfig struct {
TLSKeyPath string `yaml:"tls_key_path"`
}

type FeatureFlagsConfig struct {
DisableTTLDeletion bool `yaml:"disable_ttl_deletion" default:"false"`
}

type LogConfig struct {
DuplicateToFile string `yaml:"duplicate_to_file"`
Level string `yaml:"level" default:"DEBUG"`
}

type OperationProcessorConfig struct {
OperationTtlSeconds int64 `yaml:"operation_ttl_seconds" default:"86400"`
ProcessorIntervalSeconds int64 `yaml:"processor_interval_seconds" default:"10"`
}

type AuditConfig struct {
EventsDestination string `yaml:"events_destination"`
}

type QuotaConfig struct {
SchedulesPerDB int `yaml:"schedules_per_db" default:"10"`
}

type Validatable interface {
Validate() error
}

type Config struct {
DBConnection YDBConnectionConfig `yaml:"db_connection"`
ClientConnection ClientConnectionConfig `yaml:"client_connection"`
S3 S3Config `yaml:"s3"`
OperationTtlSeconds int64 `yaml:"operation_ttl_seconds"`
Auth AuthConfig `yaml:"auth"`
GRPCServer GRPCServerConfig `yaml:"grpc_server"`
MetricsServer MetricsServerConfig `yaml:"metrics_server"`
SchedulesLimitPerDB int `yaml:"schedules_limit_per_db" default:"10"`
ProcessorIntervalSeconds int64 `yaml:"processor_interval_seconds" default:"10"`
DisableTTLDeletion bool `yaml:"disable_ttl_deletion" default:"false"`
AuditEventsDestination string `yaml:"audit_events_destination"`
DuplicateLogToFile string `yaml:"duplicate_log_to_file"`
DBConnection YDBConnectionConfig `yaml:"db_connection"`
ClientConnection ClientConnectionConfig `yaml:"client_connection"`
S3 S3Config `yaml:"s3"`
Auth AuthConfig `yaml:"auth"`
GRPCServer GRPCServerConfig `yaml:"grpc_server"`
MetricsServer MetricsServerConfig `yaml:"metrics_server"`
OperationProcessor OperationProcessorConfig `yaml:"operation_processor"`
Audit AuditConfig `yaml:"audit"`
Log LogConfig `yaml:"log"`
Quota QuotaConfig `yaml:"quota"`
FeatureFlags FeatureFlagsConfig `yaml:"feature_flags"`

// TODO: remove these fields and their getters after migration to the new config format
OperationTtlSeconds int64 `yaml:"operation_ttl_seconds" default:"86400"`
SchedulesLimitPerDB int `yaml:"schedules_limit_per_db" default:"10"`
ProcessorIntervalSeconds int64 `yaml:"processor_interval_seconds" default:"10"`
DisableTTLDeletion bool `yaml:"disable_ttl_deletion" default:"false"`
AuditEventsDestination string `yaml:"audit_events_destination"`
DuplicateLogToFile string `yaml:"duplicate_log_to_file"`
}

func (c Config) GetOperationTtlSeconds() int64 {
if c.OperationTtlSeconds == 0 {
return c.OperationProcessor.OperationTtlSeconds
}

return c.OperationTtlSeconds
}

func (c *Config) SetOperationTtlSeconds(val int64) {
c.OperationTtlSeconds = val
c.OperationProcessor.OperationTtlSeconds = val
}

func (c Config) GetSchedulesLimitPerDB() int {
if c.SchedulesLimitPerDB == 0 {
return c.Quota.SchedulesPerDB
}

return c.SchedulesLimitPerDB
}

func (c Config) GetProcessorIntervalSeconds() int64 {
if c.ProcessorIntervalSeconds == 0 {
return c.OperationProcessor.ProcessorIntervalSeconds
}

return c.ProcessorIntervalSeconds
}

func (c Config) GetDisableTTLDeletion() bool {
return c.DisableTTLDeletion || c.FeatureFlags.DisableTTLDeletion
}

func (c Config) GetAuditEventsDestination() string {
if len(c.AuditEventsDestination) == 0 {
return c.Audit.EventsDestination
}

return c.AuditEventsDestination
}

func (c Config) GetDuplicateLogToFile() string {
if len(c.DuplicateLogToFile) == 0 {
return c.Log.DuplicateToFile
}

return c.DuplicateLogToFile
}

func (c Config) GetLogLevel() string {
if len(c.GRPCServer.LogLevel) == 0 {
return c.Log.Level
}

return c.GRPCServer.LogLevel
}

type ClusterConnectionConfig struct {
Expand Down
31 changes: 15 additions & 16 deletions internal/handlers/delete_backup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,10 @@ func TestDBOperationHandlerDeadlineExceededForRunningOperation(t *testing.T) {

s3Connector := s3Client.NewMockS3Connector(s3ObjectsMap)

config := config.Config{}
config.SetOperationTtlSeconds(0)
handler := NewDBOperationHandler(
dbConnector, s3Connector, config.Config{
OperationTtlSeconds: 0,
}, queries.NewWriteTableQueryMock,
dbConnector, s3Connector, config, queries.NewWriteTableQueryMock,
)

err := handler(ctx, &dbOp)
Expand Down Expand Up @@ -123,10 +123,10 @@ func TestDBOperationHandlerPendingOperationCompletedSuccessfully(t *testing.T) {
s3Connector := s3Client.NewMockS3Connector(s3ObjectsMap)

metrics.InitializeMockMetricsRegistry()
config := config.Config{}
config.SetOperationTtlSeconds(1000)
handler := NewDBOperationHandler(
dbConnector, s3Connector, config.Config{
OperationTtlSeconds: 1000,
}, queries.NewWriteTableQueryMock,
dbConnector, s3Connector, config, queries.NewWriteTableQueryMock,
)

err := handler(ctx, &dbOp)
Expand Down Expand Up @@ -203,10 +203,10 @@ func TestDBOperationHandlerRunningOperationCompletedSuccessfully(t *testing.T) {
s3Connector := s3Client.NewMockS3Connector(s3ObjectsMap)

metrics.InitializeMockMetricsRegistry()
config := config.Config{}
config.SetOperationTtlSeconds(1000)
handler := NewDBOperationHandler(
dbConnector, s3Connector, config.Config{
OperationTtlSeconds: 1000,
}, queries.NewWriteTableQueryMock,
dbConnector, s3Connector, config, queries.NewWriteTableQueryMock,
)

err := handler(ctx, &dbOp)
Expand Down Expand Up @@ -280,11 +280,10 @@ func TestDBOperationHandlerUnexpectedBackupStatus(t *testing.T) {
)

s3Connector := s3Client.NewMockS3Connector(s3ObjectsMap)

config := config.Config{}
config.SetOperationTtlSeconds(1000)
handler := NewDBOperationHandler(
dbConnector, s3Connector, config.Config{
OperationTtlSeconds: 1000,
}, queries.NewWriteTableQueryMock,
dbConnector, s3Connector, config, queries.NewWriteTableQueryMock,
)

err := handler(ctx, &dbOp)
Expand Down Expand Up @@ -351,10 +350,10 @@ func TestDBOperationHandlerDeleteMoreThanAllowedLimit(t *testing.T) {
s3Connector := s3Client.NewMockS3Connector(s3ObjectsMap)

metrics.InitializeMockMetricsRegistry()
config := config.Config{}
config.SetOperationTtlSeconds(1000)
handler := NewDBOperationHandler(
dbConnector, s3Connector, config.Config{
OperationTtlSeconds: 1000,
}, queries.NewWriteTableQueryMock,
dbConnector, s3Connector, config, queries.NewWriteTableQueryMock,
)

err := handler(ctx, &dbOp)
Expand Down
28 changes: 21 additions & 7 deletions internal/handlers/restore_backup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,10 +132,12 @@ func TestRBOperationHandlerRunningOperationInProgress(t *testing.T) {
dbConnector := db.NewMockDBConnector(db.WithOperations(opMap))

// try to handle pending rb operation with ttl
config := config.Config{}
config.SetOperationTtlSeconds(1000)
handler := NewRBOperationHandler(
dbConnector,
clientConnector,
config.Config{OperationTtlSeconds: 1000},
config,
)
err := handler(ctx, &rbOp)
assert.Empty(t, err)
Expand Down Expand Up @@ -184,10 +186,12 @@ func TestRBOperationHandlerRunningOperationCompletedSuccessfully(t *testing.T) {
clientConnector := client.NewMockClientConnector(client.WithOperations(ydbOpMap))
dbConnector := db.NewMockDBConnector(db.WithOperations(opMap))

config := config.Config{}
config.SetOperationTtlSeconds(1000)
handler := NewRBOperationHandler(
dbConnector,
clientConnector,
config.Config{OperationTtlSeconds: 1000},
config,
)
err := handler(ctx, &rbOp)
assert.Empty(t, err)
Expand Down Expand Up @@ -235,10 +239,12 @@ func TestRBOperationHandlerRunningOperationCancelled(t *testing.T) {
clientConnector := client.NewMockClientConnector(client.WithOperations(ydbOpMap))
dbConnector := db.NewMockDBConnector(db.WithOperations(opMap))

config := config.Config{}
config.SetOperationTtlSeconds(10)
handler := NewRBOperationHandler(
dbConnector,
clientConnector,
config.Config{OperationTtlSeconds: 10},
config,
)
err := handler(ctx, &rbOp)
assert.Empty(t, err)
Expand Down Expand Up @@ -337,10 +343,12 @@ func TestRBOperationHandlerCancellingOperationInProgress(t *testing.T) {
clientConnector := client.NewMockClientConnector(client.WithOperations(ydbOpMap))
dbConnector := db.NewMockDBConnector(db.WithOperations(opMap))

config := config.Config{}
config.SetOperationTtlSeconds(1000)
handler := NewRBOperationHandler(
dbConnector,
clientConnector,
config.Config{OperationTtlSeconds: 1000},
config,
)
err := handler(ctx, &rbOp)
assert.Empty(t, err)
Expand Down Expand Up @@ -389,10 +397,12 @@ func TestRBOperationHandlerCancellingOperationCompletedSuccessfully(t *testing.T
clientConnector := client.NewMockClientConnector(client.WithOperations(ydbOpMap))
dbConnector := db.NewMockDBConnector(db.WithOperations(opMap))

config := config.Config{}
config.SetOperationTtlSeconds(10)
handler := NewRBOperationHandler(
dbConnector,
clientConnector,
config.Config{OperationTtlSeconds: 10},
config,
)
err := handler(ctx, &rbOp)
assert.Empty(t, err)
Expand Down Expand Up @@ -441,10 +451,12 @@ func TestRBOperationHandlerCancellingOperationCancelled(t *testing.T) {
clientConnector := client.NewMockClientConnector(client.WithOperations(ydbOpMap))
dbConnector := db.NewMockDBConnector(db.WithOperations(opMap))

config := config.Config{}
config.SetOperationTtlSeconds(10)
handler := NewRBOperationHandler(
dbConnector,
clientConnector,
config.Config{OperationTtlSeconds: 10},
config,
)
err := handler(ctx, &rbOp)
assert.Empty(t, err)
Expand Down Expand Up @@ -493,10 +505,12 @@ func TestRBOperationHandlerRetriableErrorForRunningOperation(t *testing.T) {
clientConnector := client.NewMockClientConnector(client.WithOperations(ydbOpMap))
dbConnector := db.NewMockDBConnector(db.WithOperations(opMap))

config := config.Config{}
config.SetOperationTtlSeconds(10)
handler := NewRBOperationHandler(
dbConnector,
clientConnector,
config.Config{OperationTtlSeconds: 10},
config,
)
err := handler(ctx, &rbOp)
assert.Empty(t, err)
Expand Down
Loading
Loading