Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

nats compatibility + handle produce/consume from old sdk to new station #1224

Merged
merged 4 commits into from
Aug 9, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions server/memphis_handlers_consumers.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,12 @@ func (s *Server) createConsumerDirectCommon(c *client, consumerName, cStationNam
analytics.SendEvent(user.TenantName, user.Username, analyticsParams, "user-create-station-sdk")
}
}
} else {
if version < 2 {
err := errors.New("To consume from this station please upgrade your SDK version")
serv.Warnf("[tenant: %v]createConsumerDirectCommon at CreateDefaultStation: Consumer %v at station %v : %v", tenantName, consumerName, cStationName, err.Error())
return []int{}, err
}
}

consumerGroupExist, consumerFromGroup, err := isConsumerGroupExist(consumerGroup, station.ID)
Expand Down
8 changes: 1 addition & 7 deletions server/memphis_handlers_dls_messages.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,14 +94,8 @@ func (s *Server) handleNewUnackedMsg(msg []byte) error {
poisonedCgs := []string{}
if station.IsNative {
producedByHeader = headersJson["$memphis_producedBy"]

// This check for backward compatability
if producedByHeader == "" {
producedByHeader = headersJson["producedBy"]
if producedByHeader == "" {
serv.Warnf("handleNewUnackedMsg: Error while getting notified about a poison message: Missing mandatory message headers, please upgrade the SDK version you are using")
return nil
}
producedByHeader = "unknown"
}

if producedByHeader == "$memphis_dls" { // skip poison messages which have been resent
Expand Down
6 changes: 6 additions & 0 deletions server/memphis_handlers_producers.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,12 @@ func (s *Server) createProducerDirectCommon(c *client, pName, pType, pConnection
analytics.SendEvent(user.TenantName, user.Username, analyticsParams, "user-create-station-sdk")
}
}
} else {
if version < 2 {
err := errors.New("To produce to this station please upgrade your SDK version")
serv.Warnf("[tenant: %v]createProducerDirectCommon : Producer %v at station %v : %v", user.TenantName, pName, pStationName, err.Error())
return false, false, err, models.Station{}
}
}

newProducer, err := db.InsertNewProducer(name, station.ID, producerType, pConnectionId, station.TenantName, station.PartitionsList)
Expand Down
21 changes: 7 additions & 14 deletions server/memphis_handlers_stations.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,15 +175,15 @@ func (s *Server) createStationDirectIntern(c *client,
jsApiResp := JSApiStreamCreateResponse{ApiResponse: ApiResponse{Type: JSApiStreamCreateResponseType}}
memphisGlobalAcc := s.MemphisGlobalAccount()

if csr.PartitionsNumber == 0 {
if csr.PartitionsNumber == 0 && isNative {
errMsg := fmt.Errorf("you are using an old SDK, make sure to update your SDK")
serv.Warnf("[tenant: %v][user:%v]createStationDirect - tried to use an old SDK", csr.TenantName, csr.Username)
jsApiResp.Error = NewJSStreamCreateError(errMsg)
respondWithErrOrJsApiRespWithEcho(!isNative, c, memphisGlobalAcc, _EMPTY_, reply, _EMPTY_, jsApiResp, errMsg)
return
}

if csr.PartitionsNumber > MAX_PARTITIONS || csr.PartitionsNumber < 1 {
if (csr.PartitionsNumber > MAX_PARTITIONS || csr.PartitionsNumber < 1) && isNative {
errMsg := fmt.Errorf("cannot create station with %v partitions (max:%v min:1): Station - %v, ", csr.PartitionsNumber, MAX_PARTITIONS, csr.StationName)
serv.Warnf("[tenant: %v][user:%v]createStationDirect %v", csr.TenantName, csr.Username, errMsg)
jsApiResp.Error = NewJSStreamCreateError(errMsg)
Expand Down Expand Up @@ -1552,7 +1552,7 @@ func (sh StationsHandler) GetMessageDetails(c *gin.Context) {
Headers: headersJson,
},
Producer: models.ProducerDetailsResp{
Name: "",
Name: "unknown",
IsActive: false,
},
PoisonedCgs: []models.PoisonedCg{},
Expand All @@ -1570,17 +1570,6 @@ func (sh StationsHandler) GetMessageDetails(c *gin.Context) {
}
}

// This check for backward compatability
if connectionIdHeader == "" || producedByHeader == "" {
connectionIdHeader = headersJson["connectionId"]
producedByHeader = strings.ToLower(headersJson["producedBy"])
if connectionIdHeader == "" || producedByHeader == "" {
serv.Warnf("[tenant: %v][user: %v]GetMessageDetails: missing mandatory message headers, please upgrade the SDK version you are using", user.TenantName, user.Username)
c.AbortWithStatusJSON(SHOWABLE_ERROR_STATUS_CODE, gin.H{"message": "missing mandatory message headers, please upgrade the SDK version you are using"})
return
}
}

connectionId := connectionIdHeader
poisonedCgs := make([]models.PoisonedCg, 0)
// Only native stations have CGs
Expand All @@ -1601,6 +1590,10 @@ func (sh StationsHandler) GetMessageDetails(c *gin.Context) {
}
if exist {
isActive = producer.IsActive
} else {
if producedByHeader == "" {
producedByHeader = "unknown"
}
}

msg := models.MessageResponse{
Expand Down
8 changes: 0 additions & 8 deletions server/memphis_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -1070,14 +1070,6 @@ func (s *Server) GetMessagesFromPartition(station models.Station, streamName str
connectionIdHeader := headersJson["$memphis_connectionId"]
producedByHeader := strings.ToLower(headersJson["$memphis_producedBy"])

// This check for backward compatability
if connectionIdHeader == "" || producedByHeader == "" {
connectionIdHeader = headersJson["connectionId"]
producedByHeader = strings.ToLower(headersJson["producedBy"])
if connectionIdHeader == "" || producedByHeader == "" {
return []models.MessageDetails{}, errors.New("missing mandatory message headers, please upgrade the SDK version you are using")
}
}

for header := range headersJson {
if strings.HasPrefix(header, "$memphis") {
Expand Down
41 changes: 39 additions & 2 deletions server/memphis_jsapi_wrappers.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"encoding/json"
"errors"
"memphis/models"
"regexp"
"strings"
"time"
)
Expand Down Expand Up @@ -176,11 +177,13 @@ func (s *Server) memphisJSApiWrapStreamCreate(sub *subscription, c *client, _ *A
return
}

if len(cfg.Name) > 32 {
resp.Error = NewJSStreamCreateError(errors.New("stream name can not be greater than 32 characters"))
streamName, err := validateStreamName(cfg.Name)
if err != nil {
resp.Error = NewJSStreamCreateError(err)
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}
cfg.Name = streamName

go memphisCreateNonNativeStationIfNeeded(s, reply, cfg, c)

Expand All @@ -192,3 +195,37 @@ func (s *Server) memphisJSApiWrapStreamDelete(sub *subscription, c *client, acc

s.jsStreamDeleteRequestIntern(sub, c, acc, subject, reply, rmsg)
}

func validateStreamName(streamName string) (string, error) {
name := strings.ToLower(streamName)
emptyErrStr := "stream name can not be empty"
tooLongErrStr := "stream name should be under 128 characters"
invalidCharErrStr := "only alphanumeric and the '_', '-' characters are allowed in stream name"
firstLetterErrStr := "stream name can not start or end with non alphanumeric character"

emptyErr := errors.New(emptyErrStr)
tooLongErr := errors.New(tooLongErrStr)
invalidCharErr := errors.New(invalidCharErrStr)
firstLetterErr := errors.New(firstLetterErrStr)

if len(name) == 0 {
return "", emptyErr
}

if len(name) > 128 {
return "", tooLongErr
}

re := regexp.MustCompile("^[a-z0-9_-]*$")

validName := re.MatchString(name)
if !validName {
return "", invalidCharErr
}

if name[0:1] == "-" || name[0:1] == "_" || name[len(name)-1:] == "." || name[len(name)-1:] == "-" || name[len(name)-1:] == "_" {
return "", firstLetterErr
}

return name, nil
}