Skip to content

Commit

Permalink
Merge pull request #145 from nats-io/add-compression
Browse files Browse the repository at this point in the history
  • Loading branch information
Jarema committed Sep 22, 2023
2 parents abee178 + c0bf9eb commit 0e208aa
Show file tree
Hide file tree
Showing 11 changed files with 332 additions and 92 deletions.
11 changes: 10 additions & 1 deletion controllers/jetstream/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -308,7 +308,6 @@ func consumerSpecToOpts(spec apis.ConsumerSpec) ([]jsm.ConsumerOption, error) {
opts := []jsm.ConsumerOption{
jsm.DurableName(spec.DurableName),
jsm.DeliverySubject(spec.DeliverSubject),
jsm.FilterStreamBySubject(spec.FilterSubject),
jsm.RateLimitBitsPerSecond(uint64(spec.RateLimitBps)),
jsm.MaxAckPending(uint(spec.MaxAckPending)),
jsm.ConsumerDescription(spec.Description),
Expand All @@ -319,6 +318,16 @@ func consumerSpecToOpts(spec apis.ConsumerSpec) ([]jsm.ConsumerOption, error) {
jsm.ConsumerOverrideReplicas(spec.Replicas),
}

if spec.FilterSubject != "" && len(spec.FilterSubjects) > 0 {
return nil, fmt.Errorf("cannot specify both FilterSubject and FilterSubjects")
}

if spec.FilterSubject != "" {
opts = append(opts, jsm.FilterStreamBySubject(spec.FilterSubject))
} else if len(spec.FilterSubjects) > 0 {
opts = append(opts, jsm.FilterStreamBySubject(spec.FilterSubjects...))
}

switch spec.DeliverPolicy {
case "all":
opts = append(opts, jsm.DeliverAllAvailable())
Expand Down
81 changes: 61 additions & 20 deletions controllers/jetstream/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"time"

jsm "github.com/nats-io/jsm.go"
"github.com/nats-io/jsm.go/api"
jsmapi "github.com/nats-io/jsm.go/api"
apis "github.com/nats-io/nack/pkg/jetstream/apis/jetstream/v1beta2"
typed "github.com/nats-io/nack/pkg/jetstream/generated/clientset/versioned/typed/jetstream/v1beta2"
Expand Down Expand Up @@ -337,6 +338,13 @@ func createStream(ctx context.Context, c jsmClient, spec apis.StreamSpec) (err e
opts = append(opts, jsm.DiscardNew())
}

switch spec.Compression {
case "s2":
opts = append(opts, jsm.Compression(api.S2Compression))
case "none":
opts = append(opts, jsm.Compression(api.NoCompression))
}

if spec.NoAck {
opts = append(opts, jsm.NoAck())
}
Expand Down Expand Up @@ -397,6 +405,16 @@ func createStream(ctx context.Context, c jsmClient, spec apis.StreamSpec) (err e
}))
}

if spec.SubjectTransform != nil {
opts = append(opts, func(o *api.StreamConfig) error {
o.SubjectTransform = &jsmapi.SubjectTransformConfig{
Source: spec.SubjectTransform.Source,
Destination: spec.SubjectTransform.Dest,
}
return nil
})
}

if spec.AllowDirect {
opts = append(opts, jsm.AllowDirect())
}
Expand Down Expand Up @@ -446,27 +464,36 @@ func updateStream(ctx context.Context, c jsmClient, spec apis.StreamSpec) (err e
return err
}

var subjectTransform *jsmapi.SubjectTransformConfig
if spec.SubjectTransform != nil {
subjectTransform = &jsmapi.SubjectTransformConfig{
Source: spec.SubjectTransform.Source,
Destination: spec.SubjectTransform.Dest,
}
}

config := jsmapi.StreamConfig{
Name: spec.Name,
Description: spec.Description,
Retention: retention,
Subjects: spec.Subjects,
MaxConsumers: spec.MaxConsumers,
MaxMsgs: int64(spec.MaxMsgs),
MaxBytes: int64(spec.MaxBytes),
MaxMsgsPer: int64(spec.MaxMsgsPerSubject),
MaxAge: maxAge,
MaxMsgSize: int32(spec.MaxMsgSize),
Storage: storage,
Discard: discard,
DiscardNewPer: spec.DiscardPerSubject,
Replicas: spec.Replicas,
NoAck: spec.NoAck,
Duplicates: duplicates,
AllowDirect: spec.AllowDirect,
DenyDelete: spec.DenyDelete,
RollupAllowed: spec.AllowRollup,
FirstSeq: spec.FirstSequence,
Name: spec.Name,
Description: spec.Description,
Retention: retention,
Subjects: spec.Subjects,
MaxConsumers: spec.MaxConsumers,
MaxMsgs: int64(spec.MaxMsgs),
MaxBytes: int64(spec.MaxBytes),
MaxMsgsPer: int64(spec.MaxMsgsPerSubject),
MaxAge: maxAge,
MaxMsgSize: int32(spec.MaxMsgSize),
Storage: storage,
Discard: discard,
DiscardNewPer: spec.DiscardPerSubject,
Replicas: spec.Replicas,
NoAck: spec.NoAck,
Duplicates: duplicates,
AllowDirect: spec.AllowDirect,
DenyDelete: spec.DenyDelete,
RollupAllowed: spec.AllowRollup,
FirstSeq: spec.FirstSequence,
SubjectTransform: subjectTransform,
}
if spec.Republish != nil {
config.RePublish = &jsmapi.RePublish{
Expand All @@ -492,6 +519,13 @@ func updateStream(ctx context.Context, c jsmClient, spec apis.StreamSpec) (err e
config.Sources[i] = jss
}

switch spec.Compression {
case "s2":
config.Compression = api.S2Compression
case "none":
config.Compression = api.NoCompression
}

return js.UpdateConfiguration(config)
}

Expand Down Expand Up @@ -647,5 +681,12 @@ func getStreamSource(ss *apis.StreamSource) (*jsmapi.StreamSource, error) {
}
}

for _, transform := range ss.SubjectTransforms {
jss.SubjectTransforms = append(jss.SubjectTransforms, jsmapi.SubjectTransformConfig{
Source: transform.Source,
Destination: transform.Dest,
})
}

return jss, nil
}
49 changes: 49 additions & 0 deletions deploy/crds.yml
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,19 @@ spec:
type: string
externalDeliverPrefix:
type: string
subjectTransforms:
description: List of subject transforms for this mirror.
type: array
items:
description: A subject transform pair.
type: object
properties:
source:
description: Source subject.
type: string
dest:
description: Destination subject.
type: string
placement:
description: A stream's placement.
type: object
Expand Down Expand Up @@ -146,6 +159,19 @@ spec:
type: string
externalDeliverPrefix:
type: string
subjectTransforms:
description: List of subject transforms for this mirror.
type: array
items:
description: A subject transform pair.
type: object
properties:
source:
description: Source subject.
type: string
dest:
description: Destination subject.
type: string
servers:
description: A list of servers for creating stream
type: array
Expand Down Expand Up @@ -193,6 +219,24 @@ spec:
description: Sequence number from which the Stream will start.
type: number
default: 0
compression:
description: Stream specific compression.
type: string
enum:
- s2
- none
- ''
default: ''
subjectTransform:
description: SubjectTransform is for applying a subject transform (to matching messages) when a new message is received
type: object
properties:
source:
type: string
description: Source subject
dest:
type: string
description: Destination subject to transform into
preventDelete:
description: When true, the managed Stream will not be deleted when the resource is deleted
type: boolean
Expand Down Expand Up @@ -494,6 +538,11 @@ spec:
filterSubject:
description: Select only a specific incoming subjects, supports wildcards.
type: string
filterSubjects:
description: List of incoming subjects, supports wildcards. Available since 2.10.
type: array
items:
type: string
replayPolicy:
description: How messages are sent.
type: string
Expand Down
1 change: 1 addition & 0 deletions pkg/jetstream/apis/jetstream/v1beta2/consumertypes.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ type ConsumerSpec struct {
PreventUpdate bool `json:"preventUpdate"`
DurableName string `json:"durableName"`
FilterSubject string `json:"filterSubject"`
FilterSubjects []string `json:"filterSubjects"`
FlowControl bool `json:"flowControl"`
HeadersOnly bool `json:"headersOnly"`
HeartbeatInterval string `json:"heartbeatInterval"`
Expand Down
71 changes: 40 additions & 31 deletions pkg/jetstream/apis/jetstream/v1beta2/streamtypes.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,37 +22,44 @@ func (s *Stream) GetSpec() interface{} {

// StreamSpec is the spec for a Stream resource
type StreamSpec struct {
Account string `json:"account"`
AllowDirect bool `json:"allowDirect"`
AllowRollup bool `json:"allowRollup"`
Creds string `json:"creds"`
DenyDelete bool `json:"denyDelete"`
Description string `json:"description"`
DiscardPerSubject bool `json:"discardPerSubject"`
PreventDelete bool `json:"preventDelete"`
PreventUpdate bool `json:"preventUpdate"`
Discard string `json:"discard"`
DuplicateWindow string `json:"duplicateWindow"`
MaxAge string `json:"maxAge"`
MaxBytes int `json:"maxBytes"`
MaxConsumers int `json:"maxConsumers"`
MaxMsgs int `json:"maxMsgs"`
MaxMsgSize int `json:"maxMsgSize"`
MaxMsgsPerSubject int `json:"maxMsgsPerSubject"`
Mirror *StreamSource `json:"mirror"`
Name string `json:"name"`
Nkey string `json:"nkey"`
NoAck bool `json:"noAck"`
Placement *StreamPlacement `json:"placement"`
Replicas int `json:"replicas"`
Republish *RePublish `json:"republish"`
FirstSequence uint64 `json:"firstSequence"`
Retention string `json:"retention"`
Servers []string `json:"servers"`
Sources []*StreamSource `json:"sources"`
Storage string `json:"storage"`
Subjects []string `json:"subjects"`
TLS TLS `json:"tls"`
Account string `json:"account"`
AllowDirect bool `json:"allowDirect"`
AllowRollup bool `json:"allowRollup"`
Creds string `json:"creds"`
DenyDelete bool `json:"denyDelete"`
Description string `json:"description"`
DiscardPerSubject bool `json:"discardPerSubject"`
PreventDelete bool `json:"preventDelete"`
PreventUpdate bool `json:"preventUpdate"`
Discard string `json:"discard"`
DuplicateWindow string `json:"duplicateWindow"`
MaxAge string `json:"maxAge"`
MaxBytes int `json:"maxBytes"`
MaxConsumers int `json:"maxConsumers"`
MaxMsgs int `json:"maxMsgs"`
MaxMsgSize int `json:"maxMsgSize"`
MaxMsgsPerSubject int `json:"maxMsgsPerSubject"`
Mirror *StreamSource `json:"mirror"`
Name string `json:"name"`
Nkey string `json:"nkey"`
NoAck bool `json:"noAck"`
Placement *StreamPlacement `json:"placement"`
Replicas int `json:"replicas"`
Republish *RePublish `json:"republish"`
SubjectTransform *SubjectTransform `json:"subjectTransform"`
FirstSequence uint64 `json:"firstSequence"`
Compression string `json:"compression"`
Retention string `json:"retention"`
Servers []string `json:"servers"`
Sources []*StreamSource `json:"sources"`
Storage string `json:"storage"`
Subjects []string `json:"subjects"`
TLS TLS `json:"tls"`
}

type SubjectTransform struct {
Source string `json:"source"`
Dest string `json:"dest"`
}

type StreamPlacement struct {
Expand All @@ -68,6 +75,8 @@ type StreamSource struct {

ExternalAPIPrefix string `json:"externalApiPrefix"`
ExternalDeliverPrefix string `json:"externalDeliverPrefix"`

SubjectTransforms []*SubjectTransform `json:"subjectTransforms"`
}

type RePublish struct {
Expand Down

0 comments on commit 0e208aa

Please sign in to comment.