From 4527cbfcb44dadccbf8e9042d6df54e6001aaef7 Mon Sep 17 00:00:00 2001 From: shay23b Date: Wed, 9 Aug 2023 10:51:45 +0300 Subject: [PATCH 1/2] nats compatibility + handle produce/consume from old sdk to new station --- server/memphis_handlers_consumers.go | 6 ++++ server/memphis_handlers_dls_messages.go | 3 +- server/memphis_handlers_producers.go | 6 ++++ server/memphis_handlers_stations.go | 19 ++++-------- server/memphis_helper.go | 8 ----- server/memphis_jsapi_wrappers.go | 41 +++++++++++++++++++++++-- 6 files changed, 58 insertions(+), 25 deletions(-) diff --git a/server/memphis_handlers_consumers.go b/server/memphis_handlers_consumers.go index 6e26e5f9c..37aba19f0 100644 --- a/server/memphis_handlers_consumers.go +++ b/server/memphis_handlers_consumers.go @@ -165,6 +165,12 @@ func (s *Server) createConsumerDirectCommon(c *client, consumerName, cStationNam analytics.SendEvent(user.TenantName, user.Username, analyticsParams, "user-create-station-sdk") } } + } else { + if version < 2 { + err := errors.New("To consume from this station please upgrade your SDK version") + serv.Warnf("[tenant: %v]createConsumerDirectCommon at CreateDefaultStation: Consumer %v at station %v : %v", tenantName, consumerName, cStationName, err.Error()) + return []int{}, err + } } consumerGroupExist, consumerFromGroup, err := isConsumerGroupExist(consumerGroup, station.ID) diff --git a/server/memphis_handlers_dls_messages.go b/server/memphis_handlers_dls_messages.go index de95918aa..eb561b72e 100644 --- a/server/memphis_handlers_dls_messages.go +++ b/server/memphis_handlers_dls_messages.go @@ -99,8 +99,7 @@ func (s *Server) handleNewUnackedMsg(msg []byte) error { if producedByHeader == "" { producedByHeader = headersJson["producedBy"] if producedByHeader == "" { - serv.Warnf("handleNewUnackedMsg: Error while getting notified about a poison message: Missing mandatory message headers, please upgrade the SDK version you are using") - return nil + producedByHeader = "unknown" } } diff --git a/server/memphis_handlers_producers.go b/server/memphis_handlers_producers.go index f5a90404a..1ddd826ac 100644 --- a/server/memphis_handlers_producers.go +++ b/server/memphis_handlers_producers.go @@ -109,6 +109,12 @@ func (s *Server) createProducerDirectCommon(c *client, pName, pType, pConnection analytics.SendEvent(user.TenantName, user.Username, analyticsParams, "user-create-station-sdk") } } + } else { + if version < 2 { + err := errors.New("To produce to this station please upgrade your SDK version") + serv.Warnf("[tenant: %v]createProducerDirectCommon : Producer %v at station %v : %v", user.TenantName, pName, pStationName, err.Error()) + return false, false, err, models.Station{} + } } newProducer, err := db.InsertNewProducer(name, station.ID, producerType, pConnectionId, station.TenantName, station.PartitionsList) diff --git a/server/memphis_handlers_stations.go b/server/memphis_handlers_stations.go index 01cf09141..1b02b33a2 100644 --- a/server/memphis_handlers_stations.go +++ b/server/memphis_handlers_stations.go @@ -175,7 +175,7 @@ func (s *Server) createStationDirectIntern(c *client, jsApiResp := JSApiStreamCreateResponse{ApiResponse: ApiResponse{Type: JSApiStreamCreateResponseType}} memphisGlobalAcc := s.MemphisGlobalAccount() - if csr.PartitionsNumber == 0 { + if csr.PartitionsNumber == 0 && isNative { errMsg := fmt.Errorf("you are using an old SDK, make sure to update your SDK") serv.Warnf("[tenant: %v][user:%v]createStationDirect - tried to use an old SDK", csr.TenantName, csr.Username) jsApiResp.Error = NewJSStreamCreateError(errMsg) @@ -183,7 +183,7 @@ func (s *Server) createStationDirectIntern(c *client, return } - if csr.PartitionsNumber > MAX_PARTITIONS || csr.PartitionsNumber < 1 { + if (csr.PartitionsNumber > MAX_PARTITIONS || csr.PartitionsNumber < 1) && isNative { errMsg := fmt.Errorf("cannot create station with %v partitions (max:%v min:1): Station - %v, ", csr.PartitionsNumber, MAX_PARTITIONS, csr.StationName) serv.Warnf("[tenant: %v][user:%v]createStationDirect %v", csr.TenantName, csr.Username, errMsg) jsApiResp.Error = NewJSStreamCreateError(errMsg) @@ -1570,17 +1570,6 @@ func (sh StationsHandler) GetMessageDetails(c *gin.Context) { } } - // This check for backward compatability - if connectionIdHeader == "" || producedByHeader == "" { - connectionIdHeader = headersJson["connectionId"] - producedByHeader = strings.ToLower(headersJson["producedBy"]) - if connectionIdHeader == "" || producedByHeader == "" { - serv.Warnf("[tenant: %v][user: %v]GetMessageDetails: missing mandatory message headers, please upgrade the SDK version you are using", user.TenantName, user.Username) - c.AbortWithStatusJSON(SHOWABLE_ERROR_STATUS_CODE, gin.H{"message": "missing mandatory message headers, please upgrade the SDK version you are using"}) - return - } - } - connectionId := connectionIdHeader poisonedCgs := make([]models.PoisonedCg, 0) // Only native stations have CGs @@ -1601,6 +1590,10 @@ func (sh StationsHandler) GetMessageDetails(c *gin.Context) { } if exist { isActive = producer.IsActive + } else { + if producedByHeader == "" { + producedByHeader = "unknown" + } } msg := models.MessageResponse{ diff --git a/server/memphis_helper.go b/server/memphis_helper.go index c86e8b39d..1ab7dd9d4 100644 --- a/server/memphis_helper.go +++ b/server/memphis_helper.go @@ -1068,14 +1068,6 @@ func (s *Server) GetMessagesFromPartition(station models.Station, streamName str connectionIdHeader := headersJson["$memphis_connectionId"] producedByHeader := strings.ToLower(headersJson["$memphis_producedBy"]) - // This check for backward compatability - if connectionIdHeader == "" || producedByHeader == "" { - connectionIdHeader = headersJson["connectionId"] - producedByHeader = strings.ToLower(headersJson["producedBy"]) - if connectionIdHeader == "" || producedByHeader == "" { - return []models.MessageDetails{}, errors.New("missing mandatory message headers, please upgrade the SDK version you are using") - } - } for header := range headersJson { if strings.HasPrefix(header, "$memphis") { diff --git a/server/memphis_jsapi_wrappers.go b/server/memphis_jsapi_wrappers.go index c27a0ff74..ebf7adc1c 100644 --- a/server/memphis_jsapi_wrappers.go +++ b/server/memphis_jsapi_wrappers.go @@ -15,6 +15,7 @@ import ( "encoding/json" "errors" "memphis/models" + "regexp" "strings" "time" ) @@ -176,11 +177,13 @@ func (s *Server) memphisJSApiWrapStreamCreate(sub *subscription, c *client, _ *A return } - if len(cfg.Name) > 32 { - resp.Error = NewJSStreamCreateError(errors.New("stream name can not be greater than 32 characters")) + streamName, err := validateStreamName(cfg.Name) + if err != nil { + resp.Error = NewJSStreamCreateError(err) s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) return } + cfg.Name = streamName go memphisCreateNonNativeStationIfNeeded(s, reply, cfg, c) @@ -192,3 +195,37 @@ func (s *Server) memphisJSApiWrapStreamDelete(sub *subscription, c *client, acc s.jsStreamDeleteRequestIntern(sub, c, acc, subject, reply, rmsg) } + +func validateStreamName(streamName string) (string, error) { + name := strings.ToLower(streamName) + emptyErrStr := "stream name can not be empty" + tooLongErrStr := "stream name should be under 128 characters" + invalidCharErrStr := "only alphanumeric and the '_', '-' characters are allowed in stream name" + firstLetterErrStr := "stream name can not start or end with non alphanumeric character" + + emptyErr := errors.New(emptyErrStr) + tooLongErr := errors.New(tooLongErrStr) + invalidCharErr := errors.New(invalidCharErrStr) + firstLetterErr := errors.New(firstLetterErrStr) + + if len(name) == 0 { + return "", emptyErr + } + + if len(name) > 128 { + return "", tooLongErr + } + + re := regexp.MustCompile("^[a-z0-9_-]*$") + + validName := re.MatchString(name) + if !validName { + return "", invalidCharErr + } + + if name[0:1] == "-" || name[0:1] == "_" || name[len(name)-1:] == "." || name[len(name)-1:] == "-" || name[len(name)-1:] == "_" { + return "", firstLetterErr + } + + return name, nil +} From d2f06752a686d7672a4218e8bc004a63dd01c93a Mon Sep 17 00:00:00 2001 From: shay23b Date: Wed, 9 Aug 2023 11:39:22 +0300 Subject: [PATCH 2/2] fix --- server/memphis_handlers_dls_messages.go | 7 +------ server/memphis_handlers_stations.go | 2 +- 2 files changed, 2 insertions(+), 7 deletions(-) diff --git a/server/memphis_handlers_dls_messages.go b/server/memphis_handlers_dls_messages.go index eb561b72e..ab6a5170a 100644 --- a/server/memphis_handlers_dls_messages.go +++ b/server/memphis_handlers_dls_messages.go @@ -94,13 +94,8 @@ func (s *Server) handleNewUnackedMsg(msg []byte) error { poisonedCgs := []string{} if station.IsNative { producedByHeader = headersJson["$memphis_producedBy"] - - // This check for backward compatability if producedByHeader == "" { - producedByHeader = headersJson["producedBy"] - if producedByHeader == "" { - producedByHeader = "unknown" - } + producedByHeader = "unknown" } if producedByHeader == "$memphis_dls" { // skip poison messages which have been resent diff --git a/server/memphis_handlers_stations.go b/server/memphis_handlers_stations.go index 1b02b33a2..b9922f833 100644 --- a/server/memphis_handlers_stations.go +++ b/server/memphis_handlers_stations.go @@ -1552,7 +1552,7 @@ func (sh StationsHandler) GetMessageDetails(c *gin.Context) { Headers: headersJson, }, Producer: models.ProducerDetailsResp{ - Name: "", + Name: "unknown", IsActive: false, }, PoisonedCgs: []models.PoisonedCg{},