diff --git a/.circleci/config.yml b/.circleci/config.yml index 0622db2627e..b5492251161 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -35,7 +35,7 @@ jobs: - run: name: "Run unit tests." environment: - THANOS_TEST_OBJSTORE_SKIP: AZURE,COS,ALIYUNOSS,BOS + THANOS_TEST_OBJSTORE_SKIP: AZURE,COS,ALIYUNOSS,BOS,OCI # Variables for Swift testing. OS_AUTH_URL: http://127.0.0.1:5000/v2.0 OS_PASSWORD: s3cr3t diff --git a/Makefile b/Makefile index 1a9cbaeffdf..fa7cb8b294c 100644 --- a/Makefile +++ b/Makefile @@ -288,12 +288,12 @@ test: export THANOS_TEST_PROMETHEUS_PATHS= $(PROMETHEUS_ARRAY) test: export THANOS_TEST_ALERTMANAGER_PATH= $(ALERTMANAGER) test: check-git install-deps @echo ">> install thanos GOOPTS=${GOOPTS}" - @echo ">> running unit tests (without /test/e2e). Do export THANOS_TEST_OBJSTORE_SKIP=GCS,S3,AZURE,SWIFT,COS,ALIYUNOSS,BOS if you want to skip e2e tests against all real store buckets. Current value: ${THANOS_TEST_OBJSTORE_SKIP}" + @echo ">> running unit tests (without /test/e2e). Do export THANOS_TEST_OBJSTORE_SKIP=GCS,S3,AZURE,SWIFT,COS,ALIYUNOSS,BOS,OCI if you want to skip e2e tests against all real store buckets. Current value: ${THANOS_TEST_OBJSTORE_SKIP}" @go test $(shell go list ./... | grep -v /vendor/ | grep -v /test/e2e); .PHONY: test-local test-local: ## Runs test excluding tests for ALL object storage integrations. -test-local: export THANOS_TEST_OBJSTORE_SKIP=GCS,S3,AZURE,SWIFT,COS,ALIYUNOSS,BOS +test-local: export THANOS_TEST_OBJSTORE_SKIP=GCS,S3,AZURE,SWIFT,COS,ALIYUNOSS,BOS,OCI test-local: $(MAKE) test @@ -311,7 +311,7 @@ test-e2e: docker $(GOTESPLIT) .PHONY: test-e2e-local test-e2e-local: ## Runs all thanos e2e tests locally. -test-e2e-local: export THANOS_TEST_OBJSTORE_SKIP=GCS,S3,AZURE,SWIFT,COS,ALIYUNOSS,BOS +test-e2e-local: export THANOS_TEST_OBJSTORE_SKIP=GCS,S3,AZURE,SWIFT,COS,ALIYUNOSS,BOS,OCI test-e2e-local: $(MAKE) test-e2e diff --git a/docs/storage.md b/docs/storage.md index b2f03165377..29df440d285 100644 --- a/docs/storage.md +++ b/docs/storage.md @@ -37,15 +37,16 @@ In Kubernetes it is as easy as (on Thanos sidecar example): Current object storage client implementations: -| Provider | Maturity | Aimed For | Auto-tested on CI | Maintainers | -|----------------------------------------------------------------------------------------|--------------------|-----------------------|-------------------|-------------------------| -| [Google Cloud Storage](#gcs) | Stable | Production Usage | yes | @bwplotka | -| [AWS/S3](#s3) (and all S3-compatible storages e.g disk-based [Minio](https://min.io/)) | Stable | Production Usage | yes | @bwplotka | -| [Azure Storage Account](#azure) | Stable | Production Usage | no | @vglafirov | -| [OpenStack Swift](#openstack-swift) | Beta (working PoC) | Production Usage | yes | @FUSAKLA | -| [Tencent COS](#tencent-cos) | Beta | Production Usage | no | @jojohappy,@hanjm | -| [AliYun OSS](#aliyun-oss) | Beta | Production Usage | no | @shaulboozhiao,@wujinhu | -| [Local Filesystem](#filesystem) | Stable | Testing and Demo only | yes | @bwplotka | +| Provider | Maturity | Aimed For | Auto-tested on CI | Maintainers | +|----------------------------------------------------------------------------------------|--------------------|-----------------------|-------------------|----------------------------------| +| [Google Cloud Storage](#gcs) | Stable | Production Usage | yes | @bwplotka | +| [AWS/S3](#s3) (and all S3-compatible storages e.g disk-based [Minio](https://min.io/)) | Stable | Production Usage | yes | @bwplotka | +| [Azure Storage Account](#azure) | Stable | Production Usage | no | @vglafirov | +| [OpenStack Swift](#openstack-swift) | Beta (working PoC) | Production Usage | yes | @FUSAKLA | +| [Tencent COS](#tencent-cos) | Beta | Production Usage | no | @jojohappy,@hanjm | +| [AliYun OSS](#aliyun-oss) | Beta | Production Usage | no | @shaulboozhiao,@wujinhu | +| [Local Filesystem](#filesystem) | Stable | Testing and Demo only | yes | @bwplotka | +| [OCI Object Storage](#oci---oracle-cloud-infrastructure) | Beta | Production Usage | yes | @aarontams,@gaurav-05,@ericrrath | **Missing support to some object storage?** Check out [how to add your client section](#how-to-add-a-new-client-to-thanos) @@ -206,7 +207,7 @@ Example working AWS IAM policy for user: To test the policy, set env vars for S3 access for *empty, not used* bucket as well as: ``` -THANOS_TEST_OBJSTORE_SKIP=GCS,AZURE,SWIFT,COS,ALIYUNOSS +THANOS_TEST_OBJSTORE_SKIP=GCS,AZURE,SWIFT,COS,ALIYUNOSS,OCI THANOS_ALLOW_EXISTING_BUCKET_USE=true ``` @@ -240,7 +241,7 @@ We need access to CreateBucket and DeleteBucket and access to all buckets: } ``` -With this policy you should be able to run set `THANOS_TEST_OBJSTORE_SKIP=GCS,AZURE,SWIFT,COS,ALIYUNOSS` and unset `S3_BUCKET` and run all tests using `make test`. +With this policy you should be able to run set `THANOS_TEST_OBJSTORE_SKIP=GCS,AZURE,SWIFT,COS,ALIYUNOSS,OCI` and unset `S3_BUCKET` and run all tests using `make test`. Details about AWS policies: https://docs.aws.amazon.com/AmazonS3/latest/dev/using-with-s3-actions.html @@ -484,6 +485,68 @@ config: prefix: "" ``` +### OCI - Oracle Cloud Infrastructure + +To configure OCI Object Storage as Thanos Object Store, you need access to OCI Object Storage. +OCI has 3 different providers; default, instance principal, and raw, which are supported by this object storage client implementation + +#### Default Provider +The default config provider will look for configurations in 3 places: file in `$HOME/.oci/config`, `$HOME/.obmcs/config`, and +variables names starting with the string TF_VAR. If the same configuration is found in multiple places the provider will +prefer the first one. + +For Example: +```yaml +type: OCI +config: + provider: "default" + bucket: "" + compartment_ocid: "" + part_size: "" // Optional part size to override the OCI default of 128 MiB, value is in bytes. + max_request_retries: "" // Optional maximum number of retries for a request. + request_retry_interval: "" // Optional sleep duration in seconds between retry requests. + http_config: + idle_conn_timeout: 1m30s // Optional maximum amount of time an idle (keep-alive) connection will remain idle before closing itself. Zero means no limit. + response_header_timeout: 2m // Optional amount of time to wait for a server's response headers after fully writing the request. + tls_handshake_timeout: 10s // Optional maximum amount of time waiting to wait for a TLS handshake. Zero means no timeout. + expect_continue_timeout: 1s // Optional amount of time to wait for a server's first response headers. Zero means no timeout and causes the body to be sent immediately. + insecure_skip_verify: false // Optional. If true, crypto/tls accepts any certificate presented by the server and any host name in that certificate. + max_idle_conns: 100 // Optional maximum number of idle (keep-alive) connections across all hosts. Zero means no limit. + max_idle_conns_per_host: 100 // Optional maximum idle (keep-alive) connections to keep per-host. If zero, DefaultMaxIdleConnsPerHost=2 is used. + max_conns_per_host: 0 // Optional maximum total number of connections per host. + disable_compression: false // Optional. If true, prevents the Transport from requesting compression. + client_timeout: 90s // Optional time limit for requests made by the HTTP Client. +``` + + +#### Instance Principal Provider +For Example: +```yaml +type: OCI +config: + provider: "instance-principal" + bucket: "" + compartment_ocid: "" +``` +You can also include any of the optional configuration just like the example in `Default Provider`. + +#### Raw Provider +For Example: +```yaml +type: OCI +config: + provider: "raw" + bucket: "" + compartment_ocid: "" + tenancy_ocid: "" + user_ocid: "" + region: "" + fingerprint: "" + privatekey: "" + passphrase: "" // Optional passphrase to encrypt the private API Signing key +``` +You can also include any of the optional configuration just like the example in `Default Provider`. + ### How to add a new client to Thanos? Following checklist allows adding new Go code client to supported providers: diff --git a/go.mod b/go.mod index 70ab85dbf9c..26d49e89caf 100644 --- a/go.mod +++ b/go.mod @@ -60,6 +60,7 @@ require ( github.com/olekukonko/tablewriter v0.0.2 github.com/opentracing/basictracer-go v1.0.0 github.com/opentracing/opentracing-go v1.2.0 + github.com/oracle/oci-go-sdk/v65 v65.8.1 github.com/pkg/errors v0.9.1 github.com/pmezard/go-difflib v1.0.0 github.com/prometheus/alertmanager v0.23.1-0.20210914172521-e35efbddb66a @@ -149,6 +150,7 @@ require ( github.com/gobwas/httphead v0.0.0-20180130184737-2c6c146eadee // indirect github.com/gobwas/pool v0.2.0 // indirect github.com/gobwas/ws v1.0.2 // indirect + github.com/gofrs/flock v0.8.1 // indirect github.com/gogo/googleapis v1.4.0 // indirect github.com/golang-jwt/jwt/v4 v4.0.0 // indirect github.com/golang/protobuf v1.5.2 // indirect @@ -186,7 +188,7 @@ require ( github.com/santhosh-tekuri/jsonschema v1.2.4 // indirect github.com/sercand/kuberesolver v2.4.0+incompatible // indirect github.com/sirupsen/logrus v1.8.1 // indirect - github.com/sony/gobreaker v0.4.1 // indirect + github.com/sony/gobreaker v0.5.0 // indirect github.com/stretchr/objx v0.2.0 // indirect github.com/weaveworks/promrus v1.2.0 // indirect github.com/yuin/gopher-lua v0.0.0-20200816102855-ee81675732da // indirect @@ -195,7 +197,7 @@ require ( go.mongodb.org/mongo-driver v1.7.5 // indirect go.opencensus.io v0.23.0 // indirect golang.org/x/mod v0.5.1 // indirect - golang.org/x/sys v0.0.0-20220114195835-da31bd327af9 // indirect + golang.org/x/sys v0.0.0-20220209214540-3681064d5158 // indirect golang.org/x/time v0.0.0-20211116232009-f0f3c7e86c11 // indirect golang.org/x/tools v0.1.9-0.20211209172050-90a85b2969be // indirect golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 // indirect diff --git a/go.sum b/go.sum index 7c5e6ab006b..7ed3cf6bd60 100644 --- a/go.sum +++ b/go.sum @@ -839,6 +839,8 @@ github.com/godbus/dbus v0.0.0-20190422162347-ade71ed3457e/go.mod h1:bBOAhwG1umN6 github.com/godbus/dbus/v5 v5.0.3/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA= github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA= github.com/godbus/dbus/v5 v5.0.6/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA= +github.com/gofrs/flock v0.8.1 h1:+gYjHKf32LDeiEEFhQaotPbLuUXjY5ZqxKgXy7n59aw= +github.com/gofrs/flock v0.8.1/go.mod h1:F1TvTiK9OcQqauNUHlbJvyl9Qa1QvF/gOUDKA14jxHU= github.com/gofrs/uuid v3.3.0+incompatible/go.mod h1:b2aQJv3Z4Fp6yNu3cdSllBxTCLRxnplIgP/c0N/04lM= github.com/gofrs/uuid v4.0.0+incompatible/go.mod h1:b2aQJv3Z4Fp6yNu3cdSllBxTCLRxnplIgP/c0N/04lM= github.com/gogo/googleapis v0.0.0-20180223154316-0cd9801be74a/go.mod h1:gf4bu3Q80BeJ6H1S1vYPm8/ELATdvryBaNFGgqEef3s= @@ -1497,6 +1499,8 @@ github.com/openzipkin/zipkin-go v0.1.6/go.mod h1:QgAqvLzwWbR/WpD4A3cGpPtJrZXNIiJ github.com/openzipkin/zipkin-go v0.2.1/go.mod h1:NaW6tEwdmWMaCDZzg8sh+IBNOxHMPnhQw8ySjnjRyN4= github.com/openzipkin/zipkin-go v0.2.2/go.mod h1:NaW6tEwdmWMaCDZzg8sh+IBNOxHMPnhQw8ySjnjRyN4= github.com/openzipkin/zipkin-go v0.2.5/go.mod h1:KpXfKdgRDnnhsxw4pNIH9Md5lyFqKUa4YDFlwRYAMyE= +github.com/oracle/oci-go-sdk/v65 v65.8.1 h1:Mn/IgGZMWkmg7RbcdPAogcmfoDFrYP1CvU9x/4r/lT8= +github.com/oracle/oci-go-sdk/v65 v65.8.1/go.mod h1:oyMrMa1vOzzKTmPN+kqrTR9y9kPA2tU1igN3NUSNTIE= github.com/pact-foundation/pact-go v1.0.4/go.mod h1:uExwJY4kCzNPcHRj+hCR/HBbOOIwwtUjcrb0b5/5kLM= github.com/pascaldekloe/goe v0.0.0-20180627143212-57f6aae5913c/go.mod h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144TG7ZOy1lc= github.com/pascaldekloe/goe v0.1.0/go.mod h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144TG7ZOy1lc= @@ -1691,8 +1695,9 @@ github.com/snowflakedb/gosnowflake v1.3.13/go.mod h1:6nfka9aTXkUNha1p1cjeeyjDvcy github.com/soheilhy/cmux v0.1.4/go.mod h1:IM3LyeVVIOuxMH7sFAkER9+bJ4dT7Ms6E4xg4kGIyLM= github.com/soheilhy/cmux v0.1.5-0.20210205191134-5ec6847320e5/go.mod h1:T7TcVDs9LWfQgPlPsdngu6I6QIoyIFZDDC6sNE1GqG0= github.com/soheilhy/cmux v0.1.5/go.mod h1:T7TcVDs9LWfQgPlPsdngu6I6QIoyIFZDDC6sNE1GqG0= -github.com/sony/gobreaker v0.4.1 h1:oMnRNZXX5j85zso6xCPRNPtmAycat+WcoKbklScLDgQ= github.com/sony/gobreaker v0.4.1/go.mod h1:ZKptC7FHNvhBz7dN2LGjPVBz2sZJmc0/PkyDJOjmxWY= +github.com/sony/gobreaker v0.5.0 h1:dRCvqm0P490vZPmy7ppEk2qCnCieBooFJ+YoXGYB+yg= +github.com/sony/gobreaker v0.5.0/go.mod h1:ZKptC7FHNvhBz7dN2LGjPVBz2sZJmc0/PkyDJOjmxWY= github.com/soundcloud/go-runit v0.0.0-20150630195641-06ad41a06c4a/go.mod h1:LeFCbQYJ3KJlPs/FvPz2dy1tkpxyeNESVyCNNzRXFR0= github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA= github.com/spaolacci/murmur3 v1.1.0 h1:7c1g84S4BPRrfL5Xrdp6fOJ206sU9y293DDHaoy0bLI= @@ -2340,8 +2345,9 @@ golang.org/x/sys v0.0.0-20211025201205-69cdffdb9359/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.0.0-20211116061358-0a5406a5449c/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20211124211545-fe61309f8881/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.0.0-20220114195835-da31bd327af9 h1:XfKQ4OlFl8okEOr5UvAqFRVj8pY/4yfcXrddB8qAbU0= golang.org/x/sys v0.0.0-20220114195835-da31bd327af9/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220209214540-3681064d5158 h1:rm+CHSpPEEW2IsXUib1ThaHIjuBVZjxNgSKmBLFfD4c= +golang.org/x/sys v0.0.0-20220209214540-3681064d5158/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/term v0.0.0-20201117132131-f5c789dd3221/go.mod h1:Nr5EML6q2oocZ2LXRh80K7BxOlk5/8JxuGnuhpl+muw= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20210220032956-6a3ed077a48d/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= diff --git a/pkg/objstore/client/factory.go b/pkg/objstore/client/factory.go index 20610790142..f2653f57760 100644 --- a/pkg/objstore/client/factory.go +++ b/pkg/objstore/client/factory.go @@ -20,6 +20,7 @@ import ( "github.com/thanos-io/thanos/pkg/objstore/cos" "github.com/thanos-io/thanos/pkg/objstore/filesystem" "github.com/thanos-io/thanos/pkg/objstore/gcs" + "github.com/thanos-io/thanos/pkg/objstore/oci" "github.com/thanos-io/thanos/pkg/objstore/oss" "github.com/thanos-io/thanos/pkg/objstore/s3" "github.com/thanos-io/thanos/pkg/objstore/swift" @@ -36,6 +37,7 @@ const ( COS ObjProvider = "COS" ALIYUNOSS ObjProvider = "ALIYUNOSS" BOS ObjProvider = "BOS" + OCI ObjProvider = "OCI" ) type BucketConfig struct { @@ -76,6 +78,8 @@ func NewBucket(logger log.Logger, confContentYaml []byte, reg prometheus.Registe bucket, err = filesystem.NewBucketFromConfig(config) case string(BOS): bucket, err = bos.NewBucket(logger, config, component) + case string(OCI): + bucket, err = oci.NewBucket(logger, config) default: return nil, errors.Errorf("bucket with type %s is not supported", bucketConf.Type) } diff --git a/pkg/objstore/objtesting/foreach.go b/pkg/objstore/objtesting/foreach.go index 139e7242716..164a789b615 100644 --- a/pkg/objstore/objtesting/foreach.go +++ b/pkg/objstore/objtesting/foreach.go @@ -17,6 +17,7 @@ import ( "github.com/thanos-io/thanos/pkg/objstore/azure" "github.com/thanos-io/thanos/pkg/objstore/cos" "github.com/thanos-io/thanos/pkg/objstore/gcs" + "github.com/thanos-io/thanos/pkg/objstore/oci" "github.com/thanos-io/thanos/pkg/objstore/oss" "github.com/thanos-io/thanos/pkg/objstore/s3" "github.com/thanos-io/thanos/pkg/objstore/swift" @@ -24,7 +25,7 @@ import ( ) // IsObjStoreSkipped returns true if given provider ID is found in THANOS_TEST_OBJSTORE_SKIP array delimited by comma e.g: -// THANOS_TEST_OBJSTORE_SKIP=GCS,S3,AZURE,SWIFT,COS,ALIYUNOSS,BOS. +// THANOS_TEST_OBJSTORE_SKIP=GCS,S3,AZURE,SWIFT,COS,ALIYUNOSS,BOS,OCI. func IsObjStoreSkipped(t *testing.T, provider client.ObjProvider) bool { if e, ok := os.LookupEnv("THANOS_TEST_OBJSTORE_SKIP"); ok { obstores := strings.Split(e, ",") @@ -170,4 +171,18 @@ func ForeachStore(t *testing.T, testFn func(t *testing.T, bkt objstore.Bucket)) testFn(t, objstore.NewPrefixedBucket(bkt, "some_prefix")) }) } + + // Optional OCI. + if !IsObjStoreSkipped(t, client.OCI) { + t.Run("oci", func(t *testing.T) { + bkt, closeFn, err := oci.NewTestBucket(t) + testutil.Ok(t, err) + + t.Parallel() + defer closeFn() + + testFn(t, bkt) + }) + } + } diff --git a/pkg/objstore/oci/helper.go b/pkg/objstore/oci/helper.go new file mode 100644 index 00000000000..eac8f684abf --- /dev/null +++ b/pkg/objstore/oci/helper.go @@ -0,0 +1,277 @@ +// Copyright (c) The Thanos Authors. +// Licensed under the Apache License 2.0. + +package oci + +import ( + "context" + "crypto/tls" + "fmt" + "net" + "net/http" + "os" + "strconv" + "strings" + "time" + + "github.com/oracle/oci-go-sdk/v65/common" + "github.com/oracle/oci-go-sdk/v65/objectstorage" + "github.com/pkg/errors" + "github.com/prometheus/common/model" + "github.com/thanos-io/thanos/pkg/objstore" +) + +func DefaultTransport(config Config) *http.Transport { + return &http.Transport{ + Proxy: http.ProxyFromEnvironment, + DialContext: (&net.Dialer{ + Timeout: 30 * time.Second, + KeepAlive: 30 * time.Second, + DualStack: true, + }).DialContext, + + IdleConnTimeout: time.Duration(config.HTTPConfig.IdleConnTimeout), + ResponseHeaderTimeout: time.Duration(config.HTTPConfig.ResponseHeaderTimeout), + TLSHandshakeTimeout: time.Duration(config.HTTPConfig.TLSHandshakeTimeout), + ExpectContinueTimeout: time.Duration(config.HTTPConfig.ExpectContinueTimeout), + TLSClientConfig: &tls.Config{InsecureSkipVerify: config.HTTPConfig.InsecureSkipVerify}, + MaxIdleConns: config.HTTPConfig.MaxIdleConns, + MaxIdleConnsPerHost: config.HTTPConfig.MaxIdleConnsPerHost, + MaxConnsPerHost: config.HTTPConfig.MaxConnsPerHost, + DisableCompression: config.HTTPConfig.DisableCompression, + } +} + +func getNamespace(client objectstorage.ObjectStorageClient, requestMetadata common.RequestMetadata) (namespace *string, err error) { + response, err := client.GetNamespace( + context.Background(), + objectstorage.GetNamespaceRequest{RequestMetadata: requestMetadata}, + ) + if err != nil { + return nil, err + } + return response.Value, nil +} + +func getObject(ctx context.Context, bkt Bucket, objectName string, byteRange string) (response objectstorage.GetObjectResponse, err error) { + if len(objectName) == 0 { + err = fmt.Errorf("value cannot be empty for field ObjectName in path") + return + } + request := objectstorage.GetObjectRequest{ + NamespaceName: &bkt.namespace, + BucketName: &bkt.name, + ObjectName: &objectName, + RequestMetadata: bkt.requestMetadata, + } + if byteRange != "" { + request.Range = &byteRange + } + return bkt.client.GetObject(ctx, request) +} + +func listAllObjects(ctx context.Context, bkt Bucket, prefix string, options ...objstore.IterOption) (objectNames []string, err error) { + var allObjectNames []string + var nextStartWith *string = nil + init := true + + for init || nextStartWith != nil { + init = false + objectNames, nextStartWith, err = listObjects(ctx, bkt, prefix, nextStartWith) + if err != nil { + return nil, err + } + + if objstore.ApplyIterOptions(options...).Recursive { + for _, objectName := range objectNames { + if strings.HasSuffix(objectName, DirDelim) { + subObjectNames, err := listAllObjects(ctx, bkt, objectName, options...) + if err != nil { + return nil, err + } + allObjectNames = append(allObjectNames, subObjectNames...) + } else { + allObjectNames = append(allObjectNames, objectName) + } + } + } else { + allObjectNames = append(allObjectNames, objectNames...) + } + } + return allObjectNames, nil +} + +func listObjects(ctx context.Context, bkt Bucket, prefix string, start *string) (objectNames []string, nextStartWith *string, err error) { + request := objectstorage.ListObjectsRequest{ + NamespaceName: &bkt.namespace, + BucketName: &bkt.name, + Delimiter: common.String(DirDelim), + Prefix: &prefix, + Start: start, + RequestMetadata: bkt.requestMetadata, + } + response, err := bkt.client.ListObjects(ctx, request) + if err != nil { + return nil, nil, err + } + + for _, object := range response.ListObjects.Objects { + objectNames = append(objectNames, *object.Name) + } + objectNames = append(objectNames, response.ListObjects.Prefixes...) + + return objectNames, response.NextStartWith, nil +} + +func (config *Config) validateConfig() (err error) { + var errMsg []string + + if config.Tenancy == "" { + errMsg = append(errMsg, "no OCI tenancy ocid specified") + } + if config.User == "" { + errMsg = append(errMsg, "no OCI user ocid specified") + } + if config.Region == "" { + errMsg = append(errMsg, "no OCI region specified") + } + if config.Fingerprint == "" { + errMsg = append(errMsg, "no OCI fingerprint specified") + } + if config.PrivateKey == "" { + errMsg = append(errMsg, "no OCI privatekey specified") + } + + if len(errMsg) > 0 { + return errors.New(strings.Join(errMsg, ", ")) + } + + return +} + +func getRequestMetadata(maxRequestRetries int, requestRetryInterval int) common.RequestMetadata { + if maxRequestRetries <= 1 { + retryPolicy := common.NoRetryPolicy() + return common.RequestMetadata{ + RetryPolicy: &retryPolicy, + } + } + retryPolicy := common.NewRetryPolicyWithOptions(common.WithMaximumNumberAttempts(uint(maxRequestRetries)), + common.WithFixedBackoff(time.Duration(requestRetryInterval)*time.Second)) + return common.RequestMetadata{ + RetryPolicy: &retryPolicy, + } +} + +func getConfigFromEnv() (config Config, err error) { + config = Config{ + Provider: strings.ToLower(os.Getenv("OCI_PROVIDER")), + Bucket: os.Getenv("OCI_BUCKET"), + Compartment: os.Getenv("OCI_COMPARTMENT"), + Tenancy: os.Getenv("OCI_TENANCY_OCID"), + User: os.Getenv("OCI_USER_OCID"), + Region: os.Getenv("OCI_REGION"), + Fingerprint: os.Getenv("OCI_FINGERPRINT"), + PrivateKey: os.Getenv("OCI_PRIVATEKEY"), + Passphrase: os.Getenv("OCI_PASSPHRASE"), + } + + // [Optional] Override the default part size of 128 MiB, value is in bytes. The max part size is 50GiB + if os.Getenv("OCI_PART_SIZE") != "" { + partSize, err := strconv.ParseInt(os.Getenv("OCI_PART_SIZE"), 10, 64) + if err != nil { + return Config{}, err + } + config.PartSize = partSize + } + + if os.Getenv("OCI_MAX_REQUEST_RETRIES") != "" { + maxRequestRetries, err := strconv.Atoi(os.Getenv("OCI_MAX_REQUEST_RETRIES")) + if err != nil { + return Config{}, err + } + config.MaxRequestRetries = maxRequestRetries + } + + if os.Getenv("OCI_REQUEST_RETRY_INTERVAL") != "" { + requestRetryInterval, err := strconv.Atoi(os.Getenv("OCI_REQUEST_RETRY_INTERVAL")) + if err != nil { + return Config{}, err + } + config.RequestRetryInterval = requestRetryInterval + } + + if os.Getenv("HTTP_CONFIG_IDLE_CONN_TIMEOUT") != "" { + idleConnTimeout, err := model.ParseDuration(os.Getenv("HTTP_CONFIG_IDLE_CONN_TIMEOUT")) + if err != nil { + return Config{}, err + } + config.HTTPConfig.IdleConnTimeout = idleConnTimeout + } + + if os.Getenv("HTTP_CONFIG_RESPONSE_HEADER_TIMEOUT") != "" { + responseHeaderTimeout, err := model.ParseDuration(os.Getenv("HTTP_CONFIG_RESPONSE_HEADER_TIMEOUT")) + if err != nil { + return Config{}, err + } + config.HTTPConfig.ResponseHeaderTimeout = responseHeaderTimeout + } + + if os.Getenv("HTTP_CONFIG_TLS_HANDSHAKE_TIMEOUT") != "" { + tlsHandshakeTimeout, err := model.ParseDuration(os.Getenv("HTTP_CONFIG_TLS_HANDSHAKE_TIMEOUT")) + if err != nil { + return Config{}, err + } + config.HTTPConfig.TLSHandshakeTimeout = tlsHandshakeTimeout + } + + if os.Getenv("HTTP_CONFIG_EXPECT_CONTINUE_TIMEOUT") != "" { + expectContinueTimeout, err := model.ParseDuration(os.Getenv("HTTP_CONFIG_EXPECT_CONTINUE_TIMEOUT")) + if err != nil { + return Config{}, err + } + config.HTTPConfig.ExpectContinueTimeout = expectContinueTimeout + } + + if os.Getenv("HTTP_CONFIG_INSECURE_SKIP_VERIFY") != "" { + insecureSkipVerify, err := strconv.ParseBool(os.Getenv("HTTP_CONFIG_INSECURE_SKIP_VERIFY")) + if err != nil { + return Config{}, err + } + config.HTTPConfig.InsecureSkipVerify = insecureSkipVerify + } + + if os.Getenv("HTTP_CONFIG_MAX_IDLE_CONNS") != "" { + maxIdleConns, err := strconv.Atoi(os.Getenv("HTTP_CONFIG_MAX_IDLE_CONNS")) + if err != nil { + return Config{}, err + } + config.HTTPConfig.MaxIdleConns = maxIdleConns + } + + if os.Getenv("HTTP_CONFIG_MAX_IDLE_CONNS_PER_HOST") != "" { + maxIdleConnsPerHost, err := strconv.Atoi(os.Getenv("HTTP_CONFIG_MAX_IDLE_CONNS_PER_HOST")) + if err != nil { + return Config{}, err + } + config.HTTPConfig.MaxIdleConnsPerHost = maxIdleConnsPerHost + } + + if os.Getenv("HTTP_CONFIG_MAX_CONNS_PER_HOST") != "" { + maxConnsPerHost, err := strconv.Atoi(os.Getenv("HTTP_CONFIG_MAX_CONNS_PER_HOST")) + if err != nil { + return Config{}, err + } + config.HTTPConfig.MaxConnsPerHost = maxConnsPerHost + } + + if os.Getenv("HTTP_CONFIG_DISABLE_COMPRESSION") != "" { + disableCompression, err := strconv.ParseBool(os.Getenv("HTTP_CONFIG_DISABLE_COMPRESSION")) + if err != nil { + return Config{}, err + } + config.HTTPConfig.DisableCompression = disableCompression + } + + return config, nil +} diff --git a/pkg/objstore/oci/oci.go b/pkg/objstore/oci/oci.go new file mode 100644 index 00000000000..026f110ad1a --- /dev/null +++ b/pkg/objstore/oci/oci.go @@ -0,0 +1,375 @@ +// Copyright (c) The Thanos Authors. +// Licensed under the Apache License 2.0. + +package oci + +import ( + "context" + "fmt" + "io" + "net/http" + "strings" + "testing" + "time" + + "github.com/go-kit/log" + "github.com/go-kit/log/level" + "github.com/oracle/oci-go-sdk/v65/common" + "github.com/oracle/oci-go-sdk/v65/common/auth" + "github.com/oracle/oci-go-sdk/v65/objectstorage" + "github.com/oracle/oci-go-sdk/v65/objectstorage/transfer" + "github.com/pkg/errors" + "github.com/prometheus/common/model" + "github.com/thanos-io/thanos/pkg/objstore" + "gopkg.in/yaml.v2" +) + +// DirDelim is the delimiter used to model a directory structure in an object store bucket. +const DirDelim = "/" + +type Provider string + +const ( + DefaultConfigProvider = Provider("default") + InstancePrincipalConfigProvider = Provider("instance-principal") + RawConfigProvider = Provider("raw") +) + +var DefaultConfig = Config{ + HTTPConfig: HTTPConfig{ + IdleConnTimeout: model.Duration(90 * time.Second), + ResponseHeaderTimeout: model.Duration(2 * time.Minute), + TLSHandshakeTimeout: model.Duration(10 * time.Second), + ExpectContinueTimeout: model.Duration(1 * time.Second), + InsecureSkipVerify: false, + MaxIdleConns: 100, + MaxIdleConnsPerHost: 100, + MaxConnsPerHost: 0, + DisableCompression: false, + ClientTimeout: 90 * time.Second, + }, +} + +// HTTPConfig stores the http.Transport configuration for the OCI client. +type HTTPConfig struct { + IdleConnTimeout model.Duration `yaml:"idle_conn_timeout"` + ResponseHeaderTimeout model.Duration `yaml:"response_header_timeout"` + InsecureSkipVerify bool `yaml:"insecure_skip_verify"` + + TLSHandshakeTimeout model.Duration `yaml:"tls_handshake_timeout"` + ExpectContinueTimeout model.Duration `yaml:"expect_continue_timeout"` + MaxIdleConns int `yaml:"max_idle_conns"` + MaxIdleConnsPerHost int `yaml:"max_idle_conns_per_host"` + MaxConnsPerHost int `yaml:"max_conns_per_host"` + DisableCompression bool `yaml:"disable_compression"` + ClientTimeout time.Duration `yaml:"client_timeout"` +} + +// Config stores the configuration for oci bucket. +type Config struct { + Provider string `yaml:"provider"` + Bucket string `yaml:"bucket"` + Compartment string `yaml:"compartment_ocid"` + Tenancy string `yaml:"tenancy_ocid"` + User string `yaml:"user_ocid"` + Region string `yaml:"region"` + Fingerprint string `yaml:"fingerprint"` + PrivateKey string `yaml:"privatekey"` + Passphrase string `yaml:"passphrase"` + PartSize int64 `yaml:"part_size"` + MaxRequestRetries int `yaml:"max_request_retries"` + RequestRetryInterval int `yaml:"request_retry_interval"` + HTTPConfig HTTPConfig `yaml:"http_config"` +} + +// Bucket implements the store.Bucket interface against OCI APIs. +type Bucket struct { + logger log.Logger + name string + namespace string + client *objectstorage.ObjectStorageClient + partSize int64 + requestMetadata common.RequestMetadata +} + +// Name returns the bucket name for the provider. +func (b *Bucket) Name() string { + return b.name +} + +// Iter calls f for each entry in the given directory (not recursive). The argument to f is the full +// object name including the prefix of the inspected directory. +func (b *Bucket) Iter(ctx context.Context, dir string, f func(string) error, options ...objstore.IterOption) error { + // Ensure the object name actually ends with a dir suffix. Otherwise we'll just iterate the + // object itself as one prefix item. + if dir != "" { + dir = strings.TrimSuffix(dir, DirDelim) + DirDelim + } + + objectNames, err := listAllObjects(ctx, *b, dir, options...) + if err != nil { + return errors.Wrapf(err, "cannot list objects in directory '%s'", dir) + } + + level.Debug(b.logger).Log("NumberOfObjects", len(objectNames)) + + for _, objectName := range objectNames { + if objectName == "" || objectName == dir { + continue + } + if err := f(objectName); err != nil { + return err + } + } + + return nil +} + +// Get returns a reader for the given object name. +func (b *Bucket) Get(ctx context.Context, name string) (io.ReadCloser, error) { + response, err := getObject(ctx, *b, name, "") + if err != nil { + return nil, err + } + return response.Content, nil +} + +// GetRange returns a new range reader for the given object name and range. +func (b *Bucket) GetRange(ctx context.Context, name string, offset, length int64) (io.ReadCloser, error) { + level.Debug(b.logger).Log("msg", "getting object", "name", name, "off", offset, "length", length) + + // A single byte range to fetch, as described in RFC 7233 (https://tools.ietf.org/html/rfc7233#section-2.1). + byteRange := "" + + if offset >= 0 { + if length > 0 { + byteRange = fmt.Sprintf("bytes=%d-%d", offset, offset+length-1) + } else { + byteRange = fmt.Sprintf("bytes=%d-", offset) + } + } else { + if length > 0 { + byteRange = fmt.Sprintf("bytes=-%d", length) + } else { + return nil, errors.New(fmt.Sprintf("invalid range specified: offset=%d length=%d", offset, length)) + } + } + + level.Debug(b.logger).Log("byteRange", byteRange) + + response, err := getObject(ctx, *b, name, byteRange) + if err != nil { + return nil, err + } + return response.Content, nil +} + +// Upload the contents of the reader as an object into the bucket. +// Upload should be idempotent. +func (b *Bucket) Upload(ctx context.Context, name string, r io.Reader) (err error) { + req := transfer.UploadStreamRequest{ + UploadRequest: transfer.UploadRequest{ + NamespaceName: common.String(b.namespace), + BucketName: common.String(b.name), + ObjectName: &name, + EnableMultipartChecksumVerification: common.Bool(true), // TODO: should we check? + ObjectStorageClient: b.client, + RequestMetadata: b.requestMetadata, + }, + StreamReader: r, + } + if b.partSize > 0 { + req.UploadRequest.PartSize = &b.partSize + } + + uploadManager := transfer.NewUploadManager() + _, err = uploadManager.UploadStream(ctx, req) + + return err +} + +// Exists checks if the given object exists in the bucket. +func (b *Bucket) Exists(ctx context.Context, name string) (bool, error) { + _, err := getObject(ctx, *b, name, "") + if err != nil { + if b.IsObjNotFoundErr(err) { + return false, nil + } + return false, errors.Wrapf(err, "cannot get OCI object '%s'", name) + } + return true, nil +} + +// Delete removes the object with the given name. +// If object does not exists in the moment of deletion, Delete should throw error. +func (b *Bucket) Delete(ctx context.Context, name string) (err error) { + request := objectstorage.DeleteObjectRequest{ + NamespaceName: &b.namespace, + BucketName: &b.name, + ObjectName: &name, + RequestMetadata: b.requestMetadata, + } + _, err = b.client.DeleteObject(ctx, request) + return err +} + +// IsObjNotFoundErr returns true if error means that object is not found. Relevant to Get operations. +func (b *Bucket) IsObjNotFoundErr(err error) bool { + failure, isServiceError := common.IsServiceError(err) + if isServiceError { + k := failure.GetHTTPStatusCode() + match := k == http.StatusNotFound + level.Debug(b.logger).Log("msg", match) + return failure.GetHTTPStatusCode() == http.StatusNotFound + } + return false +} + +// ObjectSize returns the size of the specified object. +func (b *Bucket) ObjectSize(ctx context.Context, name string) (uint64, error) { + response, err := getObject(ctx, *b, name, "") + if err != nil { + return 0, err + } + return uint64(*response.ContentLength), nil +} + +// Close closes bucket. +func (b *Bucket) Close() error { + return nil +} + +// Attributes returns information about the specified object. +func (b *Bucket) Attributes(ctx context.Context, name string) (objstore.ObjectAttributes, error) { + response, err := getObject(ctx, *b, name, "") + if err != nil { + return objstore.ObjectAttributes{}, err + } + return objstore.ObjectAttributes{ + Size: *response.ContentLength, + LastModified: response.LastModified.Time, + }, nil +} + +// createBucket creates bucket. +func (b *Bucket) createBucket(ctx context.Context, compartmentId string) (err error) { + request := objectstorage.CreateBucketRequest{ + NamespaceName: &b.namespace, + RequestMetadata: b.requestMetadata, + } + request.CompartmentId = &compartmentId + request.Name = &b.name + request.Metadata = make(map[string]string) + request.PublicAccessType = objectstorage.CreateBucketDetailsPublicAccessTypeNopublicaccess + _, err = b.client.CreateBucket(ctx, request) + return err +} + +// deleteBucket deletes bucket. +func (b *Bucket) deleteBucket(ctx context.Context) (err error) { + request := objectstorage.DeleteBucketRequest{ + NamespaceName: &b.namespace, + BucketName: &b.name, + RequestMetadata: b.requestMetadata, + } + _, err = b.client.DeleteBucket(ctx, request) + return err +} + +// NewBucket returns a new Bucket using the provided oci config values. +func NewBucket(logger log.Logger, ociConfig []byte) (*Bucket, error) { + level.Debug(logger).Log("msg", "creating new oci bucket connection") + var config = DefaultConfig + var configurationProvider common.ConfigurationProvider + var err error + + if err := yaml.Unmarshal(ociConfig, &config); err != nil { + return nil, errors.Wrapf(err, "unable to unmarshal the given oci configurations") + } + + provider := Provider(strings.ToLower(config.Provider)) + level.Info(logger).Log("msg", "creating OCI client", "provider", provider) + switch provider { + case DefaultConfigProvider: + configurationProvider = common.DefaultConfigProvider() + case InstancePrincipalConfigProvider: + configurationProvider, err = auth.InstancePrincipalConfigurationProvider() + if err != nil { + return nil, errors.Wrapf(err, "unable to create OCI instance principal config provider") + } + case RawConfigProvider: + if err := config.validateConfig(); err != nil { + return nil, errors.Wrapf(err, "invalid oci configurations") + } + configurationProvider = common.NewRawConfigurationProvider(config.Tenancy, config.User, config.Region, + config.Fingerprint, config.PrivateKey, &config.Passphrase) + default: + return nil, errors.Wrapf(err, fmt.Sprintf("unsupported OCI provider: %s", provider)) + } + + client, err := objectstorage.NewObjectStorageClientWithConfigurationProvider(configurationProvider) + if err != nil { + return nil, errors.Wrapf(err, "unable to create ObjectStorage client with the given oci configurations") + } + + httpClient := http.Client{ + Transport: DefaultTransport(config), + Timeout: config.HTTPConfig.ClientTimeout, + } + client.HTTPClient = &httpClient + + requestMetadata := getRequestMetadata(config.MaxRequestRetries, config.RequestRetryInterval) + + level.Info(logger).Log("msg", "getting namespace, it might take some time") + namespace, err := getNamespace(client, requestMetadata) + if err != nil { + return nil, err + } + level.Debug(logger).Log("msg", fmt.Sprintf("OCI tenancy namespace: %s", *namespace)) + + bkt := Bucket{ + logger: logger, + name: config.Bucket, + namespace: *namespace, + client: &client, + partSize: config.PartSize, + requestMetadata: requestMetadata, + } + + return &bkt, nil +} + +// NewTestBucket creates test bkt client that before returning creates temporary bucket. +// In a close function it empties and deletes the bucket. +func NewTestBucket(t testing.TB) (objstore.Bucket, func(), error) { + config, err := getConfigFromEnv() + if err != nil { + return nil, nil, err + } + + ociConfig, err := yaml.Marshal(config) + if err != nil { + return nil, nil, err + } + + bkt, err := NewBucket(log.NewNopLogger(), ociConfig) + if err != nil { + return nil, nil, err + } + + ctx := context.Background() + bkt.name = objstore.CreateTemporaryTestBucketName(t) + if err := bkt.createBucket(ctx, config.Compartment); err != nil { + t.Errorf("failed to create temporary OCI bucket '%s' for OCI tests", bkt.name) + return nil, nil, err + } + + t.Logf("created temporary OCI bucket '%s' for OCI tests", bkt.name) + return bkt, func() { + objstore.EmptyBucket(t, ctx, bkt) + if err := bkt.deleteBucket(ctx); err != nil { + t.Logf("failed to delete temporary OCI bucket %s for OCI tests: %s", bkt.name, err) + } + t.Logf("deleted temporary OCI bucket '%s' for OCI tests", bkt.name) + }, nil +} diff --git a/scripts/cfggen/main.go b/scripts/cfggen/main.go index 9571f159f1a..9f6bf605fd7 100644 --- a/scripts/cfggen/main.go +++ b/scripts/cfggen/main.go @@ -29,6 +29,7 @@ import ( "github.com/thanos-io/thanos/pkg/objstore/cos" "github.com/thanos-io/thanos/pkg/objstore/filesystem" "github.com/thanos-io/thanos/pkg/objstore/gcs" + "github.com/thanos-io/thanos/pkg/objstore/oci" "github.com/thanos-io/thanos/pkg/objstore/oss" "github.com/thanos-io/thanos/pkg/objstore/s3" "github.com/thanos-io/thanos/pkg/objstore/swift" @@ -54,6 +55,7 @@ var ( client.ALIYUNOSS: oss.Config{}, client.FILESYSTEM: filesystem.Config{}, client.BOS: bos.Config{}, + client.OCI: oci.Config{}, } tracingConfigs = map[trclient.TracingProvider]interface{}{