Skip to content

Commit

Permalink
Feature/add gcs client pool (#1)
Browse files Browse the repository at this point in the history
* add connection to gcs and use different context for upload incase it got cancel by another thread

* save

* keep ctx

* keep ctx

* use v2

* change to GCS_CLIENT_POOL_SIZE

* pin zookeeper to 3.8.2 version for resolve incompatibility between clickhouse and zookeeper 3.9.0, see details in apache/zookeeper#1837 (comment) return `:latest` default value after resolve ClickHouse/ClickHouse#53749

* Revert "add more precise disk re-balancing for not exists disks, during download, partial fix Altinity#561"

This reverts commit 20e250c.

* fix S3 head object Server Side Encryption parameters, fix Altinity#709

* change timeout to 60m, TODO make tests Parallel

---------

Co-authored-by: Slach <bloodjazman@gmail.com>
  • Loading branch information
minguyen9988 and Slach committed Sep 28, 2023
1 parent 20e250c commit 2173f6e
Show file tree
Hide file tree
Showing 15 changed files with 164 additions and 111 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/build.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -297,7 +297,7 @@ jobs:
export CLICKHOUSE_BACKUP_BIN="$(pwd)/clickhouse-backup/clickhouse-backup-race"
docker-compose -f test/integration/${COMPOSE_FILE} up -d || ( docker-compose -f test/integration/${COMPOSE_FILE} ps -a && docker-compose -f test/integration/${COMPOSE_FILE} logs clickhouse && exit 1 )
docker-compose -f test/integration/${COMPOSE_FILE} ps -a
go test -timeout 30m -failfast -tags=integration -run "${RUN_TESTS:-.+}" -v test/integration/integration_test.go
go test -timeout 60m -failfast -tags=integration -run "${RUN_TESTS:-.+}" -v test/integration/integration_test.go
- name: Format integration coverage
run: |
sudo chmod -Rv a+rw test/integration/_coverage_/
Expand Down
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,5 @@ build/
_instances/
_coverage_/
__pycache__/
*.py[cod]
*.py[cod]
vendor/
8 changes: 4 additions & 4 deletions ChangeLog.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,19 @@ IMPROVEMENTS
- Implementation blacklist for table engines during backup / download / upload / restore [537](https://github.com/Altinity/clickhouse-backup/issues/537)
- restore RBAC / configs, refactoring restart clickhouse-server via `sql:SYSTEM SHUTDOWN` or `exec:systemctl restart clickhouse-server`, add `--rbac-only` and `--configs-only` options to `create`, `upload`, `download`, `restore` command. fix [706]https://github.com/Altinity/clickhouse-backup/issues/706
- Backup/Restore RBAC related objects from Zookeeper via direct connection to zookeeper/keeper, fix [604](https://github.com/Altinity/clickhouse-backup/issues/604)
- add `SHARDED_OPERATION_MODE` option, to easy create backup for sharded cluster, available values `none` (no sharding), `table` (table granularity), `database` (database granularity), `first-replica` (on the lexicographically sorted first active replica), thanks @mskwon, fix [639](https://github.com/Altinity/clickhouse-backup/issues/639), fix [648](https://github.com/Altinity/clickhouse-backup/pull/648)
- add support for `compression_format: none` for upload and download backups created with `--rbac` / `--rbac-only` or `--configs` / `--configs-only` options, fix [713](https://github.com/Altinity/clickhouse-backup/issues/713)
- add support for s3 `GLACIER` storage class, when GET return error, then, it requires 5 minutes per key and restore could be slow. Use `GLACIER_IR`, it looks more robust, fix [614](https://github.com/Altinity/clickhouse-backup/issues/614)
- Add `SHARDED_OPERATION_MODE` option, to easy create backup for sharded cluster, available values `none` (no sharding), `table` (table granularity), `database` (database granularity), `first-replica` (on the lexicographically sorted first active replica), thanks @mskwon, fix [639](https://github.com/Altinity/clickhouse-backup/issues/639), fix [648](https://github.com/Altinity/clickhouse-backup/pull/648)
- Add support for `compression_format: none` for upload and download backups created with `--rbac` / `--rbac-only` or `--configs` / `--configs-only` options, fix [713](https://github.com/Altinity/clickhouse-backup/issues/713)
- Add support for s3 `GLACIER` storage class, when GET return error, then, it requires 5 minutes per key and restore could be slow. Use `GLACIER_IR`, it looks more robust, fix [614](https://github.com/Altinity/clickhouse-backup/issues/614)
- restore functions via `CREATE OR REPLACE` for more atomic behavior
- prepare to Make ./tests/integration/ test parallel fix [721](https://github.com/Altinity/clickhouse-backup/issues/721)
- add more precise disk re-balancing for not exists disks, during download, partial fix [561](https://github.com/Altinity/clickhouse-backup/issues/561)

BUG FIXES
- fix possible create backup failures during UNFREEZE not exists tables, affected 2.2.7+ version, fix [704](https://github.com/Altinity/clickhouse-backup/issues/704)
- fix too strict `system.parts_columns` check when backup create, exclude Enum and Tuple (JSON) and Nullable(Type) vs Type corner cases, fix [685](https://github.com/Altinity/clickhouse-backup/issues/685), fix [699](https://github.com/Altinity/clickhouse-backup/issues/699)
- fix `--rbac` behavior when /var/lib/clickhouse/access not exists
- fix `skip_database_engines` behavior
- fix `skip_databases` behavior during restore for corner case `db.prefix*` and corner cases when conflict with `--tables="*pattern.*"`, fix [663](https://github.com/Altinity/clickhouse-backup/issues/663)
- fix S3 head object Server Side Encryption parameters, fix [709](https://github.com/Altinity/clickhouse-backup/issues/709)

# v2.3.2
BUG FIXES
Expand Down
1 change: 0 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ require (
github.com/pkg/errors v0.9.1
github.com/pkg/sftp v1.13.5
github.com/prometheus/client_golang v1.15.1
github.com/ricochet2200/go-disk-usage/du v0.0.0-20210707232629-ac9918953285
github.com/stretchr/testify v1.8.4
github.com/tencentyun/cos-go-sdk-v5 v0.7.41
github.com/urfave/cli v1.22.14
Expand Down
2 changes: 0 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -338,8 +338,6 @@ github.com/prometheus/common v0.44.0 h1:+5BrQJwiBB9xsMygAB3TNvpQKOwlkc25LbISbrdO
github.com/prometheus/common v0.44.0/go.mod h1:ofAIvZbQ1e/nugmZGz4/qCb9Ap1VoSTIO7x0VV9VvuY=
github.com/prometheus/procfs v0.10.1 h1:kYK1Va/YMlutzCGazswoHKo//tZVlFpKYh+PymziUAg=
github.com/prometheus/procfs v0.10.1/go.mod h1:nwNm2aOCAYw8uTR/9bWRREkZFxAUcWzPHWJq+XBB/FM=
github.com/ricochet2200/go-disk-usage/du v0.0.0-20210707232629-ac9918953285 h1:d54EL9l+XteliUfUCGsEwwuk65dmmxX85VXF+9T6+50=
github.com/ricochet2200/go-disk-usage/du v0.0.0-20210707232629-ac9918953285/go.mod h1:fxIDly1xtudczrZeOOlfaUvd2OPb2qZAPuWdU2BsBTk=
github.com/rivo/uniseg v0.2.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc=
github.com/rivo/uniseg v0.4.4 h1:8TfxU8dW6PdqD27gjM8MVNuicgxIjxpm4K7x4jp8sis=
github.com/rivo/uniseg v0.4.4/go.mod h1:FN3SvrM+Zdj16jyLfmOkMNblXMcoc8DfTHruCPUcx88=
Expand Down
54 changes: 2 additions & 52 deletions pkg/backup/download.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import (
"github.com/eapache/go-resiliency/retrier"
"io"
"io/fs"
"math/rand"
"os"
"path"
"path/filepath"
Expand Down Expand Up @@ -220,16 +219,9 @@ func (b *Backuper) Download(backupName string, tablePattern string, partitions [
if !schemaOnly {
for _, t := range tableMetadataAfterDownload {
for disk := range t.Parts {
// https://github.com/Altinity/clickhouse-backup/issues/561, @todo think about calculate size of each part and rebalancing t.Parts
if _, diskExists := b.DiskToPathMap[disk]; !diskExists && disk != b.cfg.ClickHouse.EmbeddedBackupDisk {
var diskPath string
var diskPathErr error
diskPath, disks, diskPathErr = b.getDiskPathForNonExistsDisk(disk, disks, remoteBackup, t)
if diskPathErr != nil {
return diskPathErr
}
b.DiskToPathMap[disk] = diskPath
log.Warnf("table '%s.%s' require disk '%s' that not found in clickhouse table system.disks, you can add nonexistent disks to `disk_mapping` in `clickhouse` config section, data will download to %s", t.Database, t.Table, disk, b.DiskToPathMap[disk])
b.DiskToPathMap[disk] = b.DiskToPathMap["default"]
log.Warnf("table '%s.%s' require disk '%s' that not found in clickhouse table system.disks, you can add nonexistent disks to `disk_mapping` in `clickhouse` config section, data will download to %s", t.Database, t.Table, disk, b.DiskToPathMap["default"])
}
}
}
Expand Down Expand Up @@ -972,45 +964,3 @@ func (b *Backuper) downloadSingleBackupFile(ctx context.Context, remoteFile stri
}
return nil
}

// getDiskPathForNonExistsDisk - https://github.com/Altinity/clickhouse-backup/issues/561
// allow to Restore to new server with different storage policy, different disk count,
// implements `least_used` for normal disk and `round_robin` for Object disks
func (b *Backuper) getDiskPathForNonExistsDisk(disk string, disks []clickhouse.Disk, remoteBackup storage.Backup, t metadata.TableMetadata) (string, []clickhouse.Disk, error) {
diskType, ok := remoteBackup.DiskTypes[disk]
if !ok {
return "", disks, fmt.Errorf("diskType for %s not found in %#v in %s/metadata.json", disk, remoteBackup.DiskTypes, remoteBackup.BackupName)
}
var filteredDisks []clickhouse.Disk
for _, d := range disks {
if !d.IsBackup && d.Type == diskType {
filteredDisks = append(filteredDisks, d)
}
}
if len(filteredDisks) == 0 {
return "", disks, fmt.Errorf("diskType: %s not found in system.disks", diskType)
}
// round_robin for non-local disks
if diskType != "local" {
roundRobinIdx := rand.Intn(len(filteredDisks))
return filteredDisks[roundRobinIdx].Path, disks, nil
}
// least_used for local
freeSpace := uint64(0)
leastUsedIdx := -1
for idx, d := range filteredDisks {
if filteredDisks[idx].FreeSpace > freeSpace {
freeSpace = d.FreeSpace
leastUsedIdx = idx
}
}
if leastUsedIdx < 0 {
return "", disks, fmt.Errorf("%s free space, not found in system.disks with `local` type", utils.FormatBytes(t.TotalBytes))
}
for idx := range disks {
if disks[idx].Name == filteredDisks[leastUsedIdx].Name {
disks[idx].FreeSpace -= t.TotalBytes
}
}
return filteredDisks[leastUsedIdx].Path, disks, nil
}
3 changes: 1 addition & 2 deletions pkg/backup/list.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"
"encoding/json"
"fmt"
"github.com/ricochet2200/go-disk-usage/du"
"io"
"os"
"path"
Expand Down Expand Up @@ -174,7 +173,7 @@ func (b *Backuper) GetLocalBackups(ctx context.Context, disks []clickhouse.Disk)
}
if disks == nil {
disks = []clickhouse.Disk{
{Name: "default", Path: "/var/lib/clickhouse", Type: "local", FreeSpace: du.NewDiskUsage("/var/lib/clickhouse").Free()},
{Name: "default", Path: "/var/lib/clickhouse"},
}
}
defaultDataPath, err := b.ch.GetDefaultPath(disks)
Expand Down
11 changes: 6 additions & 5 deletions pkg/backup/upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,6 @@ import (
"context"
"encoding/json"
"fmt"
"github.com/Altinity/clickhouse-backup/pkg/clickhouse"
"github.com/Altinity/clickhouse-backup/pkg/custom"
"github.com/Altinity/clickhouse-backup/pkg/resumable"
"github.com/Altinity/clickhouse-backup/pkg/status"
"github.com/eapache/go-resiliency/retrier"
"io"
"os"
"path"
Expand All @@ -20,6 +15,12 @@ import (
"sync/atomic"
"time"

"github.com/Altinity/clickhouse-backup/pkg/clickhouse"
"github.com/Altinity/clickhouse-backup/pkg/custom"
"github.com/Altinity/clickhouse-backup/pkg/resumable"
"github.com/Altinity/clickhouse-backup/pkg/status"
"github.com/eapache/go-resiliency/retrier"

"golang.org/x/sync/errgroup"
"golang.org/x/sync/semaphore"

Expand Down
27 changes: 10 additions & 17 deletions pkg/clickhouse/clickhouse.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (
"github.com/Altinity/clickhouse-backup/pkg/common"
"github.com/ClickHouse/clickhouse-go/v2/lib/driver"
"github.com/antchfx/xmlquery"
"github.com/ricochet2200/go-disk-usage/du"
"os"
"path"
"path/filepath"
Expand All @@ -30,6 +29,7 @@ type ClickHouse struct {
Config *config.ClickHouseConfig
Log *apexLog.Entry
conn driver.Conn
disks []Disk
version int
isPartsColumnPresent int8
IsOpen bool
Expand Down Expand Up @@ -218,10 +218,9 @@ func (ch *ClickHouse) getDisksFromSystemSettings(ctx context.Context) ([]Disk, e
dataPathArray := strings.Split(metadataPath, "/")
clickhouseData := path.Join(dataPathArray[:len(dataPathArray)-1]...)
return []Disk{{
Name: "default",
Path: path.Join("/", clickhouseData),
Type: "local",
FreeSpace: du.NewDiskUsage(path.Join("/", clickhouseData)).Free(),
Name: "default",
Path: path.Join("/", clickhouseData),
Type: "local",
}}, nil
}
}
Expand Down Expand Up @@ -252,24 +251,18 @@ func (ch *ClickHouse) getDisksFromSystemDisks(ctx context.Context) ([]Disk, erro
case <-ctx.Done():
return nil, ctx.Err()
default:
type DiskFields struct {
DiskTypePresent uint64 `ch:"is_disk_type_present"`
FreeSpacePresent uint64 `ch:"is_free_space_present"`
}
diskFields := make([]DiskFields, 0)
if err := ch.SelectContext(ctx, &diskFields, "SELECT countIf(name='type') AS is_disk_type_present, countIf(name='free_space') AS is_free_space_present FROM system.columns WHERE database='system' AND table='disks'"); err != nil {
isDiskType := make([]struct {
Present uint64 `ch:"is_disk_type_present"`
}, 0)
if err := ch.SelectContext(ctx, &isDiskType, "SELECT count() is_disk_type_present FROM system.columns WHERE database='system' AND table='disks' AND name='type'"); err != nil {
return nil, err
}
diskTypeSQL := "'local'"
if len(diskFields) > 0 && diskFields[0].DiskTypePresent > 0 {
if len(isDiskType) > 0 && isDiskType[0].Present > 0 {
diskTypeSQL = "any(type)"
}
diskFreeSpaceSQL := "0"
if len(diskFields) > 0 && diskFields[0].FreeSpacePresent > 0 {
diskFreeSpaceSQL = "min(free_space)"
}
var result []Disk
query := fmt.Sprintf("SELECT path, any(name) AS name, %s AS type, %s AS free_space FROM system.disks GROUP BY path", diskTypeSQL, diskFreeSpaceSQL)
query := fmt.Sprintf("SELECT path, any(name) AS name, %s AS type FROM system.disks GROUP BY path", diskTypeSQL)
err := ch.SelectContext(ctx, &result, query)
return result, err
}
Expand Down
9 changes: 4 additions & 5 deletions pkg/clickhouse/structs.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,10 @@ type IsSystemTablesFieldPresent struct {
}

type Disk struct {
Name string `ch:"name"`
Path string `ch:"path"`
Type string `ch:"type"`
FreeSpace uint64 `ch:"free_space"`
IsBackup bool
Name string `ch:"name"`
Path string `ch:"path"`
Type string `ch:"type"`
IsBackup bool
}

// Database - Clickhouse system.databases struct
Expand Down
7 changes: 6 additions & 1 deletion pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,14 @@ package config
import (
"crypto/tls"
"fmt"
s3types "github.com/aws/aws-sdk-go-v2/service/s3/types"
"math"
"os"
"runtime"
"strings"
"time"

s3types "github.com/aws/aws-sdk-go-v2/service/s3/types"

"github.com/apex/log"
"github.com/kelseyhightower/envconfig"
"github.com/urfave/cli"
Expand Down Expand Up @@ -76,6 +77,9 @@ type GCSConfig struct {
StorageClass string `yaml:"storage_class" envconfig:"GCS_STORAGE_CLASS"`
ObjectLabels map[string]string `yaml:"object_labels" envconfig:"GCS_OBJECT_LABELS"`
CustomStorageClassMap map[string]string `yaml:"custom_storage_class_map" envconfig:"GCS_CUSTOM_STORAGE_CLASS_MAP"`
// NOTE: ClientPoolSize should be atleast 2 times bigger than
// UploadConcurrency or DownloadConcurrency in each upload and download case
ClientPoolSize int `yaml:"client_pool_size" envconfig:"GCS_CLIENT_POOL_SIZE"`
}

// AzureBlobConfig - Azure Blob settings section
Expand Down Expand Up @@ -544,6 +548,7 @@ func DefaultConfig() *Config {
CompressionLevel: 1,
CompressionFormat: "tar",
StorageClass: "STANDARD",
ClientPoolSize: 500,
},
COS: COSConfig{
RowURL: "",
Expand Down

0 comments on commit 2173f6e

Please sign in to comment.