Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Station partitioning #1180

Merged
merged 57 commits into from
Aug 7, 2023
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
Show all changes
57 commits
Select commit Hold shift + click to select a range
c54a0d9
added partition number to creat station direct intern
Jul 27, 2023
ec4bffc
now can create partitions by passing a paramether
Jul 30, 2023
8d2f475
merged changes
Jul 30, 2023
4aea2cf
get overview works
Jul 30, 2023
ccba5ac
get avgMsgSize works with partitions
Jul 30, 2023
123297d
add partition to ui create station form
Jul 31, 2023
ef92ef9
now working with a slice of partitions
Jul 31, 2023
9a31c26
added get messages per partition
Jul 31, 2023
90ae168
fixed remove zombie resources, added version for station and changed …
Aug 1, 2023
eca02b5
stations, station and main overview works
Aug 1, 2023
c9bd94f
returning partition list and count at GetStation
Aug 1, 2023
eafdf0d
returning station data per partition and getting a message from a par…
Aug 2, 2023
bd2dde1
created max partitions
Aug 2, 2023
aa94e65
connect to endpoint
Aug 2, 2023
85e8a94
added ws station overview partition filtering support
Aug 3, 2023
e636bac
Update accounts_cycles_test.go
daniel-davidd Aug 3, 2023
d7caf1f
fixed bug
Aug 3, 2023
9aec31e
Merge branch 'station_partitioning' of https://github.com/memphisdev/…
Aug 3, 2023
00b9dab
fix alterStationsTable
shay23b Aug 3, 2023
cf764d5
.
Aug 3, 2023
76935b3
created return of partition list on producer creation
Aug 3, 2023
dfa78b4
Merge branch 'station_partitioning', remote-tracking branch 'origin' …
Aug 3, 2023
599fa56
fixed returnings at create producer direct commom
Aug 3, 2023
4b4ecbe
socket
Aug 3, 2023
e9483f9
getstarted and partition limit
Aug 4, 2023
8ba798a
partition num
Aug 4, 2023
e4f1996
cahnged default value to -1 and fixed purge and delete message
Aug 5, 2023
a279989
Merge branch 'station_partitioning', remote-tracking branch 'origin' …
Aug 5, 2023
b560735
added purge for spacific partition
Aug 5, 2023
a92d04a
changed logs at GetStationOverviewData
Aug 6, 2023
85c4e18
purge and remove msg
Aug 6, 2023
4ab8fa9
fix ws
Aug 6, 2023
feddbae
consumer wip
shay23b Aug 6, 2023
0c949bd
changed dls to work with partitions
Aug 6, 2023
5910a03
resolved comments
Aug 6, 2023
375712c
Merge branch 'station_partitioning', remote-tracking branch 'origin' …
Aug 6, 2023
912f11c
pulled master
Aug 7, 2023
ab5486d
cg info
shay23b Aug 7, 2023
3f8f718
pulled master
Aug 7, 2023
b5212a1
Merge pull request #1205 from memphisdev/producer_partitioning
daniel-davidd Aug 7, 2023
feb87df
Merge remote-tracking branch 'origin/station_partitioning' into stati…
shay23b Aug 7, 2023
4b15a31
fix consumer queries + handle dls
shay23b Aug 7, 2023
9cd496c
fixes
shay23b Aug 7, 2023
efef8e3
dls
Aug 7, 2023
e7ca9fd
Merge branch 'station_partitioning' into partitioning_ui
SvetaMemphis Aug 7, 2023
f0799fa
cr fixes
shay23b Aug 7, 2023
a2744ee
fix createConsumerDirectCommon
shay23b Aug 7, 2023
028879e
added partitions to the producer struct and table
Aug 7, 2023
b42ec25
Merge pull request #1206 from memphisdev/station_partitioning_consumer
shay23b Aug 7, 2023
f495c74
selectedRowPartition name change
Aug 7, 2023
86fed6c
Merge branch 'partitioning_ui' of https://github.com/memphisdev/memph…
Aug 7, 2023
e4e06d6
review fixes
Aug 7, 2023
3bc5d51
purge move to comment
Aug 7, 2023
30ad5c7
.
Aug 7, 2023
4e0222e
changed the create station request as the sdk
Aug 7, 2023
9557f3f
purge station back to prev
Aug 7, 2023
924c0a2
Merge pull request #1207 from memphisdev/partitioning_ui
avrhamNeeman Aug 7, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 16 additions & 4 deletions db/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -342,6 +342,8 @@ func createTables(MetadataDbClient MetadataStorage) error {
ALTER TYPE enum_retention_type ADD VALUE 'ack_based';
ALTER TABLE stations ADD COLUMN IF NOT EXISTS tenant_name VARCHAR NOT NULL DEFAULT '$memphis';
ALTER TABLE stations ADD COLUMN IF NOT EXISTS resend_disabled BOOL NOT NULL DEFAULT false;
ALTER TABLE stations ADD COLUMN IF NOT EXISTS partitions INTEGER[];
ALTER TABLE stations ADD COLUMN IF NOT EXISTS version INTEGER NOT NULL DEFAULT 0;
DROP INDEX IF EXISTS unique_station_name_deleted;
CREATE UNIQUE INDEX unique_station_name_deleted ON stations(name, is_deleted, tenant_name) WHERE is_deleted = false;
END IF;
Expand Down Expand Up @@ -371,6 +373,8 @@ func createTables(MetadataDbClient MetadataStorage) error {
tiered_storage_enabled BOOL NOT NULL,
tenant_name VARCHAR NOT NULL DEFAULT '$memphis',
resend_disabled BOOL NOT NULL DEFAULT false,
partitions INTEGER[],
version INTEGER NOT NULL DEFAULT 0,
PRIMARY KEY (id),
CONSTRAINT fk_tenant_name_stations
FOREIGN KEY(tenant_name)
Expand Down Expand Up @@ -1157,7 +1161,9 @@ func InsertNewStation(
isNative bool,
dlsConfiguration models.DlsConfiguration,
tieredStorageEnabled bool,
tenantName string) (models.Station, int64, error) {
tenantName string,
partitionsList []int,
version int) (models.Station, int64, error) {
ctx, cancelfunc := context.WithTimeout(context.Background(), DbOperationTimeout*time.Second)
defer cancelfunc()

Expand Down Expand Up @@ -1185,9 +1191,11 @@ func InsertNewStation(
dls_configuration_poison,
dls_configuration_schemaverse,
tiered_storage_enabled,
tenant_name
tenant_name,
partitions,
version
)
VALUES($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18) RETURNING id`
VALUES($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18, $19, $20) RETURNING id`

stmt, err := conn.Conn().Prepare(ctx, "insert_new_station", query)
if err != nil {
Expand All @@ -1202,7 +1210,7 @@ func InsertNewStation(
}
rows, err := conn.Conn().Query(ctx, stmt.Name,
stationName, retentionType, retentionValue, storageType, replicas, userId, username, createAt, updatedAt,
false, schemaName, schemaVersionUpdate, idempotencyWindow, isNative, dlsConfiguration.Poison, dlsConfiguration.Schemaverse, tieredStorageEnabled, tenantName)
false, schemaName, schemaVersionUpdate, idempotencyWindow, isNative, dlsConfiguration.Poison, dlsConfiguration.Schemaverse, tieredStorageEnabled, tenantName, partitionsList, version)
if err != nil {
return models.Station{}, 0, err
}
Expand Down Expand Up @@ -1254,6 +1262,8 @@ func InsertNewStation(
DlsConfigurationSchemaverse: dlsConfiguration.Schemaverse,
TieredStorageEnabled: tieredStorageEnabled,
TenantName: tenantName,
PartitionsList: partitionsList,
Version: version,
}

rowsAffected := rows.CommandTag().RowsAffected()
Expand Down Expand Up @@ -1487,6 +1497,8 @@ func GetAllStationsDetailsLight(tenantName string) ([]models.ExtendedStationLigh
&stationRes.TieredStorageEnabled,
&stationRes.TenantName,
&stationRes.ResendDisabled,
&stationRes.PartitionsList,
&stationRes.Version,
&stationRes.Activity,
); err != nil {
return []models.ExtendedStationLight{}, err
Expand Down
3 changes: 2 additions & 1 deletion models/monitoring.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,8 @@ type SystemComponentsStatus struct {
}

type GetStationOverviewDataSchema struct {
StationName string `form:"station_name" json:"station_name" binding:"required"`
StationName string `form:"station_name" json:"station_name" binding:"required"`
PartitionNumber int `form:"partition_number" json:"partition_number" binding:"required"`
}

type SystemLogsRequest struct {
Expand Down
19 changes: 14 additions & 5 deletions models/stations.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ type MessageDetails struct {
ConnectionId string `json:"connection_id" `
Size int `json:"size"`
Headers map[string]string `json:"headers"`
Partition int `json:"partition"`
}

type Station struct {
Expand All @@ -60,6 +61,8 @@ type Station struct {
TieredStorageEnabled bool `json:"tiered_storage_enabled"`
TenantName string `json:"tenant_name"`
ResendDisabled bool `json:"resend_disabled"`
PartitionsList []int `json:"partitions_list"`
Version int `json:"version"`
}

type GetStationResponseSchema struct {
Expand All @@ -80,6 +83,8 @@ type GetStationResponseSchema struct {
DlsConfiguration DlsConfiguration `json:"dls_configuration"`
TieredStorageEnabled bool `json:"tiered_storage_enabled"`
ResendDisabled bool `json:"resend_disabled"`
PartitionsList []int `json:"partitions_list"`
PartitionsNumber int `json:"partitions_number"`
}

type ExtendedStation struct {
Expand Down Expand Up @@ -133,6 +138,8 @@ type ExtendedStationLight struct {
TieredStorageEnabled bool `json:"tiered_storage_enabled,omitempty"`
TenantName string `json:"tenant_name"`
ResendDisabled bool `json:"resend_disabled"`
PartitionsList []int `json:"partitions_list"`
Version int `json"version"`
}

type ActiveProducersConsumersDetails struct {
Expand Down Expand Up @@ -165,6 +172,7 @@ type CreateStationSchema struct {
IdempotencyWindow int64 `json:"idempotency_window_in_ms"`
DlsConfiguration DlsConfiguration `json:"dls_configuration"`
TieredStorageEnabled bool `json:"tiered_storage_enabled"`
PartitionsNumber int `json:"partitions_number"`
}

type DlsConfiguration struct {
Expand Down Expand Up @@ -209,11 +217,12 @@ type GetPoisonMessageJourneySchema struct {
}

type GetMessageDetailsSchema struct {
IsDls bool `form:"is_dls" json:"is_dls"`
DlsType string `form:"dls_type" json:"dls_type"`
MessageId int `form:"message_id" json:"message_id"`
MessageSeq int `form:"message_seq" json:"message_seq"`
StationName string `form:"station_name" json:"station_name" binding:"required"`
IsDls bool `form:"is_dls" json:"is_dls"`
DlsType string `form:"dls_type" json:"dls_type"`
MessageId int `form:"message_id" json:"message_id"`
MessageSeq int `form:"message_seq" json:"message_seq"`
StationName string `form:"station_name" json:"station_name" binding:"required"`
PartitionNumber int `form:"partition_number" json:"partition_number" binding:"required"`
}

type UseSchema struct {
Expand Down
1 change: 1 addition & 0 deletions server/memphis_cloud.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ import (

const shouldCreateRootUserforGlobalAcc = true
const TENANT_SEQUENCE_START_ID = 2
const MAX_PARTITIONS = 5
daniel-davidd marked this conversation as resolved.
Show resolved Hide resolved

type BillingHandler struct{ S *Server }
type TenantHandler struct{ S *Server }
Expand Down
4 changes: 2 additions & 2 deletions server/memphis_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,15 +78,15 @@ func getUserDetailsFromMiddleware(c *gin.Context) (models.User, error) {
func CreateDefaultStation(tenantName string, s *Server, sn StationName, userId int, username string) (models.Station, bool, error) {
stationName := sn.Ext()
replicas := getDefaultReplicas()
err := s.CreateStream(tenantName, sn, "message_age_sec", 604800, "file", 120000, replicas, false)
err := s.CreateStream(tenantName, sn, "message_age_sec", 604800, "file", 120000, replicas, false, 1)
if err != nil {
return models.Station{}, false, err
}

schemaName := ""
schemaVersionNumber := 0

newStation, rowsUpdated, err := db.InsertNewStation(stationName, userId, username, "message_age_sec", 604800, "file", replicas, schemaName, schemaVersionNumber, 120000, true, models.DlsConfiguration{Poison: true, Schemaverse: true}, false, tenantName)
newStation, rowsUpdated, err := db.InsertNewStation(stationName, userId, username, "message_age_sec", 604800, "file", replicas, schemaName, schemaVersionNumber, 120000, true, models.DlsConfiguration{Poison: true, Schemaverse: true}, false, tenantName, []int{}, 1)
if err != nil {
return models.Station{}, false, err
}
Expand Down
102 changes: 75 additions & 27 deletions server/memphis_handlers_monitoring.go
Original file line number Diff line number Diff line change
Expand Up @@ -525,40 +525,88 @@ func (mh MonitoringHandler) GetStationOverviewData(c *gin.Context) {
c.AbortWithStatusJSON(500, gin.H{"message": "Server error"})
return
}
totalMessages, err := stationsHandler.GetTotalMessages(station.TenantName, station.Name)
if err != nil {
if IsNatsErr(err, JSStreamNotFoundErr) {
serv.Warnf("[tenant: %v][user: %v]GetStationOverviewData at GetAuditLogsByStation: nats error At station %v: does not exist", user.TenantName, user.Username, body.StationName)
c.AbortWithStatusJSON(SHOWABLE_ERROR_STATUS_CODE, gin.H{"message": "Station " + body.StationName + " does not exist"})
} else {
serv.Errorf("[tenant: %v][user: %v]GetStationOverviewData: At station %v: %v", user.TenantName, user.Username, body.StationName, err.Error())
c.AbortWithStatusJSON(500, gin.H{"message": "Server error"})

var totalMessages int
if body.PartitionNumber == 0 {
totalMessages, err = stationsHandler.GetTotalMessages(station.TenantName, station.Name, station.PartitionsList)
if err != nil {
if IsNatsErr(err, JSStreamNotFoundErr) {
serv.Warnf("[tenant: %v][user: %v]GetStationOverviewData at GetAuditLogsByStation: nats error At station %v: does not exist", user.TenantName, user.Username, body.StationName)
c.AbortWithStatusJSON(SHOWABLE_ERROR_STATUS_CODE, gin.H{"message": "Station " + body.StationName + " does not exist"})
} else {
serv.Errorf("[tenant: %v][user: %v]GetStationOverviewData: At station %v: %v", user.TenantName, user.Username, body.StationName, err.Error())
c.AbortWithStatusJSON(500, gin.H{"message": "Server error"})
}
return
}
return
} else {
totalMessages, err = stationsHandler.GetTotalPartitionMessages(station.TenantName, station.Name, body.PartitionNumber)
if err != nil {
if IsNatsErr(err, JSStreamNotFoundErr) {
serv.Warnf("[tenant: %v][user: %v]GetStationOverviewData at GetAuditLogsByStation: nats error At station %v: does not exist", user.TenantName, user.Username, body.StationName)
c.AbortWithStatusJSON(SHOWABLE_ERROR_STATUS_CODE, gin.H{"message": "Station " + body.StationName + " does not exist"})
} else {
serv.Errorf("[tenant: %v][user: %v]GetStationOverviewData: At station %v: %v", user.TenantName, user.Username, body.StationName, err.Error())
c.AbortWithStatusJSON(500, gin.H{"message": "Server error"})
}
return
}

}
avgMsgSize, err := stationsHandler.GetAvgMsgSize(station)
if err != nil {
if IsNatsErr(err, JSStreamNotFoundErr) {
serv.Warnf("[tenant: %v][user: %v]GetStationOverviewData at GetAvgMsgSize: At station %v: does not exist", user.TenantName, user.Username, body.StationName)
c.AbortWithStatusJSON(SHOWABLE_ERROR_STATUS_CODE, gin.H{"message": "Station " + body.StationName + " does not exist"})
} else {
serv.Errorf("[tenant: %v][user: %v]GetStationOverviewData at GetAvgMsgSize: At station %v: %v", user.TenantName, user.Username, body.StationName, err.Error())
c.AbortWithStatusJSON(500, gin.H{"message": "Server error"})

var avgMsgSize int64
if body.PartitionNumber == 0 {
avgMsgSize, err = stationsHandler.GetAvgMsgSize(station)
if err != nil {
if IsNatsErr(err, JSStreamNotFoundErr) {
serv.Warnf("[tenant: %v][user: %v]GetStationOverviewData at GetAvgMsgSize: At station %v: does not exist", user.TenantName, user.Username, body.StationName)
c.AbortWithStatusJSON(SHOWABLE_ERROR_STATUS_CODE, gin.H{"message": "Station " + body.StationName + " does not exist"})
} else {
serv.Errorf("[tenant: %v][user: %v]GetStationOverviewData at GetAvgMsgSize: At station %v: %v", user.TenantName, user.Username, body.StationName, err.Error())
c.AbortWithStatusJSON(500, gin.H{"message": "Server error"})
}
return
}
} else {
avgMsgSize, err = stationsHandler.GetPartitionAvgMsgSize(station.TenantName, fmt.Sprintf("%v$%v", stationName.Intern(), body.PartitionNumber))
if err != nil {
if IsNatsErr(err, JSStreamNotFoundErr) {
serv.Warnf("[tenant: %v][user: %v]GetStationOverviewData at GetAvgMsgSize: At station %v: does not exist", user.TenantName, user.Username, body.StationName)
c.AbortWithStatusJSON(SHOWABLE_ERROR_STATUS_CODE, gin.H{"message": "Station " + body.StationName + " does not exist"})
} else {
serv.Errorf("[tenant: %v][user: %v]GetStationOverviewData at GetAvgMsgSize: At station %v: %v", user.TenantName, user.Username, body.StationName, err.Error())
c.AbortWithStatusJSON(500, gin.H{"message": "Server error"})
}
return
}
return
}

messagesToFetch := 1000
messages, err := stationsHandler.GetMessages(station, messagesToFetch)
if err != nil {
if IsNatsErr(err, JSStreamNotFoundErr) {
serv.Warnf("[tenant: %v][user: %v]GetStationOverviewData at GetMessages: nats error At station %v: does not exist", user.TenantName, user.Username, body.StationName)
c.AbortWithStatusJSON(SHOWABLE_ERROR_STATUS_CODE, gin.H{"message": "Station " + body.StationName + " does not exist"})
} else {
serv.Errorf("GetStationOverviewData at GetMessages: At station " + body.StationName + ": " + err.Error())
c.AbortWithStatusJSON(500, gin.H{"message": "Server error"})
messages := make([]models.MessageDetails, 0)
if body.PartitionNumber == 0 {
messages, err = stationsHandler.GetMessages(station, messagesToFetch)
if err != nil {
if IsNatsErr(err, JSStreamNotFoundErr) {
serv.Warnf("[tenant: %v][user: %v]GetStationOverviewData at GetMessages: nats error At station %v: does not exist", user.TenantName, user.Username, body.StationName)
c.AbortWithStatusJSON(SHOWABLE_ERROR_STATUS_CODE, gin.H{"message": "Station " + body.StationName + " does not exist"})
} else {
serv.Errorf("GetStationOverviewData at GetMessages: At station " + body.StationName + ": " + err.Error())
c.AbortWithStatusJSON(500, gin.H{"message": "Server error"})
}
return
}
} else {
messages, err = stationsHandler.GetMessagesFromPartition(station, fmt.Sprintf("%v$%v", stationName.Intern(), body.PartitionNumber), messagesToFetch, body.PartitionNumber)
if err != nil {
if IsNatsErr(err, JSStreamNotFoundErr) {
serv.Warnf("[tenant: %v][user: %v]GetStationOverviewData at GetMessages: nats error At station %v: does not exist", user.TenantName, user.Username, body.StationName)
c.AbortWithStatusJSON(SHOWABLE_ERROR_STATUS_CODE, gin.H{"message": "Station " + body.StationName + " does not exist"})
} else {
serv.Errorf("GetStationOverviewData at GetMessages: At station " + body.StationName + ": " + err.Error())
c.AbortWithStatusJSON(500, gin.H{"message": "Server error"})
}
return
}
return
}

poisonMessages, schemaFailedMessages, totalDlsAmount, err := poisonMsgsHandler.GetDlsMsgsByStationLight(station)
Expand Down
Loading