Skip to content

Commit

Permalink
Station partitioning (#1180)
Browse files Browse the repository at this point in the history
* added partition number to creat station direct intern

* now can create partitions by passing a paramether

* get overview works

* get avgMsgSize works with partitions

* add partition to ui create station form

* now working with a slice of partitions

* added get messages per partition

* fixed remove zombie resources, added version for station and changed partition to start with 1

* stations, station  and main overview works

* returning partition list and count at GetStation

* returning station data per partition and getting a message from a partition

* created max partitions

* connect to endpoint

* added ws station overview partition filtering support

* Update accounts_cycles_test.go

* fixed bug

* fix alterStationsTable

* .

* created return of partition list on producer creation

* fixed returnings at create producer direct commom

* socket

* getstarted and partition limit

* partition num

* cahnged default value to -1 and fixed purge and delete message

* added purge for spacific partition

* changed logs at GetStationOverviewData

* purge and remove msg

* fix ws

* consumer wip

* changed dls to work with partitions

* resolved comments

* cg info

* fix consumer queries + handle dls

* fixes

* dls

* cr fixes

* fix createConsumerDirectCommon

* added partitions to the producer struct and table

* selectedRowPartition name change

* review fixes

* purge move to comment

* .

* changed the create station request as the sdk

* purge station back to prev

---------

Co-authored-by: daniel-davidd <daniel@ip-192-168-1-113.eu-central-1.compute.internal>
Co-authored-by: daniel-davidd <daniel@ip-192-168-1-103.eu-central-1.compute.internal>
Co-authored-by: svetaStrech <sveta@strech.io>
Co-authored-by: shay23b <shay@memphis.dev>
Co-authored-by: Sveta Gimpelson <74717402+SvetaMemphis@users.noreply.github.com>
Co-authored-by: Avraham Neeman <74565114+avrhamNeeman@users.noreply.github.com>
  • Loading branch information
7 people committed Aug 7, 2023
1 parent 27d3c78 commit 7081026
Show file tree
Hide file tree
Showing 33 changed files with 1,187 additions and 445 deletions.
182 changes: 123 additions & 59 deletions db/db.go

Large diffs are not rendered by default.

13 changes: 9 additions & 4 deletions models/consumers.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ type Consumer struct {
StartConsumeFromSeq uint64 `json:"start_consume_from_seq"`
LastMessages int64 `json:"last_messages"`
TenantName string `json:"tenant_name"`
PartitionsList []int `json:"partitions_list"`
}

type ExtendedConsumer struct {
Expand All @@ -40,6 +41,7 @@ type ExtendedConsumer struct {
MaxAckTimeMs int64 `json:"max_ack_time_ms"`
MaxMsgDeliveries int `json:"max_msg_deliveries"`
StationName string `json:"station_name,omitempty"`
PartitionsList []int `json:"partitions_list"`
Count int `json:"count"`
}

Expand All @@ -61,6 +63,7 @@ type Cg struct {
DisconnectedConsumers []ExtendedConsumer `json:"disconnected_consumers"`
DeletedConsumers []ExtendedConsumer `json:"deleted_consumers"`
LastStatusChangeDate time.Time `json:"last_status_change_date"`
PartitionsList []int `json:"partitions_list"`
}

type GetAllConsumersByStationSchema struct {
Expand Down Expand Up @@ -88,6 +91,7 @@ type CgMember struct {
IsActive bool `json:"is_active"`
MaxMsgDeliveries int `json:"max_msg_deliveries"`
MaxAckTimeMs int64 `json:"max_ack_time_ms"`
PartitionsList []int `json:"partitions_list"`
Count int `json:"count"`
}

Expand All @@ -102,8 +106,9 @@ type DelayedCgResp struct {
}

type LightCG struct {
CGName string `json:"cg_name"`
StationName string `json:"station_name"`
StationId int `json:"station_id"`
TenantName string `json:"tenant_name"`
CGName string `json:"cg_name"`
StationName string `json:"station_name"`
StationId int `json:"station_id"`
TenantName string `json:"tenant_name"`
PartitionsList []int `json:"partitions_list"`
}
2 changes: 2 additions & 0 deletions models/dead_letter_station.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ type SchemaVerseDlsMessageSdk struct {
Producer ProducerDetails `json:"producer"`
Message MessagePayload `json:"message"`
ValidationError string `json:"validation_error"`
PartitionNumber int `json:"partition_number"`
}

type DlsMessage struct {
Expand All @@ -74,6 +75,7 @@ type DlsMessage struct {
ValidationError string `json:"validation_error"`
TenantName string `json:"tenant_name"`
ProducerName string `json:"producer_name"`
PartitionNumber int `json:"partition_number"`
}

type DlsMsgResendAll struct {
Expand Down
3 changes: 2 additions & 1 deletion models/monitoring.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,8 @@ type SystemComponentsStatus struct {
}

type GetStationOverviewDataSchema struct {
StationName string `form:"station_name" json:"station_name" binding:"required"`
StationName string `form:"station_name" json:"station_name" binding:"required"`
PartitionNumber int `form:"partition_number" json:"partition_number" binding:"required"`
}

type SystemLogsRequest struct {
Expand Down
17 changes: 9 additions & 8 deletions models/producers.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,15 @@ import (
)

type Producer struct {
ID int `json:"id"`
Name string `json:"name"`
StationId int `json:"station_id"`
Type string `json:"type"`
ConnectionId string `json:"connection_id"`
IsActive bool `json:"is_active"`
UpdatedAt time.Time `json:"updated_at"`
TenantName string `json:"tenant_name"`
ID int `json:"id"`
Name string `json:"name"`
StationId int `json:"station_id"`
Type string `json:"type"`
ConnectionId string `json:"connection_id"`
IsActive bool `json:"is_active"`
UpdatedAt time.Time `json:"updated_at"`
TenantName string `json:"tenant_name"`
PartitionsList []int `json:"partitions"`
}

type ExtendedProducer struct {
Expand Down
39 changes: 29 additions & 10 deletions models/stations.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ type MessageDetails struct {
ConnectionId string `json:"connection_id" `
Size int `json:"size"`
Headers map[string]string `json:"headers"`
Partition int `json:"partition"`
}

type Station struct {
Expand All @@ -60,6 +61,8 @@ type Station struct {
TieredStorageEnabled bool `json:"tiered_storage_enabled"`
TenantName string `json:"tenant_name"`
ResendDisabled bool `json:"resend_disabled"`
PartitionsList []int `json:"partitions_list"`
Version int `json:"version"`
}

type GetStationResponseSchema struct {
Expand All @@ -80,6 +83,8 @@ type GetStationResponseSchema struct {
DlsConfiguration DlsConfiguration `json:"dls_configuration"`
TieredStorageEnabled bool `json:"tiered_storage_enabled"`
ResendDisabled bool `json:"resend_disabled"`
PartitionsList []int `json:"partitions_list"`
PartitionsNumber int `json:"partitions_number"`
}

type ExtendedStation struct {
Expand Down Expand Up @@ -133,6 +138,8 @@ type ExtendedStationLight struct {
TieredStorageEnabled bool `json:"tiered_storage_enabled,omitempty"`
TenantName string `json:"tenant_name"`
ResendDisabled bool `json:"resend_disabled"`
PartitionsList []int `json:"partitions_list"`
Version int `json"version"`
}

type ActiveProducersConsumersDetails struct {
Expand Down Expand Up @@ -165,6 +172,7 @@ type CreateStationSchema struct {
IdempotencyWindow int64 `json:"idempotency_window_in_ms"`
DlsConfiguration DlsConfiguration `json:"dls_configuration"`
TieredStorageEnabled bool `json:"tiered_storage_enabled"`
PartitionsNumber int `json:"partitions_number"`
}

type DlsConfiguration struct {
Expand All @@ -185,14 +193,20 @@ type DropDlsMessagesSchema struct {
}

type PurgeStationSchema struct {
StationName string `json:"station_name" binding:"required"`
PurgeDls bool `json:"purge_dls"`
PurgeStation bool `json:"purge_station"`
StationName string `json:"station_name" binding:"required"`
PurgeDls bool `json:"purge_dls"`
PurgeStation bool `json:"purge_station"`
PartitionsList []int `json:"partitions_list"`
}

type RemoveMessagesSchema struct {
StationName string `json:"station_name" binding:"required"`
MessageSeqs []uint64 `json:"message_seqs" binding:"required"`
StationName string `json:"station_name" binding:"required"`
Messages []MessageToDelete `json:"messages" binding:"required"`
}

type MessageToDelete struct {
MessageSeq uint64 `json:"message_seq" binding:"required"`
PartitionNumber int `json:"partition_number" binding:"required"`
}

type ResendPoisonMessagesSchema struct {
Expand All @@ -209,11 +223,12 @@ type GetPoisonMessageJourneySchema struct {
}

type GetMessageDetailsSchema struct {
IsDls bool `form:"is_dls" json:"is_dls"`
DlsType string `form:"dls_type" json:"dls_type"`
MessageId int `form:"message_id" json:"message_id"`
MessageSeq int `form:"message_seq" json:"message_seq"`
StationName string `form:"station_name" json:"station_name" binding:"required"`
IsDls bool `form:"is_dls" json:"is_dls"`
DlsType string `form:"dls_type" json:"dls_type"`
MessageId int `form:"message_id" json:"message_id"`
MessageSeq int `form:"message_seq" json:"message_seq"`
StationName string `form:"station_name" json:"station_name" binding:"required"`
PartitionNumber int `form:"partition_number" json:"partition_number" binding:"required"`
}

type UseSchema struct {
Expand All @@ -240,3 +255,7 @@ type StationOverviewSchemaDetails struct {
type GetUpdatesForSchema struct {
StationName string `form:"station_name" json:"station_name" binding:"required"`
}

type PartitionsUpdate struct {
PartitionsList []int `json:"partitions_list"`
}
2 changes: 1 addition & 1 deletion server/background_tasks.go
Original file line number Diff line number Diff line change
Expand Up @@ -608,7 +608,7 @@ func (s *Server) RemoveOldProducersAndConsumers() {
if _, ok := CGmap[cg.CGName]; !ok {
stationName, err := StationNameFromStr(cg.StationName)
if err == nil {
err = s.RemoveConsumer(cg.TenantName, stationName, cg.CGName)
err = s.RemoveConsumer(cg.TenantName, stationName, cg.CGName, cg.PartitionsList)
if err != nil {
serv.Errorf("RemoveOldProducersAndConsumers at RemoveConsumer: %v", err.Error())
}
Expand Down
3 changes: 2 additions & 1 deletion server/memphis_cloud.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ import (

const shouldCreateRootUserforGlobalAcc = true
const TENANT_SEQUENCE_START_ID = 2
const MAX_PARTITIONS = 10000

type BillingHandler struct{ S *Server }
type TenantHandler struct{ S *Server }
Expand Down Expand Up @@ -1999,7 +2000,7 @@ func (s *Server) SetDlsRetentionForExistTenants() error {
}

func validateRetentionType(retentionType string) error {
if retentionType == "ack_based"{
if retentionType == "ack_based" {
return errors.New("this type of retention is supported only on the cloud version of Memphis, available on cloud.memphis.dev")
}
if retentionType != "message_age_sec" && retentionType != "messages" && retentionType != "bytes" {
Expand Down
4 changes: 2 additions & 2 deletions server/memphis_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,15 +79,15 @@ func getUserDetailsFromMiddleware(c *gin.Context) (models.User, error) {
func CreateDefaultStation(tenantName string, s *Server, sn StationName, userId int, username string) (models.Station, bool, error) {
stationName := sn.Ext()
replicas := getDefaultReplicas()
err := s.CreateStream(tenantName, sn, "message_age_sec", 604800, "file", 120000, replicas, false)
err := s.CreateStream(tenantName, sn, "message_age_sec", 604800, "file", 120000, replicas, false, 1)
if err != nil {
return models.Station{}, false, err
}

schemaName := ""
schemaVersionNumber := 0

newStation, rowsUpdated, err := db.InsertNewStation(stationName, userId, username, "message_age_sec", 604800, "file", replicas, schemaName, schemaVersionNumber, 120000, true, models.DlsConfiguration{Poison: true, Schemaverse: true}, false, tenantName)
newStation, rowsUpdated, err := db.InsertNewStation(stationName, userId, username, "message_age_sec", 604800, "file", replicas, schemaName, schemaVersionNumber, 120000, true, models.DlsConfiguration{Poison: true, Schemaverse: true}, false, tenantName, []int{1}, 1)
if err != nil {
return models.Station{}, false, err
}
Expand Down
Loading

0 comments on commit 7081026

Please sign in to comment.