Skip to content

Commit

Permalink
Merge pull request #1205 from memphisdev/producer_partitioning
Browse files Browse the repository at this point in the history
Producer partitioning
  • Loading branch information
daniel-davidd committed Aug 7, 2023
2 parents 912f11c + 3f8f718 commit b5212a1
Show file tree
Hide file tree
Showing 3 changed files with 19 additions and 12 deletions.
4 changes: 4 additions & 0 deletions models/stations.go
Original file line number Diff line number Diff line change
Expand Up @@ -255,3 +255,7 @@ type StationOverviewSchemaDetails struct {
type GetUpdatesForSchema struct {
StationName string `form:"station_name" json:"station_name" binding:"required"`
}

type PartitionsUpdate struct {
PartitionsList []int `json:"partitions_list"`
}
26 changes: 14 additions & 12 deletions server/memphis_handlers_producers.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,42 +43,42 @@ func validateProducerType(producerType string) error {
return nil
}

func (s *Server) createProducerDirectCommon(c *client, pName, pType, pConnectionId string, pStationName StationName, username string, tenantName string) (bool, bool, error) {
func (s *Server) createProducerDirectCommon(c *client, pName, pType, pConnectionId string, pStationName StationName, username string, tenantName string) (bool, bool, error, models.Station) {
name := strings.ToLower(pName)
err := validateProducerName(name)
if err != nil {
serv.Warnf("createProducerDirectCommon at validateProducerName: Producer %v at station %v: %v", pName, pStationName.external, err.Error())
return false, false, err
return false, false, err, models.Station{}
}

producerType := strings.ToLower(pType)
err = validateProducerType(producerType)
if err != nil {
serv.Warnf("createProducerDirectCommon at validateProducerType: Producer %v at station %v: %v", pName, pStationName.external, err.Error())
return false, false, err
return false, false, err, models.Station{}
}

exist, user, err := memphis_cache.GetUser(username, tenantName, false)
if err != nil {
serv.Errorf("createProducerDirectCommon at GetUser: Producer %v at station %v: %v", pName, pStationName.external, err.Error())
return false, false, err
return false, false, err, models.Station{}
}
if !exist {
serv.Warnf("createProducerDirectCommon: User %v does not exist", username)
return false, false, errors.New("User " + username + " does not exist")
return false, false, errors.New("User " + username + " does not exist"), models.Station{}
}

exist, station, err := db.GetStationByName(pStationName.Ext(), user.TenantName)
if err != nil {
serv.Errorf("[tenant: %v][user: %v]createProducerDirectCommon at GetStationByName: Producer %v at station %v: %v", user.TenantName, user.Username, pName, pStationName.external, err.Error())
return false, false, err
return false, false, err, models.Station{}
}
if !exist {
var created bool
station, created, err = CreateDefaultStation(user.TenantName, s, pStationName, user.ID, user.Username)
if err != nil {
serv.Errorf("[tenant: %v][user: %v]createProducerDirectCommon at CreateDefaultStation: creating default station error - producer %v at station %v: %v", user.TenantName, user.Username, pName, pStationName.external, err.Error())
return false, false, err
return false, false, err, models.Station{}
}
if created {
message := "Station " + pStationName.Ext() + " has been created by user " + user.Username
Expand Down Expand Up @@ -109,7 +109,7 @@ func (s *Server) createProducerDirectCommon(c *client, pName, pType, pConnection
newProducer, err := db.InsertNewProducer(name, station.ID, producerType, pConnectionId, station.TenantName)
if err != nil {
serv.Warnf("[tenant: %v][user: %v]createProducerDirectCommon at InsertNewProducer: %v", user.TenantName, user.Username, err.Error())
return false, false, err
return false, false, err, models.Station{}
}
message := "Producer " + name + " connected"
serv.Noticef("[tenant: %v][user: %v]: %v", user.TenantName, user.Username, message)
Expand All @@ -126,7 +126,7 @@ func (s *Server) createProducerDirectCommon(c *client, pName, pType, pConnection
err = CreateAuditLogs(auditLogs)
if err != nil {
serv.Errorf("[tenant: %v][user: %v]createProducerDirectCommon at CreateAuditLogs: Producer %v at station %v: %v", user.TenantName, user.Username, pName, pStationName.external, err.Error())
return false, false, err
return false, false, err, models.Station{}
}

shouldSendAnalytics, _ := shouldSendAnalytics()
Expand All @@ -141,7 +141,7 @@ func (s *Server) createProducerDirectCommon(c *client, pName, pType, pConnection
}

shouldSendNotifications := shouldSendNotification(user.TenantName, SchemaVAlert)
return shouldSendNotifications, station.DlsConfigurationSchemaverse, nil
return shouldSendNotifications, station.DlsConfigurationSchemaverse, nil, station
}

func (s *Server) createProducerDirectV0(c *client, reply string, cpr createProducerRequestV0, tenantName string) {
Expand All @@ -150,7 +150,7 @@ func (s *Server) createProducerDirectV0(c *client, reply string, cpr createProdu
respondWithErr(s.MemphisGlobalAccountString(), s, reply, err)
return
}
_, _, err = s.createProducerDirectCommon(c, cpr.Name,
_, _, err, _ = s.createProducerDirectCommon(c, cpr.Name,
cpr.ProducerType, cpr.ConnectionId, sn, cpr.Username, tenantName)
respondWithErr(s.MemphisGlobalAccountString(), s, reply, err)
}
Expand Down Expand Up @@ -183,12 +183,14 @@ func (s *Server) createProducerDirect(c *client, reply string, msg []byte) {
return
}

clusterSendNotification, schemaVerseToDls, err := s.createProducerDirectCommon(c, cpr.Name, cpr.ProducerType, cpr.ConnectionId, sn, cpr.Username, tenantName)
clusterSendNotification, schemaVerseToDls, err, station := s.createProducerDirectCommon(c, cpr.Name, cpr.ProducerType, cpr.ConnectionId, sn, cpr.Username, tenantName)
if err != nil {
respondWithRespErr(s.MemphisGlobalAccountString(), s, reply, err, &resp)
return
}

partitions := models.PartitionsUpdate{PartitionsList: station.PartitionsList}
resp.PartitionsUpdate = partitions
resp.SchemaVerseToDls = schemaVerseToDls
resp.ClusterSendNotification = clusterSendNotification
schemaUpdate, err := getSchemaUpdateInitFromStation(sn, cpr.TenantName)
Expand Down
1 change: 1 addition & 0 deletions server/memphis_sdk_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ type createConsumerResponse struct {

type createProducerResponse struct {
SchemaUpdate models.ProducerSchemaUpdateInit `json:"schema_update"`
PartitionsUpdate models.PartitionsUpdate `json:"partitions_update"`
SchemaVerseToDls bool `json:"schemaverse_to_dls"`
ClusterSendNotification bool `json:"send_notification"`
Err string `json:"error"`
Expand Down

0 comments on commit b5212a1

Please sign in to comment.