From 2173f6e0f144dc23cbfa9745cd655cb46d6bb45a Mon Sep 17 00:00:00 2001 From: minguyen <136291932+minguyen-jumptrading@users.noreply.github.com> Date: Thu, 28 Sep 2023 11:57:42 +0800 Subject: [PATCH] Feature/add gcs client pool (#1) * add connection to gcs and use different context for upload incase it got cancel by another thread * save * keep ctx * keep ctx * use v2 * change to GCS_CLIENT_POOL_SIZE * pin zookeeper to 3.8.2 version for resolve incompatibility between clickhouse and zookeeper 3.9.0, see details in https://github.com/apache/zookeeper/pull/1837#issuecomment-1066095370 return `:latest` default value after resolve https://github.com/ClickHouse/ClickHouse/issues/53749 * Revert "add more precise disk re-balancing for not exists disks, during download, partial fix https://github.com/Altinity/clickhouse-backup/issues/561" This reverts commit 20e250c5a5bbdc9a82845088ee522ef65a262a16. * fix S3 head object Server Side Encryption parameters, fix https://github.com/Altinity/clickhouse-backup/issues/709 * change timeout to 60m, TODO make tests Parallel --------- Co-authored-by: Slach --- .github/workflows/build.yaml | 2 +- .gitignore | 3 +- ChangeLog.md | 8 +- go.mod | 1 - go.sum | 2 - pkg/backup/download.go | 54 +-------- pkg/backup/list.go | 3 +- pkg/backup/upload.go | 11 +- pkg/clickhouse/clickhouse.go | 27 ++--- pkg/clickhouse/structs.go | 9 +- pkg/config/config.go | 7 +- pkg/storage/gcs.go | 113 ++++++++++++++++-- pkg/storage/s3.go | 29 ++++- test/integration/docker-compose.yml | 3 +- .../docker-compose/docker-compose.yml | 3 +- 15 files changed, 164 insertions(+), 111 deletions(-) diff --git a/.github/workflows/build.yaml b/.github/workflows/build.yaml index b22e5133..76dd5dfc 100644 --- a/.github/workflows/build.yaml +++ b/.github/workflows/build.yaml @@ -297,7 +297,7 @@ jobs: export CLICKHOUSE_BACKUP_BIN="$(pwd)/clickhouse-backup/clickhouse-backup-race" docker-compose -f test/integration/${COMPOSE_FILE} up -d || ( docker-compose -f test/integration/${COMPOSE_FILE} ps -a && docker-compose -f test/integration/${COMPOSE_FILE} logs clickhouse && exit 1 ) docker-compose -f test/integration/${COMPOSE_FILE} ps -a - go test -timeout 30m -failfast -tags=integration -run "${RUN_TESTS:-.+}" -v test/integration/integration_test.go + go test -timeout 60m -failfast -tags=integration -run "${RUN_TESTS:-.+}" -v test/integration/integration_test.go - name: Format integration coverage run: | sudo chmod -Rv a+rw test/integration/_coverage_/ diff --git a/.gitignore b/.gitignore index 646104a8..5d635a69 100644 --- a/.gitignore +++ b/.gitignore @@ -12,4 +12,5 @@ build/ _instances/ _coverage_/ __pycache__/ -*.py[cod] \ No newline at end of file +*.py[cod] +vendor/ \ No newline at end of file diff --git a/ChangeLog.md b/ChangeLog.md index 665e9b6a..db336c9b 100644 --- a/ChangeLog.md +++ b/ChangeLog.md @@ -4,12 +4,11 @@ IMPROVEMENTS - Implementation blacklist for table engines during backup / download / upload / restore [537](https://github.com/Altinity/clickhouse-backup/issues/537) - restore RBAC / configs, refactoring restart clickhouse-server via `sql:SYSTEM SHUTDOWN` or `exec:systemctl restart clickhouse-server`, add `--rbac-only` and `--configs-only` options to `create`, `upload`, `download`, `restore` command. fix [706]https://github.com/Altinity/clickhouse-backup/issues/706 - Backup/Restore RBAC related objects from Zookeeper via direct connection to zookeeper/keeper, fix [604](https://github.com/Altinity/clickhouse-backup/issues/604) -- add `SHARDED_OPERATION_MODE` option, to easy create backup for sharded cluster, available values `none` (no sharding), `table` (table granularity), `database` (database granularity), `first-replica` (on the lexicographically sorted first active replica), thanks @mskwon, fix [639](https://github.com/Altinity/clickhouse-backup/issues/639), fix [648](https://github.com/Altinity/clickhouse-backup/pull/648) -- add support for `compression_format: none` for upload and download backups created with `--rbac` / `--rbac-only` or `--configs` / `--configs-only` options, fix [713](https://github.com/Altinity/clickhouse-backup/issues/713) -- add support for s3 `GLACIER` storage class, when GET return error, then, it requires 5 minutes per key and restore could be slow. Use `GLACIER_IR`, it looks more robust, fix [614](https://github.com/Altinity/clickhouse-backup/issues/614) +- Add `SHARDED_OPERATION_MODE` option, to easy create backup for sharded cluster, available values `none` (no sharding), `table` (table granularity), `database` (database granularity), `first-replica` (on the lexicographically sorted first active replica), thanks @mskwon, fix [639](https://github.com/Altinity/clickhouse-backup/issues/639), fix [648](https://github.com/Altinity/clickhouse-backup/pull/648) +- Add support for `compression_format: none` for upload and download backups created with `--rbac` / `--rbac-only` or `--configs` / `--configs-only` options, fix [713](https://github.com/Altinity/clickhouse-backup/issues/713) +- Add support for s3 `GLACIER` storage class, when GET return error, then, it requires 5 minutes per key and restore could be slow. Use `GLACIER_IR`, it looks more robust, fix [614](https://github.com/Altinity/clickhouse-backup/issues/614) - restore functions via `CREATE OR REPLACE` for more atomic behavior - prepare to Make ./tests/integration/ test parallel fix [721](https://github.com/Altinity/clickhouse-backup/issues/721) -- add more precise disk re-balancing for not exists disks, during download, partial fix [561](https://github.com/Altinity/clickhouse-backup/issues/561) BUG FIXES - fix possible create backup failures during UNFREEZE not exists tables, affected 2.2.7+ version, fix [704](https://github.com/Altinity/clickhouse-backup/issues/704) @@ -17,6 +16,7 @@ BUG FIXES - fix `--rbac` behavior when /var/lib/clickhouse/access not exists - fix `skip_database_engines` behavior - fix `skip_databases` behavior during restore for corner case `db.prefix*` and corner cases when conflict with `--tables="*pattern.*"`, fix [663](https://github.com/Altinity/clickhouse-backup/issues/663) +- fix S3 head object Server Side Encryption parameters, fix [709](https://github.com/Altinity/clickhouse-backup/issues/709) # v2.3.2 BUG FIXES diff --git a/go.mod b/go.mod index 2c2c7c93..2d4dcbad 100644 --- a/go.mod +++ b/go.mod @@ -34,7 +34,6 @@ require ( github.com/pkg/errors v0.9.1 github.com/pkg/sftp v1.13.5 github.com/prometheus/client_golang v1.15.1 - github.com/ricochet2200/go-disk-usage/du v0.0.0-20210707232629-ac9918953285 github.com/stretchr/testify v1.8.4 github.com/tencentyun/cos-go-sdk-v5 v0.7.41 github.com/urfave/cli v1.22.14 diff --git a/go.sum b/go.sum index 6850d4ae..6c66b79e 100644 --- a/go.sum +++ b/go.sum @@ -338,8 +338,6 @@ github.com/prometheus/common v0.44.0 h1:+5BrQJwiBB9xsMygAB3TNvpQKOwlkc25LbISbrdO github.com/prometheus/common v0.44.0/go.mod h1:ofAIvZbQ1e/nugmZGz4/qCb9Ap1VoSTIO7x0VV9VvuY= github.com/prometheus/procfs v0.10.1 h1:kYK1Va/YMlutzCGazswoHKo//tZVlFpKYh+PymziUAg= github.com/prometheus/procfs v0.10.1/go.mod h1:nwNm2aOCAYw8uTR/9bWRREkZFxAUcWzPHWJq+XBB/FM= -github.com/ricochet2200/go-disk-usage/du v0.0.0-20210707232629-ac9918953285 h1:d54EL9l+XteliUfUCGsEwwuk65dmmxX85VXF+9T6+50= -github.com/ricochet2200/go-disk-usage/du v0.0.0-20210707232629-ac9918953285/go.mod h1:fxIDly1xtudczrZeOOlfaUvd2OPb2qZAPuWdU2BsBTk= github.com/rivo/uniseg v0.2.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc= github.com/rivo/uniseg v0.4.4 h1:8TfxU8dW6PdqD27gjM8MVNuicgxIjxpm4K7x4jp8sis= github.com/rivo/uniseg v0.4.4/go.mod h1:FN3SvrM+Zdj16jyLfmOkMNblXMcoc8DfTHruCPUcx88= diff --git a/pkg/backup/download.go b/pkg/backup/download.go index dca2fda3..ca6ff65c 100644 --- a/pkg/backup/download.go +++ b/pkg/backup/download.go @@ -15,7 +15,6 @@ import ( "github.com/eapache/go-resiliency/retrier" "io" "io/fs" - "math/rand" "os" "path" "path/filepath" @@ -220,16 +219,9 @@ func (b *Backuper) Download(backupName string, tablePattern string, partitions [ if !schemaOnly { for _, t := range tableMetadataAfterDownload { for disk := range t.Parts { - // https://github.com/Altinity/clickhouse-backup/issues/561, @todo think about calculate size of each part and rebalancing t.Parts if _, diskExists := b.DiskToPathMap[disk]; !diskExists && disk != b.cfg.ClickHouse.EmbeddedBackupDisk { - var diskPath string - var diskPathErr error - diskPath, disks, diskPathErr = b.getDiskPathForNonExistsDisk(disk, disks, remoteBackup, t) - if diskPathErr != nil { - return diskPathErr - } - b.DiskToPathMap[disk] = diskPath - log.Warnf("table '%s.%s' require disk '%s' that not found in clickhouse table system.disks, you can add nonexistent disks to `disk_mapping` in `clickhouse` config section, data will download to %s", t.Database, t.Table, disk, b.DiskToPathMap[disk]) + b.DiskToPathMap[disk] = b.DiskToPathMap["default"] + log.Warnf("table '%s.%s' require disk '%s' that not found in clickhouse table system.disks, you can add nonexistent disks to `disk_mapping` in `clickhouse` config section, data will download to %s", t.Database, t.Table, disk, b.DiskToPathMap["default"]) } } } @@ -972,45 +964,3 @@ func (b *Backuper) downloadSingleBackupFile(ctx context.Context, remoteFile stri } return nil } - -// getDiskPathForNonExistsDisk - https://github.com/Altinity/clickhouse-backup/issues/561 -// allow to Restore to new server with different storage policy, different disk count, -// implements `least_used` for normal disk and `round_robin` for Object disks -func (b *Backuper) getDiskPathForNonExistsDisk(disk string, disks []clickhouse.Disk, remoteBackup storage.Backup, t metadata.TableMetadata) (string, []clickhouse.Disk, error) { - diskType, ok := remoteBackup.DiskTypes[disk] - if !ok { - return "", disks, fmt.Errorf("diskType for %s not found in %#v in %s/metadata.json", disk, remoteBackup.DiskTypes, remoteBackup.BackupName) - } - var filteredDisks []clickhouse.Disk - for _, d := range disks { - if !d.IsBackup && d.Type == diskType { - filteredDisks = append(filteredDisks, d) - } - } - if len(filteredDisks) == 0 { - return "", disks, fmt.Errorf("diskType: %s not found in system.disks", diskType) - } - // round_robin for non-local disks - if diskType != "local" { - roundRobinIdx := rand.Intn(len(filteredDisks)) - return filteredDisks[roundRobinIdx].Path, disks, nil - } - // least_used for local - freeSpace := uint64(0) - leastUsedIdx := -1 - for idx, d := range filteredDisks { - if filteredDisks[idx].FreeSpace > freeSpace { - freeSpace = d.FreeSpace - leastUsedIdx = idx - } - } - if leastUsedIdx < 0 { - return "", disks, fmt.Errorf("%s free space, not found in system.disks with `local` type", utils.FormatBytes(t.TotalBytes)) - } - for idx := range disks { - if disks[idx].Name == filteredDisks[leastUsedIdx].Name { - disks[idx].FreeSpace -= t.TotalBytes - } - } - return filteredDisks[leastUsedIdx].Path, disks, nil -} diff --git a/pkg/backup/list.go b/pkg/backup/list.go index 1435589a..25952416 100644 --- a/pkg/backup/list.go +++ b/pkg/backup/list.go @@ -4,7 +4,6 @@ import ( "context" "encoding/json" "fmt" - "github.com/ricochet2200/go-disk-usage/du" "io" "os" "path" @@ -174,7 +173,7 @@ func (b *Backuper) GetLocalBackups(ctx context.Context, disks []clickhouse.Disk) } if disks == nil { disks = []clickhouse.Disk{ - {Name: "default", Path: "/var/lib/clickhouse", Type: "local", FreeSpace: du.NewDiskUsage("/var/lib/clickhouse").Free()}, + {Name: "default", Path: "/var/lib/clickhouse"}, } } defaultDataPath, err := b.ch.GetDefaultPath(disks) diff --git a/pkg/backup/upload.go b/pkg/backup/upload.go index 9901b237..f58ee1d7 100644 --- a/pkg/backup/upload.go +++ b/pkg/backup/upload.go @@ -5,11 +5,6 @@ import ( "context" "encoding/json" "fmt" - "github.com/Altinity/clickhouse-backup/pkg/clickhouse" - "github.com/Altinity/clickhouse-backup/pkg/custom" - "github.com/Altinity/clickhouse-backup/pkg/resumable" - "github.com/Altinity/clickhouse-backup/pkg/status" - "github.com/eapache/go-resiliency/retrier" "io" "os" "path" @@ -20,6 +15,12 @@ import ( "sync/atomic" "time" + "github.com/Altinity/clickhouse-backup/pkg/clickhouse" + "github.com/Altinity/clickhouse-backup/pkg/custom" + "github.com/Altinity/clickhouse-backup/pkg/resumable" + "github.com/Altinity/clickhouse-backup/pkg/status" + "github.com/eapache/go-resiliency/retrier" + "golang.org/x/sync/errgroup" "golang.org/x/sync/semaphore" diff --git a/pkg/clickhouse/clickhouse.go b/pkg/clickhouse/clickhouse.go index 5b36ec33..f1b809ca 100644 --- a/pkg/clickhouse/clickhouse.go +++ b/pkg/clickhouse/clickhouse.go @@ -10,7 +10,6 @@ import ( "github.com/Altinity/clickhouse-backup/pkg/common" "github.com/ClickHouse/clickhouse-go/v2/lib/driver" "github.com/antchfx/xmlquery" - "github.com/ricochet2200/go-disk-usage/du" "os" "path" "path/filepath" @@ -30,6 +29,7 @@ type ClickHouse struct { Config *config.ClickHouseConfig Log *apexLog.Entry conn driver.Conn + disks []Disk version int isPartsColumnPresent int8 IsOpen bool @@ -218,10 +218,9 @@ func (ch *ClickHouse) getDisksFromSystemSettings(ctx context.Context) ([]Disk, e dataPathArray := strings.Split(metadataPath, "/") clickhouseData := path.Join(dataPathArray[:len(dataPathArray)-1]...) return []Disk{{ - Name: "default", - Path: path.Join("/", clickhouseData), - Type: "local", - FreeSpace: du.NewDiskUsage(path.Join("/", clickhouseData)).Free(), + Name: "default", + Path: path.Join("/", clickhouseData), + Type: "local", }}, nil } } @@ -252,24 +251,18 @@ func (ch *ClickHouse) getDisksFromSystemDisks(ctx context.Context) ([]Disk, erro case <-ctx.Done(): return nil, ctx.Err() default: - type DiskFields struct { - DiskTypePresent uint64 `ch:"is_disk_type_present"` - FreeSpacePresent uint64 `ch:"is_free_space_present"` - } - diskFields := make([]DiskFields, 0) - if err := ch.SelectContext(ctx, &diskFields, "SELECT countIf(name='type') AS is_disk_type_present, countIf(name='free_space') AS is_free_space_present FROM system.columns WHERE database='system' AND table='disks'"); err != nil { + isDiskType := make([]struct { + Present uint64 `ch:"is_disk_type_present"` + }, 0) + if err := ch.SelectContext(ctx, &isDiskType, "SELECT count() is_disk_type_present FROM system.columns WHERE database='system' AND table='disks' AND name='type'"); err != nil { return nil, err } diskTypeSQL := "'local'" - if len(diskFields) > 0 && diskFields[0].DiskTypePresent > 0 { + if len(isDiskType) > 0 && isDiskType[0].Present > 0 { diskTypeSQL = "any(type)" } - diskFreeSpaceSQL := "0" - if len(diskFields) > 0 && diskFields[0].FreeSpacePresent > 0 { - diskFreeSpaceSQL = "min(free_space)" - } var result []Disk - query := fmt.Sprintf("SELECT path, any(name) AS name, %s AS type, %s AS free_space FROM system.disks GROUP BY path", diskTypeSQL, diskFreeSpaceSQL) + query := fmt.Sprintf("SELECT path, any(name) AS name, %s AS type FROM system.disks GROUP BY path", diskTypeSQL) err := ch.SelectContext(ctx, &result, query) return result, err } diff --git a/pkg/clickhouse/structs.go b/pkg/clickhouse/structs.go index 42c46cb0..dcbc7904 100644 --- a/pkg/clickhouse/structs.go +++ b/pkg/clickhouse/structs.go @@ -38,11 +38,10 @@ type IsSystemTablesFieldPresent struct { } type Disk struct { - Name string `ch:"name"` - Path string `ch:"path"` - Type string `ch:"type"` - FreeSpace uint64 `ch:"free_space"` - IsBackup bool + Name string `ch:"name"` + Path string `ch:"path"` + Type string `ch:"type"` + IsBackup bool } // Database - Clickhouse system.databases struct diff --git a/pkg/config/config.go b/pkg/config/config.go index ae600df5..9273ed58 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -3,13 +3,14 @@ package config import ( "crypto/tls" "fmt" - s3types "github.com/aws/aws-sdk-go-v2/service/s3/types" "math" "os" "runtime" "strings" "time" + s3types "github.com/aws/aws-sdk-go-v2/service/s3/types" + "github.com/apex/log" "github.com/kelseyhightower/envconfig" "github.com/urfave/cli" @@ -76,6 +77,9 @@ type GCSConfig struct { StorageClass string `yaml:"storage_class" envconfig:"GCS_STORAGE_CLASS"` ObjectLabels map[string]string `yaml:"object_labels" envconfig:"GCS_OBJECT_LABELS"` CustomStorageClassMap map[string]string `yaml:"custom_storage_class_map" envconfig:"GCS_CUSTOM_STORAGE_CLASS_MAP"` + // NOTE: ClientPoolSize should be atleast 2 times bigger than + // UploadConcurrency or DownloadConcurrency in each upload and download case + ClientPoolSize int `yaml:"client_pool_size" envconfig:"GCS_CLIENT_POOL_SIZE"` } // AzureBlobConfig - Azure Blob settings section @@ -544,6 +548,7 @@ func DefaultConfig() *Config { CompressionLevel: 1, CompressionFormat: "tar", StorageClass: "STANDARD", + ClientPoolSize: 500, }, COS: COSConfig{ RowURL: "", diff --git a/pkg/storage/gcs.go b/pkg/storage/gcs.go index 8801742c..93c71f2c 100644 --- a/pkg/storage/gcs.go +++ b/pkg/storage/gcs.go @@ -5,14 +5,16 @@ import ( "encoding/base64" "errors" "fmt" - "google.golang.org/api/iterator" "io" "net/http" "path" "strings" "time" + "google.golang.org/api/iterator" + "github.com/Altinity/clickhouse-backup/pkg/config" + pool "github.com/jolestar/go-commons-pool/v2" "google.golang.org/api/option/internaloption" "cloud.google.com/go/storage" @@ -23,14 +25,19 @@ import ( // GCS - presents methods for manipulate data on GCS type GCS struct { - client *storage.Client - Config *config.GCSConfig + client *storage.Client + Config *config.GCSConfig + clientPool *pool.ObjectPool } type debugGCSTransport struct { base http.RoundTripper } +type clientObject struct { + Client *storage.Client +} + func (w debugGCSTransport) RoundTrip(r *http.Request) (*http.Response, error) { logMsg := fmt.Sprintf(">>> [GCS_REQUEST] >>> %v %v\n", r.Method, r.URL.String()) for h, values := range r.Header { @@ -96,6 +103,30 @@ func (gcs *GCS) Connect(ctx context.Context) error { clientOptions = append(clientOptions, option.WithHTTPClient(debugClient)) } + factory := pool.NewPooledObjectFactory( + func(context.Context) (interface{}, error) { + sClient, err := storage.NewClient(ctx, clientOptions...) + if err != nil { + return nil, err + } + return &clientObject{ + Client: sClient, + }, + nil + }, func(ctx context.Context, object *pool.PooledObject) error { + // destroy + return object.Object.(*clientObject).Client.Close() + }, func(ctx context.Context, object *pool.PooledObject) bool { + return true + }, func(ctx context.Context, object *pool.PooledObject) error { + // activate do nothing + return nil + }, func(ctx context.Context, object *pool.PooledObject) error { + // passivate do nothing + return nil + }) + gcs.clientPool = pool.NewObjectPoolWithDefaultConfig(ctx, factory) + gcs.clientPool.Config.MaxTotal = gcs.Config.ClientPoolSize gcs.client, err = storage.NewClient(ctx, clientOptions...) return err } @@ -105,6 +136,13 @@ func (gcs *GCS) Close(ctx context.Context) error { } func (gcs *GCS) Walk(ctx context.Context, gcsPath string, recursive bool, process func(ctx context.Context, r RemoteFile) error) error { + pClientObj, err := gcs.clientPool.BorrowObject(ctx) + if err != nil { + log.Errorf("can't get client connection from pool: %+v", err) + return err + } + pClient := pClientObj.(*clientObject).Client + rootPath := path.Join(gcs.Config.Path, gcsPath) prefix := rootPath + "/" if rootPath == "/" { @@ -114,22 +152,25 @@ func (gcs *GCS) Walk(ctx context.Context, gcsPath string, recursive bool, proces if !recursive { delimiter = "/" } - it := gcs.client.Bucket(gcs.Config.Bucket).Objects(ctx, &storage.Query{ + it := pClient.Bucket(gcs.Config.Bucket).Objects(ctx, &storage.Query{ Prefix: prefix, Delimiter: delimiter, }) for { object, err := it.Next() if errors.Is(err, iterator.Done) { + gcs.clientPool.ReturnObject(ctx, pClientObj) return nil } if err != nil { + gcs.clientPool.InvalidateObject(ctx, pClientObj) return err } if object.Prefix != "" { if err := process(ctx, &gcsFile{ name: strings.TrimPrefix(object.Prefix, rootPath), }); err != nil { + gcs.clientPool.InvalidateObject(ctx, pClientObj) return err } continue @@ -139,17 +180,26 @@ func (gcs *GCS) Walk(ctx context.Context, gcsPath string, recursive bool, proces lastModified: object.Updated, name: strings.TrimPrefix(object.Name, rootPath), }); err != nil { + gcs.clientPool.InvalidateObject(ctx, pClientObj) return err } } } func (gcs *GCS) GetFileReader(ctx context.Context, key string) (io.ReadCloser, error) { - obj := gcs.client.Bucket(gcs.Config.Bucket).Object(path.Join(gcs.Config.Path, key)) + pClientObj, err := gcs.clientPool.BorrowObject(ctx) + if err != nil { + log.Errorf("can't get client connection from pool: %+v", err) + return nil, err + } + pClient := pClientObj.(*clientObject).Client + obj := pClient.Bucket(gcs.Config.Bucket).Object(path.Join(gcs.Config.Path, key)) reader, err := obj.NewReader(ctx) if err != nil { + gcs.clientPool.InvalidateObject(ctx, pClientObj) return nil, err } + gcs.clientPool.ReturnObject(ctx, pClientObj) return reader, nil } @@ -158,8 +208,15 @@ func (gcs *GCS) GetFileReaderWithLocalPath(ctx context.Context, key, _ string) ( } func (gcs *GCS) PutFile(ctx context.Context, key string, r io.ReadCloser) error { + pClientObj, err := gcs.clientPool.BorrowObject(ctx) + if err != nil { + log.Errorf("can't get client connection from pool: %+v", err) + return err + } + pClient := pClientObj.(*clientObject).Client key = path.Join(gcs.Config.Path, key) - obj := gcs.client.Bucket(gcs.Config.Bucket).Object(key) + obj := pClient.Bucket(gcs.Config.Bucket).Object(key) + writer := obj.NewWriter(ctx) writer.StorageClass = gcs.Config.StorageClass if len(gcs.Config.ObjectLabels) > 0 { @@ -168,21 +225,32 @@ func (gcs *GCS) PutFile(ctx context.Context, key string, r io.ReadCloser) error defer func() { if err := writer.Close(); err != nil { log.Warnf("can't close writer: %+v", err) + gcs.clientPool.InvalidateObject(ctx, pClientObj) + return } + gcs.clientPool.ReturnObject(ctx, pClientObj) }() buffer := make([]byte, 512*1024) - _, err := io.CopyBuffer(writer, r, buffer) + _, err = io.CopyBuffer(writer, r, buffer) return err } func (gcs *GCS) StatFile(ctx context.Context, key string) (RemoteFile, error) { - objAttr, err := gcs.client.Bucket(gcs.Config.Bucket).Object(path.Join(gcs.Config.Path, key)).Attrs(ctx) + pClientObj, err := gcs.clientPool.BorrowObject(ctx) + if err != nil { + log.Errorf("can't get client connection from pool: %+v", err) + return nil, err + } + pClient := pClientObj.(*clientObject).Client + objAttr, err := pClient.Bucket(gcs.Config.Bucket).Object(path.Join(gcs.Config.Path, key)).Attrs(ctx) if err != nil { if errors.Is(err, storage.ErrObjectNotExist) { return nil, ErrNotFound } + gcs.clientPool.InvalidateObject(ctx, pClientObj) return nil, err } + gcs.clientPool.ReturnObject(ctx, pClientObj) return &gcsFile{ size: objAttr.Size, lastModified: objAttr.Updated, @@ -191,8 +259,20 @@ func (gcs *GCS) StatFile(ctx context.Context, key string) (RemoteFile, error) { } func (gcs *GCS) deleteKey(ctx context.Context, key string) error { - object := gcs.client.Bucket(gcs.Config.Bucket).Object(key) - return object.Delete(ctx) + pClientObj, err := gcs.clientPool.BorrowObject(ctx) + if err != nil { + log.Errorf("can't get client connection from pool: %+v", err) + return err + } + pClient := pClientObj.(*clientObject).Client + object := pClient.Bucket(gcs.Config.Bucket).Object(key) + err = object.Delete(ctx) + if err != nil { + gcs.clientPool.InvalidateObject(ctx, pClientObj) + return err + } + gcs.clientPool.ReturnObject(ctx, pClientObj) + return nil } func (gcs *GCS) DeleteFile(ctx context.Context, key string) error { @@ -206,17 +286,26 @@ func (gcs *GCS) DeleteFileFromObjectDiskBackup(ctx context.Context, key string) } func (gcs *GCS) CopyObject(ctx context.Context, srcBucket, srcKey, dstKey string) (int64, error) { + pClientObj, err := gcs.clientPool.BorrowObject(ctx) + if err != nil { + log.Errorf("can't get client connection from pool: %+v", err) + return 0, err + } + pClient := pClientObj.(*clientObject).Client dstKey = path.Join(gcs.Config.ObjectDiskPath, dstKey) - src := gcs.client.Bucket(srcBucket).Object(srcKey) - dst := gcs.client.Bucket(gcs.Config.Bucket).Object(dstKey) + src := pClient.Bucket(srcBucket).Object(srcKey) + dst := pClient.Bucket(gcs.Config.Bucket).Object(dstKey) attrs, err := src.Attrs(ctx) if err != nil { + gcs.clientPool.InvalidateObject(ctx, pClientObj) return 0, err } if _, err = dst.CopierFrom(src).Run(ctx); err != nil { + gcs.clientPool.InvalidateObject(ctx, pClientObj) return 0, err } log.Debugf("GCS->CopyObject %s/%s -> %s/%s", srcBucket, srcKey, gcs.Config.Bucket, dstKey) + gcs.clientPool.ReturnObject(ctx, pClientObj) return attrs.Size, nil } diff --git a/pkg/storage/s3.go b/pkg/storage/s3.go index 0918dc10..58e93c39 100644 --- a/pkg/storage/s3.go +++ b/pkg/storage/s3.go @@ -343,10 +343,12 @@ func (s *S3) getObjectVersion(ctx context.Context, key string) (*string, error) } func (s *S3) StatFile(ctx context.Context, key string) (RemoteFile, error) { - head, err := s.client.HeadObject(ctx, &s3.HeadObjectInput{ + params := &s3.HeadObjectInput{ Bucket: aws.String(s.Config.Bucket), Key: aws.String(path.Join(s.Config.Path, key)), - }) + } + s.enrichHeadParamsWithSSE(params) + head, err := s.client.HeadObject(ctx, params) if err != nil { var opError *smithy.OperationError if errors.As(err, &opError) { @@ -463,10 +465,12 @@ func (s *S3) CopyObject(ctx context.Context, srcBucket, srcKey, dstKey string) ( if err != nil { return 0, err } - dstObjResp, err := s.client.HeadObject(ctx, &s3.HeadObjectInput{ + dstHeadParams := &s3.HeadObjectInput{ Bucket: aws.String(s.Config.Bucket), Key: aws.String(dstKey), - }) + } + s.enrichHeadParamsWithSSE(dstHeadParams) + dstObjResp, err := s.client.HeadObject(ctx, dstHeadParams) if err != nil { return 0, err } @@ -620,11 +624,12 @@ func (s *S3) restoreObject(ctx context.Context, key string) error { } i := 0 for { - headObjectRequest := &s3.HeadObjectInput{ + restoreHeadParams := &s3.HeadObjectInput{ Bucket: aws.String(s.Config.Bucket), Key: aws.String(path.Join(s.Config.Path, key)), } - res, err := s.client.HeadObject(ctx, headObjectRequest) + s.enrichHeadParamsWithSSE(restoreHeadParams) + res, err := s.client.HeadObject(ctx, restoreHeadParams) if err != nil { return fmt.Errorf("restoreObject: failed to head %s object metadata, %v", path.Join(s.Config.Path, key), err) } @@ -639,6 +644,18 @@ func (s *S3) restoreObject(ctx context.Context, key string) error { } } +func (s *S3) enrichHeadParamsWithSSE(headParams *s3.HeadObjectInput) { + if s.Config.SSECustomerAlgorithm != "" { + headParams.SSECustomerAlgorithm = aws.String(s.Config.SSECustomerAlgorithm) + } + if s.Config.SSECustomerKey != "" { + headParams.SSECustomerKey = aws.String(s.Config.SSECustomerKey) + } + if s.Config.SSECustomerKeyMD5 != "" { + headParams.SSECustomerKeyMD5 = aws.String(s.Config.SSECustomerKeyMD5) + } +} + type s3File struct { size int64 lastModified time.Time diff --git a/test/integration/docker-compose.yml b/test/integration/docker-compose.yml index a5f9d012..b7c99a05 100644 --- a/test/integration/docker-compose.yml +++ b/test/integration/docker-compose.yml @@ -84,7 +84,8 @@ services: # - clickhouse-backup zookeeper: - image: docker.io/zookeeper:${ZOOKEEPER_VERSION:-latest} + # @TODO back :latest default value after resolve https://github.com/ClickHouse/ClickHouse/issues/53749 + image: ${ZOOKEEPER_IMAGE:-docker.io/zookeeper}:${ZOOKEEPER_VERSION:-3.8.2} hostname: zookeeper environment: ZOO_4LW_COMMANDS_WHITELIST: "*" diff --git a/test/testflows/clickhouse_backup/docker-compose/docker-compose.yml b/test/testflows/clickhouse_backup/docker-compose/docker-compose.yml index 56d16951..15f4d470 100644 --- a/test/testflows/clickhouse_backup/docker-compose/docker-compose.yml +++ b/test/testflows/clickhouse_backup/docker-compose/docker-compose.yml @@ -2,7 +2,8 @@ version: '2.4' services: zookeeper: - image: zookeeper:latest + # @TODO back :latest default value after resolve https://github.com/ClickHouse/ClickHouse/issues/53749 + image: ${ZOOKEEPER_IMAGE:-docker.io/zookeeper}:${ZOOKEEPER_VERSION:-3.8.2} expose: - "2181" environment: