Skip to content

Commit

Permalink
fix station and consumer for old sdks (#1234)
Browse files Browse the repository at this point in the history
  • Loading branch information
shay23b committed Aug 10, 2023
1 parent 6464111 commit 7ab086c
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 26 deletions.
19 changes: 12 additions & 7 deletions server/memphis_handlers_consumers.go
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,7 @@ func (s *Server) createConsumerDirectCommon(c *client, consumerName, cStationNam

func (s *Server) createConsumerDirect(c *client, reply string, msg []byte) {
var ccr createConsumerRequestV1
var resp createConsumerResponseV1
var resp createConsumerResponse

tenantName, message, err := s.getTenantNameAndMessage(msg)
if err != nil {
Expand All @@ -279,30 +279,35 @@ func (s *Server) createConsumerDirect(c *client, reply string, msg []byte) {
if ccr.StartConsumeFromSequence <= 0 {
err := errors.New("startConsumeFromSequence has to be a positive number")
serv.Warnf("[tenant: %v]createConsumerDirect: %v", tenantName, err.Error())
respondWithRespErr(serv.MemphisGlobalAccountString(), s, reply, err, &resp)
respondWithErr(serv.MemphisGlobalAccountString(), s, reply, err)
return
}

if ccr.LastMessages < -1 {
err := errors.New("min value for LastMessages is -1")
serv.Warnf("[tenant: %v]createConsumerDirect: %v", tenantName, err.Error())
respondWithRespErr(serv.MemphisGlobalAccountString(), s, reply, err, &resp)
respondWithErr(serv.MemphisGlobalAccountString(), s, reply, err)
return
}

if ccr.StartConsumeFromSequence > 1 && ccr.LastMessages > -1 {
err := errors.New("consumer creation options can't contain both startConsumeFromSequence and lastMessages")
serv.Warnf("[tenant: %v]createConsumerDirect: %v", tenantName, err.Error())
respondWithRespErr(serv.MemphisGlobalAccountString(), s, reply, err, &resp)
respondWithErr(serv.MemphisGlobalAccountString(), s, reply, err)
return
}

partitions, err := s.createConsumerDirectCommon(c, ccr.Name, ccr.StationName, ccr.ConsumerGroup, ccr.ConsumerType, ccr.ConnectionId, tenantName, ccr.Username, ccr.MaxAckTimeMillis, ccr.MaxMsgDeliveries, ccr.RequestVersion, ccr.StartConsumeFromSequence, ccr.LastMessages)
if err != nil {
respondWithRespErr(serv.MemphisGlobalAccountString(), s, reply, err, &resp)
respondWithErr(serv.MemphisGlobalAccountString(), s, reply, err)
}
if len(partitions) == 0 && ccr.RequestVersion < 2 {
respondWithErr(serv.MemphisGlobalAccountString(), s, reply, err)
} else {
v1Resp := createConsumerResponseV1{PartitionsUpdate: models.PartitionsUpdate{PartitionsList: partitions}, Err: ""}
respondWithResp(s.MemphisGlobalAccountString(), s, reply, &v1Resp)

}
resp.PartitionsUpdate = models.PartitionsUpdate{PartitionsList: partitions}
respondWithResp(s.MemphisGlobalAccountString(), s, reply, &resp)
}

func (ch ConsumersHandler) GetCgsByStation(stationName StationName, station models.Station) ([]models.Cg, []models.Cg, []models.Cg, error) { // for socket io endpoint
Expand Down
38 changes: 19 additions & 19 deletions server/memphis_handlers_stations.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,23 +175,6 @@ func (s *Server) createStationDirectIntern(c *client,
jsApiResp := JSApiStreamCreateResponse{ApiResponse: ApiResponse{Type: JSApiStreamCreateResponseType}}
memphisGlobalAcc := s.MemphisGlobalAccount()

if csr.PartitionsNumber == 0 && isNative {
errMsg := fmt.Errorf("you are using an old SDK, make sure to update your SDK")
serv.Warnf("[tenant: %v][user:%v]createStationDirect - tried to use an old SDK", csr.TenantName, csr.Username)
jsApiResp.Error = NewJSStreamCreateError(errMsg)
respondWithErrOrJsApiRespWithEcho(!isNative, c, memphisGlobalAcc, _EMPTY_, reply, _EMPTY_, jsApiResp, errMsg)
return
}

if (csr.PartitionsNumber > MAX_PARTITIONS || csr.PartitionsNumber < 1) && isNative {
errMsg := fmt.Errorf("cannot create station with %v partitions (max:%v min:1): Station - %v, ", csr.PartitionsNumber, MAX_PARTITIONS, csr.StationName)
serv.Warnf("[tenant: %v][user:%v]createStationDirect %v", csr.TenantName, csr.Username, errMsg)
jsApiResp.Error = NewJSStreamCreateError(errMsg)
respondWithErrOrJsApiRespWithEcho(!isNative, c, memphisGlobalAcc, _EMPTY_, reply, _EMPTY_, jsApiResp, errMsg)
return
}
partitionsList := make([]int, 0)

stationName, err := StationNameFromStr(csr.StationName)
if err != nil {
serv.Warnf("[tenant: %v][user:%v]createStationDirect at StationNameFromStr: Station %v: %v", csr.TenantName, csr.Username, csr.StationName, err.Error())
Expand Down Expand Up @@ -227,20 +210,37 @@ func (s *Server) createStationDirectIntern(c *client,
csr.TenantName = t.Name
}

exist, _, err := db.GetStationByName(stationName.Ext(), csr.TenantName)
exist, station, err := db.GetStationByName(stationName.Ext(), csr.TenantName)
if err != nil {
serv.Errorf("[tenant: %v][user:%v]createStationDirect at db.GetStationByName: Station %v: %v", csr.TenantName, csr.Username, csr.StationName, err.Error())
jsApiResp.Error = NewJSStreamCreateError(err)
respondWithErrOrJsApiRespWithEcho(!isNative, c, memphisGlobalAcc, _EMPTY_, reply, _EMPTY_, jsApiResp, err)
return
}

if exist {
if exist && ((csr.PartitionsNumber == 0 && len(station.PartitionsList) == 0) || (csr.PartitionsNumber > 0 && len(station.PartitionsList) > 0)) {
jsApiResp.Error = NewJSStreamNameExistError()
respondWithErrOrJsApiRespWithEcho(!isNative, c, memphisGlobalAcc, _EMPTY_, reply, _EMPTY_, jsApiResp, err)
return
}

if csr.PartitionsNumber == 0 && isNative {
errMsg := fmt.Errorf("you are using an old SDK, make sure to update your SDK")
serv.Warnf("[tenant: %v][user:%v]createStationDirect - tried to use an old SDK", csr.TenantName, csr.Username)
jsApiResp.Error = NewJSStreamCreateError(errMsg)
respondWithErrOrJsApiRespWithEcho(!isNative, c, memphisGlobalAcc, _EMPTY_, reply, _EMPTY_, jsApiResp, errMsg)
return
}

if (csr.PartitionsNumber > MAX_PARTITIONS || csr.PartitionsNumber < 1) && isNative {
errMsg := fmt.Errorf("cannot create station with %v partitions (max:%v min:1): Station - %v, ", csr.PartitionsNumber, MAX_PARTITIONS, csr.StationName)
serv.Warnf("[tenant: %v][user:%v]createStationDirect %v", csr.TenantName, csr.Username, errMsg)
jsApiResp.Error = NewJSStreamCreateError(errMsg)
respondWithErrOrJsApiRespWithEcho(!isNative, c, memphisGlobalAcc, _EMPTY_, reply, _EMPTY_, jsApiResp, errMsg)
return
}
partitionsList := make([]int, 0)

schemaName := csr.SchemaName
var schemaDetails models.SchemaDetails
if schemaName != "" {
Expand Down

0 comments on commit 7ab086c

Please sign in to comment.