From a7f60245e28aead17da184a6daa4204fc937c3cf Mon Sep 17 00:00:00 2001 From: Harshavardhana Date: Fri, 1 Jun 2018 17:03:17 -0700 Subject: [PATCH] Allow usage check to be configurable --- cmd/common-main.go | 9 +++ cmd/config-current.go | 26 ++++++- cmd/config-migrate.go | 122 +++++++++++++++++++++++++++++++++ cmd/config-versions.go | 27 ++++++++ cmd/disk-usage.go | 49 ++++++++++++- cmd/fs-v1.go | 32 +++++++-- cmd/fs-v1_test.go | 31 +++------ cmd/globals.go | 8 +++ cmd/posix.go | 35 +++++++--- cmd/ui-errors.go | 8 +++ docs/config/README.md | 12 ++++ docs/config/config.sample.json | 7 +- 12 files changed, 320 insertions(+), 46 deletions(-) diff --git a/cmd/common-main.go b/cmd/common-main.go index 9c026db2caddb3..579c5dc762e64f 100644 --- a/cmd/common-main.go +++ b/cmd/common-main.go @@ -153,6 +153,15 @@ func handleCommonEnvVars() { globalCacheExpiry = expiry } + if intervalStr := os.Getenv("MINIO_USAGE_CHECK_INTERVAL"); intervalStr != "" { + interval, err := parseDuration(intervalStr) + if err != nil { + logger.Fatal(uiErrInvalidUsageCheckIntervalValue(err), "Unable to parse MINIO_USAGE_CHECK_INTERVAL value (`%s`)", intervalStr) + } + globalUsageCheckInterval = interval + globalIsEnvUsageCheck = true + } + // In place update is true by default if the MINIO_UPDATE is not set // or is not set to 'off', if MINIO_UPDATE is set to 'off' then // in-place update is off. diff --git a/cmd/config-current.go b/cmd/config-current.go index 6a4d378d2c6ed1..649157a8ab1820 100644 --- a/cmd/config-current.go +++ b/cmd/config-current.go @@ -21,6 +21,7 @@ import ( "fmt" "reflect" "sync" + "time" "github.com/minio/minio/cmd/logger" @@ -39,9 +40,9 @@ import ( // 6. Make changes in config-current_test.go for any test change // Config version -const serverConfigVersion = "23" +const serverConfigVersion = "24" -type serverConfig = serverConfigV23 +type serverConfig = serverConfigV24 var ( // globalServerConfig server config. @@ -104,6 +105,17 @@ func (s *serverConfig) GetBrowser() bool { return bool(s.Browser) } +// Set new usage configuration, currently only supports configuring +// usage check interval. +func (s *serverConfig) SetUsageConfig(checkUsageInterval time.Duration) { + s.Usage = usageConfig{checkUsageInterval} +} + +// Get current usage configuration. +func (s *serverConfig) GetUsageConfig() usageConfig { + return s.Usage +} + // SetCacheConfig sets the current cache config func (s *serverConfig) SetCacheConfig(drives, exclude []string, expiry int) { s.Cache.Drives = drives @@ -141,6 +153,8 @@ func (s *serverConfig) ConfigDiff(t *serverConfig) string { return "StorageClass configuration differs" case !reflect.DeepEqual(s.Cache, t.Cache): return "Cache configuration differs" + case !reflect.DeepEqual(s.Usage, t.Usage): + return "Usage configuration differs" case !reflect.DeepEqual(s.Notify.AMQP, t.Notify.AMQP): return "AMQP Notification configuration differs" case !reflect.DeepEqual(s.Notify.NATS, t.Notify.NATS): @@ -186,6 +200,7 @@ func newServerConfig() *serverConfig { Exclude: []string{}, Expiry: globalCacheExpiry, }, + Usage: usageConfig{globalDefaultUsageCheckInterval}, Notify: notifier{}, } @@ -246,6 +261,10 @@ func newConfig() error { srvCfg.SetCacheConfig(globalCacheDrives, globalCacheExcludes, globalCacheExpiry) } + if globalIsEnvUsageCheck { + srvCfg.SetUsageConfig(globalUsageCheckInterval) + } + // hold the mutex lock before a new config is assigned. // Save the new config globally. // unlock the mutex. @@ -339,6 +358,9 @@ func loadConfig() error { globalCacheExcludes = cacheConf.Exclude globalCacheExpiry = cacheConf.Expiry } + if !globalIsEnvUsageCheck { + globalUsageCheckInterval = globalServerConfig.GetUsageConfig().UsageCheckInterval + } globalServerConfigMu.Unlock() return nil diff --git a/cmd/config-migrate.go b/cmd/config-migrate.go index dd428dd548527e..861058d72bfe1e 100644 --- a/cmd/config-migrate.go +++ b/cmd/config-migrate.go @@ -172,6 +172,11 @@ func migrateConfig() error { return err } fallthrough + case "23": + if err = migrateV23ToV24(); err != nil { + return err + } + fallthrough case serverConfigVersion: // No migration needed. this always points to current version. err = nil @@ -1951,3 +1956,120 @@ func migrateV22ToV23() error { logger.Info(configMigrateMSGTemplate, configFile, cv22.Version, srvConfig.Version) return nil } + +func migrateV23ToV24() error { + configFile := getConfigFile() + + cv23 := &serverConfigV23{} + _, err := quick.Load(configFile, cv23) + if os.IsNotExist(err) { + return nil + } else if err != nil { + return fmt.Errorf("Unable to load config version ‘23’. %v", err) + } + if cv23.Version != "23" { + return nil + } + + // Copy over fields from V23 into V24 config struct + srvConfig := &serverConfigV24{ + Notify: notifier{}, + } + srvConfig.Version = "24" + srvConfig.Credential = cv23.Credential + srvConfig.Region = cv23.Region + if srvConfig.Region == "" { + // Region needs to be set for AWS Signature Version 4. + srvConfig.Region = globalMinioDefaultRegion + } + + if len(cv23.Notify.AMQP) == 0 { + srvConfig.Notify.AMQP = make(map[string]target.AMQPArgs) + srvConfig.Notify.AMQP["1"] = target.AMQPArgs{} + } else { + srvConfig.Notify.AMQP = cv23.Notify.AMQP + } + if len(cv23.Notify.Elasticsearch) == 0 { + srvConfig.Notify.Elasticsearch = make(map[string]target.ElasticsearchArgs) + srvConfig.Notify.Elasticsearch["1"] = target.ElasticsearchArgs{ + Format: event.NamespaceFormat, + } + } else { + srvConfig.Notify.Elasticsearch = cv23.Notify.Elasticsearch + } + if len(cv23.Notify.Redis) == 0 { + srvConfig.Notify.Redis = make(map[string]target.RedisArgs) + srvConfig.Notify.Redis["1"] = target.RedisArgs{ + Format: event.NamespaceFormat, + } + } else { + srvConfig.Notify.Redis = cv23.Notify.Redis + } + if len(cv23.Notify.PostgreSQL) == 0 { + srvConfig.Notify.PostgreSQL = make(map[string]target.PostgreSQLArgs) + srvConfig.Notify.PostgreSQL["1"] = target.PostgreSQLArgs{ + Format: event.NamespaceFormat, + } + } else { + srvConfig.Notify.PostgreSQL = cv23.Notify.PostgreSQL + } + if len(cv23.Notify.Kafka) == 0 { + srvConfig.Notify.Kafka = make(map[string]target.KafkaArgs) + srvConfig.Notify.Kafka["1"] = target.KafkaArgs{} + } else { + srvConfig.Notify.Kafka = cv23.Notify.Kafka + } + if len(cv23.Notify.NATS) == 0 { + srvConfig.Notify.NATS = make(map[string]target.NATSArgs) + srvConfig.Notify.NATS["1"] = target.NATSArgs{} + } else { + srvConfig.Notify.NATS = cv23.Notify.NATS + } + if len(cv23.Notify.Webhook) == 0 { + srvConfig.Notify.Webhook = make(map[string]target.WebhookArgs) + srvConfig.Notify.Webhook["1"] = target.WebhookArgs{} + } else { + srvConfig.Notify.Webhook = cv23.Notify.Webhook + } + if len(cv23.Notify.MySQL) == 0 { + srvConfig.Notify.MySQL = make(map[string]target.MySQLArgs) + srvConfig.Notify.MySQL["1"] = target.MySQLArgs{ + Format: event.NamespaceFormat, + } + } else { + srvConfig.Notify.MySQL = cv23.Notify.MySQL + } + + if len(cv23.Notify.MQTT) == 0 { + srvConfig.Notify.MQTT = make(map[string]target.MQTTArgs) + srvConfig.Notify.MQTT["1"] = target.MQTTArgs{} + } else { + srvConfig.Notify.MQTT = cv23.Notify.MQTT + } + + // Load browser config from existing config in the file. + srvConfig.Browser = cv23.Browser + + // Load domain config from existing config in the file. + srvConfig.Domain = cv23.Domain + + // Load storage class config from existing storage class config in the file. + srvConfig.StorageClass.RRS = cv23.StorageClass.RRS + srvConfig.StorageClass.Standard = cv23.StorageClass.Standard + + // Load cache config from existing cache config in the file. + srvConfig.Cache.Drives = cv23.Cache.Drives + srvConfig.Cache.Exclude = cv23.Cache.Exclude + srvConfig.Cache.Expiry = cv23.Cache.Expiry + + // Init usage config. For future migration, usage config needs + // to be copied over from previous version. + srvConfig.Usage = usageConfig{globalDefaultUsageCheckInterval} + + if err = quick.Save(configFile, srvConfig); err != nil { + return fmt.Errorf("Failed to migrate config from ‘%s’ to ‘%s’. %v", cv23.Version, srvConfig.Version, err) + } + + logger.Info(configMigrateMSGTemplate, configFile, cv23.Version, srvConfig.Version) + return nil +} diff --git a/cmd/config-versions.go b/cmd/config-versions.go index 82ec0e58ca8847..6631f678a45cc5 100644 --- a/cmd/config-versions.go +++ b/cmd/config-versions.go @@ -602,3 +602,30 @@ type serverConfigV23 struct { // Notification queue configuration. Notify notifier `json:"notify"` } + +// serverConfigV24 is just like version '23' with addition of usage interval +// field.n +// +// IMPORTANT NOTE: When updating this struct make sure that +// serverConfig.ConfigDiff() is updated as necessary. +type serverConfigV24 struct { + Version string `json:"version"` + + // S3 API configuration. + Credential auth.Credentials `json:"credential"` + Region string `json:"region"` + Browser BrowserFlag `json:"browser"` + Domain string `json:"domain"` + + // Storage class configuration + StorageClass storageClassConfig `json:"storageclass"` + + // Cache configuration + Cache CacheConfig `json:"cache"` + + // Usage configuration + Usage usageConfig `json:"usage"` + + // Notification queue configuration. + Notify notifier `json:"notify"` +} diff --git a/cmd/disk-usage.go b/cmd/disk-usage.go index 456d1e41f241b3..4ef6e94f212877 100644 --- a/cmd/disk-usage.go +++ b/cmd/disk-usage.go @@ -18,12 +18,55 @@ package cmd import ( "context" + "encoding/json" + "fmt" "time" ) -const ( - usageCheckInterval = 12 * time.Hour // 12 hours -) +// Captures configurable parameters of usage check. +type usageConfig struct { + UsageCheckInterval time.Duration +} + +// MarshalJSON - encodes to JSON data. +func (u usageConfig) MarshalJSON() ([]byte, error) { + type _usageConfig struct { + UsageCheckInterval string `json:"interval"` + } + return json.Marshal(_usageConfig{u.UsageCheckInterval.String()}) +} + +// parseDuration - parse duration string +func parseDuration(dStr string) (time.Duration, error) { + d, err := time.ParseDuration(dStr) + if err != nil { + return d, err + } + if d < globalMinimumUsageCheckInterval { + return d, fmt.Errorf("interval %s is not allowed, minimum required value is %s", + d, globalMinimumUsageCheckInterval) + } + return d, nil +} + +// UnmarshalJSON - decodes JSON data. +func (u *usageConfig) UnmarshalJSON(data []byte) error { + type _usageConfig struct { + UsageCheckInterval string `json:"interval"` + } + var u1 = _usageConfig{} + if err := json.Unmarshal(data, &u1); err != nil { + return err + } + if !globalIsEnvUsageCheck { + d, err := parseDuration(u1.UsageCheckInterval) + if err != nil { + return err + } + u.UsageCheckInterval = d + } + return nil +} // getDiskUsage walks the file tree rooted at root, calling usageFn // for each file or directory in the tree, including root. diff --git a/cmd/fs-v1.go b/cmd/fs-v1.go index 81cb9feabdff3d..2e0b796f6afe0d 100644 --- a/cmd/fs-v1.go +++ b/cmd/fs-v1.go @@ -66,8 +66,10 @@ type FSObjects struct { nsMutex *nsLockMap // Disk usage metrics - totalUsed uint64 - usageCheckInterval time.Duration + totalUsed uint64 + + // Disk usage running routine + usageRunning int32 } // Represents the background append file. @@ -134,10 +136,9 @@ func NewFSObjectLayer(fsPath string) (ObjectLayer, error) { rwPool: &fsIOPool{ readersMap: make(map[string]*lock.RLockedFile), }, - nsMutex: newNSLock(false), - listPool: newTreeWalkPool(globalLookupTimeout), - appendFileMap: make(map[string]*fsAppendFile), - usageCheckInterval: usageCheckInterval, + nsMutex: newNSLock(false), + listPool: newTreeWalkPool(globalLookupTimeout), + appendFileMap: make(map[string]*fsAppendFile), } // Once the filesystem has initialized hold the read lock for @@ -173,7 +174,7 @@ func (fs *FSObjects) Shutdown(ctx context.Context) error { // diskUsage returns du information for the posix path, in a continuous routine. func (fs *FSObjects) diskUsage(doneCh chan struct{}) { - ticker := time.NewTicker(fs.usageCheckInterval) + ticker := time.NewTicker(globalUsageCheckInterval) defer ticker.Stop() usageFn := func(ctx context.Context, entry string) error { @@ -191,6 +192,13 @@ func (fs *FSObjects) diskUsage(doneCh chan struct{}) { return nil } + // Check if disk usage routine is running, if yes then return. + if atomic.LoadInt32(&fs.usageRunning) == 1 { + return + } + atomic.StoreInt32(&fs.usageRunning, 1) + defer atomic.StoreInt32(&fs.usageRunning, 0) + if err := getDiskUsage(context.Background(), fs.fsPath, usageFn); err != nil { return } @@ -200,6 +208,12 @@ func (fs *FSObjects) diskUsage(doneCh chan struct{}) { case <-doneCh: return case <-ticker.C: + // Check if disk usage routine is running, if yes let it finish. + if atomic.LoadInt32(&fs.usageRunning) == 1 { + continue + } + atomic.StoreInt32(&fs.usageRunning, 1) + var usage uint64 usageFn = func(ctx context.Context, entry string) error { var fi os.FileInfo @@ -215,9 +229,13 @@ func (fs *FSObjects) diskUsage(doneCh chan struct{}) { usage = usage + uint64(fi.Size()) return nil } + if err := getDiskUsage(context.Background(), fs.fsPath, usageFn); err != nil { + atomic.StoreInt32(&fs.usageRunning, 0) continue } + + atomic.StoreInt32(&fs.usageRunning, 0) atomic.StoreUint64(&fs.totalUsed, usage) } } diff --git a/cmd/fs-v1_test.go b/cmd/fs-v1_test.go index b9f84519fe9a62..c9288359fcbdc9 100644 --- a/cmd/fs-v1_test.go +++ b/cmd/fs-v1_test.go @@ -19,7 +19,6 @@ package cmd import ( "bytes" "context" - "fmt" "os" "path/filepath" "testing" @@ -191,14 +190,13 @@ func TestFSGetBucketInfo(t *testing.T) { t.Fatal("BucketNotFound error not returned") } - globalServiceDoneCh <- struct{}{} - // Check for buckets and should get disk not found. - fs.fsPath = filepath.Join(globalTestTmpDir, "minio-"+nextSuffix()) + os.RemoveAll(disk) - _, err = fs.GetBucketInfo(context.Background(), bucketName) - if !isSameType(err, BucketNotFound{}) { - t.Fatal("BucketNotFound error not returned") + if _, err = fs.GetBucketInfo(context.Background(), bucketName); err != nil { + if !isSameType(err, BucketNotFound{}) { + t.Fatal("BucketNotFound error not returned") + } } } @@ -303,10 +301,8 @@ func TestFSDeleteObject(t *testing.T) { t.Fatal("Unexpected error: ", err) } - globalServiceDoneCh <- struct{}{} - // Delete object should err disk not found. - fs.fsPath = filepath.Join(globalTestTmpDir, "minio-"+nextSuffix()) + os.RemoveAll(disk) if err := fs.DeleteObject(context.Background(), bucketName, objectName); err != nil { if !isSameType(err, BucketNotFound{}) { t.Fatal("Unexpected error: ", err) @@ -346,10 +342,8 @@ func TestFSDeleteBucket(t *testing.T) { obj.MakeBucketWithLocation(context.Background(), bucketName, "") - globalServiceDoneCh <- struct{}{} - // Delete bucket should get error disk not found. - fs.fsPath = filepath.Join(globalTestTmpDir, "minio-"+nextSuffix()) + os.RemoveAll(disk) if err = fs.DeleteBucket(context.Background(), bucketName); err != nil { if !isSameType(err, BucketNotFound{}) { t.Fatal("Unexpected error: ", err) @@ -393,21 +387,12 @@ func TestFSListBuckets(t *testing.T) { } // Test ListBuckets with disk not found. - fs.fsPath = filepath.Join(globalTestTmpDir, "minio-"+nextSuffix()) - + os.RemoveAll(disk) if _, err := fs.ListBuckets(context.Background()); err != nil { if err != errDiskNotFound { t.Fatal("Unexpected error: ", err) } } - - longPath := fmt.Sprintf("%0256d", 1) - fs.fsPath = longPath - if _, err := fs.ListBuckets(context.Background()); err != nil { - if err != errFileNameTooLong { - t.Fatal("Unexpected error: ", err) - } - } } // TestFSHealObject - tests for fs HealObject diff --git a/cmd/globals.go b/cmd/globals.go index c1d1512734ab67..c4005d0711363f 100644 --- a/cmd/globals.go +++ b/cmd/globals.go @@ -188,6 +188,14 @@ var ( globalCacheExpiry = 90 // Add new variable global values here. + // Minimum required usage check interval value. + globalMinimumUsageCheckInterval = 2 * time.Hour // 2 hours + // Default usage check interval value. + globalDefaultUsageCheckInterval = 12 * time.Hour // 12 hours + // Usage check interval value. + globalUsageCheckInterval = globalDefaultUsageCheckInterval + // Is env usage check interval set. + globalIsEnvUsageCheck bool ) // global colors. diff --git a/cmd/posix.go b/cmd/posix.go index ec06c31be95c25..502b6e64ce474b 100644 --- a/cmd/posix.go +++ b/cmd/posix.go @@ -50,9 +50,9 @@ type posix struct { connected bool // Disk usage metrics - stopUsageCh chan struct{} - totalUsage uint64 - usageCheckInterval time.Duration + stopUsageCh chan struct{} + totalUsage uint64 + usageRunning int32 } // checkPathLength - returns error if given path name length more than 255 @@ -164,13 +164,12 @@ func newPosix(path string) (StorageAPI, error) { return &b }, }, - stopUsageCh: make(chan struct{}), - usageCheckInterval: usageCheckInterval, + stopUsageCh: make(chan struct{}), } st.connected = true - go st.diskUsage() + go st.diskUsage(globalServiceDoneCh) // Success. return st, nil @@ -315,8 +314,8 @@ func (s *posix) checkDiskFound() (err error) { } // diskUsage returns du information for the posix path, in a continuous routine. -func (s *posix) diskUsage() { - ticker := time.NewTicker(s.usageCheckInterval) +func (s *posix) diskUsage(doneCh chan struct{}) { + ticker := time.NewTicker(globalUsageCheckInterval) defer ticker.Stop() usageFn := func(ctx context.Context, entry string) error { @@ -333,6 +332,13 @@ func (s *posix) diskUsage() { } } + // Check if disk usage routine is running, if yes then return. + if atomic.LoadInt32(&s.usageRunning) == 1 { + return + } + atomic.StoreInt32(&s.usageRunning, 1) + defer atomic.StoreInt32(&s.usageRunning, 0) + if err := getDiskUsage(context.Background(), s.diskPath, usageFn); err != nil { return } @@ -341,9 +347,16 @@ func (s *posix) diskUsage() { select { case <-s.stopUsageCh: return - case <-globalServiceDoneCh: + case <-doneCh: return case <-ticker.C: + // Check if disk usage routine is running, if yes let it + // finish, before starting a new one. + if atomic.LoadInt32(&s.usageRunning) == 1 { + continue + } + atomic.StoreInt32(&s.usageRunning, 1) + var usage uint64 usageFn = func(ctx context.Context, entry string) error { select { @@ -358,9 +371,13 @@ func (s *posix) diskUsage() { return nil } } + if err := getDiskUsage(context.Background(), s.diskPath, usageFn); err != nil { + atomic.StoreInt32(&s.usageRunning, 0) continue } + + atomic.StoreInt32(&s.usageRunning, 0) atomic.StoreUint64(&s.totalUsage, usage) } } diff --git a/cmd/ui-errors.go b/cmd/ui-errors.go index 194e5998e3c8ae..e043a5f4a964da 100644 --- a/cmd/ui-errors.go +++ b/cmd/ui-errors.go @@ -47,6 +47,14 @@ var ( "MINIO_CACHE_EXPIRY: Valid cache expiry duration is in days.", ) + uiErrInvalidUsageCheckIntervalValue = newUIErrFn( + "Invalid usage check interval value", + "Please check the passed value", + `MINIO_USAGE_CHECK_INTERVAL: Valid usage check interval duration string is a signed sequence of decimal numbers, +each with optional fraction and a unit suffix, such as "2h45m". +Valid time units are "ns", "us", "ms", "s", "m", "h".`, + ) + uiErrInvalidCredentials = newUIErrFn( "Invalid credentials", "Please provide correct credentials", diff --git a/docs/config/README.md b/docs/config/README.md index fd239236779d6a..b54b153bad3d86 100644 --- a/docs/config/README.md +++ b/docs/config/README.md @@ -99,6 +99,18 @@ By default, parity for objects with standard storage class is set to `N/2`, and |``exclude`` | _[]string_ | List of wildcard patterns for prefixes to exclude from cache | |``expiry`` | _int_ | Days to cache expiry | +### Usage +|Field|Type|Description| +|:---|:---|:---| +|``interval``| _string_ | Valid duration string is a signed sequence of decimal numbers, each with optional fraction and a unit suffix, such as "300ms", "-1.5h" or "2h45m". Valid time units are "ns", "us", "ms", "s", "m", "h".| + +Example: Run usage check every 4 hours 3 minutes 10 seconds. + +```sh +export MINIO_USAGE_CHECK_INTERVAL="4h3m10s" +minio server /data +``` + #### Notify |Field|Type|Description| |:---|:---|:---| diff --git a/docs/config/config.sample.json b/docs/config/config.sample.json index ea89c3c7459fda..3b0c9691158564 100644 --- a/docs/config/config.sample.json +++ b/docs/config/config.sample.json @@ -1,5 +1,5 @@ { - "version": "22", + "version": "24", "credential": { "accessKey": "USWUXHGYZQYFYFFIT3RE", "secretKey": "MOJRH0mkL1IPauahWITSVvyDrQbEEIwljvmxdq03" @@ -16,6 +16,9 @@ "expiry": 90, "exclude": [] }, + "usage": { + "interval": "3h" + } "notify": { "amqp": { "1": { @@ -120,4 +123,4 @@ } } } -} \ No newline at end of file +}