Skip to content

Commit

Permalink
Merge pull request #1224 from memphisdev/nats-compatibility+backward-…
Browse files Browse the repository at this point in the history
…handle

nats compatibility + handle produce/consume from old sdk to new station
  • Loading branch information
shay23b committed Aug 9, 2023
2 parents ef155da + 83d72b0 commit 129fab8
Show file tree
Hide file tree
Showing 6 changed files with 59 additions and 31 deletions.
6 changes: 6 additions & 0 deletions server/memphis_handlers_consumers.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,12 @@ func (s *Server) createConsumerDirectCommon(c *client, consumerName, cStationNam
analytics.SendEvent(user.TenantName, user.Username, analyticsParams, "user-create-station-sdk")
}
}
} else {
if version < 2 {
err := errors.New("To consume from this station please upgrade your SDK version")
serv.Warnf("[tenant: %v]createConsumerDirectCommon at CreateDefaultStation: Consumer %v at station %v : %v", tenantName, consumerName, cStationName, err.Error())
return []int{}, err
}
}

consumerGroupExist, consumerFromGroup, err := isConsumerGroupExist(consumerGroup, station.ID)
Expand Down
8 changes: 1 addition & 7 deletions server/memphis_handlers_dls_messages.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,14 +94,8 @@ func (s *Server) handleNewUnackedMsg(msg []byte) error {
poisonedCgs := []string{}
if station.IsNative {
producedByHeader = headersJson["$memphis_producedBy"]

// This check for backward compatability
if producedByHeader == "" {
producedByHeader = headersJson["producedBy"]
if producedByHeader == "" {
serv.Warnf("handleNewUnackedMsg: Error while getting notified about a poison message: Missing mandatory message headers, please upgrade the SDK version you are using")
return nil
}
producedByHeader = "unknown"
}

if producedByHeader == "$memphis_dls" { // skip poison messages which have been resent
Expand Down
6 changes: 6 additions & 0 deletions server/memphis_handlers_producers.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,12 @@ func (s *Server) createProducerDirectCommon(c *client, pName, pType, pConnection
analytics.SendEvent(user.TenantName, user.Username, analyticsParams, "user-create-station-sdk")
}
}
} else {
if version < 2 {
err := errors.New("To produce to this station please upgrade your SDK version")
serv.Warnf("[tenant: %v]createProducerDirectCommon : Producer %v at station %v : %v", user.TenantName, pName, pStationName, err.Error())
return false, false, err, models.Station{}
}
}

newProducer, err := db.InsertNewProducer(name, station.ID, producerType, pConnectionId, station.TenantName, station.PartitionsList)
Expand Down
21 changes: 7 additions & 14 deletions server/memphis_handlers_stations.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,15 +175,15 @@ func (s *Server) createStationDirectIntern(c *client,
jsApiResp := JSApiStreamCreateResponse{ApiResponse: ApiResponse{Type: JSApiStreamCreateResponseType}}
memphisGlobalAcc := s.MemphisGlobalAccount()

if csr.PartitionsNumber == 0 {
if csr.PartitionsNumber == 0 && isNative {
errMsg := fmt.Errorf("you are using an old SDK, make sure to update your SDK")
serv.Warnf("[tenant: %v][user:%v]createStationDirect - tried to use an old SDK", csr.TenantName, csr.Username)
jsApiResp.Error = NewJSStreamCreateError(errMsg)
respondWithErrOrJsApiRespWithEcho(!isNative, c, memphisGlobalAcc, _EMPTY_, reply, _EMPTY_, jsApiResp, errMsg)
return
}

if csr.PartitionsNumber > MAX_PARTITIONS || csr.PartitionsNumber < 1 {
if (csr.PartitionsNumber > MAX_PARTITIONS || csr.PartitionsNumber < 1) && isNative {
errMsg := fmt.Errorf("cannot create station with %v partitions (max:%v min:1): Station - %v, ", csr.PartitionsNumber, MAX_PARTITIONS, csr.StationName)
serv.Warnf("[tenant: %v][user:%v]createStationDirect %v", csr.TenantName, csr.Username, errMsg)
jsApiResp.Error = NewJSStreamCreateError(errMsg)
Expand Down Expand Up @@ -1552,7 +1552,7 @@ func (sh StationsHandler) GetMessageDetails(c *gin.Context) {
Headers: headersJson,
},
Producer: models.ProducerDetailsResp{
Name: "",
Name: "unknown",
IsActive: false,
},
PoisonedCgs: []models.PoisonedCg{},
Expand All @@ -1570,17 +1570,6 @@ func (sh StationsHandler) GetMessageDetails(c *gin.Context) {
}
}

// This check for backward compatability
if connectionIdHeader == "" || producedByHeader == "" {
connectionIdHeader = headersJson["connectionId"]
producedByHeader = strings.ToLower(headersJson["producedBy"])
if connectionIdHeader == "" || producedByHeader == "" {
serv.Warnf("[tenant: %v][user: %v]GetMessageDetails: missing mandatory message headers, please upgrade the SDK version you are using", user.TenantName, user.Username)
c.AbortWithStatusJSON(SHOWABLE_ERROR_STATUS_CODE, gin.H{"message": "missing mandatory message headers, please upgrade the SDK version you are using"})
return
}
}

connectionId := connectionIdHeader
poisonedCgs := make([]models.PoisonedCg, 0)
// Only native stations have CGs
Expand All @@ -1601,6 +1590,10 @@ func (sh StationsHandler) GetMessageDetails(c *gin.Context) {
}
if exist {
isActive = producer.IsActive
} else {
if producedByHeader == "" {
producedByHeader = "unknown"
}
}

msg := models.MessageResponse{
Expand Down
8 changes: 0 additions & 8 deletions server/memphis_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -1070,14 +1070,6 @@ func (s *Server) GetMessagesFromPartition(station models.Station, streamName str
connectionIdHeader := headersJson["$memphis_connectionId"]
producedByHeader := strings.ToLower(headersJson["$memphis_producedBy"])

// This check for backward compatability
if connectionIdHeader == "" || producedByHeader == "" {
connectionIdHeader = headersJson["connectionId"]
producedByHeader = strings.ToLower(headersJson["producedBy"])
if connectionIdHeader == "" || producedByHeader == "" {
return []models.MessageDetails{}, errors.New("missing mandatory message headers, please upgrade the SDK version you are using")
}
}

for header := range headersJson {
if strings.HasPrefix(header, "$memphis") {
Expand Down
41 changes: 39 additions & 2 deletions server/memphis_jsapi_wrappers.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"encoding/json"
"errors"
"memphis/models"
"regexp"
"strings"
"time"
)
Expand Down Expand Up @@ -176,11 +177,13 @@ func (s *Server) memphisJSApiWrapStreamCreate(sub *subscription, c *client, _ *A
return
}

if len(cfg.Name) > 32 {
resp.Error = NewJSStreamCreateError(errors.New("stream name can not be greater than 32 characters"))
streamName, err := validateStreamName(cfg.Name)
if err != nil {
resp.Error = NewJSStreamCreateError(err)
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}
cfg.Name = streamName

go memphisCreateNonNativeStationIfNeeded(s, reply, cfg, c)

Expand All @@ -192,3 +195,37 @@ func (s *Server) memphisJSApiWrapStreamDelete(sub *subscription, c *client, acc

s.jsStreamDeleteRequestIntern(sub, c, acc, subject, reply, rmsg)
}

func validateStreamName(streamName string) (string, error) {
name := strings.ToLower(streamName)
emptyErrStr := "stream name can not be empty"
tooLongErrStr := "stream name should be under 128 characters"
invalidCharErrStr := "only alphanumeric and the '_', '-' characters are allowed in stream name"
firstLetterErrStr := "stream name can not start or end with non alphanumeric character"

emptyErr := errors.New(emptyErrStr)
tooLongErr := errors.New(tooLongErrStr)
invalidCharErr := errors.New(invalidCharErrStr)
firstLetterErr := errors.New(firstLetterErrStr)

if len(name) == 0 {
return "", emptyErr
}

if len(name) > 128 {
return "", tooLongErr
}

re := regexp.MustCompile("^[a-z0-9_-]*$")

validName := re.MatchString(name)
if !validName {
return "", invalidCharErr
}

if name[0:1] == "-" || name[0:1] == "_" || name[len(name)-1:] == "." || name[len(name)-1:] == "-" || name[len(name)-1:] == "_" {
return "", firstLetterErr
}

return name, nil
}

0 comments on commit 129fab8

Please sign in to comment.