diff --git a/Makefile b/Makefile index 419bdf4..bf0cabd 100644 --- a/Makefile +++ b/Makefile @@ -3,7 +3,16 @@ mock/mock_jetstream.go: mockgen: mock/mock_jetstream.go -test: +test: lint test-only + +test-only: go test ./ -v --cover -timeout 60s -.PHONY: test \ No newline at end of file +lint: check-cognitive-complexity + golangci-lint run --print-issued-lines=false --exclude-use-default=false --enable=revive --enable=goimports --enable=unconvert --fix --enable=gosec --timeout=10m + +check-cognitive-complexity: + find . -type f -name '*.go' -not -name "*.pb.go" -not -name "mock*.go" -not -name "*_test.go" \ + -exec gocognit -over 15 {} + + +.PHONY: test test-only lint check-cognitive-complexity mockgen \ No newline at end of file diff --git a/event_message.go b/event_message.go index 1bc4e20..f5e1fe6 100644 --- a/event_message.go +++ b/event_message.go @@ -1,6 +1,8 @@ package ferstream import ( + "time" + "github.com/kumparan/go-utils" "google.golang.org/protobuf/proto" @@ -27,6 +29,22 @@ type ( Error error `json:"error"` } + // NatsEventAuditLogMessage :nodoc: + NatsEventAuditLogMessage struct { + Subject string `json:"subject"` // empty on publish + ServiceName string `json:"service_name"` + UserID int64 `json:"user_id"` + AuditableType string `json:"auditable_type"` + AuditableID string `json:"auditable_id"` + Action string `json:"action"` + AuditedChanges string `json:"audited_changes"` + OldData string `json:"old_data,omitempty"` + NewData string `json:"new_data,omitempty"` + CreatedAt time.Time `json:"created_at"` + Error error `json:"error"` + } + + // MessageParser :nodoc: MessageParser interface { ParseFromBytes(data []byte) error AddSubject(subj string) @@ -91,7 +109,7 @@ func (n *NatsEventMessage) Build() (data []byte, err error) { return nil, n.Error } - message, err := tapao.Marshal(n) + message, err := tapao.Marshal(n, tapao.With(tapao.JSON)) if err != nil { n.wrapError(err) return nil, n.Error @@ -148,8 +166,9 @@ func (n *NatsEventMessage) wrapError(err error) { n.Error = err } +// ParseFromBytes :nodoc: func (n *NatsEventMessage) ParseFromBytes(data []byte) (err error) { - err = tapao.Unmarshal(data, &n, tapao.FallbackWith(tapao.JSON)) + err = tapao.Unmarshal(data, &n, tapao.With(tapao.JSON)) if err != nil { n.Error = errors.Wrap(n.Error, err.Error()) return err @@ -157,6 +176,7 @@ func (n *NatsEventMessage) ParseFromBytes(data []byte) (err error) { return } +// AddSubject :nodoc: func (n *NatsEventMessage) AddSubject(subj string) { n.NatsEvent.Subject = subj } @@ -172,9 +192,65 @@ func (n *NatsEventMessage) ToJSONByte() ([]byte, error) { return tapao.Marshal(n, tapao.With(tapao.JSON)) } -// ParseJSON parse JSON into message +// Build :nodoc: +func (n *NatsEventAuditLogMessage) Build() (data []byte, err error) { + if n.Error != nil { + return nil, n.Error + } + + message, err := tapao.Marshal(n, tapao.With(tapao.JSON)) + if err != nil { + n.wrapError(err) + return nil, n.Error + } + + return message, nil +} + +// ParseFromBytes :nodoc: +func (n *NatsEventAuditLogMessage) ParseFromBytes(data []byte) (err error) { + err = tapao.Unmarshal(data, &n, tapao.With(tapao.JSON)) + if err != nil { + n.Error = errors.Wrap(n.Error, err.Error()) + return err + } + return +} + +// AddSubject :nodoc: +func (n *NatsEventAuditLogMessage) AddSubject(subj string) { + n.Subject = subj +} + +// ToJSONString marshal message to JSON string +func (n *NatsEventAuditLogMessage) ToJSONString() (string, error) { + bt, err := tapao.Marshal(n, tapao.With(tapao.JSON)) + return string(bt), err +} + +// ToJSONByte marshal message to JSON byte +func (n *NatsEventAuditLogMessage) ToJSONByte() ([]byte, error) { + return tapao.Marshal(n, tapao.With(tapao.JSON)) +} + +func (n *NatsEventAuditLogMessage) wrapError(err error) { + if n.Error != nil { + n.Error = errors.Wrap(n.Error, err.Error()) + return + } + n.Error = err +} + +// ParseJSON parse JSON into Nats event message func ParseJSON(in string) (*NatsEventMessage, error) { msg := &NatsEventMessage{} err := tapao.Unmarshal([]byte(in), msg, tapao.With(tapao.JSON)) return msg, err } + +// ParseNatsEventAuditLogMessageFromBytes :nodoc: +func ParseNatsEventAuditLogMessageFromBytes(in []byte) (*NatsEventAuditLogMessage, error) { + msg := &NatsEventAuditLogMessage{} + err := tapao.Unmarshal(in, msg, tapao.With(tapao.JSON)) + return msg, err +} diff --git a/event_message_test.go b/event_message_test.go index e0fceff..dae27d5 100644 --- a/event_message_test.go +++ b/event_message_test.go @@ -1,6 +1,7 @@ package ferstream import ( + "encoding/json" "testing" "time" @@ -83,7 +84,7 @@ func TestNatsEventMessage_WithRequest(t *testing.T) { assert.NoError(t, result.Error) var requestResult pb.FindByIDRequest - err := tapao.Unmarshal(result.Request, &requestResult, tapao.FallbackWith(tapao.Protobuf)) + err := tapao.Unmarshal(result.Request, &requestResult, tapao.With(tapao.Protobuf)) assert.NoError(t, err) assert.Equal(t, body.Id, requestResult.GetId()) } @@ -113,14 +114,14 @@ func TestNatsEventMessage_Build(t *testing.T) { assert.NotNil(t, message) var result NatsEventMessage - err = tapao.Unmarshal(message, &result, tapao.FallbackWith(tapao.JSON)) + err = tapao.Unmarshal(message, &result, tapao.With(tapao.JSON)) assert.NoError(t, err) assert.Equal(t, event.ID, result.NatsEvent.ID) assert.Equal(t, event.UserID, result.NatsEvent.UserID) assert.Equal(t, utils.Dump(body), result.Body) var requestResult pb.Greeting - err = tapao.Unmarshal(result.Request, &requestResult, tapao.FallbackWith(tapao.Protobuf)) + err = tapao.Unmarshal(result.Request, &requestResult, tapao.With(tapao.Protobuf)) assert.NoError(t, err) assert.Equal(t, req.Id, requestResult.Id) assert.Equal(t, req.Name, requestResult.Name) @@ -139,7 +140,7 @@ func TestNatsEventMessage_Build(t *testing.T) { assert.NotNil(t, message) var result NatsEventMessage - err = tapao.Unmarshal(message, &result, tapao.FallbackWith(tapao.JSON)) + err = tapao.Unmarshal(message, &result, tapao.With(tapao.JSON)) assert.NoError(t, err) assert.Equal(t, event.ID, result.NatsEvent.ID) assert.Equal(t, event.UserID, result.NatsEvent.UserID) @@ -147,7 +148,7 @@ func TestNatsEventMessage_Build(t *testing.T) { assert.Equal(t, utils.Dump(oldBody), result.OldBody) var requestResult pb.Greeting - err = tapao.Unmarshal(result.Request, &requestResult, tapao.FallbackWith(tapao.Protobuf)) + err = tapao.Unmarshal(result.Request, &requestResult, tapao.With(tapao.Protobuf)) assert.NoError(t, err) assert.Equal(t, req.Id, requestResult.Id) assert.Equal(t, req.Name, requestResult.Name) @@ -249,3 +250,188 @@ func TestNatsEventMessage_ParseJSON(t *testing.T) { assert.Equal(t, msg, parsed) } + +func TestNatsEventAuditLogMessage_Build(t *testing.T) { + type User struct { + ID int64 `json:"id"` + Name string `json:"name"` + } + + oldData := User{ + ID: int64(123), + Name: "test name", + } + + newData := User{ + ID: int64(123), + Name: "new test name", + } + + byteOldData, err := json.Marshal(oldData) + require.NoError(t, err) + byteNewData, err := json.Marshal(newData) + require.NoError(t, err) + + createdAt, err := time.Parse("2006-01-02", "2020-01-29") + require.NoError(t, err) + + t.Run("success", func(t *testing.T) { + msg := &NatsEventAuditLogMessage{ + ServiceName: "test-audit", + UserID: 123, + AuditableType: "user", + AuditableID: "123", + Action: "update", + AuditedChanges: string(byteNewData), + OldData: string(byteOldData), + NewData: string(byteNewData), + CreatedAt: createdAt, + Error: nil, + } + + msgByte, err := msg.Build() + require.NoError(t, err) + assert.NotNil(t, msgByte) + + var result NatsEventAuditLogMessage + err = tapao.Unmarshal(msgByte, &result, tapao.With(tapao.JSON)) + assert.NoError(t, err) + assert.Equal(t, msg.ServiceName, result.ServiceName) + assert.Equal(t, msg.OldData, result.OldData) + assert.Equal(t, msg.NewData, result.NewData) + }) +} + +func TestNatsEventAuditLogMessage_ToJSONString(t *testing.T) { + type User struct { + ID int64 `json:"id"` + Name string `json:"name"` + } + + oldData := User{ + ID: int64(123), + Name: "test name", + } + + newData := User{ + ID: int64(123), + Name: "new test name", + } + + byteOldData, err := json.Marshal(oldData) + require.NoError(t, err) + byteNewData, err := json.Marshal(newData) + require.NoError(t, err) + + createdAt, err := time.Parse("2006-01-02", "2020-01-29") + require.NoError(t, err) + + msg := &NatsEventAuditLogMessage{ + ServiceName: "test-audit", + UserID: 123, + AuditableType: "user", + AuditableID: "123", + Action: "update", + AuditedChanges: string(byteNewData), + OldData: string(byteOldData), + NewData: string(byteNewData), + CreatedAt: createdAt, + Error: nil, + } + + parsed, err := msg.ToJSONString() + require.NoError(t, err) + expectedRes := "{\"subject\":\"\",\"service_name\":\"test-audit\",\"user_id\":123,\"auditable_type\":\"user\",\"auditable_id\":\"123\",\"action\":\"update\",\"audited_changes\":\"{\\\"id\\\":123,\\\"name\\\":\\\"new test name\\\"}\",\"old_data\":\"{\\\"id\\\":123,\\\"name\\\":\\\"test name\\\"}\",\"new_data\":\"{\\\"id\\\":123,\\\"name\\\":\\\"new test name\\\"}\",\"created_at\":\"2020-01-29T00:00:00Z\",\"error\":null}" + assert.Equal(t, expectedRes, parsed) +} + +func TestNatsEventAuditLogMessage_ToJSONByte(t *testing.T) { + type User struct { + ID int64 `json:"id"` + Name string `json:"name"` + } + + oldData := User{ + ID: int64(123), + Name: "test name", + } + + newData := User{ + ID: int64(123), + Name: "new test name", + } + + byteOldData, err := json.Marshal(oldData) + require.NoError(t, err) + byteNewData, err := json.Marshal(newData) + require.NoError(t, err) + + createdAt, err := time.Parse("2006-01-02", "2020-01-29") + require.NoError(t, err) + + msg := &NatsEventAuditLogMessage{ + ServiceName: "test-audit", + UserID: 123, + AuditableType: "user", + AuditableID: "123", + Action: "update", + AuditedChanges: string(byteNewData), + OldData: string(byteOldData), + NewData: string(byteNewData), + CreatedAt: createdAt, + Error: nil, + } + + jsonByte, err := msg.ToJSONByte() + require.NoError(t, err) + + parsed, err := ParseNatsEventAuditLogMessageFromBytes(jsonByte) + require.NoError(t, err) + + assert.Equal(t, parsed, msg) +} + +func TestNatsEventAuditLogMessage_ParseNatsEventAuditLogMessageFromBytes(t *testing.T) { + payload := "{\"subject\":\"\",\"service_name\":\"test-audit\",\"user_id\":123,\"auditable_type\":\"user\",\"auditable_id\":\"123\",\"action\":\"update\",\"audited_changes\":\"{\\\"id\\\":123,\\\"name\\\":\\\"new test name\\\"}\",\"old_data\":\"{\\\"id\\\":123,\\\"name\\\":\\\"test name\\\"}\",\"new_data\":\"{\\\"id\\\":123,\\\"name\\\":\\\"new test name\\\"}\",\"created_at\":\"2020-01-29T00:00:00Z\",\"error\":null}" + + type User struct { + ID int64 `json:"id"` + Name string `json:"name"` + } + + oldData := User{ + ID: int64(123), + Name: "test name", + } + + newData := User{ + ID: int64(123), + Name: "new test name", + } + + byteOldData, err := json.Marshal(oldData) + require.NoError(t, err) + byteNewData, err := json.Marshal(newData) + require.NoError(t, err) + + createdAt, err := time.Parse("2006-01-02", "2020-01-29") + require.NoError(t, err) + + msg := &NatsEventAuditLogMessage{ + ServiceName: "test-audit", + UserID: 123, + AuditableType: "user", + AuditableID: "123", + Action: "update", + AuditedChanges: string(byteNewData), + OldData: string(byteOldData), + NewData: string(byteNewData), + CreatedAt: createdAt, + Error: nil, + } + + parsed, err := ParseNatsEventAuditLogMessageFromBytes([]byte(payload)) + require.NoError(t, err) + + assert.Equal(t, msg, parsed) +} diff --git a/jetstream.go b/jetstream.go index 5c78c90..47ef4ad 100644 --- a/jetstream.go +++ b/jetstream.go @@ -24,22 +24,22 @@ type ( jsCtx nats.JetStreamContext } - // JetStreamRegistrar + // JetStreamRegistrar :nodoc: JetStreamRegistrar interface { RegisterNATSJetStream(js JetStream) } - // StreamRegistration + // StreamRegistrar :nodoc: StreamRegistrar interface { InitStream() error } - // Subscriber + // Subscriber :nodoc: Subscriber interface { SubscribeJetStreamEvent() error } - // Message Handler :nodoc: + // MessageHandler :nodoc: MessageHandler func(payload MessageParser) (err error) ) diff --git a/jetstream_test.go b/jetstream_test.go index f0f546f..ff20445 100644 --- a/jetstream_test.go +++ b/jetstream_test.go @@ -1,6 +1,7 @@ package ferstream import ( + "encoding/json" "log" "os" "testing" @@ -62,60 +63,150 @@ func TestPublish(t *testing.T) { } func TestQueueSubscribe(t *testing.T) { - n, err := NewNATSConnection(defaultURL) - if err != nil { - t.Fatal(err) - } - defer func() { - n.Close() - }() + t.Run("queue subscribe NatsEventMessage", func(t *testing.T) { + n, err := NewNATSConnection(defaultURL) + if err != nil { + t.Fatal(err) + } + defer func() { + n.Close() + }() + + streamConf := &nats.StreamConfig{ + Name: "STREAM_NAME_ANOTHER", + Subjects: []string{"STREAM_EVENT_ANOTHER.*"}, + MaxAge: 1 * time.Hour, + Storage: nats.FileStorage, + } - streamConf := &nats.StreamConfig{ - Name: "STREAM_NAME_ANOTHER", - Subjects: []string{"STREAM_EVENT_ANOTHER.*"}, - MaxAge: 1 * time.Hour, - Storage: nats.FileStorage, - } + _, err = n.AddStream(streamConf) + if err != nil { + t.Fatal(err) + } - _, err = n.AddStream(streamConf) - if err != nil { - t.Fatal(err) - } + countMsg := 10 + subject := "STREAM_EVENT_ANOTHER.TEST" + queue := "test_queue_group" - countMsg := 10 - subject := "STREAM_EVENT_ANOTHER.TEST" - queue := "test_queue_group" + msgBytes, err := NewNatsEventMessage().WithEvent(&NatsEvent{ + ID: int64(1232), + UserID: int64(21), + }).Build() - msgBytes, err := NewNatsEventMessage().WithEvent(&NatsEvent{ - ID: int64(1232), - UserID: int64(21), - }).Build() + assert.NoError(t, err) - assert.NoError(t, err) + for i := 0; i < countMsg; i++ { + _, err = n.Publish(subject, msgBytes) + if err != nil { + t.Fatal(err) + } + } - for i := 0; i < countMsg; i++ { - _, err = n.Publish(subject, msgBytes) + receiverCh := make(chan *nats.Msg) + sub, err := n.QueueSubscribe(subject, queue, func(msg *nats.Msg) { + receiverCh <- msg + }) if err != nil { t.Fatal(err) } - } - receiverCh := make(chan *nats.Msg) - sub, err := n.QueueSubscribe(subject, queue, func(msg *nats.Msg) { - receiverCh <- msg + for i := 0; i < countMsg; i++ { + b := <-receiverCh + + assert.Equal(t, msgBytes, b.Data) + assert.Equal(t, subject, b.Subject, "test subject") + } + + _ = sub.Unsubscribe() }) - if err != nil { - t.Fatal(err) - } - for i := 0; i < countMsg; i++ { - b := <-receiverCh + t.Run("queue subscribe NatsEventAuditLogMessage", func(t *testing.T) { + n, err := NewNATSConnection(defaultURL) + if err != nil { + t.Fatal(err) + } + defer func() { + n.Close() + }() + + streamConf := &nats.StreamConfig{ + Name: "STREAM_NAME_ANOTHER", + Subjects: []string{"STREAM_EVENT_ANOTHER.*"}, + MaxAge: 1 * time.Hour, + Storage: nats.FileStorage, + } - assert.Equal(t, msgBytes, b.Data) - assert.Equal(t, subject, b.Subject, "test subject") - } + _, err = n.AddStream(streamConf) + if err != nil { + t.Fatal(err) + } + + countMsg := 10 + subject := "STREAM_EVENT_ANOTHER.TEST_NATS_EVENT_AUDIT_LOG_MESSAGE" + queue := "test_queue_group" + + type User struct { + ID int64 `json:"id"` + Name string `json:"name"` + } + + oldData := User{ + ID: int64(123), + Name: "test name", + } + + newData := User{ + ID: int64(123), + Name: "new test name", + } + + byteOldData, err := json.Marshal(oldData) + require.NoError(t, err) + byteNewData, err := json.Marshal(newData) + require.NoError(t, err) + + createdAt, err := time.Parse("2006-01-02", "2020-01-29") + require.NoError(t, err) + + msg := &NatsEventAuditLogMessage{ + ServiceName: "test-audit", + UserID: 123, + AuditableType: "user", + AuditableID: "123", + Action: "update", + AuditedChanges: string(byteNewData), + OldData: string(byteOldData), + NewData: string(byteNewData), + CreatedAt: createdAt, + Error: nil, + } + msgBytes, err := msg.Build() + assert.NoError(t, err) + + for i := 0; i < countMsg; i++ { + _, err = n.Publish(subject, msgBytes) + if err != nil { + t.Fatal(err) + } + } + + receiverCh := make(chan *nats.Msg) + sub, err := n.QueueSubscribe(subject, queue, func(msg *nats.Msg) { + receiverCh <- msg + }) + if err != nil { + t.Fatal(err) + } + + for i := 0; i < countMsg; i++ { + b := <-receiverCh - _ = sub.Unsubscribe() + assert.Equal(t, msgBytes, b.Data) + assert.Equal(t, subject, b.Subject, "test subject") + } + + _ = sub.Unsubscribe() + }) } func TestAddStream(t *testing.T) {