Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

consumer schema update listener added for SDKs #1292

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 10 additions & 10 deletions models/schemas.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,25 +81,25 @@ type ExtendedSchemaDetails struct {
CreatedByUsername string `json:"created_by_username"`
}

type ProducerSchemaUpdateType int
type SchemaUpdateType int

const (
SchemaUpdateTypeInit ProducerSchemaUpdateType = iota + 1
SchemaUpdateTypeInit SchemaUpdateType = iota + 1
SchemaUpdateTypeDrop
)

type ProducerSchemaUpdate struct {
UpdateType ProducerSchemaUpdateType
Init ProducerSchemaUpdateInit `json:"init,omitempty"`
type SchemaUpdate struct {
UpdateType SchemaUpdateType
Init SchemaUpdateInit `json:"init,omitempty"`
}

type ProducerSchemaUpdateInit struct {
SchemaName string `json:"schema_name"`
ActiveVersion ProducerSchemaUpdateVersion `json:"active_version"`
SchemaType string `json:"type"`
type SchemaUpdateInit struct {
SchemaName string `json:"schema_name"`
ActiveVersion SchemaUpdateVersion `json:"active_version"`
SchemaType string `json:"type"`
}

type ProducerSchemaUpdateVersion struct {
type SchemaUpdateVersion struct {
VersionNumber int `json:"version_number"`
Descriptor string `json:"descriptor"`
Content string `json:"schema_content"`
Expand Down
23 changes: 21 additions & 2 deletions server/memphis_handlers_consumers.go
Original file line number Diff line number Diff line change
Expand Up @@ -321,12 +321,31 @@ func (s *Server) createConsumerDirect(c *client, reply string, msg []byte) {
if err != nil {
respondWithErr(serv.MemphisGlobalAccountString(), s, reply, err)
}

sn, err := StationNameFromStr(ccr.StationName)
if err != nil {
s.Warnf("[tenant: %v][user: %v]createConsumerDirect at StationNameFromStr: Consumer %v at station %v: %v", ccr.TenantName, ccr.Username, ccr.Name, ccr.StationName, err.Error())
respondWithRespErr(s.MemphisGlobalAccountString(), s, reply, err, &resp)
return
}

schemaUpdate, err := getSchemaUpdateInitFromStation(sn, ccr.TenantName)
if err == ErrNoSchema {
v1Resp := createConsumerResponseV1{PartitionsUpdate: models.PartitionsUpdate{PartitionsList: partitions}, Err: ""}
respondWithResp(s.MemphisGlobalAccountString(), s, reply, &v1Resp)
return
}
if err != nil {
s.Errorf("[tenant: %v][user: %v]createConsumerDirect at getSchemaUpdateInitFromStation: Consumer %v at station %v: %v", ccr.TenantName, ccr.Username, ccr.Name, ccr.StationName, err.Error())
respondWithRespErr(s.MemphisGlobalAccountString(), s, reply, err, &resp)
return
}

if len(partitions) == 0 && ccr.RequestVersion < 2 {
respondWithErr(serv.MemphisGlobalAccountString(), s, reply, err)
} else {
v1Resp := createConsumerResponseV1{PartitionsUpdate: models.PartitionsUpdate{PartitionsList: partitions}, Err: ""}
v1Resp := createConsumerResponseV1{SchemaUpdate: *schemaUpdate, PartitionsUpdate: models.PartitionsUpdate{PartitionsList: partitions}, Err: ""}
respondWithResp(s.MemphisGlobalAccountString(), s, reply, &v1Resp)

}
}

Expand Down
10 changes: 5 additions & 5 deletions server/memphis_handlers_schemas.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,15 +185,15 @@ func validateMessageStructName(messageStructName string) error {
return nil
}

func generateSchemaUpdateInit(schema models.Schema) (*models.ProducerSchemaUpdateInit, error) {
func generateSchemaUpdateInit(schema models.Schema) (*models.SchemaUpdateInit, error) {
activeVersion, err := getActiveVersionBySchemaId(schema.ID)
if err != nil {
return nil, err
}

return &models.ProducerSchemaUpdateInit{
return &models.SchemaUpdateInit{
SchemaName: schema.Name,
ActiveVersion: models.ProducerSchemaUpdateVersion{
ActiveVersion: models.SchemaUpdateVersion{
VersionNumber: activeVersion.VersionNumber,
Descriptor: activeVersion.Descriptor,
Content: activeVersion.SchemaContent,
Expand All @@ -203,7 +203,7 @@ func generateSchemaUpdateInit(schema models.Schema) (*models.ProducerSchemaUpdat
}, nil
}

func getSchemaUpdateInitFromStation(sn StationName, tenantName string) (*models.ProducerSchemaUpdateInit, error) {
func getSchemaUpdateInitFromStation(sn StationName, tenantName string) (*models.SchemaUpdateInit, error) {
schema, err := getSchemaByStationName(sn, tenantName)
if err != nil {
return nil, err
Expand All @@ -212,7 +212,7 @@ func getSchemaUpdateInitFromStation(sn StationName, tenantName string) (*models.
return generateSchemaUpdateInit(schema)
}

func (s *Server) updateStationProducersOfSchemaChange(tenantName string, sn StationName, schemaUpdate models.ProducerSchemaUpdate) {
func (s *Server) updateStationProducersOfSchemaChange(tenantName string, sn StationName, schemaUpdate models.SchemaUpdate) {
subject := fmt.Sprintf(schemaUpdatesSubjectTemplate, sn.Intern())
msg, err := json.Marshal(schemaUpdate)
if err != nil {
Expand Down
6 changes: 3 additions & 3 deletions server/memphis_handlers_stations.go
Original file line number Diff line number Diff line change
Expand Up @@ -1770,7 +1770,7 @@ func (sh StationsHandler) UseSchema(c *gin.Context) {
serv.Errorf("[tenant: %v][user: %v]UseSchema at generateSchemaUpdateInit: Schema %v at station %v : %v", user.TenantName, user.Username, body.SchemaName, stationName.Ext(), err.Error())
return
}
update := models.ProducerSchemaUpdate{
update := models.SchemaUpdate{
UpdateType: models.SchemaUpdateTypeInit,
Init: *updateContent,
}
Expand Down Expand Up @@ -1884,7 +1884,7 @@ func (s *Server) useSchemaDirect(c *client, reply string, msg []byte) {
return
}

update := models.ProducerSchemaUpdate{
update := models.SchemaUpdate{
UpdateType: models.SchemaUpdateTypeInit,
Init: *updateContent,
}
Expand All @@ -1909,7 +1909,7 @@ func removeSchemaFromStation(s *Server, sn StationName, updateDB bool, tenantNam
}
}

update := models.ProducerSchemaUpdate{
update := models.SchemaUpdate{
UpdateType: models.SchemaUpdateTypeDrop,
}

Expand Down
3 changes: 2 additions & 1 deletion server/memphis_sdk_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,12 +80,13 @@ type createConsumerResponse struct {
}

type createConsumerResponseV1 struct {
SchemaUpdate models.SchemaUpdateInit `json:"schema_update"`
PartitionsUpdate models.PartitionsUpdate `json:"partitions_update"`
Err string `json:"error"`
}

type createProducerResponse struct {
SchemaUpdate models.ProducerSchemaUpdateInit `json:"schema_update"`
SchemaUpdate models.SchemaUpdateInit `json:"schema_update"`
PartitionsUpdate models.PartitionsUpdate `json:"partitions_update"`
SchemaVerseToDls bool `json:"schemaverse_to_dls"`
ClusterSendNotification bool `json:"send_notification"`
Expand Down