diff --git a/db/db.go b/db/db.go index 430a4b6ae..2c38296e9 100644 --- a/db/db.go +++ b/db/db.go @@ -293,6 +293,7 @@ func createTables(MetadataDbClient MetadataStorage) error { ALTER TABLE consumers DROP CONSTRAINT IF EXISTS fk_connection_id; CREATE INDEX IF NOT EXISTS consumer_tenant_name ON consumers(tenant_name); CREATE INDEX IF NOT EXISTS consumer_connection_id ON consumers(connection_id); + ALTER TABLE consumers ADD COLUMN IF NOT EXISTS partitions INTEGER[]; IF EXISTS ( SELECT 1 FROM information_schema.columns @@ -321,6 +322,7 @@ func createTables(MetadataDbClient MetadataStorage) error { start_consume_from_seq SERIAL NOT NULL, last_msgs SERIAL NOT NULL, tenant_name VARCHAR NOT NULL DEFAULT '$memphis', + partitions INTEGER[], PRIMARY KEY (id), CONSTRAINT fk_station_id FOREIGN KEY(station_id) @@ -340,9 +342,11 @@ func createTables(MetadataDbClient MetadataStorage) error { IF EXISTS ( SELECT 1 FROM information_schema.tables WHERE table_name = 'stations' AND table_schema = 'public' ) THEN - ALTER TYPE enum_retention_type ADD VALUE 'ack_based'; + ALTER TYPE enum_retention_type ADD VALUE IF NOT EXISTS 'ack_based'; ALTER TABLE stations ADD COLUMN IF NOT EXISTS tenant_name VARCHAR NOT NULL DEFAULT '$memphis'; ALTER TABLE stations ADD COLUMN IF NOT EXISTS resend_disabled BOOL NOT NULL DEFAULT false; + ALTER TABLE stations ADD COLUMN IF NOT EXISTS partitions INTEGER[]; + ALTER TABLE stations ADD COLUMN IF NOT EXISTS version INTEGER NOT NULL DEFAULT 0; DROP INDEX IF EXISTS unique_station_name_deleted; CREATE UNIQUE INDEX unique_station_name_deleted ON stations(name, is_deleted, tenant_name) WHERE is_deleted = false; END IF; @@ -372,6 +376,8 @@ func createTables(MetadataDbClient MetadataStorage) error { tiered_storage_enabled BOOL NOT NULL, tenant_name VARCHAR NOT NULL DEFAULT '$memphis', resend_disabled BOOL NOT NULL DEFAULT false, + partitions INTEGER[], + version INTEGER NOT NULL DEFAULT 0, PRIMARY KEY (id), CONSTRAINT fk_tenant_name_stations FOREIGN KEY(tenant_name) @@ -436,6 +442,7 @@ func createTables(MetadataDbClient MetadataStorage) error { CREATE INDEX IF NOT EXISTS producer_name ON producers(name); CREATE INDEX IF NOT EXISTS producer_tenant_name ON producers(tenant_name); CREATE INDEX IF NOT EXISTS producer_connection_id ON producers(connection_id); + ALTER TABLE producers ADD COLUMN IF NOT EXISTS partitions INTEGER[]; IF EXISTS ( SELECT 1 FROM information_schema.columns @@ -458,6 +465,7 @@ func createTables(MetadataDbClient MetadataStorage) error { is_active BOOL NOT NULL DEFAULT true, updated_at TIMESTAMPTZ NOT NULL, tenant_name VARCHAR NOT NULL DEFAULT '$memphis', + partitions INTEGER[], PRIMARY KEY (id), CONSTRAINT fk_station_id FOREIGN KEY(station_id) @@ -478,6 +486,7 @@ func createTables(MetadataDbClient MetadataStorage) error { ) THEN ALTER TABLE dls_messages ADD COLUMN IF NOT EXISTS tenant_name VARCHAR NOT NULL DEFAULT '$memphis'; ALTER TABLE dls_messages ADD COLUMN IF NOT EXISTS producer_name VARCHAR NOT NULL DEFAULT ''; + ALTER TABLE dls_messages ADD COLUMN IF NOT EXISTS partition_number INTEGER NOT NULL DEFAULT -1; DROP INDEX IF EXISTS dls_producer_id; IF EXISTS ( SELECT 1 FROM information_schema.columns WHERE table_name = 'dls_messages' AND column_name = 'producer_id' @@ -509,6 +518,7 @@ func createTables(MetadataDbClient MetadataStorage) error { validation_error VARCHAR DEFAULT '', tenant_name VARCHAR NOT NULL DEFAULT '$memphis', producer_name VARCHAR NOT NULL, + partition_number INTEGER NOT NULL DEFAULT -1, PRIMARY KEY (id), CONSTRAINT fk_station_id FOREIGN KEY(station_id) @@ -1168,7 +1178,9 @@ func InsertNewStation( isNative bool, dlsConfiguration models.DlsConfiguration, tieredStorageEnabled bool, - tenantName string) (models.Station, int64, error) { + tenantName string, + partitionsList []int, + version int) (models.Station, int64, error) { ctx, cancelfunc := context.WithTimeout(context.Background(), DbOperationTimeout*time.Second) defer cancelfunc() @@ -1196,9 +1208,11 @@ func InsertNewStation( dls_configuration_poison, dls_configuration_schemaverse, tiered_storage_enabled, - tenant_name + tenant_name, + partitions, + version ) - VALUES($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18) RETURNING id` + VALUES($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18, $19, $20) RETURNING id` stmt, err := conn.Conn().Prepare(ctx, "insert_new_station", query) if err != nil { @@ -1213,7 +1227,7 @@ func InsertNewStation( } rows, err := conn.Conn().Query(ctx, stmt.Name, stationName, retentionType, retentionValue, storageType, replicas, userId, username, createAt, updatedAt, - false, schemaName, schemaVersionUpdate, idempotencyWindow, isNative, dlsConfiguration.Poison, dlsConfiguration.Schemaverse, tieredStorageEnabled, tenantName) + false, schemaName, schemaVersionUpdate, idempotencyWindow, isNative, dlsConfiguration.Poison, dlsConfiguration.Schemaverse, tieredStorageEnabled, tenantName, partitionsList, version) if err != nil { return models.Station{}, 0, err } @@ -1265,6 +1279,8 @@ func InsertNewStation( DlsConfigurationSchemaverse: dlsConfiguration.Schemaverse, TieredStorageEnabled: tieredStorageEnabled, TenantName: tenantName, + PartitionsList: partitionsList, + Version: version, } rowsAffected := rows.CommandTag().RowsAffected() @@ -1498,6 +1514,8 @@ func GetAllStationsDetailsLight(tenantName string) ([]models.ExtendedStationLigh &stationRes.TieredStorageEnabled, &stationRes.TenantName, &stationRes.ResendDisabled, + &stationRes.PartitionsList, + &stationRes.Version, &stationRes.Activity, ); err != nil { return []models.ExtendedStationLight{}, err @@ -2332,7 +2350,7 @@ func GetActiveProducerByStationID(producerName string, stationId int) (bool, mod return true, producers[0], nil } -func InsertNewProducer(name string, stationId int, producerType string, connectionIdObj string, tenantName string) (models.Producer, error) { +func InsertNewProducer(name string, stationId int, producerType string, connectionIdObj string, tenantName string, partitionsList []int) (models.Producer, error) { ctx, cancelfunc := context.WithTimeout(context.Background(), DbOperationTimeout*time.Second) defer cancelfunc() @@ -2349,8 +2367,9 @@ func InsertNewProducer(name string, stationId int, producerType string, connecti is_active, updated_at, type, - tenant_name) - VALUES($1, $2, $3, $4, $5, $6, $7) RETURNING id` + tenant_name, + partitions) + VALUES($1, $2, $3, $4, $5, $6, $7, $8) RETURNING id` stmt, err := conn.Conn().Prepare(ctx, "insert_new_producer", query) if err != nil { @@ -2363,7 +2382,7 @@ func InsertNewProducer(name string, stationId int, producerType string, connecti if tenantName != conf.GlobalAccount { tenantName = strings.ToLower(tenantName) } - rows, err := conn.Conn().Query(ctx, stmt.Name, name, stationId, connectionIdObj, isActive, updatedAt, producerType, tenantName) + rows, err := conn.Conn().Query(ctx, stmt.Name, name, stationId, connectionIdObj, isActive, updatedAt, producerType, tenantName, partitionsList) if err != nil { return models.Producer{}, err } @@ -2393,13 +2412,14 @@ func InsertNewProducer(name string, stationId int, producerType string, connecti } newProducer := models.Producer{ - ID: producerId, - Name: name, - StationId: stationId, - Type: producerType, - ConnectionId: connectionIdObj, - IsActive: isActive, - UpdatedAt: time.Now(), + ID: producerId, + Name: name, + StationId: stationId, + Type: producerType, + ConnectionId: connectionIdObj, + IsActive: isActive, + UpdatedAt: time.Now(), + PartitionsList: partitionsList, } return newProducer, nil } @@ -2683,13 +2703,13 @@ func InsertNewConsumer(name string, maxMsgDeliveries int, startConsumeFromSequence uint64, lastMessages int64, - tenantName string) (bool, models.Consumer, int64, error) { + tenantName string, partitionsList []int) (models.Consumer, error) { ctx, cancelfunc := context.WithTimeout(context.Background(), DbOperationTimeout*time.Second) defer cancelfunc() conn, err := MetadataDbClient.Client.Acquire(ctx) if err != nil { - return false, models.Consumer{}, 0, err + return models.Consumer{}, err } defer conn.Release() @@ -2705,13 +2725,14 @@ func InsertNewConsumer(name string, start_consume_from_seq, last_msgs, type, - tenant_name) - VALUES($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12) + tenant_name, + partitions) + VALUES($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13) RETURNING id` stmt, err := conn.Conn().Prepare(ctx, "insert_new_consumer", query) if err != nil { - return false, models.Consumer{}, 0, err + return models.Consumer{}, err } var consumerId int @@ -2719,48 +2740,35 @@ func InsertNewConsumer(name string, isActive := true rows, err := conn.Conn().Query(ctx, stmt.Name, - name, stationId, connectionIdObj, cgName, maxAckTime, isActive, updatedAt, maxMsgDeliveries, startConsumeFromSequence, lastMessages, consumerType, tenantName) + name, stationId, connectionIdObj, cgName, maxAckTime, isActive, updatedAt, maxMsgDeliveries, startConsumeFromSequence, lastMessages, consumerType, tenantName, partitionsList) if err != nil { - var pgErr *pgconn.PgError - if errors.As(err, &pgErr) && pgErr.Code == "23505" { - // Handle unique constraint violation error - return true, models.Consumer{}, 0, nil - } else { - return false, models.Consumer{}, 0, err - } + return models.Consumer{}, err } defer rows.Close() for rows.Next() { err := rows.Scan(&consumerId) if err != nil { - return false, models.Consumer{}, 0, err + return models.Consumer{}, err } } if err := rows.Err(); err != nil { - var pgErr *pgconn.PgError - if errors.As(err, &pgErr) && pgErr.Code == "23505" { - // Handle unique constraint violation error - return true, models.Consumer{}, 0, nil - } else { - return false, models.Consumer{}, 0, err - } + return models.Consumer{}, err } if err := rows.Err(); err != nil { var pgErr *pgconn.PgError if errors.As(err, &pgErr) { if pgErr.Detail != "" { - return false, models.Consumer{}, 0, errors.New(pgErr.Detail) + return models.Consumer{}, errors.New(pgErr.Detail) } else { - return false, models.Consumer{}, 0, errors.New(pgErr.Message) + return models.Consumer{}, errors.New(pgErr.Message) } } else { - return false, models.Consumer{}, 0, err + return models.Consumer{}, err } } - rowsAffected := rows.CommandTag().RowsAffected() newConsumer := models.Consumer{ ID: consumerId, Name: name, @@ -2775,8 +2783,9 @@ func InsertNewConsumer(name string, StartConsumeFromSeq: startConsumeFromSequence, LastMessages: lastMessages, TenantName: tenantName, + PartitionsList: partitionsList, } - return false, newConsumer, rowsAffected, nil + return newConsumer, nil } func GetConsumers() ([]models.Consumer, error) { @@ -2815,7 +2824,7 @@ func GetAllConsumersByStation(stationId int) ([]models.ExtendedConsumer, error) return []models.ExtendedConsumer{}, err } defer conn.Release() - query := `SELECT DISTINCT ON (c.name, c.consumers_group) c.id, c.name, c.updated_at, c.is_active, c.consumers_group, c.max_ack_time_ms, c.max_msg_deliveries, s.name, + query := `SELECT DISTINCT ON (c.name, c.consumers_group) c.id, c.name, c.updated_at, c.is_active, c.consumers_group, c.max_ack_time_ms, c.max_msg_deliveries, s.name, c.partitions, COUNT (CASE WHEN c.is_active THEN 1 END) OVER (PARTITION BY c.name) AS count FROM consumers AS c LEFT JOIN stations AS s ON s.id = c.station_id @@ -3017,6 +3026,7 @@ func GetConsumerGroupMembers(cgName string, stationId int) ([]models.CgMember, e c.is_active, c.max_msg_deliveries, c.max_ack_time_ms, + c.partitions, COUNT (CASE WHEN c.is_active THEN 1 END) OVER (PARTITION BY c.name) AS count FROM consumers AS c @@ -5146,7 +5156,7 @@ func GetImage(name string, tenantName string) (bool, models.Image, error) { } // dls Functions -func InsertSchemaverseDlsMsg(stationId int, messageSeq int, producerName string, poisonedCgs []string, messageDetails models.MessagePayload, validationError string, tenantName string) (models.DlsMessage, error) { +func InsertSchemaverseDlsMsg(stationId int, messageSeq int, producerName string, poisonedCgs []string, messageDetails models.MessagePayload, validationError string, tenantName string, partitionNumber int) (models.DlsMessage, error) { ctx, cancelfunc := context.WithTimeout(context.Background(), DbOperationTimeout*time.Second) defer cancelfunc() connection, err := MetadataDbClient.Client.Acquire(ctx) @@ -5164,8 +5174,9 @@ func InsertSchemaverseDlsMsg(stationId int, messageSeq int, producerName string, message_type, validation_error, tenant_name, - producer_name) - VALUES($1, $2, $3, $4, $5, $6, $7, $8, $9) + producer_name, + partition_number) + VALUES($1, $2, $3, $4, $5, $6, $7, $8, $9, $10) RETURNING id` stmt, err := connection.Conn().Prepare(ctx, "insert_dls_messages", query) @@ -5176,7 +5187,7 @@ func InsertSchemaverseDlsMsg(stationId int, messageSeq int, producerName string, if tenantName != conf.GlobalAccount { tenantName = strings.ToLower(tenantName) } - rows, err := connection.Conn().Query(ctx, stmt.Name, stationId, messageSeq, poisonedCgs, messageDetails, updatedAt, "schema", validationError, tenantName, producerName) + rows, err := connection.Conn().Query(ctx, stmt.Name, stationId, messageSeq, poisonedCgs, messageDetails, updatedAt, "schema", validationError, tenantName, producerName, partitionNumber) if err != nil { return models.DlsMessage{}, err } @@ -5207,8 +5218,9 @@ func InsertSchemaverseDlsMsg(stationId int, messageSeq int, producerName string, Data: messageDetails.Data, Headers: messageDetails.Headers, }, - UpdatedAt: updatedAt, - TenantName: tenantName, + UpdatedAt: updatedAt, + TenantName: tenantName, + PartitionNumber: partitionNumber, } if err := rows.Err(); err != nil { @@ -5231,7 +5243,7 @@ func InsertSchemaverseDlsMsg(stationId int, messageSeq int, producerName string, return deadLetterPayload, nil } -func GetMsgByStationIdAndMsgSeq(stationId, messageSeq int) (bool, models.DlsMessage, error) { +func GetMsgByStationIdAndMsgSeq(stationId, messageSeq, partitionNumber int) (bool, models.DlsMessage, error) { ctx, cancelfunc := context.WithTimeout(context.Background(), DbOperationTimeout*time.Second) defer cancelfunc() @@ -5241,14 +5253,14 @@ func GetMsgByStationIdAndMsgSeq(stationId, messageSeq int) (bool, models.DlsMess } defer connection.Release() - query := `SELECT * FROM dls_messages WHERE station_id = $1 AND message_seq = $2 LIMIT 1` + query := `SELECT * FROM dls_messages WHERE station_id = $1 AND message_seq = $2 AND partition_number = $3 LIMIT 1` stmt, err := connection.Conn().Prepare(ctx, "get_dls_messages_by_station_id_and_message_seq", query) if err != nil { return false, models.DlsMessage{}, err } - rows, err := connection.Conn().Query(ctx, stmt.Name, stationId, messageSeq) + rows, err := connection.Conn().Query(ctx, stmt.Name, stationId, messageSeq, partitionNumber) if err != nil { return false, models.DlsMessage{}, err } @@ -5265,7 +5277,7 @@ func GetMsgByStationIdAndMsgSeq(stationId, messageSeq int) (bool, models.DlsMess return true, message[0], nil } -func StorePoisonMsg(stationId, messageSeq int, cgName string, producerName string, poisonedCgs []string, messageDetails models.MessagePayload, tenantName string) (int, error) { +func StorePoisonMsg(stationId, messageSeq int, cgName string, producerName string, poisonedCgs []string, messageDetails models.MessagePayload, tenantName string, partitionNumber int) (int, error) { ctx, cancelfunc := context.WithTimeout(context.Background(), DbOperationTimeout*time.Second) defer cancelfunc() @@ -5302,12 +5314,12 @@ func StorePoisonMsg(stationId, messageSeq int, cgName string, producerName strin } checkRows.Close() - query = `SELECT * FROM dls_messages WHERE station_id = $1 AND message_seq = $2 AND tenant_name =$3 LIMIT 1 FOR UPDATE` + query = `SELECT * FROM dls_messages WHERE station_id = $1 AND message_seq = $2 AND tenant_name =$3 AND partition_number = $4 LIMIT 1 FOR UPDATE` stmt, err = tx.Prepare(ctx, "handle_insert_dls_message", query) if err != nil { return 0, err } - rows, err := tx.Query(ctx, stmt.Name, stationId, messageSeq, tenantName) + rows, err := tx.Query(ctx, stmt.Name, stationId, messageSeq, tenantName, partitionNumber) if err != nil { return 0, err } @@ -5329,9 +5341,10 @@ func StorePoisonMsg(stationId, messageSeq int, cgName string, producerName strin updated_at, message_type, validation_error, - tenant_name + tenant_name, + partition_number ) - VALUES($1, $2, $3, $4, $5, $6, $7, $8, $9) + VALUES($1, $2, $3, $4, $5, $6, $7, $8, $9, $10) RETURNING id` stmt, err := tx.Prepare(ctx, "insert_dls_message", query) @@ -5342,7 +5355,7 @@ func StorePoisonMsg(stationId, messageSeq int, cgName string, producerName strin if tenantName != conf.GlobalAccount { tenantName = strings.ToLower(tenantName) } - rows, err := tx.Query(ctx, stmt.Name, stationId, messageSeq, producerName, poisonedCgs, messageDetails, updatedAt, "poison", "", tenantName) + rows, err := tx.Query(ctx, stmt.Name, stationId, messageSeq, producerName, poisonedCgs, messageDetails, updatedAt, "poison", "", tenantName, partitionNumber) if err != nil { return 0, err } @@ -5497,6 +5510,28 @@ func PurgeDlsMsgsFromStation(station_id int) error { return nil } +func PurgeDlsMsgsFromPartition(station_id, partitionNumber int) error { + ctx, cancelfunc := context.WithTimeout(context.Background(), DbOperationTimeout*time.Second) + defer cancelfunc() + conn, err := MetadataDbClient.Client.Acquire(ctx) + if err != nil { + return errors.New("PurgeDlsMsgsFromPartition: " + err.Error()) + } + defer conn.Release() + + query := `DELETE FROM dls_messages where station_id=$1 AND partition_number = $2` + stmt, err := conn.Conn().Prepare(ctx, "purge_dls_messages", query) + if err != nil { + return err + } + + _, err = conn.Conn().Exec(ctx, stmt.Name, station_id, partitionNumber) + if err != nil { + return errors.New("PurgeDlsMsgsFromPartition: " + err.Error()) + } + return nil +} + func RemoveCgFromDlsMsg(msgId int, cgName string, tenantName string) error { ctx, cancelfunc := context.WithTimeout(context.Background(), DbOperationTimeout*time.Second) defer cancelfunc() @@ -5550,6 +5585,35 @@ func GetDlsMsgsByStationId(stationId int) ([]models.DlsMessage, error) { return dlsMsgs, nil } +func GetDlsMsgsByStationAndPartition(stationId, partitionNumber int) ([]models.DlsMessage, error) { + ctx, cancelfunc := context.WithTimeout(context.Background(), DbOperationTimeout*time.Second) + defer cancelfunc() + conn, err := MetadataDbClient.Client.Acquire(ctx) + if err != nil { + return []models.DlsMessage{}, err + } + defer conn.Release() + query := `SELECT * from dls_messages where station_id=$1 AND partition_number = $2 ORDER BY updated_at DESC limit 1000` + stmt, err := conn.Conn().Prepare(ctx, "get_dls_msg_by_station_and_partition", query) + if err != nil { + return []models.DlsMessage{}, err + } + rows, err := conn.Conn().Query(ctx, stmt.Name, stationId, partitionNumber) + if err != nil { + return []models.DlsMessage{}, err + } + defer rows.Close() + dlsMsgs, err := pgx.CollectRows(rows, pgx.RowToStructByPos[models.DlsMessage]) + if err != nil { + return []models.DlsMessage{}, err + } + if len(dlsMsgs) == 0 { + return []models.DlsMessage{}, nil + } + + return dlsMsgs, nil +} + func GetDlsMessageById(messageId int) (bool, models.DlsMessage, error) { ctx, cancelfunc := context.WithTimeout(context.Background(), DbOperationTimeout*time.Second) defer cancelfunc() @@ -6187,7 +6251,7 @@ func DeleteOldProducersAndConsumers(timeInterval time.Time) ([]models.LightCG, e var queries []string queries = append(queries, "DELETE FROM producers WHERE is_active = false AND updated_at < $1") - queries = append(queries, "WITH deleted AS (DELETE FROM consumers WHERE is_active = false AND updated_at < $1 RETURNING *) SELECT deleted.consumers_group, s.name as station_name, deleted.station_id , deleted.tenant_name FROM deleted INNER JOIN stations s ON deleted.station_id = s.id GROUP BY deleted.consumers_group, s.name, deleted.station_id, deleted.tenant_name") + queries = append(queries, "WITH deleted AS (DELETE FROM consumers WHERE is_active = false AND updated_at < $1 RETURNING *) SELECT deleted.consumers_group, s.name as station_name, deleted.station_id , deleted.tenant_name, deleted.partitions FROM deleted INNER JOIN stations s ON deleted.station_id = s.id GROUP BY deleted.consumers_group, s.name, deleted.station_id, deleted.tenant_name, deleted.partitions") batch := &pgx.Batch{} for _, q := range queries { diff --git a/models/consumers.go b/models/consumers.go index 0e64c14f0..deca9cedb 100644 --- a/models/consumers.go +++ b/models/consumers.go @@ -29,6 +29,7 @@ type Consumer struct { StartConsumeFromSeq uint64 `json:"start_consume_from_seq"` LastMessages int64 `json:"last_messages"` TenantName string `json:"tenant_name"` + PartitionsList []int `json:"partitions_list"` } type ExtendedConsumer struct { @@ -40,6 +41,7 @@ type ExtendedConsumer struct { MaxAckTimeMs int64 `json:"max_ack_time_ms"` MaxMsgDeliveries int `json:"max_msg_deliveries"` StationName string `json:"station_name,omitempty"` + PartitionsList []int `json:"partitions_list"` Count int `json:"count"` } @@ -61,6 +63,7 @@ type Cg struct { DisconnectedConsumers []ExtendedConsumer `json:"disconnected_consumers"` DeletedConsumers []ExtendedConsumer `json:"deleted_consumers"` LastStatusChangeDate time.Time `json:"last_status_change_date"` + PartitionsList []int `json:"partitions_list"` } type GetAllConsumersByStationSchema struct { @@ -88,6 +91,7 @@ type CgMember struct { IsActive bool `json:"is_active"` MaxMsgDeliveries int `json:"max_msg_deliveries"` MaxAckTimeMs int64 `json:"max_ack_time_ms"` + PartitionsList []int `json:"partitions_list"` Count int `json:"count"` } @@ -102,8 +106,9 @@ type DelayedCgResp struct { } type LightCG struct { - CGName string `json:"cg_name"` - StationName string `json:"station_name"` - StationId int `json:"station_id"` - TenantName string `json:"tenant_name"` + CGName string `json:"cg_name"` + StationName string `json:"station_name"` + StationId int `json:"station_id"` + TenantName string `json:"tenant_name"` + PartitionsList []int `json:"partitions_list"` } diff --git a/models/dead_letter_station.go b/models/dead_letter_station.go index 6c8fc6ae8..af59e46cb 100644 --- a/models/dead_letter_station.go +++ b/models/dead_letter_station.go @@ -61,6 +61,7 @@ type SchemaVerseDlsMessageSdk struct { Producer ProducerDetails `json:"producer"` Message MessagePayload `json:"message"` ValidationError string `json:"validation_error"` + PartitionNumber int `json:"partition_number"` } type DlsMessage struct { @@ -74,6 +75,7 @@ type DlsMessage struct { ValidationError string `json:"validation_error"` TenantName string `json:"tenant_name"` ProducerName string `json:"producer_name"` + PartitionNumber int `json:"partition_number"` } type DlsMsgResendAll struct { diff --git a/models/monitoring.go b/models/monitoring.go index 0368f2999..b77e69c84 100644 --- a/models/monitoring.go +++ b/models/monitoring.go @@ -53,7 +53,8 @@ type SystemComponentsStatus struct { } type GetStationOverviewDataSchema struct { - StationName string `form:"station_name" json:"station_name" binding:"required"` + StationName string `form:"station_name" json:"station_name" binding:"required"` + PartitionNumber int `form:"partition_number" json:"partition_number" binding:"required"` } type SystemLogsRequest struct { diff --git a/models/producers.go b/models/producers.go index b1297b941..5d9074afd 100644 --- a/models/producers.go +++ b/models/producers.go @@ -16,14 +16,15 @@ import ( ) type Producer struct { - ID int `json:"id"` - Name string `json:"name"` - StationId int `json:"station_id"` - Type string `json:"type"` - ConnectionId string `json:"connection_id"` - IsActive bool `json:"is_active"` - UpdatedAt time.Time `json:"updated_at"` - TenantName string `json:"tenant_name"` + ID int `json:"id"` + Name string `json:"name"` + StationId int `json:"station_id"` + Type string `json:"type"` + ConnectionId string `json:"connection_id"` + IsActive bool `json:"is_active"` + UpdatedAt time.Time `json:"updated_at"` + TenantName string `json:"tenant_name"` + PartitionsList []int `json:"partitions"` } type ExtendedProducer struct { diff --git a/models/stations.go b/models/stations.go index ef910559f..3b54f7efb 100644 --- a/models/stations.go +++ b/models/stations.go @@ -37,6 +37,7 @@ type MessageDetails struct { ConnectionId string `json:"connection_id" ` Size int `json:"size"` Headers map[string]string `json:"headers"` + Partition int `json:"partition"` } type Station struct { @@ -60,6 +61,8 @@ type Station struct { TieredStorageEnabled bool `json:"tiered_storage_enabled"` TenantName string `json:"tenant_name"` ResendDisabled bool `json:"resend_disabled"` + PartitionsList []int `json:"partitions_list"` + Version int `json:"version"` } type GetStationResponseSchema struct { @@ -80,6 +83,8 @@ type GetStationResponseSchema struct { DlsConfiguration DlsConfiguration `json:"dls_configuration"` TieredStorageEnabled bool `json:"tiered_storage_enabled"` ResendDisabled bool `json:"resend_disabled"` + PartitionsList []int `json:"partitions_list"` + PartitionsNumber int `json:"partitions_number"` } type ExtendedStation struct { @@ -133,6 +138,8 @@ type ExtendedStationLight struct { TieredStorageEnabled bool `json:"tiered_storage_enabled,omitempty"` TenantName string `json:"tenant_name"` ResendDisabled bool `json:"resend_disabled"` + PartitionsList []int `json:"partitions_list"` + Version int `json"version"` } type ActiveProducersConsumersDetails struct { @@ -165,6 +172,7 @@ type CreateStationSchema struct { IdempotencyWindow int64 `json:"idempotency_window_in_ms"` DlsConfiguration DlsConfiguration `json:"dls_configuration"` TieredStorageEnabled bool `json:"tiered_storage_enabled"` + PartitionsNumber int `json:"partitions_number"` } type DlsConfiguration struct { @@ -185,14 +193,20 @@ type DropDlsMessagesSchema struct { } type PurgeStationSchema struct { - StationName string `json:"station_name" binding:"required"` - PurgeDls bool `json:"purge_dls"` - PurgeStation bool `json:"purge_station"` + StationName string `json:"station_name" binding:"required"` + PurgeDls bool `json:"purge_dls"` + PurgeStation bool `json:"purge_station"` + PartitionsList []int `json:"partitions_list"` } type RemoveMessagesSchema struct { - StationName string `json:"station_name" binding:"required"` - MessageSeqs []uint64 `json:"message_seqs" binding:"required"` + StationName string `json:"station_name" binding:"required"` + Messages []MessageToDelete `json:"messages" binding:"required"` +} + +type MessageToDelete struct { + MessageSeq uint64 `json:"message_seq" binding:"required"` + PartitionNumber int `json:"partition_number" binding:"required"` } type ResendPoisonMessagesSchema struct { @@ -209,11 +223,12 @@ type GetPoisonMessageJourneySchema struct { } type GetMessageDetailsSchema struct { - IsDls bool `form:"is_dls" json:"is_dls"` - DlsType string `form:"dls_type" json:"dls_type"` - MessageId int `form:"message_id" json:"message_id"` - MessageSeq int `form:"message_seq" json:"message_seq"` - StationName string `form:"station_name" json:"station_name" binding:"required"` + IsDls bool `form:"is_dls" json:"is_dls"` + DlsType string `form:"dls_type" json:"dls_type"` + MessageId int `form:"message_id" json:"message_id"` + MessageSeq int `form:"message_seq" json:"message_seq"` + StationName string `form:"station_name" json:"station_name" binding:"required"` + PartitionNumber int `form:"partition_number" json:"partition_number" binding:"required"` } type UseSchema struct { @@ -240,3 +255,7 @@ type StationOverviewSchemaDetails struct { type GetUpdatesForSchema struct { StationName string `form:"station_name" json:"station_name" binding:"required"` } + +type PartitionsUpdate struct { + PartitionsList []int `json:"partitions_list"` +} diff --git a/server/background_tasks.go b/server/background_tasks.go index f4cbad255..7dda03ee8 100644 --- a/server/background_tasks.go +++ b/server/background_tasks.go @@ -608,7 +608,7 @@ func (s *Server) RemoveOldProducersAndConsumers() { if _, ok := CGmap[cg.CGName]; !ok { stationName, err := StationNameFromStr(cg.StationName) if err == nil { - err = s.RemoveConsumer(cg.TenantName, stationName, cg.CGName) + err = s.RemoveConsumer(cg.TenantName, stationName, cg.CGName, cg.PartitionsList) if err != nil { serv.Errorf("RemoveOldProducersAndConsumers at RemoveConsumer: %v", err.Error()) } diff --git a/server/memphis_cloud.go b/server/memphis_cloud.go index 5263e29e9..a137e225c 100644 --- a/server/memphis_cloud.go +++ b/server/memphis_cloud.go @@ -46,6 +46,7 @@ import ( const shouldCreateRootUserforGlobalAcc = true const TENANT_SEQUENCE_START_ID = 2 +const MAX_PARTITIONS = 10000 type BillingHandler struct{ S *Server } type TenantHandler struct{ S *Server } @@ -1999,7 +2000,7 @@ func (s *Server) SetDlsRetentionForExistTenants() error { } func validateRetentionType(retentionType string) error { - if retentionType == "ack_based"{ + if retentionType == "ack_based" { return errors.New("this type of retention is supported only on the cloud version of Memphis, available on cloud.memphis.dev") } if retentionType != "message_age_sec" && retentionType != "messages" && retentionType != "bytes" { diff --git a/server/memphis_handlers.go b/server/memphis_handlers.go index 75a4ff35a..128480dd4 100644 --- a/server/memphis_handlers.go +++ b/server/memphis_handlers.go @@ -79,7 +79,7 @@ func getUserDetailsFromMiddleware(c *gin.Context) (models.User, error) { func CreateDefaultStation(tenantName string, s *Server, sn StationName, userId int, username string) (models.Station, bool, error) { stationName := sn.Ext() replicas := getDefaultReplicas() - err := s.CreateStream(tenantName, sn, "message_age_sec", 604800, "file", 120000, replicas, false) + err := s.CreateStream(tenantName, sn, "message_age_sec", 604800, "file", 120000, replicas, false, 1) if err != nil { return models.Station{}, false, err } @@ -87,7 +87,7 @@ func CreateDefaultStation(tenantName string, s *Server, sn StationName, userId i schemaName := "" schemaVersionNumber := 0 - newStation, rowsUpdated, err := db.InsertNewStation(stationName, userId, username, "message_age_sec", 604800, "file", replicas, schemaName, schemaVersionNumber, 120000, true, models.DlsConfiguration{Poison: true, Schemaverse: true}, false, tenantName) + newStation, rowsUpdated, err := db.InsertNewStation(stationName, userId, username, "message_age_sec", 604800, "file", replicas, schemaName, schemaVersionNumber, 120000, true, models.DlsConfiguration{Poison: true, Schemaverse: true}, false, tenantName, []int{1}, 1) if err != nil { return models.Station{}, false, err } diff --git a/server/memphis_handlers_consumers.go b/server/memphis_handlers_consumers.go index 70021c4b1..5beba3cd4 100644 --- a/server/memphis_handlers_consumers.go +++ b/server/memphis_handlers_consumers.go @@ -16,6 +16,7 @@ import ( "errors" "fmt" "sort" + "strconv" "memphis/analytics" "memphis/db" @@ -75,16 +76,16 @@ func GetConsumerGroupMembers(cgName string, station models.Station) ([]models.Cg } func (s *Server) createConsumerDirectV0(c *client, reply, tenantName string, ccr createConsumerRequestV0, requestVersion int) { - err := s.createConsumerDirectCommon(c, ccr.Name, ccr.StationName, ccr.ConsumerGroup, ccr.ConsumerType, ccr.ConnectionId, tenantName, ccr.Username, ccr.MaxAckTimeMillis, ccr.MaxMsgDeliveries, requestVersion, 1, -1) + _, err := s.createConsumerDirectCommon(c, ccr.Name, ccr.StationName, ccr.ConsumerGroup, ccr.ConsumerType, ccr.ConnectionId, tenantName, ccr.Username, ccr.MaxAckTimeMillis, ccr.MaxMsgDeliveries, requestVersion, 1, -1) respondWithErr(serv.MemphisGlobalAccountString(), s, reply, err) } -func (s *Server) createConsumerDirectCommon(c *client, consumerName, cStationName, cGroup, cType, connectionId, tenantName, userName string, maxAckTime, maxMsgDeliveries, requestVersion int, startConsumeFromSequence uint64, lastMessages int64) error { +func (s *Server) createConsumerDirectCommon(c *client, consumerName, cStationName, cGroup, cType, connectionId, tenantName, userName string, maxAckTime, maxMsgDeliveries, requestVersion int, startConsumeFromSequence uint64, lastMessages int64) ([]int, error) { name := strings.ToLower(consumerName) err := validateConsumerName(name) if err != nil { serv.Warnf("[tenant: %v][user: %v]createConsumerDirectCommon at validateConsumerName: Failed creating consumer %v at station %v : %v", tenantName, userName, consumerName, cStationName, err.Error()) - return err + return []int{}, err } consumerGroup := strings.ToLower(cGroup) @@ -92,7 +93,7 @@ func (s *Server) createConsumerDirectCommon(c *client, consumerName, cStationNam err = validateConsumerName(consumerGroup) if err != nil { serv.Warnf("[tenant: %v][user: %v]createConsumerDirectCommon at validateConsumerName: Failed creating consumer %v at station %v : %v", tenantName, userName, consumerName, cStationName, err.Error()) - return err + return []int{}, err } } else { consumerGroup = name @@ -102,36 +103,42 @@ func (s *Server) createConsumerDirectCommon(c *client, consumerName, cStationNam err = validateConsumerType(consumerType) if err != nil { serv.Warnf("[tenant: %v][user: %v]createConsumerDirectCommon at validateConsumerType: Failed creating consumer %v at station %v : %v", tenantName, userName, consumerName, cStationName, err.Error()) - return err + return []int{}, err } stationName, err := StationNameFromStr(cStationName) if err != nil { serv.Warnf("[tenant: %v][user: %v]createConsumerDirectCommon at StationNameFromStr: Consumer %v at station %v : %v", tenantName, userName, consumerName, cStationName, err.Error()) - return err + return []int{}, err } exist, user, err := memphis_cache.GetUser(userName, tenantName, false) if err != nil { serv.Errorf("[tenant: %v][user: %v]createConsumerDirectCommon at GetUser from cache: Consumer %v at station %v : %v", tenantName, userName, consumerName, cStationName, err.Error()) - return err + return []int{}, err } else if !exist { - serv.Warnf("[tenant: %v][user: %v] createConsumerDirectCommon at GetUser from cache: user does not exist", tenantName, userName) - return fmt.Errorf("user does not exist in db") + err := errors.New("user does not exist") + serv.Warnf("[tenant: %v][user: %v] createConsumerDirectCommon at GetUser from cache: %s", tenantName, userName, err.Error()) + return []int{}, err } exist, station, err := db.GetStationByName(stationName.Ext(), user.TenantName) if err != nil { serv.Errorf("[tenant: %v]createConsumerDirectCommon at GetStationByName: Consumer %v at station %v : %v", tenantName, consumerName, cStationName, err.Error()) - return err + return []int{}, err } if !exist { + if requestVersion < 2 { + err := errors.New("This station does not exist, a default station can not be created automatically, please upgrade your SDK version") + serv.Warnf("[tenant: %v]createConsumerDirectCommon at CreateDefaultStation: Consumer %v at station %v : %v", tenantName, consumerName, cStationName, err.Error()) + return []int{}, err + } var created bool station, created, err = CreateDefaultStation(user.TenantName, s, stationName, user.ID, user.Username) if err != nil { serv.Warnf("[tenant: %v]createConsumerDirectCommon at CreateDefaultStation: Consumer %v at station %v : %v", tenantName, consumerName, cStationName, err.Error()) - return err + return []int{}, err } if created { @@ -163,82 +170,87 @@ func (s *Server) createConsumerDirectCommon(c *client, consumerName, cStationNam consumerGroupExist, consumerFromGroup, err := isConsumerGroupExist(consumerGroup, station.ID) if err != nil { serv.Errorf("[tenant: %v]createConsumerDirectCommon at isConsumerGroupExist: Consumer %v at station %v :%v", user.TenantName, consumerName, cStationName, err.Error()) - return err + return []int{}, err } - exist, newConsumer, rowsUpdated, err := db.InsertNewConsumer(name, station.ID, consumerType, connectionId, consumerGroup, maxAckTime, maxMsgDeliveries, startConsumeFromSequence, lastMessages, tenantName) + newConsumer, err := db.InsertNewConsumer(name, station.ID, consumerType, connectionId, consumerGroup, maxAckTime, maxMsgDeliveries, startConsumeFromSequence, lastMessages, tenantName, station.PartitionsList) if err != nil { serv.Errorf("[tenant: %v]createConsumerDirectCommon at InsertNewConsumer: Consumer %v at station %v :%v", user.TenantName, consumerName, cStationName, err.Error()) - return err - } - if exist { - errMsg := fmt.Sprintf("Consumer %v at station %v: Consumer name has to be unique per station", consumerName, cStationName) - serv.Errorf("[tenant: %v]createConsumerDirectCommon: %v", user.TenantName, errMsg) - return fmt.Errorf("memphis: %v", errMsg) + return []int{}, err } - if rowsUpdated == 1 { - message := "Consumer " + name + " connected" - serv.Noticef("[tenant: %v][user: %v]: %v", user.TenantName, user.Username, message) - if consumerGroupExist { - if requestVersion == 1 { - if newConsumer.StartConsumeFromSeq != consumerFromGroup.StartConsumeFromSeq || newConsumer.LastMessages != consumerFromGroup.LastMessages { - errMsg := errors.New("consumer already exists with different uneditable configuration parameters (StartConsumeFromSequence/LastMessages)") - serv.Warnf("createConsumerDirectCommon: %v", errMsg.Error()) - return errMsg - } + message := "Consumer " + name + " connected" + serv.Noticef("[tenant: %v][user: %v]: %v", user.TenantName, user.Username, message) + if consumerGroupExist { + if requestVersion == 1 { + if newConsumer.StartConsumeFromSeq != consumerFromGroup.StartConsumeFromSeq || newConsumer.LastMessages != consumerFromGroup.LastMessages { + err := errors.New("consumer already exists with different uneditable configuration parameters (StartConsumeFromSequence/LastMessages)") + serv.Warnf("createConsumerDirectCommon: %v", err.Error()) + return []int{}, err } - - if newConsumer.MaxAckTimeMs != consumerFromGroup.MaxAckTimeMs || newConsumer.MaxMsgDeliveries != consumerFromGroup.MaxMsgDeliveries { - err := s.CreateConsumer(station.TenantName, newConsumer, station) - if err != nil { - if IsNatsErr(err, JSStreamNotFoundErr) { - serv.Warnf("[tenant: %v][user: %v]createConsumerDirectCommon: Consumer %v at station %v: station does not exist", user.TenantName, user.Username, consumerName, cStationName) - } else { - serv.Errorf("[tenant: %v][user: %v]createConsumerDirectCommon at CreateConsumer: Consumer %v at station %v: %v", user.TenantName, user.Username, consumerName, cStationName, err.Error()) + if !comparePartitionsList(consumerFromGroup.PartitionsList, newConsumer.PartitionsList) { + existingPartitions := "" + for i, pl := range consumerFromGroup.PartitionsList { + existingPartitions += strconv.Itoa(pl) + if i < len(consumerFromGroup.PartitionsList)-1 { + existingPartitions += ", " } - return err } + err := errors.New("consumer already exists with different uneditable partition list: partition numbers: ") + serv.Warnf("createConsumerDirectCommon: %v", err.Error()) + return []int{}, err } - } else { - err := s.CreateConsumer(station.TenantName, newConsumer, station) + } + + if newConsumer.MaxAckTimeMs != consumerFromGroup.MaxAckTimeMs || newConsumer.MaxMsgDeliveries != consumerFromGroup.MaxMsgDeliveries { + err := s.CreateConsumer(station.TenantName, newConsumer, station, station.PartitionsList) if err != nil { if IsNatsErr(err, JSStreamNotFoundErr) { serv.Warnf("[tenant: %v][user: %v]createConsumerDirectCommon: Consumer %v at station %v: station does not exist", user.TenantName, user.Username, consumerName, cStationName) } else { serv.Errorf("[tenant: %v][user: %v]createConsumerDirectCommon at CreateConsumer: Consumer %v at station %v: %v", user.TenantName, user.Username, consumerName, cStationName, err.Error()) } - return err + return []int{}, err } } - var auditLogs []interface{} - newAuditLog := models.AuditLog{ - StationName: stationName.Ext(), - Message: message, - CreatedBy: user.ID, - CreatedByUsername: user.Username, - CreatedAt: time.Now(), - TenantName: user.TenantName, - } - auditLogs = append(auditLogs, newAuditLog) - err = CreateAuditLogs(auditLogs) + } else { + err := s.CreateConsumer(station.TenantName, newConsumer, station, station.PartitionsList) if err != nil { - serv.Errorf("[tenant: %v][user: %v]createConsumerDirectCommon at CreateAuditLogs: Consumer %v at station %v: %v", user.TenantName, user.Username, consumerName, cStationName, err.Error()) + if IsNatsErr(err, JSStreamNotFoundErr) { + serv.Warnf("[tenant: %v][user: %v]createConsumerDirectCommon: Consumer %v at station %v: station does not exist", user.TenantName, user.Username, consumerName, cStationName) + } else { + serv.Errorf("[tenant: %v][user: %v]createConsumerDirectCommon at CreateConsumer: Consumer %v at station %v: %v", user.TenantName, user.Username, consumerName, cStationName, err.Error()) + } + return []int{}, err } + } + var auditLogs []interface{} + newAuditLog := models.AuditLog{ + StationName: stationName.Ext(), + Message: message, + CreatedBy: user.ID, + CreatedByUsername: user.Username, + CreatedAt: time.Now(), + TenantName: user.TenantName, + } + auditLogs = append(auditLogs, newAuditLog) + err = CreateAuditLogs(auditLogs) + if err != nil { + serv.Errorf("[tenant: %v][user: %v]createConsumerDirectCommon at CreateAuditLogs: Consumer %v at station %v: %v", user.TenantName, user.Username, consumerName, cStationName, err.Error()) + } - shouldSendAnalytics, _ := shouldSendAnalytics() - if shouldSendAnalytics { - ip := serv.getIp() - analyticsParams := map[string]interface{}{"consumer-name": newConsumer.Name, "ip": ip} - analytics.SendEvent(user.TenantName, user.Username, analyticsParams, "user-create-consumer-sdk") - } + shouldSendAnalytics, _ := shouldSendAnalytics() + if shouldSendAnalytics { + ip := serv.getIp() + analyticsParams := map[string]interface{}{"consumer-name": newConsumer.Name, "ip": ip} + analytics.SendEvent(user.TenantName, user.Username, analyticsParams, "user-create-consumer-sdk") } - return nil + return station.PartitionsList, nil } func (s *Server) createConsumerDirect(c *client, reply string, msg []byte) { var ccr createConsumerRequestV1 - var resp createConsumerResponse + var resp createConsumerResponseV1 tenantName, message, err := s.getTenantNameAndMessage(msg) if err != nil { @@ -259,28 +271,32 @@ func (s *Server) createConsumerDirect(c *client, reply string, msg []byte) { ccr.TenantName = tenantName if ccr.StartConsumeFromSequence <= 0 { - errMsg := errors.New("startConsumeFromSequence has to be a positive number") - serv.Warnf("[tenant: %v]createConsumerDirect: %v", tenantName, errMsg.Error()) - respondWithErr(serv.MemphisGlobalAccountString(), s, reply, errMsg) + err := errors.New("startConsumeFromSequence has to be a positive number") + serv.Warnf("[tenant: %v]createConsumerDirect: %v", tenantName, err.Error()) + respondWithRespErr(serv.MemphisGlobalAccountString(), s, reply, err, &resp) return } if ccr.LastMessages < -1 { - errMsg := errors.New("min value for LastMessages is -1") - serv.Warnf("[tenant: %v]createConsumerDirect: %v", tenantName, errMsg.Error()) - respondWithErr(serv.MemphisGlobalAccountString(), s, reply, errMsg) + err := errors.New("min value for LastMessages is -1") + serv.Warnf("[tenant: %v]createConsumerDirect: %v", tenantName, err.Error()) + respondWithRespErr(serv.MemphisGlobalAccountString(), s, reply, err, &resp) return } if ccr.StartConsumeFromSequence > 1 && ccr.LastMessages > -1 { - errMsg := errors.New("consumer creation options can't contain both startConsumeFromSequence and lastMessages") - serv.Warnf("[tenant: %v]createConsumerDirect: %v", tenantName, errMsg.Error()) - respondWithErr(serv.MemphisGlobalAccountString(), s, reply, errMsg) + err := errors.New("consumer creation options can't contain both startConsumeFromSequence and lastMessages") + serv.Warnf("[tenant: %v]createConsumerDirect: %v", tenantName, err.Error()) + respondWithRespErr(serv.MemphisGlobalAccountString(), s, reply, err, &resp) return } - err = s.createConsumerDirectCommon(c, ccr.Name, ccr.StationName, ccr.ConsumerGroup, ccr.ConsumerType, ccr.ConnectionId, tenantName, ccr.Username, ccr.MaxAckTimeMillis, ccr.MaxMsgDeliveries, 1, ccr.StartConsumeFromSequence, ccr.LastMessages) - respondWithErr(serv.MemphisGlobalAccountString(), s, reply, err) + partitions, err := s.createConsumerDirectCommon(c, ccr.Name, ccr.StationName, ccr.ConsumerGroup, ccr.ConsumerType, ccr.ConnectionId, tenantName, ccr.Username, ccr.MaxAckTimeMillis, ccr.MaxMsgDeliveries, 1, ccr.StartConsumeFromSequence, ccr.LastMessages) + if err != nil { + respondWithRespErr(serv.MemphisGlobalAccountString(), s, reply, err, &resp) + } + resp.Partitions = partitions + respondWithResp(s.MemphisGlobalAccountString(), s, reply, &resp) } func (ch ConsumersHandler) GetCgsByStation(stationName StationName, station models.Station) ([]models.Cg, []models.Cg, []models.Cg, error) { // for socket io endpoint @@ -314,6 +330,7 @@ func (ch ConsumersHandler) GetCgsByStation(stationName StationName, station mode DeletedConsumers: []models.ExtendedConsumer{}, IsActive: consumer.IsActive, LastStatusChangeDate: consumer.UpdatedAt, + PartitionsList: consumer.PartitionsList, } m[consumer.ConsumersGroup] = cg } else { @@ -334,6 +351,7 @@ func (ch ConsumersHandler) GetCgsByStation(stationName StationName, station mode MaxMsgDeliveries: consumer.MaxMsgDeliveries, StationName: consumer.StationName, Count: consumer.Count, + PartitionsList: consumer.PartitionsList, } if consumer.IsActive { @@ -348,8 +366,7 @@ func (ch ConsumersHandler) GetCgsByStation(stationName StationName, station mode var deletedCgs []models.Cg for _, cg := range m { - - cgInfo, err := ch.S.GetCgInfo(station.TenantName, stationName, cg.Name) + cgInfo, err := ch.S.GetCgInfo(station.TenantName, stationName, cg.Name, cg.PartitionsList) if err != nil { continue // ignoring cases where the consumer exist in memphis but not in nats } @@ -359,9 +376,9 @@ func (ch ConsumersHandler) GetCgsByStation(stationName StationName, station mode return []models.Cg{}, []models.Cg{}, []models.Cg{}, err } - cg.InProcessMessages = cgInfo.NumAckPending - cg.UnprocessedMessages = int(cgInfo.NumPending) - cg.PoisonMessages = totalPoisonMsgs + cg.InProcessMessages += cgInfo.NumAckPending + cg.UnprocessedMessages += int(cgInfo.NumPending) + cg.PoisonMessages += totalPoisonMsgs if len(cg.ConnectedConsumers) > 0 { cg.IsActive = true @@ -574,7 +591,7 @@ func (s *Server) destroyCGFromNats(c *client, reply, userName, tenantName string deleted := false if count == 0 { // no other members in this group - err = s.RemoveConsumer(station.TenantName, stationName, consumer.ConsumersGroup) + err = s.RemoveConsumer(station.TenantName, stationName, consumer.ConsumersGroup, consumer.PartitionsList) if err != nil && !IsNatsErr(err, JSConsumerNotFoundErr) && !IsNatsErr(err, JSStreamNotFoundErr) { errMsg := fmt.Sprintf("[tenant: %v]Consumer group %v at station %v: %v", tenantName, consumer.ConsumersGroup, station.Name, err.Error()) serv.Errorf("destroyCGFromNats at RemoveConsumer: %v", errMsg) @@ -634,3 +651,15 @@ func (s *Server) destroyCGFromNats(c *client, reply, userName, tenantName string respondWithErr(serv.MemphisGlobalAccountString(), s, reply, nil) } + +func comparePartitionsList(pList1, pList2 []int) bool { + if len(pList1) != len(pList2) { + return false + } + for i := 0; i < len(pList1); i++ { + if pList1[i] != pList2[i] { + return false + } + } + return true +} diff --git a/server/memphis_handlers_dls_messages.go b/server/memphis_handlers_dls_messages.go index 2ccb81399..de95918aa 100644 --- a/server/memphis_handlers_dls_messages.go +++ b/server/memphis_handlers_dls_messages.go @@ -39,7 +39,21 @@ func (s *Server) handleNewUnackedMsg(msg []byte) error { return err } - streamName := message.Stream + var streamName string + var partitionNumber int + if strings.Contains(message.Stream, "$") { + streamName = strings.Split(message.Stream, "$")[0] + partitionNumber, err = strconv.Atoi(strings.Split(message.Stream, "$")[1]) + if err != nil { + serv.Errorf("handleNewUnackedMsg: Error while converting partition to int: %v", err.Error()) + return err + } + + } else { + streamName = message.Stream + partitionNumber = -1 + } + accountName := message.Account // backward compatibility if accountName == "" { @@ -58,7 +72,7 @@ func (s *Server) handleNewUnackedMsg(msg []byte) error { cgName := message.Consumer cgName = revertDelimiters(cgName) messageSeq := message.StreamSeq - poisonMessageContent, err := s.memphisGetMessage(accountName, stationName.Intern(), uint64(messageSeq)) + poisonMessageContent, err := s.memphisGetMessage(accountName, message.Stream, uint64(messageSeq)) if err != nil { if IsNatsErr(err, JSNoMessageFoundErr) { return nil @@ -103,7 +117,7 @@ func (s *Server) handleNewUnackedMsg(msg []byte) error { Headers: headersJson, } - dlsMsgId, err := db.StorePoisonMsg(station.ID, int(messageSeq), cgName, producedByHeader, poisonedCgs, messageDetails, station.TenantName) + dlsMsgId, err := db.StorePoisonMsg(station.ID, int(messageSeq), cgName, producedByHeader, poisonedCgs, messageDetails, station.TenantName, partitionNumber) if err != nil { serv.Errorf("[tenant: %v]handleNewUnackedMsg at StorePoisonMsg: Error while getting notified about a poison message: %v", station.TenantName, err.Error()) return err @@ -146,20 +160,29 @@ func (s *Server) handleSchemaverseDlsMsg(msg []byte) { } message.Message.TimeSent = time.Now() - _, err = db.InsertSchemaverseDlsMsg(station.ID, 0, message.Producer.Name, []string{}, models.MessagePayload(message.Message), message.ValidationError, tenantName) + _, err = db.InsertSchemaverseDlsMsg(station.ID, 0, message.Producer.Name, []string{}, models.MessagePayload(message.Message), message.ValidationError, tenantName, message.PartitionNumber) if err != nil { serv.Errorf("[tenant: %v]handleSchemaverseDlsMsg: %v", tenantName, err.Error()) return } } -func (pmh PoisonMessagesHandler) GetDlsMsgsByStationLight(station models.Station) ([]models.LightDlsMessageResponse, []models.LightDlsMessageResponse, int, error) { +func (pmh PoisonMessagesHandler) GetDlsMsgsByStationLight(station models.Station, partitionNumber int) ([]models.LightDlsMessageResponse, []models.LightDlsMessageResponse, int, error) { poisonMessages := make([]models.LightDlsMessageResponse, 0) schemaMessages := make([]models.LightDlsMessageResponse, 0) - dlsMsgs, err := db.GetDlsMsgsByStationId(station.ID) - if err != nil { - return []models.LightDlsMessageResponse{}, []models.LightDlsMessageResponse{}, 0, err + var dlsMsgs []models.DlsMessage + var err error + if partitionNumber == -1 { + dlsMsgs, err = db.GetDlsMsgsByStationId(station.ID) + if err != nil { + return []models.LightDlsMessageResponse{}, []models.LightDlsMessageResponse{}, 0, err + } + } else { + dlsMsgs, err = db.GetDlsMsgsByStationAndPartition(station.ID, partitionNumber) + if err != nil { + return []models.LightDlsMessageResponse{}, []models.LightDlsMessageResponse{}, 0, err + } } for _, v := range dlsMsgs { @@ -236,6 +259,7 @@ func (pmh PoisonMessagesHandler) GetDlsMessageDetailsById(messageId int, dlsType dlsMsg := models.DlsMessage{ ID: dlsMessage.ID, StationId: dlsMessage.StationId, + PartitionNumber: dlsMessage.PartitionNumber, MessageSeq: dlsMessage.MessageSeq, ProducerName: dlsMessage.ProducerName, PoisonedCgs: dlsMessage.PoisonedCgs, @@ -257,14 +281,14 @@ func (pmh PoisonMessagesHandler) GetDlsMessageDetailsById(messageId int, dlsType pc := models.PoisonedCg{} pCg := dlsMsg.PoisonedCgs for _, v := range pCg { - cgInfo, err := serv.GetCgInfo(station.TenantName, sn, v) + cgMembers, err := GetConsumerGroupMembers(v, station) if err != nil { - serv.Errorf("[tenant: %v]GetDlsMessageDetailsById at GetCgInfo: %v", station.TenantName, err.Error()) + serv.Errorf("[tenant: %v]GetDlsMessageDetailsById at GetConsumerGroupMembers: %v", station.TenantName, err.Error()) return models.DlsMessageResponse{}, err } - cgMembers, err := GetConsumerGroupMembers(v, station) + cgInfo, err := serv.GetCgInfo(station.TenantName, sn, v, cgMembers[0].PartitionsList) if err != nil { - serv.Errorf("[tenant: %v]GetDlsMessageDetailsById at GetConsumerGroupMembers: %v", station.TenantName, err.Error()) + serv.Errorf("[tenant: %v]GetDlsMessageDetailsById at GetCgInfo: %v", station.TenantName, err.Error()) return models.DlsMessageResponse{}, err } pc.IsActive, pc.IsDeleted = getCgStatus(cgMembers) @@ -325,9 +349,9 @@ func (pmh PoisonMessagesHandler) GetDlsMessageDetailsById(messageId int, dlsType return result, nil } -func GetPoisonedCgsByMessage(station models.Station, messageSeq int) ([]models.PoisonedCg, error) { +func GetPoisonedCgsByMessage(station models.Station, messageSeq, partitionNumber int) ([]models.PoisonedCg, error) { var dlsMsg models.DlsMessage - _, dlsMsg, err := db.GetMsgByStationIdAndMsgSeq(station.ID, messageSeq) + _, dlsMsg, err := db.GetMsgByStationIdAndMsgSeq(station.ID, messageSeq, partitionNumber) if err != nil { return []models.PoisonedCg{}, err } @@ -341,16 +365,18 @@ func GetPoisonedCgsByMessage(station models.Station, messageSeq int) ([]models.P if err != nil { return []models.PoisonedCg{}, err } - cgInfo, err := serv.GetCgInfo(station.TenantName, stationName, cg) + cgMembers, err := GetConsumerGroupMembers(cg, station) if err != nil { - serv.Errorf("[tenant: %v]GetPoisonedCgsByMessage at GetCgInfo: %v", station.TenantName, err.Error()) + serv.Errorf("[tenant: %v]GetPoisonedCgsByMessage at GetConsumerGroupMembers: %v", station.TenantName, err.Error()) return []models.PoisonedCg{}, err } - cgMembers, err := GetConsumerGroupMembers(cg, station) + + cgInfo, err := serv.GetCgInfo(station.TenantName, stationName, cg, cgMembers[0].PartitionsList) if err != nil { - serv.Errorf("[tenant: %v]GetPoisonedCgsByMessage at GetConsumerGroupMembers: %v", station.TenantName, err.Error()) + serv.Errorf("[tenant: %v]GetPoisonedCgsByMessage at GetCgInfo: %v", station.TenantName, err.Error()) return []models.PoisonedCg{}, err } + poisonedCg.IsActive, poisonedCg.IsDeleted = getCgStatus(cgMembers) poisonedCg.CgName = cg diff --git a/server/memphis_handlers_monitoring.go b/server/memphis_handlers_monitoring.go index 361356f8d..d39c872c8 100644 --- a/server/memphis_handlers_monitoring.go +++ b/server/memphis_handlers_monitoring.go @@ -525,43 +525,91 @@ func (mh MonitoringHandler) GetStationOverviewData(c *gin.Context) { c.AbortWithStatusJSON(500, gin.H{"message": "Server error"}) return } - totalMessages, err := stationsHandler.GetTotalMessages(station.TenantName, station.Name) - if err != nil { - if IsNatsErr(err, JSStreamNotFoundErr) { - serv.Warnf("[tenant: %v][user: %v]GetStationOverviewData at GetAuditLogsByStation: nats error At station %v: does not exist", user.TenantName, user.Username, body.StationName) - c.AbortWithStatusJSON(SHOWABLE_ERROR_STATUS_CODE, gin.H{"message": "Station " + body.StationName + " does not exist"}) - } else { - serv.Errorf("[tenant: %v][user: %v]GetStationOverviewData: At station %v: %v", user.TenantName, user.Username, body.StationName, err.Error()) - c.AbortWithStatusJSON(500, gin.H{"message": "Server error"}) + + var totalMessages int + if body.PartitionNumber == -1 { + totalMessages, err = stationsHandler.GetTotalMessages(station.TenantName, station.Name, station.PartitionsList) + if err != nil { + if IsNatsErr(err, JSStreamNotFoundErr) { + serv.Warnf("[tenant: %v][user: %v]GetStationOverviewData at GetTotalMessages: nats error At station %v: does not exist", user.TenantName, user.Username, body.StationName) + c.AbortWithStatusJSON(SHOWABLE_ERROR_STATUS_CODE, gin.H{"message": "Station " + body.StationName + " does not exist"}) + } else { + serv.Errorf("[tenant: %v][user: %v]GetStationOverviewData: At station %v: %v", user.TenantName, user.Username, body.StationName, err.Error()) + c.AbortWithStatusJSON(500, gin.H{"message": "Server error"}) + } + return } - return + } else { + totalMessages, err = stationsHandler.GetTotalPartitionMessages(station.TenantName, station.Name, body.PartitionNumber) + if err != nil { + if IsNatsErr(err, JSStreamNotFoundErr) { + serv.Warnf("[tenant: %v][user: %v]GetStationOverviewData at GetTotalPartitionMessages: nats error At station %v: does not exist", user.TenantName, user.Username, body.StationName) + c.AbortWithStatusJSON(SHOWABLE_ERROR_STATUS_CODE, gin.H{"message": "Station " + body.StationName + " does not exist"}) + } else { + serv.Errorf("[tenant: %v][user: %v]GetStationOverviewData: At station %v: %v", user.TenantName, user.Username, body.StationName, err.Error()) + c.AbortWithStatusJSON(500, gin.H{"message": "Server error"}) + } + return + } + } - avgMsgSize, err := stationsHandler.GetAvgMsgSize(station) - if err != nil { - if IsNatsErr(err, JSStreamNotFoundErr) { - serv.Warnf("[tenant: %v][user: %v]GetStationOverviewData at GetAvgMsgSize: At station %v: does not exist", user.TenantName, user.Username, body.StationName) - c.AbortWithStatusJSON(SHOWABLE_ERROR_STATUS_CODE, gin.H{"message": "Station " + body.StationName + " does not exist"}) - } else { - serv.Errorf("[tenant: %v][user: %v]GetStationOverviewData at GetAvgMsgSize: At station %v: %v", user.TenantName, user.Username, body.StationName, err.Error()) - c.AbortWithStatusJSON(500, gin.H{"message": "Server error"}) + + var avgMsgSize int64 + if body.PartitionNumber == -1 { + avgMsgSize, err = stationsHandler.GetAvgMsgSize(station) + if err != nil { + if IsNatsErr(err, JSStreamNotFoundErr) { + serv.Warnf("[tenant: %v][user: %v]GetStationOverviewData at GetAvgMsgSize: At station %v: does not exist", user.TenantName, user.Username, body.StationName) + c.AbortWithStatusJSON(SHOWABLE_ERROR_STATUS_CODE, gin.H{"message": "Station " + body.StationName + " does not exist"}) + } else { + serv.Errorf("[tenant: %v][user: %v]GetStationOverviewData at GetAvgMsgSize: At station %v: %v", user.TenantName, user.Username, body.StationName, err.Error()) + c.AbortWithStatusJSON(500, gin.H{"message": "Server error"}) + } + return + } + } else { + avgMsgSize, err = stationsHandler.GetPartitionAvgMsgSize(station.TenantName, fmt.Sprintf("%v$%v", stationName.Intern(), body.PartitionNumber)) + if err != nil { + if IsNatsErr(err, JSStreamNotFoundErr) { + serv.Warnf("[tenant: %v][user: %v]GetStationOverviewData at GetAvgMsgSize: At station %v: does not exist", user.TenantName, user.Username, body.StationName) + c.AbortWithStatusJSON(SHOWABLE_ERROR_STATUS_CODE, gin.H{"message": "Station " + body.StationName + " does not exist"}) + } else { + serv.Errorf("[tenant: %v][user: %v]GetStationOverviewData at GetAvgMsgSize: At station %v: %v", user.TenantName, user.Username, body.StationName, err.Error()) + c.AbortWithStatusJSON(500, gin.H{"message": "Server error"}) + } + return } - return } messagesToFetch := 1000 - messages, err := stationsHandler.GetMessages(station, messagesToFetch) - if err != nil { - if IsNatsErr(err, JSStreamNotFoundErr) { - serv.Warnf("[tenant: %v][user: %v]GetStationOverviewData at GetMessages: nats error At station %v: does not exist", user.TenantName, user.Username, body.StationName) - c.AbortWithStatusJSON(SHOWABLE_ERROR_STATUS_CODE, gin.H{"message": "Station " + body.StationName + " does not exist"}) - } else { - serv.Errorf("GetStationOverviewData at GetMessages: At station " + body.StationName + ": " + err.Error()) - c.AbortWithStatusJSON(500, gin.H{"message": "Server error"}) + messages := make([]models.MessageDetails, 0) + if body.PartitionNumber == -1 { + messages, err = stationsHandler.GetMessages(station, messagesToFetch) + if err != nil { + if IsNatsErr(err, JSStreamNotFoundErr) { + serv.Warnf("[tenant: %v][user: %v]GetStationOverviewData at GetMessages: nats error At station %v: does not exist", user.TenantName, user.Username, body.StationName) + c.AbortWithStatusJSON(SHOWABLE_ERROR_STATUS_CODE, gin.H{"message": "Station " + body.StationName + " does not exist"}) + } else { + serv.Errorf("GetStationOverviewData at GetMessages: At station " + body.StationName + ": " + err.Error()) + c.AbortWithStatusJSON(500, gin.H{"message": "Server error"}) + } + return + } + } else { + messages, err = stationsHandler.GetMessagesFromPartition(station, fmt.Sprintf("%v$%v", stationName.Intern(), body.PartitionNumber), messagesToFetch, body.PartitionNumber) + if err != nil { + if IsNatsErr(err, JSStreamNotFoundErr) { + serv.Warnf("[tenant: %v][user: %v]GetStationOverviewData at GetMessages: nats error At station %v: does not exist", user.TenantName, user.Username, body.StationName) + c.AbortWithStatusJSON(SHOWABLE_ERROR_STATUS_CODE, gin.H{"message": "Station " + body.StationName + " does not exist"}) + } else { + serv.Errorf("GetStationOverviewData at GetMessages: At station " + body.StationName + ": " + err.Error()) + c.AbortWithStatusJSON(500, gin.H{"message": "Server error"}) + } + return } - return } - poisonMessages, schemaFailedMessages, totalDlsAmount, err := poisonMsgsHandler.GetDlsMsgsByStationLight(station) + poisonMessages, schemaFailedMessages, totalDlsAmount, err := poisonMsgsHandler.GetDlsMsgsByStationLight(station, body.PartitionNumber) if err != nil { if IsNatsErr(err, JSStreamNotFoundErr) { serv.Warnf("[tenant: %v][user: %v]GetStationOverviewData at GetDlsMsgsByStationLight: nats error At station %v: does not exist", user.TenantName, user.Username, body.StationName) diff --git a/server/memphis_handlers_producers.go b/server/memphis_handlers_producers.go index 2cef1af5c..1e156f938 100644 --- a/server/memphis_handlers_producers.go +++ b/server/memphis_handlers_producers.go @@ -43,42 +43,42 @@ func validateProducerType(producerType string) error { return nil } -func (s *Server) createProducerDirectCommon(c *client, pName, pType, pConnectionId string, pStationName StationName, username string, tenantName string) (bool, bool, error) { +func (s *Server) createProducerDirectCommon(c *client, pName, pType, pConnectionId string, pStationName StationName, username string, tenantName string) (bool, bool, error, models.Station) { name := strings.ToLower(pName) err := validateProducerName(name) if err != nil { serv.Warnf("createProducerDirectCommon at validateProducerName: Producer %v at station %v: %v", pName, pStationName.external, err.Error()) - return false, false, err + return false, false, err, models.Station{} } producerType := strings.ToLower(pType) err = validateProducerType(producerType) if err != nil { serv.Warnf("createProducerDirectCommon at validateProducerType: Producer %v at station %v: %v", pName, pStationName.external, err.Error()) - return false, false, err + return false, false, err, models.Station{} } exist, user, err := memphis_cache.GetUser(username, tenantName, false) if err != nil { serv.Errorf("createProducerDirectCommon at GetUser: Producer %v at station %v: %v", pName, pStationName.external, err.Error()) - return false, false, err + return false, false, err, models.Station{} } if !exist { serv.Warnf("createProducerDirectCommon: User %v does not exist", username) - return false, false, errors.New("User " + username + " does not exist") + return false, false, errors.New("User " + username + " does not exist"), models.Station{} } exist, station, err := db.GetStationByName(pStationName.Ext(), user.TenantName) if err != nil { serv.Errorf("[tenant: %v][user: %v]createProducerDirectCommon at GetStationByName: Producer %v at station %v: %v", user.TenantName, user.Username, pName, pStationName.external, err.Error()) - return false, false, err + return false, false, err, models.Station{} } if !exist { var created bool station, created, err = CreateDefaultStation(user.TenantName, s, pStationName, user.ID, user.Username) if err != nil { serv.Errorf("[tenant: %v][user: %v]createProducerDirectCommon at CreateDefaultStation: creating default station error - producer %v at station %v: %v", user.TenantName, user.Username, pName, pStationName.external, err.Error()) - return false, false, err + return false, false, err, models.Station{} } if created { message := "Station " + pStationName.Ext() + " has been created by user " + user.Username @@ -106,10 +106,10 @@ func (s *Server) createProducerDirectCommon(c *client, pName, pType, pConnection } } - newProducer, err := db.InsertNewProducer(name, station.ID, producerType, pConnectionId, station.TenantName) + newProducer, err := db.InsertNewProducer(name, station.ID, producerType, pConnectionId, station.TenantName, station.PartitionsList) if err != nil { serv.Warnf("[tenant: %v][user: %v]createProducerDirectCommon at InsertNewProducer: %v", user.TenantName, user.Username, err.Error()) - return false, false, err + return false, false, err, models.Station{} } message := "Producer " + name + " connected" serv.Noticef("[tenant: %v][user: %v]: %v", user.TenantName, user.Username, message) @@ -126,7 +126,7 @@ func (s *Server) createProducerDirectCommon(c *client, pName, pType, pConnection err = CreateAuditLogs(auditLogs) if err != nil { serv.Errorf("[tenant: %v][user: %v]createProducerDirectCommon at CreateAuditLogs: Producer %v at station %v: %v", user.TenantName, user.Username, pName, pStationName.external, err.Error()) - return false, false, err + return false, false, err, models.Station{} } shouldSendAnalytics, _ := shouldSendAnalytics() @@ -141,7 +141,7 @@ func (s *Server) createProducerDirectCommon(c *client, pName, pType, pConnection } shouldSendNotifications := shouldSendNotification(user.TenantName, SchemaVAlert) - return shouldSendNotifications, station.DlsConfigurationSchemaverse, nil + return shouldSendNotifications, station.DlsConfigurationSchemaverse, nil, station } func (s *Server) createProducerDirectV0(c *client, reply string, cpr createProducerRequestV0, tenantName string) { @@ -150,7 +150,7 @@ func (s *Server) createProducerDirectV0(c *client, reply string, cpr createProdu respondWithErr(s.MemphisGlobalAccountString(), s, reply, err) return } - _, _, err = s.createProducerDirectCommon(c, cpr.Name, + _, _, err, _ = s.createProducerDirectCommon(c, cpr.Name, cpr.ProducerType, cpr.ConnectionId, sn, cpr.Username, tenantName) respondWithErr(s.MemphisGlobalAccountString(), s, reply, err) } @@ -183,12 +183,14 @@ func (s *Server) createProducerDirect(c *client, reply string, msg []byte) { return } - clusterSendNotification, schemaVerseToDls, err := s.createProducerDirectCommon(c, cpr.Name, cpr.ProducerType, cpr.ConnectionId, sn, cpr.Username, tenantName) + clusterSendNotification, schemaVerseToDls, err, station := s.createProducerDirectCommon(c, cpr.Name, cpr.ProducerType, cpr.ConnectionId, sn, cpr.Username, tenantName) if err != nil { respondWithRespErr(s.MemphisGlobalAccountString(), s, reply, err, &resp) return } + partitions := models.PartitionsUpdate{PartitionsList: station.PartitionsList} + resp.PartitionsUpdate = partitions resp.SchemaVerseToDls = schemaVerseToDls resp.ClusterSendNotification = clusterSendNotification schemaUpdate, err := getSchemaUpdateInitFromStation(sn, cpr.TenantName) diff --git a/server/memphis_handlers_stations.go b/server/memphis_handlers_stations.go index 92812f36a..98dc35820 100644 --- a/server/memphis_handlers_stations.go +++ b/server/memphis_handlers_stations.go @@ -22,7 +22,6 @@ import ( "memphis/models" "memphis/utils" "regexp" - "sort" "strconv" "strings" "time" @@ -110,9 +109,19 @@ func removeStationResources(s *Server, station models.Station, shouldDeleteStrea } if shouldDeleteStream { - err = s.RemoveStream(station.TenantName, stationName.Intern()) - if err != nil && !IsNatsErr(err, JSStreamNotFoundErr) { - return err + if len(station.PartitionsList) == 0 { + err = s.RemoveStream(station.TenantName, stationName.Intern()) + if err != nil && !IsNatsErr(err, JSStreamNotFoundErr) { + return err + } + } else { + for _, p := range station.PartitionsList { + streamName := fmt.Sprintf("%v$%v", stationName.Intern(), p) + err = s.RemoveStream(station.TenantName, streamName) + if err != nil && !IsNatsErr(err, JSStreamNotFoundErr) { + return err + } + } } } @@ -165,6 +174,16 @@ func (s *Server) createStationDirectIntern(c *client, isNative := shouldCreateStream jsApiResp := JSApiStreamCreateResponse{ApiResponse: ApiResponse{Type: JSApiStreamCreateResponseType}} memphisGlobalAcc := s.MemphisGlobalAccount() + + if csr.PartitionsNumber > MAX_PARTITIONS || csr.PartitionsNumber < 1 { + errMsg := fmt.Errorf("cannot create station with %v partitions (max:%v min:1): Station %v", csr.PartitionsNumber, MAX_PARTITIONS, csr.StationName) + serv.Warnf("[tenant: %v][user:%v]createStationDirect %v", csr.TenantName, csr.Username, errMsg) + jsApiResp.Error = NewJSStreamCreateError(errMsg) + respondWithErrOrJsApiRespWithEcho(!isNative, c, memphisGlobalAcc, _EMPTY_, reply, _EMPTY_, jsApiResp, errMsg) + return + } + partitionsList := make([]int, 0) + stationName, err := StationNameFromStr(csr.StationName) if err != nil { serv.Warnf("[tenant: %v][user:%v]createStationDirect at StationNameFromStr: Station %v: %v", csr.TenantName, csr.Username, csr.StationName, err.Error()) @@ -304,17 +323,20 @@ func (s *Server) createStationDirectIntern(c *client, } if shouldCreateStream { - err = s.CreateStream(csr.TenantName, stationName, retentionType, retentionValue, storageType, csr.IdempotencyWindow, replicas, csr.TieredStorageEnabled) - if err != nil { - if IsNatsErr(err, JSStreamReplicasNotSupportedErr) { - serv.Warnf("[tenant: %v][user:%v]CreateStationDirect: Station %v: Station can not be created, probably since replicas count is larger than the cluster size", csr.TenantName, csr.Username, stationName.Ext()) - respondWithErr(s.MemphisGlobalAccountString(), s, reply, errors.New("station can not be created, probably since replicas count is larger than the cluster size")) + for p := 1; p <= csr.PartitionsNumber; p++ { + err = s.CreateStream(csr.TenantName, stationName, retentionType, retentionValue, storageType, csr.IdempotencyWindow, replicas, csr.TieredStorageEnabled, p) + if err != nil { + if IsNatsErr(err, JSStreamReplicasNotSupportedErr) { + serv.Warnf("[tenant: %v][user:%v]CreateStationDirect: Station %v: Station can not be created, probably since replicas count is larger than the cluster size", csr.TenantName, csr.Username, stationName.Ext()) + respondWithErr(s.MemphisGlobalAccountString(), s, reply, errors.New("station can not be created, probably since replicas count is larger than the cluster size")) + return + } + + serv.Errorf("[tenant: %v][user:%v]createStationDirect: Station %v: %v", csr.TenantName, csr.Username, csr.StationName, err.Error()) + respondWithErr(s.MemphisGlobalAccountString(), s, reply, err) return } - - serv.Errorf("[tenant: %v][user:%v]createStationDirect: Station %v: %v", csr.TenantName, csr.Username, csr.StationName, err.Error()) - respondWithErr(s.MemphisGlobalAccountString(), s, reply, err) - return + partitionsList = append(partitionsList, p) } } @@ -330,7 +352,7 @@ func (s *Server) createStationDirectIntern(c *client, return } - _, rowsUpdated, err := db.InsertNewStation(stationName.Ext(), user.ID, user.Username, retentionType, retentionValue, storageType, replicas, schemaDetails.SchemaName, schemaDetails.VersionNumber, csr.IdempotencyWindow, isNative, csr.DlsConfiguration, csr.TieredStorageEnabled, user.TenantName) + _, rowsUpdated, err := db.InsertNewStation(stationName.Ext(), user.ID, user.Username, retentionType, retentionValue, storageType, replicas, schemaDetails.SchemaName, schemaDetails.VersionNumber, csr.IdempotencyWindow, isNative, csr.DlsConfiguration, csr.TieredStorageEnabled, user.TenantName, partitionsList, 1) if err != nil { if !strings.Contains(err.Error(), "already exist") { serv.Errorf("[tenant: %v][user:%v]createStationDirect at InsertNewStation: Station %v: %v", csr.TenantName, csr.Username, csr.StationName, err.Error()) @@ -437,6 +459,8 @@ func (sh StationsHandler) GetStation(c *gin.Context) { TieredStorageEnabled: station.TieredStorageEnabled, Tags: tags, ResendDisabled: station.ResendDisabled, + PartitionsList: station.PartitionsList, + PartitionsNumber: len(station.PartitionsList), } c.IndentedJSON(200, stationResponse) @@ -459,7 +483,12 @@ func (sh StationsHandler) GetStationsDetails(tenantName string) ([]models.Extend for _, info := range allStreamInfo { streamName := info.Config.Name if !strings.Contains(streamName, "$memphis") { - stationTotalMsgs[streamName] = int(info.State.Msgs) + if strings.Contains(streamName, "$") { + stationNameAndPartition := strings.Split(streamName, "$") + stationTotalMsgs[stationNameAndPartition[0]] += int(info.State.Msgs) + } else { + stationTotalMsgs[streamName] = int(info.State.Msgs) + } } } stationIdsDlsMsgs, err := db.GetStationIdsFromDlsMsgs(tenantName) @@ -524,6 +553,8 @@ func (sh StationsHandler) GetStationsDetails(tenantName string) ([]models.Extend IsNative: station.IsNative, TieredStorageEnabled: station.TieredStorageEnabled, ResendDisabled: station.ResendDisabled, + PartitionsList: station.PartitionsList, + Version: station.Version, } exStations = append(exStations, models.ExtendedStationDetails{Station: stationRes, HasDlsMsgs: hasDlsMsgs, TotalMessages: totalMsgInfo, Tags: tags, Activity: activity}) @@ -676,7 +707,12 @@ func (sh StationsHandler) GetAllStationsDetailsLight(shouldExtend bool, tenantNa streamName := info.Config.Name if !strings.Contains(streamName, "$memphis") { totalMessages += info.State.Msgs - stationTotalMsgs[streamName] = int(info.State.Msgs) + if strings.Contains(streamName, "$") { + stationNameAndPartition := strings.Split(streamName, "$") + stationTotalMsgs[stationNameAndPartition[0]] += int(info.State.Msgs) + } else { + stationTotalMsgs[streamName] = int(info.State.Msgs) + } } } stationIdsDlsMsgs, err := db.GetStationIdsFromDlsMsgs(tenantName) @@ -778,6 +814,14 @@ func (sh StationsHandler) CreateStation(c *gin.Context) { return } + if body.PartitionsNumber > MAX_PARTITIONS || body.PartitionsNumber < 1 { + errMsg := fmt.Errorf("cannot create station with %v replicas (max:%v min:1): Station %v", body.PartitionsNumber, MAX_PARTITIONS, body.Name) + serv.Errorf("[tenant: %v][user:%v]CreateStation %v", user.TenantName, user.Username, errMsg) + c.AbortWithStatusJSON(500, gin.H{"message": "Server error"}) + return + } + partitionsList := make([]int, 0) + stationName, err := StationNameFromStr(body.Name) if err != nil { serv.Warnf("[tenant: %v][user: %v]CreateStation at StationNameFromStr: Station %v: %v", user.TenantName, user.Username, body.Name, err.Error()) @@ -885,7 +929,23 @@ func (sh StationsHandler) CreateStation(c *gin.Context) { body.IdempotencyWindow = 100 // minimum is 100 millis } - newStation, rowsUpdated, err := db.InsertNewStation(stationName.Ext(), user.ID, user.Username, retentionType, body.RetentionValue, body.StorageType, body.Replicas, schemaName, schemaVersionNumber, body.IdempotencyWindow, true, body.DlsConfiguration, body.TieredStorageEnabled, tenantName) + for p := 1; p <= body.PartitionsNumber; p++ { + err = sh.S.CreateStream(tenantName, stationName, retentionType, body.RetentionValue, body.StorageType, body.IdempotencyWindow, body.Replicas, body.TieredStorageEnabled, p) + if err != nil { + if IsNatsErr(err, JSInsufficientResourcesErr) { + serv.Warnf("[tenant: %v][user: %v]CreateStation: Station %v: Station can not be created, probably since replicas count is larger than the cluster size", user.TenantName, user.Username, body.Name) + c.AbortWithStatusJSON(SHOWABLE_ERROR_STATUS_CODE, gin.H{"message": "Station can not be created, probably since replicas count is larger than the cluster size"}) + return + } + + serv.Errorf("[tenant: %v][user: %v]CreateStation at CreateStream: Station %v: %v", user.TenantName, user.Username, body.Name, err.Error()) + c.AbortWithStatusJSON(500, gin.H{"message": "Server error"}) + return + } + partitionsList = append(partitionsList, p) + } + + newStation, rowsUpdated, err := db.InsertNewStation(stationName.Ext(), user.ID, user.Username, retentionType, body.RetentionValue, body.StorageType, body.Replicas, schemaName, schemaVersionNumber, body.IdempotencyWindow, true, body.DlsConfiguration, body.TieredStorageEnabled, tenantName, partitionsList, 1) if err != nil { serv.Errorf("[tenant: %v][user: %v]CreateStation at db.InsertNewStation: Station %v: %v", user.TenantName, user.Username, body.Name, err.Error()) c.AbortWithStatusJSON(500, gin.H{"message": "Server error"}) @@ -900,19 +960,6 @@ func (sh StationsHandler) CreateStation(c *gin.Context) { return } - err = sh.S.CreateStream(tenantName, stationName, retentionType, body.RetentionValue, body.StorageType, body.IdempotencyWindow, body.Replicas, body.TieredStorageEnabled) - if err != nil { - if IsNatsErr(err, JSInsufficientResourcesErr) { - serv.Warnf("[tenant: %v][user: %v]CreateStation: Station %v: Station can not be created, probably since replicas count is larger than the cluster size", user.TenantName, user.Username, body.Name) - c.AbortWithStatusJSON(SHOWABLE_ERROR_STATUS_CODE, gin.H{"message": "Station can not be created, probably since replicas count is larger than the cluster size"}) - return - } - - serv.Errorf("[tenant: %v][user: %v]CreateStation at CreateStream: Station %v: %v", user.TenantName, user.Username, body.Name, err.Error()) - c.AbortWithStatusJSON(500, gin.H{"message": "Server error"}) - return - } - if len(body.Tags) > 0 { err = AddTagsToEntity(body.Tags, "station", newStation.ID, newStation.TenantName) if err != nil { @@ -1190,13 +1237,43 @@ func (s *Server) removeStationDirectIntern(c *client, respondWithErr(s.MemphisGlobalAccountString(), s, reply, nil) } -func (sh StationsHandler) GetTotalMessages(tenantName, stationNameExt string) (int, error) { +func (sh StationsHandler) GetTotalMessages(tenantName, stationNameExt string, partitionsList []int) (int, error) { stationName, err := StationNameFromStr(stationNameExt) if err != nil { return 0, err } - totalMessages, err := sh.S.GetTotalMessagesInStation(tenantName, stationName) - return totalMessages, err + + totalMessages := 0 + if len(partitionsList) == 0 { + totalMessages, err = sh.S.GetTotalMessagesInStation(tenantName, stationName.Intern()) + return totalMessages, err + } else { + for _, p := range partitionsList { + streamMessages, err := sh.S.GetTotalMessagesInStation(tenantName, fmt.Sprintf("%v$%v", stationName.Intern(), p)) + if err != nil { + return totalMessages, err + } + + totalMessages = totalMessages + streamMessages + } + return totalMessages, nil + } + +} + +func (sh StationsHandler) GetTotalPartitionMessages(tenantName, stationNameExt string, partitionNumber int) (int, error) { + stationName, err := StationNameFromStr(stationNameExt) + if err != nil { + return 0, err + } + + totalMessages, err := sh.S.GetTotalMessagesInStation(tenantName, fmt.Sprintf("%v$%v", stationName.Intern(), partitionNumber)) + if err != nil { + return 0, err + } + + return totalMessages, nil + } func (sh StationsHandler) GetAvgMsgSize(station models.Station) (int64, error) { @@ -1204,6 +1281,11 @@ func (sh StationsHandler) GetAvgMsgSize(station models.Station) (int64, error) { return avgMsgSize, err } +func (sh StationsHandler) GetPartitionAvgMsgSize(tenantName, streamName string) (int64, error) { + avgPartitionMsgSize, err := sh.S.GetAvgMsgSizeInPartition(tenantName, streamName) + return avgPartitionMsgSize, err +} + func (sh StationsHandler) GetMessages(station models.Station, messagesToFetch int) ([]models.MessageDetails, error) { messages, err := sh.S.GetMessages(station, messagesToFetch) if err != nil { @@ -1213,6 +1295,15 @@ func (sh StationsHandler) GetMessages(station models.Station, messagesToFetch in return messages, nil } +func (sh StationsHandler) GetMessagesFromPartition(station models.Station, streamName string, messagesToFetch int, partition int) ([]models.MessageDetails, error) { + messages, err := sh.S.GetMessagesFromPartition(station, streamName, messagesToFetch, partition) + if err != nil { + return messages, err + } + + return messages, nil +} + func (sh StationsHandler) GetLeaderAndFollowers(station models.Station) (string, []string, error) { if sh.S.JetStreamIsClustered() { leader, followers, err := sh.S.GetLeaderAndFollowers(station) @@ -1426,7 +1517,7 @@ func (sh StationsHandler) GetMessageDetails(c *gin.Context) { return } - sm, err := sh.S.GetMessage(station.TenantName, stationName, uint64(body.MessageSeq)) + sm, err := sh.S.GetMessage(station.TenantName, stationName, uint64(body.MessageSeq), body.PartitionNumber) if err != nil { if IsNatsErr(err, JSNoMessageFoundErr) { c.AbortWithStatusJSON(SHOWABLE_ERROR_STATUS_CODE, gin.H{"message": "The message was not found since it had probably already been deleted"}) @@ -1491,40 +1582,12 @@ func (sh StationsHandler) GetMessageDetails(c *gin.Context) { poisonedCgs := make([]models.PoisonedCg, 0) // Only native stations have CGs if station.IsNative { - poisonedCgs, err = GetPoisonedCgsByMessage(station, int(sm.Sequence)) + poisonedCgs, err = GetPoisonedCgsByMessage(station, int(sm.Sequence), body.PartitionNumber) if err != nil { serv.Errorf("[tenant: %v][user: %v]GetMessageDetails at GetPoisonedCgsByMessage: Message ID: %v: %v", user.TenantName, user.Username, strconv.Itoa(msgId), err.Error()) c.AbortWithStatusJSON(500, gin.H{"message": "Server error"}) return } - - for i, cg := range poisonedCgs { - cgInfo, err := serv.GetCgInfo(station.TenantName, stationName, cg.CgName) - if err != nil { - serv.Errorf("[tenant: %v][user: %v]GetMessageDetails at GetCgInfo: Message ID: %v: %v", user.TenantName, user.Username, strconv.Itoa(msgId), err.Error()) - c.AbortWithStatusJSON(500, gin.H{"message": "Server error"}) - return - } - cgMembers, err := GetConsumerGroupMembers(cg.CgName, station) - if err != nil { - serv.Errorf("[tenant: %v][user: %v]GetMessageDetails at GetConsumerGroupMembers: Message ID: %v: %v", user.TenantName, user.Username, strconv.Itoa(msgId), err.Error()) - c.AbortWithStatusJSON(500, gin.H{"message": "Server error"}) - return - } - - isActive, isDeleted := getCgStatus(cgMembers) - - poisonedCgs[i].MaxAckTimeMs = cgMembers[0].MaxAckTimeMs - poisonedCgs[i].MaxMsgDeliveries = cgMembers[0].MaxMsgDeliveries - poisonedCgs[i].UnprocessedMessages = int(cgInfo.NumPending) - poisonedCgs[i].InProcessMessages = cgInfo.NumAckPending - poisonedCgs[i].TotalPoisonMessages = -1 - poisonedCgs[i].IsActive = isActive - poisonedCgs[i].IsDeleted = isDeleted - } - sort.Slice(poisonedCgs, func(i, j int) bool { - return poisonedCgs[i].CgName < poisonedCgs[j].CgName - }) } isActive := false exist, producer, err := db.GetProducerByStationIDAndConnectionId(producedByHeader, station.ID, connectionId) @@ -2055,20 +2118,51 @@ func (sh StationsHandler) PurgeStation(c *gin.Context) { } if body.PurgeStation { - err = sh.S.PurgeStream(station.TenantName, stationName.Intern()) - if err != nil && !IsNatsErr(err, JSStreamNotFoundErr) { - serv.Errorf("[tenant: %v][user: %v]PurgeStation: %v", user.TenantName, user.Username, err.Error()) - c.AbortWithStatusJSON(500, gin.H{"message": "Server error"}) - return + if len(station.PartitionsList) == 0 && body.PartitionsList[0] == -1 { + err = sh.S.PurgeStream(station.TenantName, stationName.Intern(), -1) + if err != nil && !IsNatsErr(err, JSStreamNotFoundErr) { + serv.Errorf("[tenant: %v][user: %v]PurgeStation: %v", user.TenantName, user.Username, err.Error()) + c.AbortWithStatusJSON(500, gin.H{"message": "Server error"}) + return + } + } else if body.PartitionsList[0] == -1 { + for _, p := range station.PartitionsList { + err = sh.S.PurgeStream(station.TenantName, stationName.Intern(), p) + if err != nil && !IsNatsErr(err, JSStreamNotFoundErr) { + serv.Errorf("[tenant: %v][user: %v]PurgeStation: %v", user.TenantName, user.Username, err.Error()) + c.AbortWithStatusJSON(500, gin.H{"message": "Server error"}) + return + } + } + } else { + for _, p := range body.PartitionsList { + err = sh.S.PurgeStream(station.TenantName, stationName.Intern(), p) + if err != nil && !IsNatsErr(err, JSStreamNotFoundErr) { + serv.Errorf("[tenant: %v][user: %v]PurgeStation: %v", user.TenantName, user.Username, err.Error()) + c.AbortWithStatusJSON(500, gin.H{"message": "Server error"}) + return + } + } } } if body.PurgeDls { - err := db.PurgeDlsMsgsFromStation(station.ID) - if err != nil { - serv.Errorf("[tenant: %v][user: %v]PurgeStation dls at PurgeDlsMsgsFromStation: %v", user.TenantName, user.Username, err.Error()) - c.AbortWithStatusJSON(500, gin.H{"message": "Server error"}) - return + if body.PartitionsList[0] == -1 { + err := db.PurgeDlsMsgsFromStation(station.ID) + if err != nil { + serv.Errorf("[tenant: %v][user: %v]PurgeStation dls at PurgeDlsMsgsFromStation: %v", user.TenantName, user.Username, err.Error()) + c.AbortWithStatusJSON(500, gin.H{"message": "Server error"}) + return + } + } else { + for _, p := range body.PartitionsList { + err := db.PurgeDlsMsgsFromPartition(station.ID, p) + if err != nil { + serv.Errorf("[tenant: %v][user: %v]PurgeStation dls at PurgeDlsMsgsFromStation: %v", user.TenantName, user.Username, err.Error()) + c.AbortWithStatusJSON(500, gin.H{"message": "Server error"}) + return + } + } } } @@ -2114,8 +2208,8 @@ func (sh StationsHandler) RemoveMessages(c *gin.Context) { return } - for _, msg := range body.MessageSeqs { - err = sh.S.RemoveMsg(station.TenantName, stationName, msg) + for _, msg := range body.Messages { + err = sh.S.RemoveMsg(station.TenantName, stationName, msg.MessageSeq, msg.PartitionNumber) if err != nil { if IsNatsErr(err, JSStreamNotFoundErr) || IsNatsErr(err, JSStreamMsgDeleteFailedF) { continue diff --git a/server/memphis_handlers_ws.go b/server/memphis_handlers_ws.go index 7837e7366..9fa4ddc21 100644 --- a/server/memphis_handlers_ws.go +++ b/server/memphis_handlers_ws.go @@ -206,12 +206,20 @@ func memphisWSGetReqFillerFromSubj(s *Server, h *Handlers, subj string, tenantNa }, nil case memphisWS_Subj_StationOverviewData: - stationName := strings.Join(strings.Split(subj, ".")[1:], ".") + splitedResp := strings.Split(subj, ".") + partitionNumberStr := splitedResp[len(splitedResp)-1] + + partitionNumber, err := strconv.Atoi(partitionNumberStr) + if err != nil { + return nil, fmt.Errorf("invalid partition number - %v", partitionNumberStr) + } + + stationName := strings.Join(splitedResp[1:len(splitedResp)-1], ".") if stationName == _EMPTY_ { return nil, errors.New("invalid station name") } return func(string) (any, error) { - return memphisWSGetStationOverviewData(s, h, stationName, tenantName) + return memphisWSGetStationOverviewData(s, h, stationName, tenantName, partitionNumber) }, nil case memphisWS_Subj_PoisonMsgJourneyData: @@ -256,7 +264,7 @@ func memphisWSGetReqFillerFromSubj(s *Server, h *Handlers, subj string, tenantNa } } -func memphisWSGetStationOverviewData(s *Server, h *Handlers, stationName string, tenantName string) (map[string]any, error) { +func memphisWSGetStationOverviewData(s *Server, h *Handlers, stationName string, tenantName string, partitionNumber int) (map[string]any, error) { sn, err := StationNameFromStr(stationName) if err != nil { return map[string]any{}, err @@ -281,22 +289,40 @@ func memphisWSGetStationOverviewData(s *Server, h *Handlers, stationName string, if err != nil { return map[string]any{}, err } - totalMessages, err := h.Stations.GetTotalMessages(station.TenantName, station.Name) - if err != nil { - return map[string]any{}, err - } - avgMsgSize, err := h.Stations.GetAvgMsgSize(station) - if err != nil { - return map[string]any{}, err - } + var totalMessages int + var avgMsgSize int64 + messages := make([]models.MessageDetails, 0) messagesToFetch := 1000 - messages, err := h.Stations.GetMessages(station, messagesToFetch) - if err != nil { - return map[string]any{}, err + if partitionNumber == -1 { + totalMessages, err = h.Stations.GetTotalMessages(station.TenantName, station.Name, station.PartitionsList) + if err != nil { + return map[string]any{}, err + } + avgMsgSize, err = h.Stations.GetAvgMsgSize(station) + if err != nil { + return map[string]any{}, err + } + messages, err = h.Stations.GetMessages(station, messagesToFetch) + if err != nil { + return map[string]any{}, err + } + } else { + totalMessages, err = h.Stations.GetTotalPartitionMessages(station.TenantName, station.Name, partitionNumber) + if err != nil { + return map[string]any{}, err + } + avgMsgSize, err = h.Stations.GetPartitionAvgMsgSize(station.TenantName, fmt.Sprintf("%v$%v", sn.Intern(), partitionNumber)) + if err != nil { + return map[string]any{}, err + } + messages, err = h.Stations.GetMessagesFromPartition(station, fmt.Sprintf("%v$%v", sn.Intern(), partitionNumber), messagesToFetch, partitionNumber) + if err != nil { + return map[string]any{}, err + } } - poisonMessages, schemaFailMessages, totalDlsAmount, err := h.PoisonMsgs.GetDlsMsgsByStationLight(station) + poisonMessages, schemaFailMessages, totalDlsAmount, err := h.PoisonMsgs.GetDlsMsgsByStationLight(station, partitionNumber) if err != nil { return map[string]any{}, err } diff --git a/server/memphis_helper.go b/server/memphis_helper.go index 6624189ed..5430808eb 100644 --- a/server/memphis_helper.go +++ b/server/memphis_helper.go @@ -134,6 +134,8 @@ var ( THROUGHPUT_LEGACY_STREAM_EXIST bool ) +type Messages []models.MessageDetails + func createReplyHandler(s *Server, respCh chan []byte) simplifiedMsgHandler { return func(_ *client, subject, _ string, msg []byte) { go func(msg []byte) { @@ -187,7 +189,7 @@ func RemoveUser(username string) error { return nil } -func (s *Server) CreateStream(tenantName string, sn StationName, retentionType string, retentionValue int, storageType string, idempotencyW int64, replicas int, tieredStorageEnabled bool) error { +func (s *Server) CreateStream(tenantName string, sn StationName, retentionType string, retentionValue int, storageType string, idempotencyW int64, replicas int, tieredStorageEnabled bool, partition_number int) error { var maxMsgs int if retentionType == "messages" && retentionValue > 0 { maxMsgs = retentionValue @@ -221,10 +223,17 @@ func (s *Server) CreateStream(tenantName string, sn StationName, retentionType s idempotencyWindow = time.Duration(idempotencyW) * time.Millisecond } + var internName string + if partition_number > 0 { + internName = fmt.Sprintf("%v$%v", sn.Intern(), partition_number) + } else { + internName = sn.Intern() + } + return s. memphisAddStream(tenantName, &StreamConfig{ - Name: sn.Intern(), - Subjects: []string{sn.Intern() + ".>"}, + Name: internName, + Subjects: []string{internName + ".>"}, Retention: retentionPolicy, MaxConsumers: -1, MaxMsgs: int64(maxMsgs), @@ -565,7 +574,7 @@ func getInternalConsumerName(cn string) string { return replaceDelimiters(cn) } -func (s *Server) CreateConsumer(tenantName string, consumer models.Consumer, station models.Station) error { +func (s *Server) CreateConsumer(tenantName string, consumer models.Consumer, station models.Station, partitionsList []int) error { var consumerName string if consumer.ConsumersGroup != "" { consumerName = consumer.ConsumersGroup @@ -594,6 +603,10 @@ func (s *Server) CreateConsumer(tenantName string, consumer models.Consumer, sta return err } + if len(partitionsList) > len(station.PartitionsList) { + partitionsList = station.PartitionsList + } + var deliveryPolicy DeliverPolicy var optStartSeq uint64 // This check for case when the last message is 0 (in case StartConsumeFromSequence > 1 the LastMessages is 0 ) @@ -617,26 +630,52 @@ func (s *Server) CreateConsumer(tenantName string, consumer models.Consumer, sta } else if consumer.StartConsumeFromSeq == 1 || consumer.LastMessages == -1 { deliveryPolicy = DeliverAll } + if len(partitionsList) == 0 { + consumerConfig := &ConsumerConfig{ + Durable: consumerName, + DeliverPolicy: deliveryPolicy, + AckPolicy: AckExplicit, + AckWait: time.Duration(maxAckTimeMs) * time.Millisecond, + MaxDeliver: MaxMsgDeliveries, + FilterSubject: stationName.Intern() + ".final", + ReplayPolicy: ReplayInstant, + MaxAckPending: -1, + HeadersOnly: false, + // RateLimit: ,// Bits per sec + // Heartbeat: // time.Duration, + } - consumerConfig := &ConsumerConfig{ - Durable: consumerName, - DeliverPolicy: deliveryPolicy, - AckPolicy: AckExplicit, - AckWait: time.Duration(maxAckTimeMs) * time.Millisecond, - MaxDeliver: MaxMsgDeliveries, - FilterSubject: stationName.Intern() + ".final", - ReplayPolicy: ReplayInstant, - MaxAckPending: -1, - HeadersOnly: false, - // RateLimit: ,// Bits per sec - // Heartbeat: // time.Duration, - } + if deliveryPolicy == DeliverByStartSequence { + consumerConfig.OptStartSeq = optStartSeq + } + err = s.memphisAddConsumer(tenantName, stationName.Intern(), consumerConfig) + return err + } else { + for _, pl := range partitionsList { + consumerConfig := &ConsumerConfig{ + Durable: consumerName, + DeliverPolicy: deliveryPolicy, + AckPolicy: AckExplicit, + AckWait: time.Duration(maxAckTimeMs) * time.Millisecond, + MaxDeliver: MaxMsgDeliveries, + FilterSubject: stationName.Intern() + "$" + strconv.Itoa(pl) + ".final", + ReplayPolicy: ReplayInstant, + MaxAckPending: -1, + HeadersOnly: false, + // RateLimit: ,// Bits per sec + // Heartbeat: // time.Duration, + } - if deliveryPolicy == DeliverByStartSequence { - consumerConfig.OptStartSeq = optStartSeq + if deliveryPolicy == DeliverByStartSequence { + consumerConfig.OptStartSeq = optStartSeq + } + err = s.memphisAddConsumer(tenantName, stationName.Intern() + "$" + strconv.Itoa(pl), consumerConfig) + if err != nil { + return err + } + } } - err = s.memphisAddConsumer(tenantName, stationName.Intern(), consumerConfig) - return err + return nil } func (s *Server) memphisAddConsumer(tenantName, streamName string, cc *ConsumerConfig) error { @@ -659,9 +698,19 @@ func (s *Server) memphisAddConsumer(tenantName, streamName string, cc *ConsumerC return resp.ToError() } -func (s *Server) RemoveConsumer(tenantName string, stationName StationName, cn string) error { +func (s *Server) RemoveConsumer(tenantName string, stationName StationName, cn string, partitionsList []int) error { cn = getInternalConsumerName(cn) - return s.memphisRemoveConsumer(tenantName, stationName.Intern(), cn) + if len(partitionsList) == 0 { + return s.memphisRemoveConsumer(tenantName, stationName.Intern(), cn) + } else { + for _, pl := range partitionsList { + err := s.memphisRemoveConsumer(tenantName, stationName.Intern()+"$"+strconv.Itoa(pl), cn) + if err != nil { + return err + } + } + } + return nil } func (s *Server) memphisRemoveConsumer(tenantName, streamName, cn string) error { @@ -675,22 +724,47 @@ func (s *Server) memphisRemoveConsumer(tenantName, streamName, cn string) error return resp.ToError() } -func (s *Server) GetCgInfo(tenantName string, stationName StationName, cgName string) (*ConsumerInfo, error) { - cgName = replaceDelimiters(cgName) - requestSubject := fmt.Sprintf(JSApiConsumerInfoT, stationName.Intern(), cgName) - +func (s *Server) GetCgInfo(tenantName string, stationName StationName, cgName string, partitionsList []int) (*ConsumerInfo, error) { var resp JSApiConsumerInfoResponse - err := jsApiRequest(tenantName, s, requestSubject, kindConsumerInfo, []byte(_EMPTY_), &resp) - if err != nil { - return nil, err - } - - err = resp.ToError() - if err != nil { - return nil, err + cgName = replaceDelimiters(cgName) + var cgInfo *ConsumerInfo + if len(partitionsList) == 0 { + requestSubject := fmt.Sprintf(JSApiConsumerInfoT, stationName.Intern(), cgName) + err := jsApiRequest(tenantName, s, requestSubject, kindConsumerInfo, []byte(_EMPTY_), &resp) + if err != nil { + return nil, err + } + err = resp.ToError() + if err != nil { + return nil, err + } + cgInfo = resp.ConsumerInfo + } else { + init := false + for _, pl := range partitionsList { + stationWithPartition := fmt.Sprintf("%s$%s", stationName.Intern(), strconv.Itoa(pl)) + requestSubject := fmt.Sprintf(JSApiConsumerInfoT, stationWithPartition, cgName) + err := jsApiRequest(tenantName, s, requestSubject, kindConsumerInfo, []byte(_EMPTY_), &resp) + if err != nil { + return nil, err + } + err = resp.ToError() + if err != nil { + return nil, err + } + if !init { + cgInfo = resp.ConsumerInfo + init = true + } else { + cgInfo.NumAckPending += resp.ConsumerInfo.NumAckPending + cgInfo.NumRedelivered += resp.ConsumerInfo.NumRedelivered + cgInfo.NumWaiting += resp.ConsumerInfo.NumWaiting + cgInfo.NumPending += resp.ConsumerInfo.NumPending + } + } } - return resp.ConsumerInfo, nil + return cgInfo, nil } func (s *Server) RemoveStream(tenantName, streamName string) error { @@ -705,8 +779,14 @@ func (s *Server) RemoveStream(tenantName, streamName string) error { return resp.ToError() } -func (s *Server) PurgeStream(tenantName, streamName string) error { - requestSubject := fmt.Sprintf(JSApiStreamPurgeT, streamName) +func (s *Server) PurgeStream(tenantName, streamName string, partitionNumber int) error { + var streamAndPartition string + if partitionNumber == -1 { + streamAndPartition = streamName + } else { + streamAndPartition = fmt.Sprintf("%v$%v", streamName, partitionNumber) + } + requestSubject := fmt.Sprintf(JSApiStreamPurgeT, streamAndPartition) var resp JSApiStreamPurgeResponse err := jsApiRequest(tenantName, s, requestSubject, kindPurgeStream, []byte(_EMPTY_), &resp) @@ -726,8 +806,14 @@ func (s *Server) MemphisVersion() string { return string(data) } -func (s *Server) RemoveMsg(tenantName string, stationName StationName, msgSeq uint64) error { - requestSubject := fmt.Sprintf(JSApiMsgDeleteT, stationName.Intern()) +func (s *Server) RemoveMsg(tenantName string, stationName StationName, msgSeq uint64, partitionNumber int) error { + var streamName string + if partitionNumber == -1 { + streamName = stationName.Intern() + } else { + streamName = fmt.Sprintf("%v$%v", stationName.Intern(), partitionNumber) + } + requestSubject := fmt.Sprintf(JSApiMsgDeleteT, streamName) var resp JSApiMsgDeleteResponse req := JSApiMsgDeleteRequest{Seq: msgSeq} @@ -740,8 +826,8 @@ func (s *Server) RemoveMsg(tenantName string, stationName StationName, msgSeq ui return resp.ToError() } -func (s *Server) GetTotalMessagesInStation(tenantName string, stationName StationName) (int, error) { - streamInfo, err := s.memphisStreamInfo(tenantName, stationName.Intern()) +func (s *Server) GetTotalMessagesInStation(tenantName string, streamName string) (int, error) { + streamInfo, err := s.memphisStreamInfo(tenantName, streamName) if err != nil { return 0, err } @@ -790,12 +876,45 @@ func (s *Server) GetAvgMsgSizeInStation(station models.Station) (int64, error) { return 0, err } - streamInfo, err := s.memphisStreamInfo(station.TenantName, stationName.Intern()) + var msgBytes uint64 + var msgCount uint64 + if len(station.PartitionsList) == 0 { + streamInfo, err := s.memphisStreamInfo(station.TenantName, stationName.Intern()) + if err != nil || streamInfo.State.Bytes == 0 { + return 0, err + } + msgBytes = streamInfo.State.Bytes + msgCount = streamInfo.State.Msgs + } else { + for _, p := range station.PartitionsList { + streamInfo, err := s.memphisStreamInfo(station.TenantName, fmt.Sprintf("%v$%v", stationName.Intern(), p)) + if err != nil { + return 0, err + } + msgBytes = msgBytes + streamInfo.State.Bytes + msgCount = msgCount + streamInfo.State.Msgs + } + } + + if err != nil || msgBytes == 0 { + return 0, err + } + + return int64(msgBytes / msgCount), nil +} + +func (s *Server) GetAvgMsgSizeInPartition(tenantName, streamName string) (int64, error) { + var msgBytes uint64 + var msgCount uint64 + + streamInfo, err := s.memphisStreamInfo(tenantName, streamName) if err != nil || streamInfo.State.Bytes == 0 { return 0, err } + msgBytes = streamInfo.State.Bytes + msgCount = streamInfo.State.Msgs - return int64(streamInfo.State.Bytes / streamInfo.State.Msgs), nil + return int64(msgBytes / msgCount), nil } func (s *Server) memphisAllStreamsInfo(tenantName string) ([]*StreamInfo, error) { @@ -849,7 +968,44 @@ func (s *Server) GetMessages(station models.Station, messagesToFetch int) ([]mod if err != nil { return []models.MessageDetails{}, err } - streamInfo, err := s.memphisStreamInfo(station.TenantName, stationName.Intern()) + + if len(station.PartitionsList) == 0 { + return s.GetMessagesFromPartition(station, stationName.Intern(), messagesToFetch, 0) + } else { + var messages Messages + for _, p := range station.PartitionsList { + partitionMessages, err := s.GetMessagesFromPartition(station, fmt.Sprintf("%v$%v", stationName.Intern(), p), messagesToFetch, p) + if err != nil { + return []models.MessageDetails{}, err + } + messages = append(messages, partitionMessages...) + } + + if len(messages) == 0 { + return []models.MessageDetails{}, nil + } else if len(messages) <= 1000 { + return messages, nil + } else { + sort.Sort(messages) + return messages[:1000], nil + } + } +} + +func (msgs Messages) Len() int { + return len(msgs) +} + +func (msgs Messages) Less(i, j int) bool { + return msgs[i].TimeSent.Before(msgs[j].TimeSent) +} + +func (msgs Messages) Swap(i, j int) { + msgs[i], msgs[j] = msgs[j], msgs[i] +} + +func (s *Server) GetMessagesFromPartition(station models.Station, streamName string, messagesToFetch int, partition int) ([]models.MessageDetails, error) { + streamInfo, err := s.memphisStreamInfo(station.TenantName, streamName) if err != nil { return []models.MessageDetails{}, err } @@ -863,13 +1019,13 @@ func (s *Server) GetMessages(station models.Station, messagesToFetch int) ([]mod messagesToFetch = int(totalMessages) } - filterSubj := stationName.Intern() + ".final" + filterSubj := streamName + ".final" if !station.IsNative { filterSubj = "" } msgs, err := s.memphisGetMsgs(station.TenantName, filterSubj, - stationName.Intern(), + streamName, startSequence, messagesToFetch, 5*time.Second, @@ -927,6 +1083,7 @@ func (s *Server) GetMessages(station models.Station, messagesToFetch int) ([]mod messageDetails.ProducedBy = producedByHeader messageDetails.ConnectionId = connectionIdHeader messageDetails.Headers = headersJson + messageDetails.Partition = partition } messages = append(messages, messageDetails) @@ -1047,8 +1204,15 @@ cleanup: return msgs, nil } -func (s *Server) GetMessage(tenantName string, stationName StationName, msgSeq uint64) (*StoredMsg, error) { - return s.memphisGetMessage(tenantName, stationName.Intern(), msgSeq) +func (s *Server) GetMessage(tenantName string, stationName StationName, msgSeq uint64, paritionNumber int) (*StoredMsg, error) { + var streamName string + if paritionNumber != -1 { + streamName = fmt.Sprintf("%v$%v", stationName.Intern(), paritionNumber) + } else { + streamName = stationName.Intern() + } + + return s.memphisGetMessage(tenantName, streamName, msgSeq) } func (s *Server) GetLeaderAndFollowers(station models.Station) (string, []string, error) { @@ -1478,7 +1642,7 @@ func (s *Server) MoveResourcesFromOldToNewDefaultAcc() error { return err } stationsMap[station.ID] = station - err = s.CreateStream(MEMPHIS_GLOBAL_ACCOUNT, stationName, station.RetentionType, station.RetentionValue, station.StorageType, station.IdempotencyWindow, station.Replicas, station.TieredStorageEnabled) + err = s.CreateStream(MEMPHIS_GLOBAL_ACCOUNT, stationName, station.RetentionType, station.RetentionValue, station.StorageType, station.IdempotencyWindow, station.Replicas, station.TieredStorageEnabled, 0) if err != nil { return err } @@ -1493,7 +1657,7 @@ func (s *Server) MoveResourcesFromOldToNewDefaultAcc() error { } for _, consumer := range consumers { station := stationsMap[consumer.StationId] - err = s.CreateConsumer(consumer.TenantName, consumer, station) + err = s.CreateConsumer(consumer.TenantName, consumer, station, station.PartitionsList) if err != nil { return err } diff --git a/server/memphis_sdk_handlers.go b/server/memphis_sdk_handlers.go index 9817d3273..6204e6b83 100644 --- a/server/memphis_sdk_handlers.go +++ b/server/memphis_sdk_handlers.go @@ -36,6 +36,7 @@ type createStationRequest struct { Username string `json:"username"` TieredStorageEnabled bool `json:"tiered_storage_enabled"` TenantName string `json:"tenant_name"` + PartitionsNumber int `json:"partitions_number"` } type destroyStationRequest struct { @@ -66,8 +67,14 @@ type createConsumerResponse struct { Err string `json:"error"` } +type createConsumerResponseV1 struct { + Partitions []int `json:"partitions"` + Err string `json:"error"` +} + type createProducerResponse struct { SchemaUpdate models.ProducerSchemaUpdateInit `json:"schema_update"` + PartitionsUpdate models.PartitionsUpdate `json:"partitions_update"` SchemaVerseToDls bool `json:"schemaverse_to_dls"` ClusterSendNotification bool `json:"send_notification"` Err string `json:"error"` @@ -164,6 +171,10 @@ func (ccr *createConsumerResponse) SetError(err error) { ccr.Err = err.Error() } +func (ccr *createConsumerResponseV1) SetError(err error) { + ccr.Err = err.Error() +} + func (csresp *SchemaResponse) SetError(err error) { if err != nil { csresp.Err = err.Error() diff --git a/server/memphis_zombie_resources.go b/server/memphis_zombie_resources.go index a5b977c88..51ca58631 100644 --- a/server/memphis_zombie_resources.go +++ b/server/memphis_zombie_resources.go @@ -13,6 +13,7 @@ package server import ( "encoding/json" + "fmt" "memphis/db" "memphis/models" "sync" @@ -20,6 +21,7 @@ import ( ) func (srv *Server) removeStaleStations() { + // TODO - handle stale partition and deleting its resources stations, err := db.GetActiveStations() if err != nil { srv.Errorf("removeStaleStations: %v", err.Error()) @@ -27,9 +29,15 @@ func (srv *Server) removeStaleStations() { for _, s := range stations { go func(srv *Server, s models.Station) { stationName, _ := StationNameFromStr(s.Name) - _, err = srv.memphisStreamInfo(s.TenantName, stationName.Intern()) - if IsNatsErr(err, JSStreamNotFoundErr) { - srv.Warnf("[tenant: %v]removeStaleStations: Found zombie station to delete: %v", s.TenantName, s.Name) + var partitionsToDelete []int + for _, p := range s.PartitionsList { + _, err = srv.memphisStreamInfo(s.TenantName, fmt.Sprintf("%v$%v", stationName.Intern(), p)) + if IsNatsErr(err, JSStreamNotFoundErr) { + partitionsToDelete = append(partitionsToDelete, p) + } + } + + if 0 < len(partitionsToDelete) { err := removeStationResources(srv, s, false) if err != nil { srv.Errorf("[tenant: %v]removeStaleStations at removeStationResources: %v", s.TenantName, err.Error()) diff --git a/ui_src/src/assets/images/partitionIcon.svg b/ui_src/src/assets/images/partitionIcon.svg new file mode 100644 index 000000000..767b852d5 --- /dev/null +++ b/ui_src/src/assets/images/partitionIcon.svg @@ -0,0 +1,4 @@ + + + + diff --git a/ui_src/src/components/createStationForm/index.js b/ui_src/src/components/createStationForm/index.js index c15a1a8b1..1b1f00dbc 100644 --- a/ui_src/src/components/createStationForm/index.js +++ b/ui_src/src/components/createStationForm/index.js @@ -11,7 +11,6 @@ // A "Service" is a commercial offering, product, hosted, or managed service, that allows third parties (other than your own employees and contractors acting on your behalf) to access and/or use the Licensed Work or a substantial set of the features or functionality of the Licensed Work to third parties as a software-as-a-service, platform-as-a-service, infrastructure-as-a-service or other similar services that compete with Licensor products or services. import './style.scss'; -import CheckCircleIcon from '@material-ui/icons/CheckCircle'; import React, { useState, useEffect } from 'react'; import { useHistory } from 'react-router-dom'; import { Form } from 'antd'; @@ -168,7 +167,8 @@ const CreateStationForm = ({ createStationFormRef, getStartedStateRef, finishUpd dls_configuration: { poison: dlsConfiguration, schemaverse: dlsConfiguration - } + }, + partitions_number: formFields.partitions_number }; if ((getStarted && getStartedStateRef?.completedSteps === 0) || !getStarted) createStation(bodyRequest); else finishUpdate(); @@ -295,33 +295,57 @@ const CreateStationForm = ({ createStationFormRef, getStartedStateRef, finishUpd )} - {!isCloud() && ( +
+ {!isCloud() && ( +
+ +
+ + getStarted && updateFormState('replicas', e)} + disabled={!allowEdit} + /> + +
+
+ )}
- +
- - getStarted && updateFormState('replicas', e)} + + getStarted && updateFormState('partitions_number', e)} disabled={!allowEdit} />
- )} +
diff --git a/ui_src/src/components/createStationForm/style.scss b/ui_src/src/components/createStationForm/style.scss index a7c5501a8..abd75b78c 100644 --- a/ui_src/src/components/createStationForm/style.scss +++ b/ui_src/src/components/createStationForm/style.scss @@ -11,7 +11,7 @@ display: flex; flex-direction: column; } - .left-gs{ + .left-gs { gap: 20px; } .right-side { @@ -48,7 +48,7 @@ border-bottom-right-radius: 8px; .description { font-size: 13px; - color: #A9A9A9; + color: #a9a9a9; } } } @@ -67,10 +67,15 @@ display: flex; flex-direction: column; } - .replicas-container { + .replicas-partition-container { + display: grid; + grid-template-columns: 47% 47%; justify-content: space-between; - display: flex; - flex-direction: column; + .replicas-container { + justify-content: space-between; + display: flex; + flex-direction: column; + } } .header-getstarted-form { width: 168px; @@ -140,30 +145,30 @@ .minutes-section, .seconds-section { text-align: center; - + margin-bottom: 15px; p { font-size: 11px; font-family: 'Inter'; - line-height: 23px; + line-height: 18px; + margin: 0; } } .separator { - margin: 3px 5px 0px 5px; + margin: 8px 5px 0px 5px; font-size: 14px; font-family: 'InterBold'; - line-height: 23px; color: var(--gray); } } .ant-input-number { - width: 60px; - height: 38px; + width: 100%; + height: 40px; border-radius: 4px; border: 1px solid var(--gray); } .ant-input-number-input { - height: 38px; + height: 40px; } .ant-input-number-focused { box-shadow: unset; @@ -184,11 +189,11 @@ margin-bottom: 10px; } } - .ackbased-type{ + .ackbased-type { height: 82px; - p{ + p { font-size: 14px; - color: #A9A9A9; + color: #a9a9a9; } } .toggle-add-schema { diff --git a/ui_src/src/components/partitionsFilter/index.js b/ui_src/src/components/partitionsFilter/index.js new file mode 100644 index 000000000..46ed0bb1e --- /dev/null +++ b/ui_src/src/components/partitionsFilter/index.js @@ -0,0 +1,75 @@ +// Copyright 2022-2023 The Memphis.dev Authors +// Licensed under the Memphis Business Source License 1.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// Changed License: [Apache License, Version 2.0 (https://www.apache.org/licenses/LICENSE-2.0), as published by the Apache Foundation. +// +// https://github.com/memphisdev/memphis/blob/master/LICENSE +// +// Additional Use Grant: You may make use of the Licensed Work (i) only as part of your own product or service, provided it is not a message broker or a message queue product or service; and (ii) provided that you do not use, provide, distribute, or make available the Licensed Work as a Service. +// A "Service" is a commercial offering, product, hosted, or managed service, that allows third parties (other than your own employees and contractors acting on your behalf) to access and/or use the Licensed Work or a substantial set of the features or functionality of the Licensed Work to third parties as a software-as-a-service, platform-as-a-service, infrastructure-as-a-service or other similar services that compete with Licensor products or services. + +import './style.scss'; + +import React, { useContext, useState } from 'react'; +import { Divider, Popover } from 'antd'; +import partitionIcon from '../../assets/images/partitionIcon.svg'; +import CollapseArrow from '../../assets/images/collapseArrow.svg'; +import { StationStoreContext } from '../../domain/stationOverview'; + +const PartitionsFilter = ({ partitions_number, height }) => { + const [stationState, stationDispatch] = useContext(StationStoreContext); + const [isOpen, setIsOpen] = useState(false); + const [selectedPartition, setSelectedPartition] = useState(-1); + + const handleApply = (i) => { + setSelectedPartition(i === 0 ? -1 : i); + stationDispatch({ type: 'SET_STATION_PARTITION', payload: i === 0 ? -1 : i }); + setIsOpen(false); + }; + + const handleOpenChange = () => { + setIsOpen(!isOpen); + }; + + const getItems = () => { + let elements = []; + for (let i = 0; i <= partitions_number; i++) { + elements.push( +
handleApply(i)}> + + PartitionIcon {i == 0 ? 'All partitions' : `Partition ${i}`} + +
+ ); + } + return elements; + }; + const getContent = () => { + return ( +
+
+

Filter

+ +
+
{getItems()}
+
+ ); + }; + + return ( + +
+
+
+ + {selectedPartition == -1 ? `All partitions` : `Partition ${selectedPartition}`} +
+ CollapseArrow +
+
+
+ ); +}; +export default PartitionsFilter; diff --git a/ui_src/src/components/partitionsFilter/style.scss b/ui_src/src/components/partitionsFilter/style.scss new file mode 100644 index 000000000..1b4603e6c --- /dev/null +++ b/ui_src/src/components/partitionsFilter/style.scss @@ -0,0 +1,65 @@ +.filter-partitions-container { + max-height: 300px; + overflow-y: auto; +} +.filter-partitions-title { + font-family: 'InterSemiBold'; + font-size: 16px; + color: #2e2c34; + p { + margin-bottom: 0; + padding: 10px 0 10px 20px; + } + .ant-divider { + margin: 0; + } +} + +.partition-item { + cursor: pointer; + width: 200px; + height: 40px; + vertical-align: middle; + display: flex; + flex-direction: column; + justify-content: center; + img { + padding-left: 15px; + padding-right: 10px; + } +} +.partition-item:last-child { + border-radius: 0px 0px 8px 8px; +} +.partition-item:hover { + background-color: #f5f5f5; +} + +.filter-partition-container { + display: flex; + justify-content: space-between; + width: 160px; + cursor: pointer; + .filter-title { + color: #737373; + } +} +.filter-partition-btn { + border-radius: 50px; + color: rgb(29, 29, 29); + background-color: rgb(255, 255, 255); + width: 200px; + height: 34px; + border-color: rgb(255, 255, 255); + font-size: 14px; + font-weight: bold; + font-family: Inter; + opacity: 1; + min-width: 60px; + box-shadow: rgba(0, 0, 0, 0.21) 0px 1px 2px 0px; + line-height: 14px; + display: flex; + align-items: center; + justify-content: center; + cursor: pointer; +} diff --git a/ui_src/src/domain/overview/failedStations/index.js b/ui_src/src/domain/overview/failedStations/index.js index 83f372bb4..f33855e9d 100644 --- a/ui_src/src/domain/overview/failedStations/index.js +++ b/ui_src/src/domain/overview/failedStations/index.js @@ -50,6 +50,7 @@ const FailedStations = ({ createStationTrigger }) => {
Name Creation date + Partitions Stored messages Status @@ -66,6 +67,12 @@ const FailedStations = ({ createStationTrigger }) => { {parsingDate(station.created_at)} + + {station?.partitions_list ? station?.partitions_list?.length?.toLocaleString() : 1} + {station.total_messages?.toLocaleString()} diff --git a/ui_src/src/domain/overview/failedStations/style.scss b/ui_src/src/domain/overview/failedStations/style.scss index 999315687..8fec14ad8 100644 --- a/ui_src/src/domain/overview/failedStations/style.scss +++ b/ui_src/src/domain/overview/failedStations/style.scss @@ -38,11 +38,11 @@ padding: 15px 0 15px 15px; width: 100%; display: grid; - grid-template-columns: 25% 20% 20% 13% 15%; - gap:5px; + grid-template-columns: 20% 15% 15% 15% 13% 15%; + gap: 5px; justify-content: space-between; color: #737373; - .title-center{ + .title-center { justify-content: center; } span { @@ -64,8 +64,8 @@ padding: 0 0 5px 15px; .stations-row { display: grid; - grid-template-columns: 25% 20% 20% 13% 15%; - gap:5px; + grid-template-columns: 20% 15% 15% 15% 13% 15%; + gap: 5px; color: #1d1d1d; justify-content: space-between; width: 100%; @@ -91,7 +91,7 @@ padding-left: 20px; } } - .even{ + .even { background-color: #f8f8f8; border-radius: 5px; } @@ -114,7 +114,7 @@ .centered { display: flex; justify-content: center; - text-align:center; + text-align: center; .staion-link { cursor: pointer; width: 120px; @@ -139,8 +139,8 @@ opacity: 0.8; } } - .lottie{ - svg{ + .lottie { + svg { width: 90px !important; } } diff --git a/ui_src/src/domain/overview/getStarted/index.js b/ui_src/src/domain/overview/getStarted/index.js index bd1ce6bed..af0f10e69 100644 --- a/ui_src/src/domain/overview/getStarted/index.js +++ b/ui_src/src/domain/overview/getStarted/index.js @@ -74,6 +74,7 @@ const initialState = { retention_value: 604800, storage_type: 'file', replicas: 'No HA (1)', + partitions_number: 1, days: 7, hours: 0, minutes: 0, diff --git a/ui_src/src/domain/stationOverview/hooks/reducer.js b/ui_src/src/domain/stationOverview/hooks/reducer.js index 4cb62d543..54fb95393 100644 --- a/ui_src/src/domain/stationOverview/hooks/reducer.js +++ b/ui_src/src/domain/stationOverview/hooks/reducer.js @@ -18,6 +18,11 @@ const Reducer = (stationState, action) => { ...stationState, stationMetaData: action.payload }; + case 'SET_STATION_PARTITION': + return { + ...stationState, + stationPartition: action.payload + }; case 'SET_SOCKET_DATA': return { ...stationState, @@ -58,6 +63,11 @@ const Reducer = (stationState, action) => { ...stationState, selectedRowId: action.payload }; + case 'SET_SELECTED_ROW_PARTITION': + return { + ...stationState, + selectedRowPartition: action.payload + }; case 'SET_SCHEMA_TYPE': return { ...stationState, diff --git a/ui_src/src/domain/stationOverview/index.js b/ui_src/src/domain/stationOverview/index.js index 83af0a478..73bf97b01 100644 --- a/ui_src/src/domain/stationOverview/index.js +++ b/ui_src/src/domain/stationOverview/index.js @@ -28,7 +28,8 @@ import { StringCodec, JSONCodec } from 'nats.ws'; const initializeState = { stationMetaData: { is_native: true }, - stationSocketData: {} + stationSocketData: {}, + stationPartition: -1 }; const StationOverview = () => { @@ -65,7 +66,10 @@ const StationOverview = () => { const getStationDetails = async () => { try { - const data = await httpRequest('GET', `${ApiEndpoints.GET_STATION_DATA}?station_name=${stationName}`); + const data = await httpRequest( + 'GET', + `${ApiEndpoints.GET_STATION_DATA}?station_name=${stationName}&partition_number=${stationState?.stationPartition || -1}` + ); await sortData(data); stationDispatch({ type: 'SET_SOCKET_DATA', payload: data }); stationDispatch({ type: 'SET_SCHEMA_TYPE', payload: data.schema.schema_type }); @@ -77,6 +81,9 @@ const StationOverview = () => { } } }; + useEffect(() => { + getStationDetails(); + }, [stationState?.stationPartition || stationState?.stationSocketData?.total_messages || stationState?.stationSocketData?.total_dls_messages]); useEffect(() => { setisLoading(true); @@ -92,10 +99,13 @@ const StationOverview = () => { const subscribeAndListen = async () => { try { (async () => { - const rawBrokerName = await state.socket?.request(`$memphis_ws_subs.station_overview_data.${stationName}`, sc.encode('SUB')); + const rawBrokerName = await state.socket?.request( + `$memphis_ws_subs.station_overview_data.${stationName}.${stationState?.stationPartition || -1}`, + sc.encode('SUB') + ); if (rawBrokerName) { const brokerName = JSON.parse(sc.decode(rawBrokerName?._rdata))['name']; - sub = state.socket?.subscribe(`$memphis_ws_pubs.station_overview_data.${stationName}.${brokerName}`); + sub = state.socket?.subscribe(`$memphis_ws_pubs.station_overview_data.${stationName}.${stationState?.stationPartition}.${brokerName}`); listenForUpdates(); } })(); @@ -129,7 +139,7 @@ const StationOverview = () => { } } }; - }, [state.socket]); + }, [stationState?.stationPartition]); return ( diff --git a/ui_src/src/domain/stationOverview/stationObservabilty/components/messageDetails/index.js b/ui_src/src/domain/stationOverview/stationObservabilty/components/messageDetails/index.js index f7485ae80..8b0770fa2 100644 --- a/ui_src/src/domain/stationOverview/stationObservabilty/components/messageDetails/index.js +++ b/ui_src/src/domain/stationOverview/stationObservabilty/components/messageDetails/index.js @@ -45,20 +45,22 @@ const MessageDetails = ({ isDls, isFailedSchemaMessage = false }) => { }, [messageDetails]); useEffect(() => { - if (stationState?.selectedRowId && !loadMessageData) { - getMessageDetails(stationState?.selectedRowId); + if ((isDls && stationState?.selectedRowId && stationState?.selectedRowPartition && !loadMessageData) || (stationState?.selectedRowId && !loadMessageData)) { + getMessageDetails(stationState?.selectedRowId, stationState?.selectedRowPartition === 0 ? -1 : stationState?.selectedRowPartition); } - }, [stationState?.selectedRowId]); + }, [stationState?.selectedRowId, stationState?.selectedRowPartition]); - const getMessageDetails = async (selectedRow) => { + const getMessageDetails = async (selectedRow, selectedRowPartition) => { setMessageDetails({}); setLoadMessageData(true); try { const data = await httpRequest( 'GET', - `${ApiEndpoints.GET_MESSAGE_DETAILS}?dls_type=${isFailedSchemaMessage ? 'schema' : 'poison'}&station_name=${stationName}&is_dls=${isDls}&message_id=${ - isDls ? parseInt(selectedRow) : -1 - }&message_seq=${isDls ? -1 : selectedRow}` + `${ApiEndpoints.GET_MESSAGE_DETAILS}?dls_type=${ + isFailedSchemaMessage ? 'schema' : 'poison' + }&station_name=${stationName}&is_dls=${isDls}&partition_number=${selectedRowPartition}&message_id=${isDls ? parseInt(selectedRow) : -1}&message_seq=${ + isDls ? -1 : selectedRow + }` ); arrangeData(data); } catch (error) { diff --git a/ui_src/src/domain/stationOverview/stationObservabilty/components/purgeStationModal/index.js b/ui_src/src/domain/stationOverview/stationObservabilty/components/purgeStationModal/index.js index b2619de42..04c606209 100644 --- a/ui_src/src/domain/stationOverview/stationObservabilty/components/purgeStationModal/index.js +++ b/ui_src/src/domain/stationOverview/stationObservabilty/components/purgeStationModal/index.js @@ -46,6 +46,7 @@ const PurgeStationModal = ({ title, desc, cancel, stationName, msgsDisabled = fa try { let purgeDataPayload = purgeData; purgeDataPayload['station_name'] = stationName; + purgeDataPayload['partitions_list'] = [stationState?.stationPartition]; await httpRequest('DELETE', `${ApiEndpoints.PURGE_STATION}`, purgeDataPayload); stationDispatch({ type: 'SET_SELECTED_ROW_ID', payload: null }); // let data = stationState?.stationSocketData; diff --git a/ui_src/src/domain/stationOverview/stationObservabilty/messages/index.js b/ui_src/src/domain/stationOverview/stationObservabilty/messages/index.js index cd4de1945..07f343cc4 100644 --- a/ui_src/src/domain/stationOverview/stationObservabilty/messages/index.js +++ b/ui_src/src/domain/stationOverview/stationObservabilty/messages/index.js @@ -43,6 +43,7 @@ import { Virtuoso } from 'react-virtuoso'; const Messages = () => { const [stationState, stationDispatch] = useContext(StationStoreContext); const [selectedRowIndex, setSelectedRowIndex] = useState(null); + const [selectedRowPartition, setSelectedRowPartition] = useState(null); const [modalPurgeIsOpen, modalPurgeFlip] = useState(false); const [resendProcced, setResendProcced] = useState(false); const [ignoreProcced, setIgnoreProcced] = useState(false); @@ -58,10 +59,12 @@ const Messages = () => { const url = window.location.href; const stationName = url.split('stations/')[1]; - const onSelectedRow = (id) => { + const onSelectedRow = (id, partition) => { setUserScrolled(false); setSelectedRowIndex(id); + setSelectedRowPartition(partition); stationDispatch({ type: 'SET_SELECTED_ROW_ID', payload: id }); + stationDispatch({ type: 'SET_SELECTED_ROW_PARTITION', payload: partition }); }; const handleCheckedClick = (e) => { @@ -79,6 +82,7 @@ const Messages = () => { const handleChangeMenuItem = (newValue) => { stationDispatch({ type: 'SET_SELECTED_ROW_ID', payload: null }); + stationDispatch({ type: 'SET_SELECTED_ROW_PARTITION', payload: null }); setSelectedRowIndex(null); setIsCheck([]); @@ -97,6 +101,7 @@ const Messages = () => { const handleChangeSubMenuItem = (newValue) => { stationDispatch({ type: 'SET_SELECTED_ROW_ID', payload: null }); + stationDispatch({ type: 'SET_SELECTED_ROW_PARTITION', payload: null }); setSelectedRowIndex(null); setSubTabValue(newValue); setIsCheck([]); @@ -107,11 +112,14 @@ const Messages = () => { let messages; try { if (tabValue === tabs[0]) { - await httpRequest('DELETE', `${ApiEndpoints.REMOVE_MESSAGES}`, { station_name: stationName, message_seqs: isCheck }); + const message_seqs = isCheck.map((item) => { + return { message_seq: Number(item.split('_')[0]), partition_number: Number(item.split('_')[1]) }; + }); + await httpRequest('DELETE', `${ApiEndpoints.REMOVE_MESSAGES}`, { station_name: stationName, messages: message_seqs }); messages = stationState?.stationSocketData?.messages; isCheck.map((messageId, index) => { messages = messages?.filter((item) => { - return item.message_seq !== messageId; + return `${item.message_seq}_${item.partition}` !== messageId; }); }); } else { @@ -135,6 +143,7 @@ const Messages = () => { ? stationDispatch({ type: 'SET_POISON_MESSAGES', payload: messages }) : stationDispatch({ type: 'SET_FAILED_MESSAGES', payload: messages }); stationDispatch({ type: 'SET_SELECTED_ROW_ID', payload: null }); + stationDispatch({ type: 'SET_SELECTED_ROW_PARTITION', payload: null }); setSelectedRowIndex(null); setIsCheck([]); }, 1500); @@ -183,11 +192,23 @@ const Messages = () => { const listGenerator = (index, message) => { const id = tabValue === tabs[1] ? message?.id : message?.message_seq; + const partition = tabValue === tabs[1] ? null : message?.partition; return (
- -
onSelectedRow(id)}> - {selectedRowIndex === id &&
} + +
onSelectedRow(id, partition)} + > + {selectedRowIndex === id && selectedRowPartition === partition &&
} {tabValue === tabs[1] ? messageParser('string', message?.message?.data) : messageParser('string', message?.data)} @@ -344,7 +365,7 @@ const Messages = () => { {tabValue === tabs[1] && subTabValue === subTabs[0].name && stationState?.stationSocketData?.poison_messages?.length > 0 && listGeneratorWrapper()} {tabValue === tabs[1] && subTabValue === subTabs[1].name && stationState?.stationSocketData?.schema_failed_messages?.length > 0 && listGeneratorWrapper()} - {tabValue === tabs[0] && stationState?.stationSocketData?.messages === null && ( + {tabValue === tabs[0] && (stationState?.stationSocketData?.messages === null || stationState?.stationSocketData?.messages?.length === 0) && (
waitingMessages

No messages

diff --git a/ui_src/src/domain/stationOverview/stationOverviewHeader/index.js b/ui_src/src/domain/stationOverview/stationOverviewHeader/index.js index 2413fcacf..821e8d7ed 100644 --- a/ui_src/src/domain/stationOverview/stationOverviewHeader/index.js +++ b/ui_src/src/domain/stationOverview/stationOverviewHeader/index.js @@ -23,6 +23,8 @@ import averageMesIcon from '../../../assets/images/averageMesIcon.svg'; import stopUsingIcon from '../../../assets/images/stopUsingIcon.svg'; import schemaIconActive from '../../../assets/images/schemaIconActive.svg'; import DeleteItemsModal from '../../../components/deleteItemsModal'; +import PartitionsFilter from '../../../components/partitionsFilter'; + import awaitingIcon from '../../../assets/images/awaitingIcon.svg'; import TooltipComponent from '../../../components/tooltip/tooltip'; import redirectIcon from '../../../assets/images/redirectIcon.svg'; @@ -156,6 +158,11 @@ const StationOverviewHeader = () => {
+ {stationState?.stationMetaData?.partitions_number ? ( + + ) : ( + '' + )}
diff --git a/ui_src/src/domain/stationOverview/stationOverviewHeader/style.scss b/ui_src/src/domain/stationOverview/stationOverviewHeader/style.scss index 3257d9ea2..a582d5c79 100644 --- a/ui_src/src/domain/stationOverview/stationOverviewHeader/style.scss +++ b/ui_src/src/domain/stationOverview/stationOverviewHeader/style.scss @@ -16,7 +16,7 @@ img { cursor: pointer; } - + .tags-list { justify-content: space-between; margin-bottom: 0px; @@ -89,15 +89,18 @@ b { font-family: 'InterSemiBold'; } - .storage-section{ + p { + width: 50%; + } + .storage-section { display: flex; justify-content: space-between; - align-items:center; - div{ + align-items: center; + div { display: flex; align-items: center; } - span{ + span { position: relative; top: 1px; } @@ -198,7 +201,7 @@ width: 165px; } } - .schema-box{ + .schema-box { min-width: 210px; } .schema-header { @@ -237,7 +240,7 @@ border-left: 1px solid #f3f3f3; border-right: 1px solid #f3f3f3; } - .pointer .number{ + .pointer .number { cursor: pointer; width: fit-content; } @@ -279,7 +282,7 @@ } } .modal-wrapper { - .ant-modal-body{ + .ant-modal-body { padding-top: 0px !important; } .tabs-headers {