diff --git a/modules/firehose/config.go b/modules/firehose/config.go index 0dc09ca3..5754db19 100644 --- a/modules/firehose/config.go +++ b/modules/firehose/config.go @@ -11,6 +11,8 @@ import ( "github.com/odpf/entropy/pkg/helm" ) +const firehoseConsumerIDStartingSequence = "0001" + var ( //go:embed schema/config.json completeConfigSchema string @@ -35,14 +37,23 @@ type moduleConfig struct { } `json:"firehose"` } -func (mc *moduleConfig) validate() error { +func (mc *moduleConfig) validateAndSanitize(r resource.Resource) error { if mc.StopTime != nil && mc.StopTime.Before(time.Now()) { return errors.ErrInvalid. WithMsgf("value for stop_time must be greater than current time") } + + if mc.Firehose.KafkaConsumerID == "" { + mc.Firehose.KafkaConsumerID = fmt.Sprintf("%s-%s", generateFirehoseName(r), firehoseConsumerIDStartingSequence) + } + return nil } +func generateFirehoseName(r resource.Resource) string { + return fmt.Sprintf("%s-%s-firehose", r.Project, r.Name) +} + func (mc moduleConfig) GetHelmReleaseConfig(r resource.Resource) (*helm.ReleaseConfig, error) { var output Output err := json.Unmarshal(r.State.Output, &output) @@ -52,7 +63,7 @@ func (mc moduleConfig) GetHelmReleaseConfig(r resource.Resource) (*helm.ReleaseC defaults := output.Defaults rc := helm.DefaultReleaseConfig() - rc.Name = fmt.Sprintf("%s-%s-firehose", r.Project, r.Name) + rc.Name = generateFirehoseName(r) rc.Repository = defaults.ChartRepository rc.Chart = defaults.ChartName rc.Namespace = defaults.Namespace diff --git a/modules/firehose/plan.go b/modules/firehose/plan.go index 49fc74a3..66707f13 100644 --- a/modules/firehose/plan.go +++ b/modules/firehose/plan.go @@ -28,7 +28,7 @@ func (m *firehoseModule) planCreate(res module.ExpandedResource, act module.Acti if err := json.Unmarshal(act.Params, &reqConf); err != nil { return nil, errors.ErrInvalid.WithMsgf("invalid config json: %v", err) } - if err := reqConf.validate(); err != nil { + if err := reqConf.validateAndSanitize(res.Resource); err != nil { return nil, err } @@ -68,7 +68,7 @@ func (m *firehoseModule) planChange(res module.ExpandedResource, act module.Acti if err := json.Unmarshal(act.Params, &reqConf); err != nil { return nil, errors.ErrInvalid.WithMsgf("invalid config json: %v", err) } - if err := reqConf.validate(); err != nil { + if err := reqConf.validateAndSanitize(r); err != nil { return nil, err } conf = reqConf