diff --git a/.env b/.env index d4a2a4d1..aac9156f 100644 --- a/.env +++ b/.env @@ -4,3 +4,10 @@ S3_SECRET_KEY = "password" S3_BUCKET = "test-bucket" S3_REGION = "us-east-1" YDB_NAME = "local-ydb" +ENABLE_NEW_PATHS_FORMAT = true +# local-ydb image that was built from main +# Image: https://github.com/ydb-platform/ydb/pkgs/container/local-ydb/451750046 +# Built from revision e52872a0c51f24dc566d4368364ed9aa849947ca +# https://github.com/ydb-platform/ydb/tree/e52872a0c51f24dc566d4368364ed9aa849947ca +# Build time: 2025-07-02T03:30:35.406Z +YDB_IMAGE = "ghcr.io/ydb-platform/local-ydb@sha256:6a0b21ab6490365de6da266311c81ab1f476159c8b84da966a2bced7caf1f88c" diff --git a/.github/workflows/unit-test.yml b/.github/workflows/unit-test.yml index 2bfd48f8..1479006e 100644 --- a/.github/workflows/unit-test.yml +++ b/.github/workflows/unit-test.yml @@ -52,6 +52,11 @@ jobs: integration-test: runs-on: ubuntu-latest + strategy: + matrix: + enable_new_paths_format: [ true, false ] + env: + ENABLE_NEW_PATHS_FORMAT: ${{ matrix.enable_new_paths_format }} steps: - uses: actions/checkout@v4 - name: supply with s3 access keys diff --git a/cmd/integration/make_backup/main.go b/cmd/integration/make_backup/main.go index 66632f8e..8b50da3c 100644 --- a/cmd/integration/make_backup/main.go +++ b/cmd/integration/make_backup/main.go @@ -541,6 +541,40 @@ func main() { if !done { log.Panicln("failed to complete a restore in 30 seconds") } + + partialRestoreOperation, err := client.MakeRestore( + context.Background(), &pb.MakeRestoreRequest{ + ContainerId: containerID, + BackupId: backupOperation.BackupId, + DatabaseName: databaseName, + DatabaseEndpoint: databaseEndpoint, + DestinationPrefix: "/partial_restore", + SourcePaths: []string{"kv_test"}, + }, + ) + if err != nil { + log.Panicf("failed to make partial restore: %v", err) + } + done = false + for range 30 { + op, err := opClient.GetOperation( + context.Background(), &pb.GetOperationRequest{ + Id: partialRestoreOperation.Id, + }, + ) + if err != nil { + log.Panicf("failed to get operation: %v", err) + } + if op.GetStatus().String() == types.OperationStateDone.String() { + done = true + break + } + time.Sleep(time.Second) + } + if !done { + log.Panicln("failed to complete a partial restore in 30 seconds") + } + deleteOperation, err := client.DeleteBackup( context.Background(), &pb.DeleteBackupRequest{ BackupId: backupOperation.BackupId, diff --git a/cmd/ydbcp/main.go b/cmd/ydbcp/main.go index 947e804e..0906a537 100644 --- a/cmd/ydbcp/main.go +++ b/cmd/ydbcp/main.go @@ -143,10 +143,8 @@ func main() { backup.NewBackupService( dbConnector, clientConnector, - configInstance.S3, authProvider, - configInstance.ClientConnection.AllowedEndpointDomains, - configInstance.ClientConnection.AllowInsecureEndpoint, + *configInstance, ).Register(server) operation.NewOperationService(dbConnector, authProvider).Register(server) backup_schedule.NewBackupScheduleService( @@ -191,10 +189,9 @@ func main() { handlers.NewTBWROperationHandler( dbConnector, clientConnector, - configInstance.S3, - configInstance.ClientConnection, queries.NewWriteTableQuery, clockwork.NewRealClock(), + *configInstance, ), ); err != nil { xlog.Error(ctx, "failed to register TBWR handler", zap.Error(err)) diff --git a/docker-compose.yaml b/docker-compose.yaml index 45a088c1..290a3c0b 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -1,11 +1,6 @@ services: ydb: - # local-ydb image that was built from main - # Image: https://github.com/ydb-platform/ydb/pkgs/container/local-ydb/451750046 - # Built from revision e52872a0c51f24dc566d4368364ed9aa849947ca - # https://github.com/ydb-platform/ydb/tree/e52872a0c51f24dc566d4368364ed9aa849947ca - # Build time: 2025-07-02T03:30:35.406Z - image: ghcr.io/ydb-platform/local-ydb@sha256:6a0b21ab6490365de6da266311c81ab1f476159c8b84da966a2bced7caf1f88c + image: ${YDB_IMAGE} platform: linux/amd64 hostname: ${YDB_NAME} container_name: ${YDB_NAME} diff --git a/go.mod b/go.mod index fda83377..15ea8804 100644 --- a/go.mod +++ b/go.mod @@ -14,7 +14,7 @@ require ( github.com/jonboulle/clockwork v0.5.0 github.com/prometheus/client_golang v1.20.4 github.com/stretchr/testify v1.10.0 - github.com/ydb-platform/ydb-go-genproto v0.0.0-20241112172322-ea1f63298f77 + github.com/ydb-platform/ydb-go-genproto v0.0.0-20250911135631-b3beddd517d9 github.com/ydb-platform/ydb-go-sdk-prometheus/v2 v2.1.2 github.com/ydb-platform/ydb-go-sdk/v3 v3.108.5 go.uber.org/automaxprocs v1.5.3 diff --git a/go.sum b/go.sum index 28efc765..3ef2fae7 100644 --- a/go.sum +++ b/go.sum @@ -31,8 +31,10 @@ github.com/envoyproxy/go-control-plane v0.9.9-0.20201210154907-fd9021fe5dad/go.m github.com/envoyproxy/go-control-plane v0.10.2-0.20220325020618-49ff273808a1/go.mod h1:KJwIaB5Mv44NWtYuAOFCVOjcI94vtpEz2JU/D2v6IjE= github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c= github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04= -github.com/golang-jwt/jwt/v4 v4.5.0 h1:7cYmW1XlMY7h7ii7UhUyChSgS5wUJEnm9uZVTGqOWzg= -github.com/golang-jwt/jwt/v4 v4.5.0/go.mod h1:m21LjoU+eqJr34lmDMbreY2eSTRJ1cv77w39/MY0Ch0= +github.com/go-logr/logr v1.4.2 h1:6pFjapn8bFcIbiKo3XT4j/BhANplGihG6tvd+8rYgrY= +github.com/go-logr/logr v1.4.2/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= +github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag= +github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE= github.com/golang-jwt/jwt/v4 v4.5.2 h1:YtQM7lnr8iZ+j5q71MGKkNw9Mn7AjHM68uc9g5fXeUI= github.com/golang-jwt/jwt/v4 v4.5.2/go.mod h1:m21LjoU+eqJr34lmDMbreY2eSTRJ1cv77w39/MY0Ch0= github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q= @@ -50,6 +52,8 @@ github.com/golang/protobuf v1.4.2/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw github.com/golang/protobuf v1.4.3/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI= github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk= github.com/golang/protobuf v1.5.2/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY= +github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek= +github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps= github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M= github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= @@ -69,8 +73,6 @@ github.com/jmespath/go-jmespath v0.4.0 h1:BEgLn5cpjn8UN1mAw4NjwDrS35OdebyEtFe+9Y github.com/jmespath/go-jmespath v0.4.0/go.mod h1:T8mJZnbsbmF+m6zOOFylbeCJqk5+pHWvzYPziyZiYoo= github.com/jmespath/go-jmespath/internal/testify v1.5.1 h1:shLQSRRSCCPj3f2gpwzGwWFoC7ycTf1rcQZHOlsJ6N8= github.com/jmespath/go-jmespath/internal/testify v1.5.1/go.mod h1:L3OGu8Wl2/fWfCI6z80xFu9LTZmf1ZRjMHUOPmWr69U= -github.com/jonboulle/clockwork v0.3.0 h1:9BSCMi8C+0qdApAp4auwX0RkLGUjs956h0EkuQymUhg= -github.com/jonboulle/clockwork v0.3.0/go.mod h1:Pkfl5aHPm1nk2H9h0bjmnJD/BcgbGXUBGnn1kMkgxc8= github.com/jonboulle/clockwork v0.5.0 h1:Hyh9A8u51kptdkR+cqRpT1EebBwTn1oK9YfGYbdFz6I= github.com/jonboulle/clockwork v0.5.0/go.mod h1:3mZlmanh0g2NDKO5TWZVJAfofYk64M7XN3SzBPjZF60= github.com/klauspost/compress v1.17.10 h1:oXAz+Vh0PMUvJczoi+flxpnBEPxoER1IaAnU/NMPtT0= @@ -104,20 +106,24 @@ github.com/rogpeppe/go-internal v1.10.0/go.mod h1:UQnix2H7Ngw/k4C5ijL5+65zddjncj github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA= github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= -github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= -github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA= github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= -github.com/ydb-platform/ydb-go-genproto v0.0.0-20241112172322-ea1f63298f77 h1:LY6cI8cP4B9rrpTleZk95+08kl2gF4rixG7+V/dwL6Q= -github.com/ydb-platform/ydb-go-genproto v0.0.0-20241112172322-ea1f63298f77/go.mod h1:Er+FePu1dNUieD+XTMDduGpQuCPssK5Q4BjF+IIXJ3I= +github.com/ydb-platform/ydb-go-genproto v0.0.0-20250911135631-b3beddd517d9 h1:SKqSRP6/ocY2Z4twOqKEKxpmawVTHTvQiom7hrU6jt0= +github.com/ydb-platform/ydb-go-genproto v0.0.0-20250911135631-b3beddd517d9/go.mod h1:Er+FePu1dNUieD+XTMDduGpQuCPssK5Q4BjF+IIXJ3I= github.com/ydb-platform/ydb-go-sdk-prometheus/v2 v2.1.2 h1:/kDHhXMNGjsqy+SZ3Zn7gZ2ziZekUJLnPXqwy6vyAX8= github.com/ydb-platform/ydb-go-sdk-prometheus/v2 v2.1.2/go.mod h1:fGsyzk5v4hqteuWNw8liz3iu3xQSRe+zBEdVzyGQy9s= -github.com/ydb-platform/ydb-go-sdk/v3 v3.99.3 h1:KxELBOo/THNL4S5UW708JvAueKGqK00PgI2fvn5T+00= -github.com/ydb-platform/ydb-go-sdk/v3 v3.99.3/go.mod h1:knXehPLqrF/uBrYY0EbDtAMR+Ve8sAwIm/pNsfvbs7E= -github.com/ydb-platform/ydb-go-sdk/v3 v3.107.1-0.20250417120650-061e5de8fb8a h1:1iX6jYOrU9tYrbOHgP4pF9tLKS5+Sqfk4Iajcjtm0aI= -github.com/ydb-platform/ydb-go-sdk/v3 v3.107.1-0.20250417120650-061e5de8fb8a/go.mod h1:l5sSv153E18VvYcsmr51hok9Sjc16tEC8AXGbwrk+ho= github.com/ydb-platform/ydb-go-sdk/v3 v3.108.5 h1:h6API3jJKooqBa5MNhBnilscwOMf2xjn+gFhoH56FHk= github.com/ydb-platform/ydb-go-sdk/v3 v3.108.5/go.mod h1:IMoR7zRpTwEwx+9iHtA13CAyEsSibUMgMXYkHVboAh8= +go.opentelemetry.io/otel v1.31.0 h1:NsJcKPIW0D0H3NgzPDHmo0WW6SptzPdqg/L1zsIm2hY= +go.opentelemetry.io/otel v1.31.0/go.mod h1:O0C14Yl9FgkjqcCZAsE053C13OaddMYr/hz6clDkEJE= +go.opentelemetry.io/otel/metric v1.31.0 h1:FSErL0ATQAmYHUIzSezZibnyVlft1ybhy4ozRPcF2fE= +go.opentelemetry.io/otel/metric v1.31.0/go.mod h1:C3dEloVbLuYoX41KpmAhOqNriGbA+qqH6PQ5E5mUfnY= +go.opentelemetry.io/otel/sdk v1.31.0 h1:xLY3abVHYZ5HSfOg3l2E5LUj2Cwva5Y7yGxnSW9H5Gk= +go.opentelemetry.io/otel/sdk v1.31.0/go.mod h1:TfRbMdhvxIIr/B2N2LQW2S5v9m3gOQ/08KsbbO5BPT0= +go.opentelemetry.io/otel/sdk/metric v1.31.0 h1:i9hxxLJF/9kkvfHppyLL55aW7iIJz4JjxTeYusH7zMc= +go.opentelemetry.io/otel/sdk/metric v1.31.0/go.mod h1:CRInTMVvNhUKgSAMbKyTMxqOBC0zgyxzW55lZzX43Y8= +go.opentelemetry.io/otel/trace v1.31.0 h1:ffjsj1aRouKewfr85U2aGagJ46+MvodynlQ1HYdmJys= +go.opentelemetry.io/otel/trace v1.31.0/go.mod h1:TXZkRk7SM2ZQLtR6eoAWQFIHPvzQ06FJAsO1tJg480A= go.opentelemetry.io/proto/otlp v0.7.0/go.mod h1:PqfVotwruBrMGOCsRd/89rSnXhoiJIqeYNgFYFoEGnI= go.uber.org/automaxprocs v1.5.3 h1:kWazyxZUrS3Gs4qUpbwo5kEIMGe/DAvi5Z4tl2NW4j8= go.uber.org/automaxprocs v1.5.3/go.mod h1:eRbA25aqJrxAbsLO0xy5jVwPt7FQnRgjW+efnwa1WM0= @@ -143,8 +149,6 @@ golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= golang.org/x/net v0.0.0-20200822124328-c89045814202/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA= golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= -golang.org/x/net v0.33.0 h1:74SYHlV8BIgHIFC/LrYkOGIwL19eTYXQ5wc6TBuO36I= -golang.org/x/net v0.33.0/go.mod h1:HXLR5J+9DxmrqMwG9qjGCxZ+zKXxBru04zlTvWlWuN4= golang.org/x/net v0.38.0 h1:vRMAPTMaeGqVhG5QyLJHqNDwecKTomGeqbnfZyKlBI8= golang.org/x/net v0.38.0/go.mod h1:ivrbrMbzFq5J41QOQh0siUuly180yBYtLp+CKbEaFx8= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= @@ -153,8 +157,6 @@ golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJ golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= -golang.org/x/sync v0.10.0 h1:3NQrjDixjgGwUOCaF8w2+VYHv0Ve/vGYSbdkTa98gmQ= -golang.org/x/sync v0.10.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= golang.org/x/sync v0.12.0 h1:MHc5BpPuC30uJk597Ri8TV3CNZcTLu6B6z4lJy+g6Jw= golang.org/x/sync v0.12.0/go.mod h1:1dzgHSNfp02xaA81J2MS99Qcpr2w7fw1gpm99rleRqA= golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= @@ -163,14 +165,10 @@ golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20200323222414-85ca7c5b95cd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210119212857-b64e53b001e4/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.28.0 h1:Fksou7UEQUWlKvIdsqzJmUmCX3cZuD2+P3XyyzwMhlA= -golang.org/x/sys v0.28.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/sys v0.31.0 h1:ioabZlmFYtWhL+TRYpcnNlLwhyxaM9kWTDEmfnprqik= golang.org/x/sys v0.31.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= -golang.org/x/text v0.21.0 h1:zyQAAkrwaneQ066sspRyJaG9VNi/YJ1NfzcGB3hZ/qo= -golang.org/x/text v0.21.0/go.mod h1:4IBbMaMmOPCJ8SecivzSH54+73PCFmPWxNTLm+vZkEQ= golang.org/x/text v0.23.0 h1:D71I7dUrlY+VX0gQShAThNGHFxZ13dGLBHQLVl1mJlY= golang.org/x/text v0.23.0/go.mod h1:/BLNzu4aZCJ1+kcD0DNRotWKage4q2rGVAg4o22unh4= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= @@ -186,8 +184,6 @@ google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8/go.mod h1:JiN7NxoA google.golang.org/genproto v0.0.0-20190819201941-24fa4b261c55/go.mod h1:DMBHOl98Agz4BDEuKkezgsaosCRResVns1a3J2ZsMNc= google.golang.org/genproto v0.0.0-20200513103714-09dca8ec2884/go.mod h1:55QSHmfGQM9UVYDPBsyGGes0y52j32PQ3BqQfXhyH3c= google.golang.org/genproto v0.0.0-20200526211855-cb27e3aa2013/go.mod h1:NbSheEEYHJ7i3ixzK3sjbqSGDJWnxyFXZblF3eUsNvo= -google.golang.org/genproto/googleapis/rpc v0.0.0-20240604185151-ef581f913117 h1:1GBuWVLM/KMVUv1t1En5Gs+gFZCNd360GGb4sSxtrhU= -google.golang.org/genproto/googleapis/rpc v0.0.0-20240604185151-ef581f913117/go.mod h1:EfXuqaE1J41VCDicxHzUDm+8rk+7ZdXzHV0IhO/I6s0= google.golang.org/genproto/googleapis/rpc v0.0.0-20241015192408-796eee8c2d53 h1:X58yt85/IXCx0Y3ZwN6sEIKZzQtDEYaBWrDvErdXrRE= google.golang.org/genproto/googleapis/rpc v0.0.0-20241015192408-796eee8c2d53/go.mod h1:GX3210XPVPUjJbTUbvwI8f2IpZDMZuPJWDzDuebbviI= google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c= @@ -197,8 +193,6 @@ google.golang.org/grpc v1.27.0/go.mod h1:qbnxyOmOxrQa7FizSgH+ReBfzJrCY1pSN7KXBS8 google.golang.org/grpc v1.33.1/go.mod h1:fr5YgcSWrqhRRxogOsw7RzIpsmvOZ6IcH4kBYTpR3n0= google.golang.org/grpc v1.36.0/go.mod h1:qjiiYl8FncCW8feJPdyg3v6XW24KsRHe+dy9BAGRRjU= google.golang.org/grpc v1.47.0/go.mod h1:vN9eftEi1UMyUsIF80+uQXhHjbXYbm0uXoFCACuMGWk= -google.golang.org/grpc v1.65.0 h1:bs/cUb4lp1G5iImFFd3u5ixQzweKizoZJAwBNLR42lc= -google.golang.org/grpc v1.65.0/go.mod h1:WgYC2ypjlB0EiQi6wdKixMqukr6lBc0Vo+oOgjrM5ZQ= google.golang.org/grpc v1.69.4 h1:MF5TftSMkd8GLw/m0KM6V8CMOCY6NZ1NQDPGFgbTt4A= google.golang.org/grpc v1.69.4/go.mod h1:vyjdE6jLBI76dgpDojsFGNaHlxdjXN9ghpnd2o7JGZ4= google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8= @@ -214,8 +208,6 @@ google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp0 google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= google.golang.org/protobuf v1.27.1/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= google.golang.org/protobuf v1.28.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I= -google.golang.org/protobuf v1.34.2 h1:6xV6lTsCfpGD21XK49h7MhtcApnLqkfYgPcdHftf6hg= -google.golang.org/protobuf v1.34.2/go.mod h1:qYOHts0dSfpeUzUFpOMr/WGzszTmLH+DiWniOlNbLDw= google.golang.org/protobuf v1.35.1 h1:m3LfL6/Ca+fqnjnlqQXNpFPABW1UD7mjh8KO2mKFytA= google.golang.org/protobuf v1.35.1/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= diff --git a/internal/backup_operations/make_backup.go b/internal/backup_operations/make_backup.go index b572a936..05d4c139 100644 --- a/internal/backup_operations/make_backup.go +++ b/internal/backup_operations/make_backup.go @@ -97,7 +97,12 @@ func IsAllowedEndpoint(e string, allowedEndpointDomains []string, allowInsecureE return false } -func OpenConnAndValidateSourcePaths(ctx context.Context, req MakeBackupInternalRequest, clientConn client.ClientConnector) error { +func OpenConnAndValidateSourcePaths( + ctx context.Context, + req MakeBackupInternalRequest, + clientConn client.ClientConnector, + featureFlags config.FeatureFlagsConfig, +) error { clientConnectionParams := types.YdbConnectionParams{ Endpoint: req.DatabaseEndpoint, DatabaseName: req.DatabaseName, @@ -116,7 +121,7 @@ func OpenConnAndValidateSourcePaths(ctx context.Context, req MakeBackupInternalR xlog.Error(ctx, "can't close client connection", zap.Error(err)) } }() - _, err = ValidateSourcePaths(ctx, req, clientConn, driver, dsn) + _, err = ValidateSourcePaths(ctx, req, clientConn, driver, dsn, featureFlags) var empty *EmptyDatabaseError if errors.As(err, &empty) { return nil @@ -125,7 +130,14 @@ func OpenConnAndValidateSourcePaths(ctx context.Context, req MakeBackupInternalR } } -func ValidateSourcePaths(ctx context.Context, req MakeBackupInternalRequest, clientConn client.ClientConnector, client *ydb.Driver, dsn string) ([]string, error) { +func ValidateSourcePaths( + ctx context.Context, + req MakeBackupInternalRequest, + clientConn client.ClientConnector, + client *ydb.Driver, + dsn string, + featureFlags config.FeatureFlagsConfig, +) ([]string, error) { if req.ScheduleID != nil { ctx = xlog.With(ctx, zap.String("ScheduleID", *req.ScheduleID)) } @@ -139,15 +151,22 @@ func ValidateSourcePaths(ctx context.Context, req MakeBackupInternalRequest, cli sourcePaths = append(sourcePaths, fullPath) } - pathsForExport, err := clientConn.PreparePathsForExport(ctx, client, sourcePaths, req.SourcePathsToExclude) - if err != nil { - xlog.Error(ctx, "error preparing paths for export", zap.Error(err)) - return nil, status.Errorf(codes.Unknown, "error preparing paths for export, dsn %s", dsn) - } + var pathsForExport []string + if featureFlags.EnableNewPathsFormat { + // We don't need to list directories, it'll be done on the server side. + pathsForExport = sourcePaths + } else { + var err error + pathsForExport, err = clientConn.PreparePathsForExport(ctx, client, sourcePaths, req.SourcePathsToExclude) + if err != nil { + xlog.Error(ctx, "error preparing paths for export", zap.Error(err)) + return nil, status.Errorf(codes.Unknown, "error preparing paths for export, dsn %s", dsn) + } - if len(pathsForExport) == 0 { - xlog.Error(ctx, "empty list of paths for export") - return nil, NewEmptyDatabaseError(codes.FailedPrecondition, "empty list of paths for export") + if len(pathsForExport) == 0 { + xlog.Error(ctx, "empty list of paths for export") + return nil, NewEmptyDatabaseError(codes.FailedPrecondition, "empty list of paths for export") + } } return pathsForExport, nil } @@ -264,6 +283,7 @@ func MakeBackup( req MakeBackupInternalRequest, subject string, clock clockwork.Clock, + featureFlags config.FeatureFlagsConfig, ) (*types.Backup, *types.TakeBackupOperation, error) { if req.ScheduleID != nil { ctx = xlog.With(ctx, zap.String("ScheduleID", *req.ScheduleID)) @@ -311,7 +331,7 @@ func MakeBackup( destinationPrefix := CreateS3DestinationPrefix(req.DatabaseName, s3, clock) ctx = xlog.With(ctx, zap.String("S3DestinationPrefix", destinationPrefix)) - pathsForExport, err := ValidateSourcePaths(ctx, req, clientConn, client, dsn) + pathsForExport, err := ValidateSourcePaths(ctx, req, clientConn, client, dsn, featureFlags) if err != nil { return nil, nil, err @@ -330,7 +350,7 @@ func MakeBackup( S3ForcePathStyle: s3.S3ForcePathStyle, } - clientOperationID, err := clientConn.ExportToS3(ctx, client, s3Settings) + clientOperationID, err := clientConn.ExportToS3(ctx, client, s3Settings, featureFlags) if err != nil { xlog.Error(ctx, "can't start export operation", zap.Error(err)) return nil, nil, status.Errorf(codes.Unknown, "can't start export operation, dsn %s", dsn) diff --git a/internal/config/config.go b/internal/config/config.go index e97c97e6..ad7d8a7a 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -66,7 +66,8 @@ type MetricsServerConfig struct { } type FeatureFlagsConfig struct { - DisableTTLDeletion bool `yaml:"disable_ttl_deletion" default:"false"` + DisableTTLDeletion bool `yaml:"disable_ttl_deletion" default:"false"` + EnableNewPathsFormat bool `yaml:"enable_new_paths_format" default:"false"` } type LogConfig struct { diff --git a/internal/connectors/client/connector.go b/internal/connectors/client/connector.go index 6bf30d2b..1b8ac226 100644 --- a/internal/connectors/client/connector.go +++ b/internal/connectors/client/connector.go @@ -35,8 +35,8 @@ type ClientConnector interface { Close(ctx context.Context, clientDb *ydb.Driver) error PreparePathsForExport(ctx context.Context, clientDb *ydb.Driver, sourcePaths []string, sourcePathsToExclude []string) ([]string, error) - ExportToS3(ctx context.Context, clientDb *ydb.Driver, s3Settings types.ExportSettings) (string, error) - ImportFromS3(ctx context.Context, clientDb *ydb.Driver, s3Settings types.ImportSettings) (string, error) + ExportToS3(ctx context.Context, clientDb *ydb.Driver, s3Settings types.ExportSettings, featureFlags config.FeatureFlagsConfig) (string, error) + ImportFromS3(ctx context.Context, clientDb *ydb.Driver, s3Settings types.ImportSettings, featureFlags config.FeatureFlagsConfig) (string, error) GetOperationStatus( ctx context.Context, clientDb *ydb.Driver, operationId string, ) (*Ydb_Operations.GetOperationResponse, error) @@ -217,7 +217,7 @@ func (d *ClientYdbConnector) PreparePathsForExport( } func (d *ClientYdbConnector) ExportToS3( - ctx context.Context, clientDb *ydb.Driver, s3Settings types.ExportSettings, + ctx context.Context, clientDb *ydb.Driver, s3Settings types.ExportSettings, featureFlags config.FeatureFlagsConfig, ) (string, error) { if clientDb == nil { return "", fmt.Errorf("unititialized client db driver") @@ -225,15 +225,18 @@ func (d *ClientYdbConnector) ExportToS3( items := make([]*Ydb_Export.ExportToS3Settings_Item, len(s3Settings.SourcePaths)) for i, source := range s3Settings.SourcePaths { - // Destination prefix format: s3_destination_prefix/rel_source_path - destinationPrefix := path.Join( - s3Settings.DestinationPrefix, - strings.TrimPrefix(source, clientDb.Name()+"/"), - ) - items[i] = &Ydb_Export.ExportToS3Settings_Item{ - SourcePath: source, - DestinationPrefix: destinationPrefix, + SourcePath: source, + } + + if !featureFlags.EnableNewPathsFormat { + // Destination prefix format: s3_destination_prefix/rel_source_path + destinationPrefix := path.Join( + s3Settings.DestinationPrefix, + strings.TrimPrefix(source, clientDb.Name()+"/"), + ) + + items[i].DestinationPrefix = destinationPrefix } } @@ -246,26 +249,30 @@ func (d *ClientYdbConnector) ExportToS3( zap.String("S3Description", s3Settings.Description), ) - response, err := exportClient.ExportToS3( - ctx, - &Ydb_Export.ExportToS3Request{ - OperationParams: &Ydb_Operations.OperationParams{ - OperationTimeout: durationpb.New(time.Second), - CancelAfter: durationpb.New(time.Second), - }, - Settings: &Ydb_Export.ExportToS3Settings{ - Endpoint: s3Settings.Endpoint, - Bucket: s3Settings.Bucket, - Region: s3Settings.Region, - AccessKey: s3Settings.AccessKey, - SecretKey: s3Settings.SecretKey, - Description: s3Settings.Description, - NumberOfRetries: s3Settings.NumberOfRetries, - Items: items, - DisableVirtualAddressing: s3Settings.S3ForcePathStyle, - }, + exportRequest := &Ydb_Export.ExportToS3Request{ + OperationParams: &Ydb_Operations.OperationParams{ + OperationTimeout: durationpb.New(time.Second), + CancelAfter: durationpb.New(time.Second), }, - ) + Settings: &Ydb_Export.ExportToS3Settings{ + Endpoint: s3Settings.Endpoint, + Bucket: s3Settings.Bucket, + Region: s3Settings.Region, + AccessKey: s3Settings.AccessKey, + SecretKey: s3Settings.SecretKey, + Description: s3Settings.Description, + NumberOfRetries: s3Settings.NumberOfRetries, + Items: items, + DisableVirtualAddressing: s3Settings.S3ForcePathStyle, + }, + } + + if featureFlags.EnableNewPathsFormat { + exportRequest.Settings.SourcePath = clientDb.Name() + exportRequest.Settings.DestinationPrefix = s3Settings.DestinationPrefix + } + + response, err := exportClient.ExportToS3(ctx, exportRequest) if err != nil { return "", fmt.Errorf("error exporting to S3: %w", err) @@ -322,7 +329,9 @@ func prepareItemsForImport(dbName string, s3Client S3API, s3Settings types.Impor *itemsPtr = append( *itemsPtr, &Ydb_Import.ImportFromS3Settings_Item{ - SourcePrefix: key, + Source: &Ydb_Import.ImportFromS3Settings_Item_SourcePrefix{ + SourcePrefix: key, + }, DestinationPath: path.Join( dbName, s3Settings.DestinationPrefix, @@ -341,31 +350,47 @@ func prepareItemsForImport(dbName string, s3Client S3API, s3Settings types.Impor if err != nil { return nil, err } + + if len(*itemsPtr) == 0 { + return nil, fmt.Errorf("empty list of items for import") + } + return *itemsPtr, nil } -func (d *ClientYdbConnector) ImportFromS3(ctx context.Context, clientDb *ydb.Driver, s3Settings types.ImportSettings) (string, error) { +func (d *ClientYdbConnector) ImportFromS3( + ctx context.Context, clientDb *ydb.Driver, s3Settings types.ImportSettings, featureFlags config.FeatureFlagsConfig, +) (string, error) { if clientDb == nil { return "", fmt.Errorf("unititialized client db driver") } - s := session.Must(session.NewSession()) - s3Client := s3.New(s, - &aws.Config{ - Region: &s3Settings.Region, - Credentials: credentials.NewStaticCredentials(s3Settings.AccessKey, s3Settings.SecretKey, ""), - Endpoint: &s3Settings.Endpoint, - S3ForcePathStyle: &s3Settings.S3ForcePathStyle, - }, - ) - - items, err := prepareItemsForImport(clientDb.Name(), s3Client, s3Settings) - if err != nil { - return "", fmt.Errorf("error preparing list of items for import: %s", err.Error()) - } + var items []*Ydb_Import.ImportFromS3Settings_Item + if featureFlags.EnableNewPathsFormat { + items = make([]*Ydb_Import.ImportFromS3Settings_Item, 0, len(s3Settings.SourcePaths)) + for sourcePath, _ := range s3Settings.SourcePaths { + items = append(items, &Ydb_Import.ImportFromS3Settings_Item{ + Source: &Ydb_Import.ImportFromS3Settings_Item_SourcePath{ + SourcePath: sourcePath[len(s3Settings.BucketDbRoot):], + }, + }) + } + } else { + s := session.Must(session.NewSession()) + s3Client := s3.New(s, + &aws.Config{ + Region: &s3Settings.Region, + Credentials: credentials.NewStaticCredentials(s3Settings.AccessKey, s3Settings.SecretKey, ""), + Endpoint: &s3Settings.Endpoint, + S3ForcePathStyle: &s3Settings.S3ForcePathStyle, + }, + ) - if len(items) == 0 { - return "", fmt.Errorf("empty list of items for import") + var err error + items, err = prepareItemsForImport(clientDb.Name(), s3Client, s3Settings) + if err != nil { + return "", fmt.Errorf("error preparing list of items for import: %s", err.Error()) + } } importClient := Ydb_Import_V1.NewImportServiceClient(ydb.GRPCConn(clientDb)) @@ -377,26 +402,30 @@ func (d *ClientYdbConnector) ImportFromS3(ctx context.Context, clientDb *ydb.Dri zap.String("S3Description", s3Settings.Description), ) - response, err := importClient.ImportFromS3( - ctx, - &Ydb_Import.ImportFromS3Request{ - OperationParams: &Ydb_Operations.OperationParams{ - OperationTimeout: durationpb.New(time.Second), - CancelAfter: durationpb.New(time.Second), - }, - Settings: &Ydb_Import.ImportFromS3Settings{ - Endpoint: s3Settings.Endpoint, - Bucket: s3Settings.Bucket, - Region: s3Settings.Region, - AccessKey: s3Settings.AccessKey, - SecretKey: s3Settings.SecretKey, - Description: s3Settings.Description, - NumberOfRetries: s3Settings.NumberOfRetries, - Items: items, - DisableVirtualAddressing: s3Settings.S3ForcePathStyle, - }, + importRequest := &Ydb_Import.ImportFromS3Request{ + OperationParams: &Ydb_Operations.OperationParams{ + OperationTimeout: durationpb.New(time.Second), + CancelAfter: durationpb.New(time.Second), }, - ) + Settings: &Ydb_Import.ImportFromS3Settings{ + Endpoint: s3Settings.Endpoint, + Bucket: s3Settings.Bucket, + Region: s3Settings.Region, + AccessKey: s3Settings.AccessKey, + SecretKey: s3Settings.SecretKey, + Description: s3Settings.Description, + NumberOfRetries: s3Settings.NumberOfRetries, + Items: items, + DisableVirtualAddressing: s3Settings.S3ForcePathStyle, + }, + } + + if featureFlags.EnableNewPathsFormat { + importRequest.Settings.SourcePrefix = s3Settings.BucketDbRoot + importRequest.Settings.DestinationPath = path.Join(clientDb.Name(), s3Settings.DestinationPrefix) + } + + response, err := importClient.ImportFromS3(ctx, importRequest) if err != nil { return "", fmt.Errorf("error importing from s3: %w", err) diff --git a/internal/connectors/client/mock.go b/internal/connectors/client/mock.go index 7de3a451..5da51a20 100644 --- a/internal/connectors/client/mock.go +++ b/internal/connectors/client/mock.go @@ -5,6 +5,7 @@ import ( "fmt" "path" "strings" + "ydbcp/internal/config" "ydbcp/internal/types" @@ -77,7 +78,7 @@ func (m *MockClientConnector) PreparePathsForExport( return sourcePaths, nil } -func (m *MockClientConnector) ExportToS3(_ context.Context, _ *ydb.Driver, s3Settings types.ExportSettings) (string, error) { +func (m *MockClientConnector) ExportToS3(_ context.Context, _ *ydb.Driver, s3Settings types.ExportSettings, _ config.FeatureFlagsConfig) (string, error) { objects := make([]ObjectPath, 0) for _, source := range s3Settings.SourcePaths { objectPath := ObjectPath{Bucket: s3Settings.Bucket, KeyPrefix: path.Join(s3Settings.DestinationPrefix, source)} @@ -102,7 +103,7 @@ func (m *MockClientConnector) ExportToS3(_ context.Context, _ *ydb.Driver, s3Set return newOp.Id, nil } -func (m *MockClientConnector) ImportFromS3(_ context.Context, _ *ydb.Driver, s3Settings types.ImportSettings) (string, error) { +func (m *MockClientConnector) ImportFromS3(_ context.Context, _ *ydb.Driver, s3Settings types.ImportSettings, _ config.FeatureFlagsConfig) (string, error) { for source := range s3Settings.SourcePaths { objectPath := ObjectPath{Bucket: s3Settings.Bucket, KeyPrefix: source} if !m.storage[objectPath] { diff --git a/internal/connectors/client/prepare_items_test.go b/internal/connectors/client/prepare_items_test.go index 2c904e68..da10a230 100644 --- a/internal/connectors/client/prepare_items_test.go +++ b/internal/connectors/client/prepare_items_test.go @@ -44,15 +44,21 @@ func TestPrepareItemsForImport(t *testing.T) { assert.NoError(t, err) expected := []Ydb_Import.ImportFromS3Settings_Item{ { - SourcePrefix: "local/table_1/", + Source: &Ydb_Import.ImportFromS3Settings_Item_SourcePrefix{ + SourcePrefix: "local/table_1/", + }, DestinationPath: "/cluster/local/table_1", }, { - SourcePrefix: "local/table_2/", + Source: &Ydb_Import.ImportFromS3Settings_Item_SourcePrefix{ + SourcePrefix: "local/table_2/", + }, DestinationPath: "/cluster/local/table_2", }, { - SourcePrefix: "local/folder/table_3/", + Source: &Ydb_Import.ImportFromS3Settings_Item_SourcePrefix{ + SourcePrefix: "local/folder/table_3/", + }, DestinationPath: "/cluster/local/folder/table_3", }, } @@ -64,15 +70,21 @@ func TestPrepareItemsForImport(t *testing.T) { expected = []Ydb_Import.ImportFromS3Settings_Item{ { - SourcePrefix: "local/table_1/", + Source: &Ydb_Import.ImportFromS3Settings_Item_SourcePrefix{ + SourcePrefix: "local/table_1/", + }, DestinationPath: "/cluster/local/prefix/table_1", }, { - SourcePrefix: "local/table_2/", + Source: &Ydb_Import.ImportFromS3Settings_Item_SourcePrefix{ + SourcePrefix: "local/table_2/", + }, DestinationPath: "/cluster/local/prefix/table_2", }, { - SourcePrefix: "local/folder/table_3/", + Source: &Ydb_Import.ImportFromS3Settings_Item_SourcePrefix{ + SourcePrefix: "local/folder/table_3/", + }, DestinationPath: "/cluster/local/prefix/folder/table_3", }, } @@ -84,11 +96,15 @@ func TestPrepareItemsForImport(t *testing.T) { expected = []Ydb_Import.ImportFromS3Settings_Item{ { - SourcePrefix: "local/table_1/", + Source: &Ydb_Import.ImportFromS3Settings_Item_SourcePrefix{ + SourcePrefix: "local/table_1/", + }, DestinationPath: "/cluster/local/prefix/table_1", }, { - SourcePrefix: "local/folder/table_3/", + Source: &Ydb_Import.ImportFromS3Settings_Item_SourcePrefix{ + SourcePrefix: "local/folder/table_3/", + }, DestinationPath: "/cluster/local/prefix/folder/table_3", }, } diff --git a/internal/handlers/take_backup_retry.go b/internal/handlers/take_backup_retry.go index 1522854c..7e340871 100644 --- a/internal/handlers/take_backup_retry.go +++ b/internal/handlers/take_backup_retry.go @@ -25,13 +25,14 @@ import ( func NewTBWROperationHandler( db db.DBConnector, client client.ClientConnector, - s3 config.S3Config, - clientConfig config.ClientConnectionConfig, queryBuilderFactory queries.WriteQueryBuilderFactory, clock clockwork.Clock, + config config.Config, ) types.OperationHandler { return func(ctx context.Context, op types.Operation) error { - err := TBWROperationHandler(ctx, op, db, client, s3, clientConfig, queryBuilderFactory, clock) + err := TBWROperationHandler( + ctx, op, db, client, config.S3, config.ClientConnection, queryBuilderFactory, clock, config.FeatureFlags, + ) if err == nil { metrics.GlobalMetricsRegistry.ReportOperationMetrics(op) } @@ -207,6 +208,7 @@ func TBWROperationHandler( clientConfig config.ClientConnectionConfig, queryBuilderFactory queries.WriteQueryBuilderFactory, clock clockwork.Clock, + featureFlags config.FeatureFlagsConfig, ) error { ctx = xlog.With(ctx, zap.String("OperationID", operation.GetID())) @@ -299,6 +301,7 @@ func TBWROperationHandler( backup_operations.FromTBWROperation(tbwr), types.OperationCreatorName, clock, + featureFlags, ) tbwr.IncRetries() diff --git a/internal/handlers/take_backup_retry_test.go b/internal/handlers/take_backup_retry_test.go index bb7d2bc7..76418bb4 100644 --- a/internal/handlers/take_backup_retry_test.go +++ b/internal/handlers/take_backup_retry_test.go @@ -353,10 +353,9 @@ func TestTBWRHandlerSuccess(t *testing.T) { handler := NewTBWROperationHandler( dbConnector, clientConnector, - config.S3Config{}, - config.ClientConnectionConfig{}, queries.NewWriteTableQueryMock, clock, + config.Config{}, ) err := handler(ctx, &tbwr) assert.Empty(t, err) @@ -418,10 +417,9 @@ func TestTBWRHandlerSkipRunning(t *testing.T) { handler := NewTBWROperationHandler( dbConnector, clientConnector, - config.S3Config{}, - config.ClientConnectionConfig{}, queries.NewWriteTableQueryMock, clock, + config.Config{}, ) err := handler(ctx, &tbwr) assert.Empty(t, err) @@ -484,10 +482,9 @@ func TestTBWRHandlerSkipError(t *testing.T) { handler := NewTBWROperationHandler( dbConnector, clientConnector, - config.S3Config{}, - config.ClientConnectionConfig{}, queries.NewWriteTableQueryMock, clockwork.NewFakeClockAt(t3.AsTime()), + config.Config{}, ) err := handler(ctx, &tbwr) assert.Empty(t, err) @@ -542,10 +539,9 @@ func TestTBWRHandlerError(t *testing.T) { handler := NewTBWROperationHandler( dbConnector, clientConnector, - config.S3Config{}, - config.ClientConnectionConfig{}, queries.NewWriteTableQueryMock, clock, + config.Config{}, ) err := handler(ctx, &tbwr) assert.Empty(t, err) @@ -597,15 +593,17 @@ func TestTBWRHandlerAlwaysRunOnce(t *testing.T) { handler := NewTBWROperationHandler( dbConnector, clientConnector, - config.S3Config{ - IsMock: true, - }, - config.ClientConnectionConfig{ - AllowedEndpointDomains: []string{".valid.com"}, - AllowInsecureEndpoint: true, - }, queries.NewWriteTableQueryMock, clock, + config.Config{ + S3: config.S3Config{ + IsMock: true, + }, + ClientConnection: config.ClientConnectionConfig{ + AllowedEndpointDomains: []string{".valid.com"}, + AllowInsecureEndpoint: true, + }, + }, ) err := handler(ctx, &tbwr) assert.Empty(t, err) @@ -669,15 +667,17 @@ func TestTBWRHandlerEmptyDatabase(t *testing.T) { handler := NewTBWROperationHandler( dbConnector, clientConnector, - config.S3Config{ - IsMock: true, - }, - config.ClientConnectionConfig{ - AllowedEndpointDomains: []string{".valid.com"}, - AllowInsecureEndpoint: true, - }, queries.NewWriteTableQueryMock, clock, + config.Config{ + S3: config.S3Config{ + IsMock: true, + }, + ClientConnection: config.ClientConnectionConfig{ + AllowedEndpointDomains: []string{".valid.com"}, + AllowInsecureEndpoint: true, + }, + }, ) err := handler(ctx, &tbwr) assert.Empty(t, err) @@ -742,15 +742,17 @@ func TestTBWRHandlerInvalidEndpointRetry(t *testing.T) { handler := NewTBWROperationHandler( dbConnector, clientConnector, - config.S3Config{ - IsMock: true, - }, - config.ClientConnectionConfig{ - AllowedEndpointDomains: []string{".valid.com"}, - AllowInsecureEndpoint: true, - }, queries.NewWriteTableQueryMock, clockwork.NewFakeClockAt(t1.AsTime()), + config.Config{ + S3: config.S3Config{ + IsMock: true, + }, + ClientConnection: config.ClientConnectionConfig{ + AllowedEndpointDomains: []string{".valid.com"}, + AllowInsecureEndpoint: true, + }, + }, ) err := handler(ctx, &tbwr) assert.Empty(t, err) @@ -800,15 +802,17 @@ func TestTBWRHandlerInvalidEndpointError(t *testing.T) { handler := NewTBWROperationHandler( dbConnector, clientConnector, - config.S3Config{ - IsMock: true, - }, - config.ClientConnectionConfig{ - AllowedEndpointDomains: []string{".valid.com"}, - AllowInsecureEndpoint: true, - }, queries.NewWriteTableQueryMock, clockwork.NewFakeClockAt(t1.AsTime()), + config.Config{ + S3: config.S3Config{ + IsMock: true, + }, + ClientConnection: config.ClientConnectionConfig{ + AllowedEndpointDomains: []string{".valid.com"}, + AllowInsecureEndpoint: true, + }, + }, ) err := handler(ctx, &tbwr) assert.Empty(t, err) @@ -863,15 +867,17 @@ func TestTBWRHandlerStartCancel(t *testing.T) { handler := NewTBWROperationHandler( dbConnector, clientConnector, - config.S3Config{ - IsMock: true, - }, - config.ClientConnectionConfig{ - AllowedEndpointDomains: []string{".valid.com"}, - AllowInsecureEndpoint: true, - }, queries.NewWriteTableQueryMock, clockwork.NewFakeClockAt(t1.AsTime()), + config.Config{ + S3: config.S3Config{ + IsMock: true, + }, + ClientConnection: config.ClientConnectionConfig{ + AllowedEndpointDomains: []string{".valid.com"}, + AllowInsecureEndpoint: true, + }, + }, ) err := handler(ctx, &tbwr) assert.Empty(t, err) @@ -940,15 +946,17 @@ func TestTBWRHandlerFullCancel(t *testing.T) { handler := NewTBWROperationHandler( dbConnector, clientConnector, - config.S3Config{ - IsMock: true, - }, - config.ClientConnectionConfig{ - AllowedEndpointDomains: []string{".valid.com"}, - AllowInsecureEndpoint: true, - }, queries.NewWriteTableQueryMock, clock, + config.Config{ + S3: config.S3Config{ + IsMock: true, + }, + ClientConnection: config.ClientConnectionConfig{ + AllowedEndpointDomains: []string{".valid.com"}, + AllowInsecureEndpoint: true, + }, + }, ) err := handler(ctx, &tbwr) assert.Empty(t, err) diff --git a/internal/server/services/backup/backup_service.go b/internal/server/services/backup/backup_service.go index 7d79553d..b97550bf 100644 --- a/internal/server/services/backup/backup_service.go +++ b/internal/server/services/backup/backup_service.go @@ -37,6 +37,7 @@ type BackupService struct { allowedEndpointDomains []string allowInsecureEndpoint bool clock clockwork.Clock + featureFlags config.FeatureFlagsConfig } func (s *BackupService) IncApiCallsCounter(methodName string, code codes.Code) { @@ -140,7 +141,7 @@ func (s *BackupService) MakeBackup(ctx context.Context, req *pb.MakeBackupReques tbwr.Ttl = &d } - err = backup_operations.OpenConnAndValidateSourcePaths(ctx, backup_operations.FromTBWROperation(tbwr), s.clientConn) + err = backup_operations.OpenConnAndValidateSourcePaths(ctx, backup_operations.FromTBWROperation(tbwr), s.clientConn, s.featureFlags) if err != nil { grpcError := backup_operations.ErrToStatus(err) s.IncApiCallsCounter(methodName, status.Code(grpcError)) @@ -319,7 +320,7 @@ func (s *BackupService) MakeRestore(ctx context.Context, req *pb.MakeRestoreRequ return nil, status.Errorf(codes.FailedPrecondition, "backup is not available, status %s", backup.Status) } - if backup.SourcePaths == nil || len(backup.SourcePaths) == 0 { + if backup.Empty() { xlog.Info(ctx, "called restore for empty backup") s.IncApiCallsCounter(methodName, codes.FailedPrecondition) return nil, status.Errorf(codes.FailedPrecondition, "backup is empty, status %s", backup.Status) @@ -382,7 +383,7 @@ func (s *BackupService) MakeRestore(ctx context.Context, req *pb.MakeRestoreRequ DestinationPrefix: req.GetDestinationPrefix(), } - clientOperationID, err := s.clientConn.ImportFromS3(ctx, clientDriver, s3Settings) + clientOperationID, err := s.clientConn.ImportFromS3(ctx, clientDriver, s3Settings, s.featureFlags) if err != nil { xlog.Error(ctx, "can't start import operation", zap.Error(err)) s.IncApiCallsCounter(methodName, codes.Unknown) @@ -595,18 +596,17 @@ func (s *BackupService) Register(server server.Server) { func NewBackupService( driver db.DBConnector, clientConn client.ClientConnector, - s3 config.S3Config, auth ap.AuthProvider, - allowedEndpointDomains []string, - allowInsecureEndpoint bool, + config config.Config, ) *BackupService { return &BackupService{ driver: driver, clientConn: clientConn, - s3: s3, + s3: config.S3, auth: auth, - allowedEndpointDomains: allowedEndpointDomains, - allowInsecureEndpoint: allowInsecureEndpoint, + allowedEndpointDomains: config.ClientConnection.AllowedEndpointDomains, + allowInsecureEndpoint: config.ClientConnection.AllowInsecureEndpoint, clock: clockwork.NewRealClock(), + featureFlags: config.FeatureFlags, } } diff --git a/internal/server/services/backup_schedule/backup_schedule_service.go b/internal/server/services/backup_schedule/backup_schedule_service.go index 6dbe6015..3ffd00f2 100644 --- a/internal/server/services/backup_schedule/backup_schedule_service.go +++ b/internal/server/services/backup_schedule/backup_schedule_service.go @@ -159,7 +159,7 @@ func (s *BackupScheduleService) CreateBackupSchedule( } err = backup_operations.OpenConnAndValidateSourcePaths( - ctx, backup_operations.FromBackupSchedule(&schedule), s.clientConn, + ctx, backup_operations.FromBackupSchedule(&schedule), s.clientConn, s.config.FeatureFlags, ) if err != nil { return nil, err @@ -278,7 +278,7 @@ func (s *BackupScheduleService) UpdateBackupSchedule( } err = backup_operations.OpenConnAndValidateSourcePaths( - ctx, backup_operations.FromBackupSchedule(schedule), s.clientConn, + ctx, backup_operations.FromBackupSchedule(schedule), s.clientConn, s.config.FeatureFlags, ) if err != nil { return nil, err diff --git a/internal/types/backup.go b/internal/types/backup.go index 0c12f0be..4b775f51 100644 --- a/internal/types/backup.go +++ b/internal/types/backup.go @@ -106,3 +106,7 @@ func (o *Backup) SetCompletedAt(completedAt *timestamppb.Timestamp) { o.AuditInfo = &pb.AuditInfo{CompletedAt: completedAt} } } + +func (o *Backup) Empty() bool { + return len(o.S3Endpoint) == 0 && len(o.S3Region) == 0 && len(o.S3Bucket) == 0 && len(o.S3PathPrefix) == 0 +} diff --git a/local_config.yaml b/local_config.yaml index ac38db52..f79b8c2c 100644 --- a/local_config.yaml +++ b/local_config.yaml @@ -32,4 +32,7 @@ metrics_server: schedules_limit_per_db: 1 processor_interval_seconds: 2 -duplicate_log_to_file: /var/log/main.log \ No newline at end of file +duplicate_log_to_file: /var/log/main.log + +feature_flags: + enable_new_paths_format: ${ENABLE_NEW_PATHS_FORMAT} \ No newline at end of file