Skip to content

Commit

Permalink
Add stream subject transform
Browse files Browse the repository at this point in the history
Signed-off-by: Tomasz Pietrek <tomasz@nats.io>
  • Loading branch information
Jarema committed Sep 22, 2023
1 parent 92ccb76 commit c8510d6
Show file tree
Hide file tree
Showing 5 changed files with 110 additions and 52 deletions.
59 changes: 39 additions & 20 deletions controllers/jetstream/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -405,6 +405,16 @@ func createStream(ctx context.Context, c jsmClient, spec apis.StreamSpec) (err e
}))
}

if spec.SubjectTransform != nil {
opts = append(opts, func(o *api.StreamConfig) error {
o.SubjectTransform = &jsmapi.SubjectTransformConfig{
Source: spec.SubjectTransform.Source,
Destination: spec.SubjectTransform.Dest,
}
return nil
})
}

if spec.AllowDirect {
opts = append(opts, jsm.AllowDirect())
}
Expand Down Expand Up @@ -454,27 +464,36 @@ func updateStream(ctx context.Context, c jsmClient, spec apis.StreamSpec) (err e
return err
}

var subjectTransform *jsmapi.SubjectTransformConfig
if spec.SubjectTransform != nil {
subjectTransform = &jsmapi.SubjectTransformConfig{
Source: spec.SubjectTransform.Source,
Destination: spec.SubjectTransform.Dest,
}
}

config := jsmapi.StreamConfig{
Name: spec.Name,
Description: spec.Description,
Retention: retention,
Subjects: spec.Subjects,
MaxConsumers: spec.MaxConsumers,
MaxMsgs: int64(spec.MaxMsgs),
MaxBytes: int64(spec.MaxBytes),
MaxMsgsPer: int64(spec.MaxMsgsPerSubject),
MaxAge: maxAge,
MaxMsgSize: int32(spec.MaxMsgSize),
Storage: storage,
Discard: discard,
DiscardNewPer: spec.DiscardPerSubject,
Replicas: spec.Replicas,
NoAck: spec.NoAck,
Duplicates: duplicates,
AllowDirect: spec.AllowDirect,
DenyDelete: spec.DenyDelete,
RollupAllowed: spec.AllowRollup,
FirstSeq: spec.FirstSequence,
Name: spec.Name,
Description: spec.Description,
Retention: retention,
Subjects: spec.Subjects,
MaxConsumers: spec.MaxConsumers,
MaxMsgs: int64(spec.MaxMsgs),
MaxBytes: int64(spec.MaxBytes),
MaxMsgsPer: int64(spec.MaxMsgsPerSubject),
MaxAge: maxAge,
MaxMsgSize: int32(spec.MaxMsgSize),
Storage: storage,
Discard: discard,
DiscardNewPer: spec.DiscardPerSubject,
Replicas: spec.Replicas,
NoAck: spec.NoAck,
Duplicates: duplicates,
AllowDirect: spec.AllowDirect,
DenyDelete: spec.DenyDelete,
RollupAllowed: spec.AllowRollup,
FirstSeq: spec.FirstSequence,
SubjectTransform: subjectTransform,
}
if spec.Republish != nil {
config.RePublish = &jsmapi.RePublish{
Expand Down
10 changes: 10 additions & 0 deletions deploy/crds.yml
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,16 @@ spec:
- s2
- none
default: ''
subjectTransform:
description: SubjectTransform is for applying a subject transform (to matching messages) when a new message is received
type: object
properties:
source:
type: string
description: Source subject
dest:
type: string
description: Destination subject to transform to
preventDelete:
description: When true, the managed Stream will not be deleted when the resource is deleted
type: boolean
Expand Down
70 changes: 38 additions & 32 deletions pkg/jetstream/apis/jetstream/v1beta2/streamtypes.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,38 +22,44 @@ func (s *Stream) GetSpec() interface{} {

// StreamSpec is the spec for a Stream resource
type StreamSpec struct {
Account string `json:"account"`
AllowDirect bool `json:"allowDirect"`
AllowRollup bool `json:"allowRollup"`
Creds string `json:"creds"`
DenyDelete bool `json:"denyDelete"`
Description string `json:"description"`
DiscardPerSubject bool `json:"discardPerSubject"`
PreventDelete bool `json:"preventDelete"`
PreventUpdate bool `json:"preventUpdate"`
Discard string `json:"discard"`
DuplicateWindow string `json:"duplicateWindow"`
MaxAge string `json:"maxAge"`
MaxBytes int `json:"maxBytes"`
MaxConsumers int `json:"maxConsumers"`
MaxMsgs int `json:"maxMsgs"`
MaxMsgSize int `json:"maxMsgSize"`
MaxMsgsPerSubject int `json:"maxMsgsPerSubject"`
Mirror *StreamSource `json:"mirror"`
Name string `json:"name"`
Nkey string `json:"nkey"`
NoAck bool `json:"noAck"`
Placement *StreamPlacement `json:"placement"`
Replicas int `json:"replicas"`
Republish *RePublish `json:"republish"`
FirstSequence uint64 `json:"firstSequence"`
Compression string `json:"compression"`
Retention string `json:"retention"`
Servers []string `json:"servers"`
Sources []*StreamSource `json:"sources"`
Storage string `json:"storage"`
Subjects []string `json:"subjects"`
TLS TLS `json:"tls"`
Account string `json:"account"`
AllowDirect bool `json:"allowDirect"`
AllowRollup bool `json:"allowRollup"`
Creds string `json:"creds"`
DenyDelete bool `json:"denyDelete"`
Description string `json:"description"`
DiscardPerSubject bool `json:"discardPerSubject"`
PreventDelete bool `json:"preventDelete"`
PreventUpdate bool `json:"preventUpdate"`
Discard string `json:"discard"`
DuplicateWindow string `json:"duplicateWindow"`
MaxAge string `json:"maxAge"`
MaxBytes int `json:"maxBytes"`
MaxConsumers int `json:"maxConsumers"`
MaxMsgs int `json:"maxMsgs"`
MaxMsgSize int `json:"maxMsgSize"`
MaxMsgsPerSubject int `json:"maxMsgsPerSubject"`
Mirror *StreamSource `json:"mirror"`
Name string `json:"name"`
Nkey string `json:"nkey"`
NoAck bool `json:"noAck"`
Placement *StreamPlacement `json:"placement"`
Replicas int `json:"replicas"`
Republish *RePublish `json:"republish"`
SubjectTransform *SubjectTransform `json:subjectTransform`
FirstSequence uint64 `json:"firstSequence"`
Compression string `json:"compression"`
Retention string `json:"retention"`
Servers []string `json:"servers"`
Sources []*StreamSource `json:"sources"`
Storage string `json:"storage"`
Subjects []string `json:"subjects"`
TLS TLS `json:"tls"`
}

type SubjectTransform struct {
Source string `json:"source"`
Dest string `json:"dest"`
}

type StreamPlacement struct {
Expand Down
21 changes: 21 additions & 0 deletions pkg/jetstream/apis/jetstream/v1beta2/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions pkg/jetstream/generated/applyconfiguration/utils.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit c8510d6

Please sign in to comment.