Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix station and consumer for old sdks #1234

Merged
merged 1 commit into from
Aug 10, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 12 additions & 7 deletions server/memphis_handlers_consumers.go
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,7 @@ func (s *Server) createConsumerDirectCommon(c *client, consumerName, cStationNam

func (s *Server) createConsumerDirect(c *client, reply string, msg []byte) {
var ccr createConsumerRequestV1
var resp createConsumerResponseV1
var resp createConsumerResponse

tenantName, message, err := s.getTenantNameAndMessage(msg)
if err != nil {
Expand All @@ -279,30 +279,35 @@ func (s *Server) createConsumerDirect(c *client, reply string, msg []byte) {
if ccr.StartConsumeFromSequence <= 0 {
err := errors.New("startConsumeFromSequence has to be a positive number")
serv.Warnf("[tenant: %v]createConsumerDirect: %v", tenantName, err.Error())
respondWithRespErr(serv.MemphisGlobalAccountString(), s, reply, err, &resp)
respondWithErr(serv.MemphisGlobalAccountString(), s, reply, err)
return
}

if ccr.LastMessages < -1 {
err := errors.New("min value for LastMessages is -1")
serv.Warnf("[tenant: %v]createConsumerDirect: %v", tenantName, err.Error())
respondWithRespErr(serv.MemphisGlobalAccountString(), s, reply, err, &resp)
respondWithErr(serv.MemphisGlobalAccountString(), s, reply, err)
return
}

if ccr.StartConsumeFromSequence > 1 && ccr.LastMessages > -1 {
err := errors.New("consumer creation options can't contain both startConsumeFromSequence and lastMessages")
serv.Warnf("[tenant: %v]createConsumerDirect: %v", tenantName, err.Error())
respondWithRespErr(serv.MemphisGlobalAccountString(), s, reply, err, &resp)
respondWithErr(serv.MemphisGlobalAccountString(), s, reply, err)
return
}

partitions, err := s.createConsumerDirectCommon(c, ccr.Name, ccr.StationName, ccr.ConsumerGroup, ccr.ConsumerType, ccr.ConnectionId, tenantName, ccr.Username, ccr.MaxAckTimeMillis, ccr.MaxMsgDeliveries, ccr.RequestVersion, ccr.StartConsumeFromSequence, ccr.LastMessages)
if err != nil {
respondWithRespErr(serv.MemphisGlobalAccountString(), s, reply, err, &resp)
respondWithErr(serv.MemphisGlobalAccountString(), s, reply, err)
}
if len(partitions) == 0 && ccr.RequestVersion < 2 {
respondWithErr(serv.MemphisGlobalAccountString(), s, reply, err)
} else {
v1Resp := createConsumerResponseV1{PartitionsUpdate: models.PartitionsUpdate{PartitionsList: partitions}, Err: ""}
respondWithResp(s.MemphisGlobalAccountString(), s, reply, &v1Resp)

}
resp.PartitionsUpdate = models.PartitionsUpdate{PartitionsList: partitions}
respondWithResp(s.MemphisGlobalAccountString(), s, reply, &resp)
}

func (ch ConsumersHandler) GetCgsByStation(stationName StationName, station models.Station) ([]models.Cg, []models.Cg, []models.Cg, error) { // for socket io endpoint
Expand Down
38 changes: 19 additions & 19 deletions server/memphis_handlers_stations.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,23 +175,6 @@ func (s *Server) createStationDirectIntern(c *client,
jsApiResp := JSApiStreamCreateResponse{ApiResponse: ApiResponse{Type: JSApiStreamCreateResponseType}}
memphisGlobalAcc := s.MemphisGlobalAccount()

if csr.PartitionsNumber == 0 && isNative {
errMsg := fmt.Errorf("you are using an old SDK, make sure to update your SDK")
serv.Warnf("[tenant: %v][user:%v]createStationDirect - tried to use an old SDK", csr.TenantName, csr.Username)
jsApiResp.Error = NewJSStreamCreateError(errMsg)
respondWithErrOrJsApiRespWithEcho(!isNative, c, memphisGlobalAcc, _EMPTY_, reply, _EMPTY_, jsApiResp, errMsg)
return
}

if (csr.PartitionsNumber > MAX_PARTITIONS || csr.PartitionsNumber < 1) && isNative {
errMsg := fmt.Errorf("cannot create station with %v partitions (max:%v min:1): Station - %v, ", csr.PartitionsNumber, MAX_PARTITIONS, csr.StationName)
serv.Warnf("[tenant: %v][user:%v]createStationDirect %v", csr.TenantName, csr.Username, errMsg)
jsApiResp.Error = NewJSStreamCreateError(errMsg)
respondWithErrOrJsApiRespWithEcho(!isNative, c, memphisGlobalAcc, _EMPTY_, reply, _EMPTY_, jsApiResp, errMsg)
return
}
partitionsList := make([]int, 0)

stationName, err := StationNameFromStr(csr.StationName)
if err != nil {
serv.Warnf("[tenant: %v][user:%v]createStationDirect at StationNameFromStr: Station %v: %v", csr.TenantName, csr.Username, csr.StationName, err.Error())
Expand Down Expand Up @@ -227,20 +210,37 @@ func (s *Server) createStationDirectIntern(c *client,
csr.TenantName = t.Name
}

exist, _, err := db.GetStationByName(stationName.Ext(), csr.TenantName)
exist, station, err := db.GetStationByName(stationName.Ext(), csr.TenantName)
if err != nil {
serv.Errorf("[tenant: %v][user:%v]createStationDirect at db.GetStationByName: Station %v: %v", csr.TenantName, csr.Username, csr.StationName, err.Error())
jsApiResp.Error = NewJSStreamCreateError(err)
respondWithErrOrJsApiRespWithEcho(!isNative, c, memphisGlobalAcc, _EMPTY_, reply, _EMPTY_, jsApiResp, err)
return
}

if exist {
if exist && ((csr.PartitionsNumber == 0 && len(station.PartitionsList) == 0) || (csr.PartitionsNumber > 0 && len(station.PartitionsList) > 0)) {
jsApiResp.Error = NewJSStreamNameExistError()
respondWithErrOrJsApiRespWithEcho(!isNative, c, memphisGlobalAcc, _EMPTY_, reply, _EMPTY_, jsApiResp, err)
return
}

if csr.PartitionsNumber == 0 && isNative {
errMsg := fmt.Errorf("you are using an old SDK, make sure to update your SDK")
serv.Warnf("[tenant: %v][user:%v]createStationDirect - tried to use an old SDK", csr.TenantName, csr.Username)
jsApiResp.Error = NewJSStreamCreateError(errMsg)
respondWithErrOrJsApiRespWithEcho(!isNative, c, memphisGlobalAcc, _EMPTY_, reply, _EMPTY_, jsApiResp, errMsg)
return
}

if (csr.PartitionsNumber > MAX_PARTITIONS || csr.PartitionsNumber < 1) && isNative {
errMsg := fmt.Errorf("cannot create station with %v partitions (max:%v min:1): Station - %v, ", csr.PartitionsNumber, MAX_PARTITIONS, csr.StationName)
serv.Warnf("[tenant: %v][user:%v]createStationDirect %v", csr.TenantName, csr.Username, errMsg)
jsApiResp.Error = NewJSStreamCreateError(errMsg)
respondWithErrOrJsApiRespWithEcho(!isNative, c, memphisGlobalAcc, _EMPTY_, reply, _EMPTY_, jsApiResp, errMsg)
return
}
partitionsList := make([]int, 0)

schemaName := csr.SchemaName
var schemaDetails models.SchemaDetails
if schemaName != "" {
Expand Down