Skip to content

Commit

Permalink
Add notifications by webhook.
Browse files Browse the repository at this point in the history
Add a new config entry moving to version 13.
```
		"webhook": {
			"1": {
				"enable": true,
				"address": "http://requestb.in/1i9al7m1"
			}
		}
```
  • Loading branch information
alexellis authored and harshavardhana committed Jan 12, 2017
1 parent f247538 commit d6a327f
Show file tree
Hide file tree
Showing 17 changed files with 623 additions and 43 deletions.
2 changes: 1 addition & 1 deletion .travis.yml
Expand Up @@ -21,4 +21,4 @@ after_success:
- bash <(curl -s https://codecov.io/bash)

go:
- 1.7.3
- 1.7.4
3 changes: 1 addition & 2 deletions cmd/api-headers.go
Expand Up @@ -21,7 +21,6 @@ import (
"encoding/xml"
"fmt"
"net/http"
"runtime"
"strconv"
"time"
)
Expand All @@ -36,7 +35,7 @@ func mustGetRequestID(t time.Time) string {
func setCommonHeaders(w http.ResponseWriter) {
// Set unique request ID for each reply.
w.Header().Set(responseRequestIDKey, mustGetRequestID(time.Now().UTC()))
w.Header().Set("Server", ("Minio/" + ReleaseTag + " (" + runtime.GOOS + "; " + runtime.GOARCH + ")"))
w.Header().Set("Server", globalServerUserAgent)
w.Header().Set("Accept-Ranges", "bytes")
}

Expand Down
7 changes: 7 additions & 0 deletions cmd/bucket-notification-utils.go
Expand Up @@ -131,6 +131,7 @@ func isValidQueueID(queueARN string) bool {
// Unmarshals QueueARN into structured object.
sqsARN := unmarshalSqsARN(queueARN)
// Is Queue identifier valid?.

if isAMQPQueue(sqsARN) { // AMQP eueue.
amqpN := serverConfig.GetAMQPNotifyByID(sqsARN.AccountID)
return amqpN.Enable && amqpN.URL != ""
Expand All @@ -151,6 +152,9 @@ func isValidQueueID(queueARN string) bool {
kafkaN := serverConfig.GetKafkaNotifyByID(sqsARN.AccountID)
return (kafkaN.Enable && len(kafkaN.Brokers) > 0 &&
kafkaN.Topic != "")
} else if isWebhookQueue(sqsARN) {
webhookN := serverConfig.GetWebhookNotifyByID(sqsARN.AccountID)
return webhookN.Enable && webhookN.Endpoint != ""
}
return false
}
Expand Down Expand Up @@ -241,6 +245,7 @@ func validateNotificationConfig(nConfig notificationConfig) APIErrorCode {
// - redis
// - postgresql
// - kafka
// - webhook
func unmarshalSqsARN(queueARN string) (mSqs arnSQS) {
mSqs = arnSQS{}
if !strings.HasPrefix(queueARN, minioSqs+serverConfig.GetRegion()+":") {
Expand All @@ -260,6 +265,8 @@ func unmarshalSqsARN(queueARN string) (mSqs arnSQS) {
mSqs.Type = queueTypePostgreSQL
case strings.HasSuffix(sqsType, queueTypeKafka):
mSqs.Type = queueTypeKafka
case strings.HasSuffix(sqsType, queueTypeWebhook):
mSqs.Type = queueTypeWebhook
} // Add more queues here.
mSqs.AccountID = strings.TrimSuffix(sqsType, ":"+mSqs.Type)
return mSqs
Expand Down
11 changes: 11 additions & 0 deletions cmd/bucket-notification-utils_test.go
Expand Up @@ -228,6 +228,12 @@ func TestQueueARN(t *testing.T) {
queueARN string
errCode APIErrorCode
}{

// Valid webhook queue arn.
{
queueARN: "arn:minio:sqs:us-east-1:1:webhook",
errCode: ErrNone,
},
// Valid redis queue arn.
{
queueARN: "arn:minio:sqs:us-east-1:1:redis",
Expand Down Expand Up @@ -306,6 +312,11 @@ func TestUnmarshalSQSARN(t *testing.T) {
queueARN string
Type string
}{
// Valid webhook queue arn.
{
queueARN: "arn:minio:sqs:us-east-1:1:webhook",
Type: "webhook",
},
// Valid redis queue arn.
{
queueARN: "arn:minio:sqs:us-east-1:1:redis",
Expand Down
98 changes: 98 additions & 0 deletions cmd/config-migrate.go
Expand Up @@ -70,6 +70,10 @@ func migrateConfig() error {
if err := migrateV11ToV12(); err != nil {
return err
}
// Migration version '12' to '13'.
if err := migrateV12ToV13(); err != nil {
return err
}

return nil
}
Expand Down Expand Up @@ -836,3 +840,97 @@ func migrateV11ToV12() error {
)
return nil
}

// Version '12' to '13' migration. Add support for custom webhook endpoint.
func migrateV12ToV13() error {
cv12, err := loadConfigV12()
if err != nil {
if os.IsNotExist(err) {
return nil
}
return fmt.Errorf("Unable to load config version ‘12’. %v", err)
}
if cv12.Version != "12" {
return nil
}

// Copy over fields from V12 into V13 config struct
srvConfig := &serverConfigV13{}
srvConfig.Version = "13"
srvConfig.Credential = cv12.Credential
srvConfig.Region = cv12.Region
if srvConfig.Region == "" {
// Region needs to be set for AWS Signature Version 4.
srvConfig.Region = "us-east-1"
}
srvConfig.Logger.Console = cv12.Logger.Console
srvConfig.Logger.File = cv12.Logger.File

// check and set notifiers config
if len(cv12.Notify.AMQP) == 0 {
srvConfig.Notify.AMQP = make(map[string]amqpNotify)
srvConfig.Notify.AMQP["1"] = amqpNotify{}
} else {
srvConfig.Notify.AMQP = cv12.Notify.AMQP
}
if len(cv12.Notify.ElasticSearch) == 0 {
srvConfig.Notify.ElasticSearch = make(map[string]elasticSearchNotify)
srvConfig.Notify.ElasticSearch["1"] = elasticSearchNotify{}
} else {
srvConfig.Notify.ElasticSearch = cv12.Notify.ElasticSearch
}
if len(cv12.Notify.Redis) == 0 {
srvConfig.Notify.Redis = make(map[string]redisNotify)
srvConfig.Notify.Redis["1"] = redisNotify{}
} else {
srvConfig.Notify.Redis = cv12.Notify.Redis
}
if len(cv12.Notify.PostgreSQL) == 0 {
srvConfig.Notify.PostgreSQL = make(map[string]postgreSQLNotify)
srvConfig.Notify.PostgreSQL["1"] = postgreSQLNotify{}
} else {
srvConfig.Notify.PostgreSQL = cv12.Notify.PostgreSQL
}
if len(cv12.Notify.Kafka) == 0 {
srvConfig.Notify.Kafka = make(map[string]kafkaNotify)
srvConfig.Notify.Kafka["1"] = kafkaNotify{}
} else {
srvConfig.Notify.Kafka = cv12.Notify.Kafka
}
if len(cv12.Notify.NATS) == 0 {
srvConfig.Notify.NATS = make(map[string]natsNotify)
srvConfig.Notify.NATS["1"] = natsNotify{}
} else {
srvConfig.Notify.NATS = cv12.Notify.NATS
}

// V12 will not have a webhook config. So we initialize one here.
srvConfig.Notify.Webhook = make(map[string]webhookNotify)
srvConfig.Notify.Webhook["1"] = webhookNotify{}

qc, err := quick.New(srvConfig)
if err != nil {
return fmt.Errorf("Unable to initialize the quick config. %v",
err)
}
configFile, err := getConfigFile()
if err != nil {
return fmt.Errorf("Unable to get config file. %v", err)
}

err = qc.Save(configFile)
if err != nil {
return fmt.Errorf(
"Failed to migrate config from ‘"+
cv12.Version+"’ to ‘"+srvConfig.Version+
"’ failed. %v", err,
)
}

console.Println(
"Migration from version ‘" +
cv12.Version + "’ to ‘" + srvConfig.Version +
"’ completed successfully.",
)
return nil
}
9 changes: 7 additions & 2 deletions cmd/config-migrate_test.go
Expand Up @@ -101,7 +101,10 @@ func TestServerConfigMigrateInexistentConfig(t *testing.T) {
t.Fatal("migrate v10 to v11 should succeed when no config file is found")
}
if err := migrateV11ToV12(); err != nil {
t.Fatal("migrate v10 to v11 should succeed when no config file is found")
t.Fatal("migrate v11 to v12 should succeed when no config file is found")
}
if err := migrateV12ToV13(); err != nil {
t.Fatal("migrate v12 to v13 should succeed when no config file is found")
}
}

Expand Down Expand Up @@ -212,5 +215,7 @@ func TestServerConfigMigrateFaultyConfig(t *testing.T) {
if err := migrateV11ToV12(); err == nil {
t.Fatal("migrateConfigV11ToV12() should fail with a corrupted json")
}

if err := migrateV12ToV13(); err == nil {
t.Fatal("migrateConfigV12ToV13() should fail with a corrupted json")
}
}
50 changes: 49 additions & 1 deletion cmd/config-old.go
Expand Up @@ -325,7 +325,8 @@ func loadConfigV6() (*configV6, error) {
return c, nil
}

// Notifier represents collection of supported notification queues.
// Notifier represents collection of supported notification queues in version
// 1 without NATS streaming.
type notifierV1 struct {
AMQP map[string]amqpNotify `json:"amqp"`
NATS map[string]natsNotifyV1 `json:"nats"`
Expand All @@ -335,6 +336,17 @@ type notifierV1 struct {
Kafka map[string]kafkaNotify `json:"kafka"`
}

// Notifier represents collection of supported notification queues in version 2
// with NATS streaming but without webhook.
type notifierV2 struct {
AMQP map[string]amqpNotify `json:"amqp"`
NATS map[string]natsNotify `json:"nats"`
ElasticSearch map[string]elasticSearchNotify `json:"elasticsearch"`
Redis map[string]redisNotify `json:"redis"`
PostgreSQL map[string]postgreSQLNotify `json:"postgresql"`
Kafka map[string]kafkaNotify `json:"kafka"`
}

// configV7 server configuration version '7'.
type serverConfigV7 struct {
Version string `json:"version"`
Expand Down Expand Up @@ -538,3 +550,39 @@ func loadConfigV11() (*serverConfigV11, error) {
}
return srvCfg, nil
}

// serverConfigV12 server configuration version '12' which is like
// version '11' except it adds support for NATS streaming notifications.
type serverConfigV12 struct {
Version string `json:"version"`

// S3 API configuration.
Credential credential `json:"credential"`
Region string `json:"region"`

// Additional error logging configuration.
Logger logger `json:"logger"`

// Notification queue configuration.
Notify notifierV2 `json:"notify"`
}

func loadConfigV12() (*serverConfigV12, error) {
configFile, err := getConfigFile()
if err != nil {
return nil, err
}
if _, err = os.Stat(configFile); err != nil {
return nil, err
}
srvCfg := &serverConfigV12{}
srvCfg.Version = "12"
qc, err := quick.New(srvCfg)
if err != nil {
return nil, err
}
if err := qc.Load(configFile); err != nil {
return nil, err
}
return srvCfg, nil
}

0 comments on commit d6a327f

Please sign in to comment.