Skip to content

Commit

Permalink
Revert "[ADDED] Support for stream subject transform" (#1220)
Browse files Browse the repository at this point in the history
* Revert "[ADDED] Support for stream subject transform (#1200)"

This reverts commit c860828.

* Revert "js: mirror test updates"

This reverts commit 424de47.
  • Loading branch information
piotrpio committed Feb 24, 2023
1 parent 090ea10 commit c86a1a1
Show file tree
Hide file tree
Showing 8 changed files with 127 additions and 222 deletions.
7 changes: 3 additions & 4 deletions go_test.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,16 @@ go 1.19

require (
github.com/golang/protobuf v1.4.2
github.com/nats-io/nats-server/v2 v2.9.12-0.20230129183410-a505c88d65e8
github.com/nats-io/nkeys v0.3.1-0.20221215194120-47c7408e7546
github.com/nats-io/nats-server/v2 v2.9.6
github.com/nats-io/nkeys v0.3.0
github.com/nats-io/nuid v1.0.1
google.golang.org/protobuf v1.23.0
)

require (
github.com/klauspost/compress v1.15.11 // indirect
github.com/minio/highwayhash v1.0.2 // indirect
github.com/nats-io/jwt/v2 v2.3.1-0.20221227170542-bdf40fa3627b // indirect
go.uber.org/automaxprocs v1.5.1 // indirect
github.com/nats-io/jwt/v2 v2.3.0 // indirect
golang.org/x/crypto v0.5.0 // indirect
golang.org/x/sys v0.4.0 // indirect
golang.org/x/time v0.0.0-20220922220347-f3bd1da661af // indirect
Expand Down
22 changes: 12 additions & 10 deletions go_test.sum
Original file line number Diff line number Diff line change
Expand Up @@ -13,25 +13,27 @@ github.com/klauspost/compress v1.15.11 h1:Lcadnb3RKGin4FYM/orgq0qde+nc15E5Cbqg4B
github.com/klauspost/compress v1.15.11/go.mod h1:QPwzmACJjUTFsnSHH934V6woptycfrDDJnH7hvFVbGM=
github.com/minio/highwayhash v1.0.2 h1:Aak5U0nElisjDCfPSG79Tgzkn2gl66NxOMspRrKnA/g=
github.com/minio/highwayhash v1.0.2/go.mod h1:BQskDq+xkJ12lmlUUi7U0M5Swg3EWR+dLTk+kldvVxY=
github.com/nats-io/jwt/v2 v2.3.1-0.20221227170542-bdf40fa3627b h1:exHeHbghpBp1JvdYq7muaKFvJgLD93UDcmoIbFu/9PA=
github.com/nats-io/jwt/v2 v2.3.1-0.20221227170542-bdf40fa3627b/go.mod h1:DYujvzCMZzUuqB3i1Pnpf1YtkuTwhdI84Aah9wRXkK0=
github.com/nats-io/nats-server/v2 v2.9.12-0.20230127214148-11ffd695750a h1:iZE9RvG9JCC0TployL6UsxJ3dbwMKBrJakYfvPMYDRc=
github.com/nats-io/nats-server/v2 v2.9.12-0.20230127214148-11ffd695750a/go.mod h1:ibVHvIWZwqnarh51bnfR3zZWtlL3SjG9X49ocsfFUm4=
github.com/nats-io/nats-server/v2 v2.9.12-0.20230129183410-a505c88d65e8 h1:MnuDEDsBCO0yU2MVA7BMbiob17nDN1TklcqlxJi6py8=
github.com/nats-io/nats-server/v2 v2.9.12-0.20230129183410-a505c88d65e8/go.mod h1:ibVHvIWZwqnarh51bnfR3zZWtlL3SjG9X49ocsfFUm4=
github.com/nats-io/nkeys v0.3.1-0.20221215194120-47c7408e7546 h1:7ZylVLLiDSFqxJTSibNsO2RjVSXj3QWnDc+zKara2HE=
github.com/nats-io/nkeys v0.3.1-0.20221215194120-47c7408e7546/go.mod h1:JOEZlxMfMnmaLwr+mpmP+RGIYSxLNBFsZykCGaI2PvA=
github.com/nats-io/jwt/v2 v2.3.0 h1:z2mA1a7tIf5ShggOFlR1oBPgd6hGqcDYsISxZByUzdI=
github.com/nats-io/jwt/v2 v2.3.0/go.mod h1:0tqz9Hlu6bCBFLWAASKhE5vUA4c24L9KPUUgvwumE/k=
github.com/nats-io/nats-server/v2 v2.9.6 h1:RTtK+rv/4CcliOuqGsy58g7MuWkBaWmF5TUNwuUo9Uw=
github.com/nats-io/nats-server/v2 v2.9.6/go.mod h1:AB6hAnGZDlYfqb7CTAm66ZKMZy9DpfierY1/PbpvI2g=
github.com/nats-io/nkeys v0.3.0 h1:cgM5tL53EvYRU+2YLXIK0G2mJtK12Ft9oeooSZMA2G8=
github.com/nats-io/nkeys v0.3.0/go.mod h1:gvUNGjVcM2IPr5rCsRsC6Wb3Hr2CQAm08dsxtV6A5y4=
github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw=
github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c=
go.uber.org/automaxprocs v1.5.1 h1:e1YG66Lrk73dn4qhg8WFSvhF0JuFQF0ERIp4rpuV8Qk=
go.uber.org/automaxprocs v1.5.1/go.mod h1:BF4eumQw0P9GtnuxxovUd06vwm1o18oMzFtK66vU6XU=
golang.org/x/crypto v0.0.0-20210314154223-e6e6c4f2bb5b/go.mod h1:T9bdIzuCu7OtxOm1hfPfRQxPLYneinmdGuTeoZ9dtd4=
golang.org/x/crypto v0.5.0 h1:U/0M97KRkSFvyD/3FSmdP5W5swImpNgle/EHFhOsQPE=
golang.org/x/crypto v0.5.0/go.mod h1:NK/OQwhpMQP3MwtdjgLlYHnH9ebylxKWv3e0fK+mkQU=
golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg=
golang.org/x/sys v0.0.0-20190130150945-aca44879d564/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.4.0 h1:Zr2JFtRQNX3BCZ8YtxRE9hNJYC8J6I1MVbMg6owUp18=
golang.org/x/sys v0.4.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/time v0.0.0-20220922220347-f3bd1da661af h1:Yx9k8YCG3dvF87UAn2tu2HQLf2dt/eR1bXxpLMWeH+Y=
golang.org/x/time v0.0.0-20220922220347-f3bd1da661af/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8=
Expand Down
48 changes: 0 additions & 48 deletions js_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1268,51 +1268,3 @@ func TestStreamNameBySubject(t *testing.T) {
}
}
}

func TestJetStreamTransform(t *testing.T) {
s := RunBasicJetStreamServer()
defer shutdownJSServerAndRemoveStorage(t, s)

nc, js := jsClient(t, s)
defer nc.Close()

_, err := js.AddStream(&StreamConfig{
Name: "ORIGIN",
Subjects: []string{"test"},
SubjectTransform: &SubjectTransformConfig{Source: ">", Destination: "transformed.>"},
Storage: MemoryStorage,
})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}

err = nc.Publish("test", []byte("1"))
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}

_, err = js.AddStream(&StreamConfig{
Subjects: []string{},
Name: "SOURCING",
Sources: []*StreamSource{{Name: "ORIGIN", SubjectTransformDest: "fromtest.>"}},
Storage: MemoryStorage,
})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}

// Create a sync subscription with an in-memory ephemeral consumer.
sub, err := js.SubscribeSync("fromtest.>", ConsumerMemoryStorage(), BindStream("SOURCING"))
if err != nil {
t.Fatalf("Error on subscribe: %v", err)
}

m, err := sub.NextMsg(time.Second)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if m.Subject != "fromtest.transformed.test" {
t.Fatalf("the subject of the message doesn't match the expected fromtest.transformed.test: %s", m.Subject)
}

}
3 changes: 1 addition & 2 deletions jserrors.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,8 +126,7 @@ const (

JSErrCodeMessageNotFound ErrorCode = 10037

JSErrCodeBadRequest ErrorCode = 10003
JSStreamInvalidConfig ErrorCode = 10052
JSErrCodeBadRequest ErrorCode = 10003

JSErrCodeStreamWrongLastSequence ErrorCode = 10071
)
Expand Down
34 changes: 11 additions & 23 deletions jsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,9 +127,6 @@ type StreamConfig struct {
DenyPurge bool `json:"deny_purge,omitempty"`
AllowRollup bool `json:"allow_rollup_hdrs,omitempty"`

// Allow applying a subject transform to incoming messages before doing anything else.
SubjectTransform *SubjectTransformConfig `json:"subject_transform,omitempty"`

// Allow republish of the message after being sequenced and stored.
RePublish *RePublish `json:"republish,omitempty"`

Expand All @@ -139,12 +136,6 @@ type StreamConfig struct {
MirrorDirect bool `json:"mirror_direct"`
}

// SubjectTransformConfig is for applying a subject transform (to matching messages) before doing anything else when a new message is received.
type SubjectTransformConfig struct {
Source string `json:"src,omitempty"`
Destination string `json:"dest"`
}

// RePublish is for republishing messages once committed to a stream. The original
// subject cis remapped from the subject pattern to the destination pattern.
type RePublish struct {
Expand All @@ -161,13 +152,12 @@ type Placement struct {

// StreamSource dictates how streams can source from other streams.
type StreamSource struct {
Name string `json:"name"`
OptStartSeq uint64 `json:"opt_start_seq,omitempty"`
OptStartTime *time.Time `json:"opt_start_time,omitempty"`
FilterSubject string `json:"filter_subject,omitempty"`
SubjectTransformDest string `json:"subject_transform_dest,omitempty"`
External *ExternalStream `json:"external,omitempty"`
Domain string `json:"-"`
Name string `json:"name"`
OptStartSeq uint64 `json:"opt_start_seq,omitempty"`
OptStartTime *time.Time `json:"opt_start_time,omitempty"`
FilterSubject string `json:"filter_subject,omitempty"`
External *ExternalStream `json:"external,omitempty"`
Domain string `json:"-"`
}

// ExternalStream allows you to qualify access to a stream source in another
Expand Down Expand Up @@ -898,13 +888,11 @@ type StreamAlternate struct {

// StreamSourceInfo shows information about an upstream stream source.
type StreamSourceInfo struct {
Name string `json:"name"`
Lag uint64 `json:"lag"`
Active time.Duration `json:"active"`
External *ExternalStream `json:"external"`
Error *APIError `json:"error"`
FilterSubject string `json:"filter_subject,omitempty"`
SubjectTransformDest string `json:"subject_transform_dest,omitempty"`
Name string `json:"name"`
Lag uint64 `json:"lag"`
Active time.Duration `json:"active"`
External *ExternalStream `json:"external"`
Error *APIError `json:"error"`
}

// StreamState is information about the given stream.
Expand Down
14 changes: 3 additions & 11 deletions kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -414,22 +414,14 @@ func (js *js) CreateKeyValue(cfg *KeyValueConfig) (KeyValue, error) {
scfg.Mirror = m
scfg.MirrorDirect = true
} else if len(cfg.Sources) > 0 {
// For now we do not allow direct subjects for sources. If that is desired a user could use stream API directly.
for _, ss := range cfg.Sources {
var sourceBucketName string
if strings.HasPrefix(ss.Name, kvBucketNamePre) {
sourceBucketName = ss.Name[len(kvBucketNamePre):]
} else {
sourceBucketName = ss.Name
if !strings.HasPrefix(ss.Name, kvBucketNamePre) {
ss = ss.copy()
ss.Name = fmt.Sprintf(kvBucketNameTmpl, ss.Name)
}

if ss.External == nil || sourceBucketName != cfg.Bucket {
ss.FilterSubject = fmt.Sprintf(kvSubjectsTmpl, sourceBucketName)
ss.SubjectTransformDest = fmt.Sprintf(kvSubjectsTmpl, cfg.Bucket)
}
scfg.Sources = append(scfg.Sources, ss)
}
scfg.Subjects = []string{fmt.Sprintf(kvSubjectsTmpl, cfg.Bucket)}
} else {
scfg.Subjects = []string{fmt.Sprintf(kvSubjectsTmpl, cfg.Bucket)}
}
Expand Down
63 changes: 0 additions & 63 deletions kv_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -251,66 +251,3 @@ func TestKeyValueCreate(t *testing.T) {
t.Fatalf("Unexpected error code, got: %v", kerr.APIError().ErrorCode)
}
}

func TestKeyValueSourcing(t *testing.T) {
s := RunBasicJetStreamServer()
defer shutdownJSServerAndRemoveStorage(t, s)

nc, js := jsClient(t, s)
defer nc.Close()

kvA, err := js.CreateKeyValue(&KeyValueConfig{Bucket: "A"})
if err != nil {
t.Fatalf("Error creating kv: %v", err)
}

_, err = kvA.Create("keyA", []byte("1"))
if err != nil {
t.Fatalf("Error creating key: %v", err)
}

if _, err := kvA.Get("keyA"); err != nil {
t.Fatalf("Got error getting keyA from A: %v", err)
}

kvB, err := js.CreateKeyValue(&KeyValueConfig{Bucket: "B"})
if err != nil {
t.Fatalf("Error creating kv: %v", err)
}

_, err = kvB.Create("keyB", []byte("1"))
if err != nil {
t.Fatalf("Error creating key: %v", err)
}

kvC, err := js.CreateKeyValue(&KeyValueConfig{Bucket: "C", Sources: []*StreamSource{{Name: "A"}, {Name: "B"}}})
if err != nil {
t.Fatalf("Error creating kv: %v", err)
}

// Wait half a second to make sure it has time to populate the stream from it's sources
i := 0
for {
status, err := kvC.Status()
if err != nil {
t.Fatalf("Error getting bucket status: %v", err)
}
if status.Values() == 2 {
break
} else {
i++
if i > 3 {
t.Fatalf("Error sourcing bucket does not contain the expected number of values")
}
}
time.Sleep(20 * time.Millisecond)
}

if _, err := kvC.Get("keyA"); err != nil {
t.Fatalf("Got error getting keyA from C: %v", err)
}

if _, err := kvC.Get("keyB"); err != nil {
t.Fatalf("Got error getting keyB from C: %v", err)
}
}
Loading

0 comments on commit c86a1a1

Please sign in to comment.