From 983065ca2b6916f025e43a22a3aaf19cad042d0c Mon Sep 17 00:00:00 2001 From: shay23b Date: Thu, 10 Aug 2023 01:18:11 +0300 Subject: [PATCH] fix station and consumer for old sdks --- server/memphis_handlers_consumers.go | 19 +++++++++----- server/memphis_handlers_stations.go | 38 ++++++++++++++-------------- 2 files changed, 31 insertions(+), 26 deletions(-) diff --git a/server/memphis_handlers_consumers.go b/server/memphis_handlers_consumers.go index 61b11f4ab..9cf4fec1b 100644 --- a/server/memphis_handlers_consumers.go +++ b/server/memphis_handlers_consumers.go @@ -256,7 +256,7 @@ func (s *Server) createConsumerDirectCommon(c *client, consumerName, cStationNam func (s *Server) createConsumerDirect(c *client, reply string, msg []byte) { var ccr createConsumerRequestV1 - var resp createConsumerResponseV1 + var resp createConsumerResponse tenantName, message, err := s.getTenantNameAndMessage(msg) if err != nil { @@ -279,30 +279,35 @@ func (s *Server) createConsumerDirect(c *client, reply string, msg []byte) { if ccr.StartConsumeFromSequence <= 0 { err := errors.New("startConsumeFromSequence has to be a positive number") serv.Warnf("[tenant: %v]createConsumerDirect: %v", tenantName, err.Error()) - respondWithRespErr(serv.MemphisGlobalAccountString(), s, reply, err, &resp) + respondWithErr(serv.MemphisGlobalAccountString(), s, reply, err) return } if ccr.LastMessages < -1 { err := errors.New("min value for LastMessages is -1") serv.Warnf("[tenant: %v]createConsumerDirect: %v", tenantName, err.Error()) - respondWithRespErr(serv.MemphisGlobalAccountString(), s, reply, err, &resp) + respondWithErr(serv.MemphisGlobalAccountString(), s, reply, err) return } if ccr.StartConsumeFromSequence > 1 && ccr.LastMessages > -1 { err := errors.New("consumer creation options can't contain both startConsumeFromSequence and lastMessages") serv.Warnf("[tenant: %v]createConsumerDirect: %v", tenantName, err.Error()) - respondWithRespErr(serv.MemphisGlobalAccountString(), s, reply, err, &resp) + respondWithErr(serv.MemphisGlobalAccountString(), s, reply, err) return } partitions, err := s.createConsumerDirectCommon(c, ccr.Name, ccr.StationName, ccr.ConsumerGroup, ccr.ConsumerType, ccr.ConnectionId, tenantName, ccr.Username, ccr.MaxAckTimeMillis, ccr.MaxMsgDeliveries, ccr.RequestVersion, ccr.StartConsumeFromSequence, ccr.LastMessages) if err != nil { - respondWithRespErr(serv.MemphisGlobalAccountString(), s, reply, err, &resp) + respondWithErr(serv.MemphisGlobalAccountString(), s, reply, err) + } + if len(partitions) == 0 && ccr.RequestVersion < 2 { + respondWithErr(serv.MemphisGlobalAccountString(), s, reply, err) + } else { + v1Resp := createConsumerResponseV1{PartitionsUpdate: models.PartitionsUpdate{PartitionsList: partitions}, Err: ""} + respondWithResp(s.MemphisGlobalAccountString(), s, reply, &v1Resp) + } - resp.PartitionsUpdate = models.PartitionsUpdate{PartitionsList: partitions} - respondWithResp(s.MemphisGlobalAccountString(), s, reply, &resp) } func (ch ConsumersHandler) GetCgsByStation(stationName StationName, station models.Station) ([]models.Cg, []models.Cg, []models.Cg, error) { // for socket io endpoint diff --git a/server/memphis_handlers_stations.go b/server/memphis_handlers_stations.go index b9922f833..96e1ceadb 100644 --- a/server/memphis_handlers_stations.go +++ b/server/memphis_handlers_stations.go @@ -175,23 +175,6 @@ func (s *Server) createStationDirectIntern(c *client, jsApiResp := JSApiStreamCreateResponse{ApiResponse: ApiResponse{Type: JSApiStreamCreateResponseType}} memphisGlobalAcc := s.MemphisGlobalAccount() - if csr.PartitionsNumber == 0 && isNative { - errMsg := fmt.Errorf("you are using an old SDK, make sure to update your SDK") - serv.Warnf("[tenant: %v][user:%v]createStationDirect - tried to use an old SDK", csr.TenantName, csr.Username) - jsApiResp.Error = NewJSStreamCreateError(errMsg) - respondWithErrOrJsApiRespWithEcho(!isNative, c, memphisGlobalAcc, _EMPTY_, reply, _EMPTY_, jsApiResp, errMsg) - return - } - - if (csr.PartitionsNumber > MAX_PARTITIONS || csr.PartitionsNumber < 1) && isNative { - errMsg := fmt.Errorf("cannot create station with %v partitions (max:%v min:1): Station - %v, ", csr.PartitionsNumber, MAX_PARTITIONS, csr.StationName) - serv.Warnf("[tenant: %v][user:%v]createStationDirect %v", csr.TenantName, csr.Username, errMsg) - jsApiResp.Error = NewJSStreamCreateError(errMsg) - respondWithErrOrJsApiRespWithEcho(!isNative, c, memphisGlobalAcc, _EMPTY_, reply, _EMPTY_, jsApiResp, errMsg) - return - } - partitionsList := make([]int, 0) - stationName, err := StationNameFromStr(csr.StationName) if err != nil { serv.Warnf("[tenant: %v][user:%v]createStationDirect at StationNameFromStr: Station %v: %v", csr.TenantName, csr.Username, csr.StationName, err.Error()) @@ -227,7 +210,7 @@ func (s *Server) createStationDirectIntern(c *client, csr.TenantName = t.Name } - exist, _, err := db.GetStationByName(stationName.Ext(), csr.TenantName) + exist, station, err := db.GetStationByName(stationName.Ext(), csr.TenantName) if err != nil { serv.Errorf("[tenant: %v][user:%v]createStationDirect at db.GetStationByName: Station %v: %v", csr.TenantName, csr.Username, csr.StationName, err.Error()) jsApiResp.Error = NewJSStreamCreateError(err) @@ -235,12 +218,29 @@ func (s *Server) createStationDirectIntern(c *client, return } - if exist { + if exist && ((csr.PartitionsNumber == 0 && len(station.PartitionsList) == 0) || (csr.PartitionsNumber > 0 && len(station.PartitionsList) > 0)) { jsApiResp.Error = NewJSStreamNameExistError() respondWithErrOrJsApiRespWithEcho(!isNative, c, memphisGlobalAcc, _EMPTY_, reply, _EMPTY_, jsApiResp, err) return } + if csr.PartitionsNumber == 0 && isNative { + errMsg := fmt.Errorf("you are using an old SDK, make sure to update your SDK") + serv.Warnf("[tenant: %v][user:%v]createStationDirect - tried to use an old SDK", csr.TenantName, csr.Username) + jsApiResp.Error = NewJSStreamCreateError(errMsg) + respondWithErrOrJsApiRespWithEcho(!isNative, c, memphisGlobalAcc, _EMPTY_, reply, _EMPTY_, jsApiResp, errMsg) + return + } + + if (csr.PartitionsNumber > MAX_PARTITIONS || csr.PartitionsNumber < 1) && isNative { + errMsg := fmt.Errorf("cannot create station with %v partitions (max:%v min:1): Station - %v, ", csr.PartitionsNumber, MAX_PARTITIONS, csr.StationName) + serv.Warnf("[tenant: %v][user:%v]createStationDirect %v", csr.TenantName, csr.Username, errMsg) + jsApiResp.Error = NewJSStreamCreateError(errMsg) + respondWithErrOrJsApiRespWithEcho(!isNative, c, memphisGlobalAcc, _EMPTY_, reply, _EMPTY_, jsApiResp, errMsg) + return + } + partitionsList := make([]int, 0) + schemaName := csr.SchemaName var schemaDetails models.SchemaDetails if schemaName != "" {