Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Station partitioning #1180

Merged
merged 57 commits into from
Aug 7, 2023
Merged
Show file tree
Hide file tree
Changes from 31 commits
Commits
Show all changes
57 commits
Select commit Hold shift + click to select a range
c54a0d9
added partition number to creat station direct intern
Jul 27, 2023
ec4bffc
now can create partitions by passing a paramether
Jul 30, 2023
8d2f475
merged changes
Jul 30, 2023
4aea2cf
get overview works
Jul 30, 2023
ccba5ac
get avgMsgSize works with partitions
Jul 30, 2023
123297d
add partition to ui create station form
Jul 31, 2023
ef92ef9
now working with a slice of partitions
Jul 31, 2023
9a31c26
added get messages per partition
Jul 31, 2023
90ae168
fixed remove zombie resources, added version for station and changed …
Aug 1, 2023
eca02b5
stations, station and main overview works
Aug 1, 2023
c9bd94f
returning partition list and count at GetStation
Aug 1, 2023
eafdf0d
returning station data per partition and getting a message from a par…
Aug 2, 2023
bd2dde1
created max partitions
Aug 2, 2023
aa94e65
connect to endpoint
Aug 2, 2023
85e8a94
added ws station overview partition filtering support
Aug 3, 2023
e636bac
Update accounts_cycles_test.go
daniel-davidd Aug 3, 2023
d7caf1f
fixed bug
Aug 3, 2023
9aec31e
Merge branch 'station_partitioning' of https://github.com/memphisdev/…
Aug 3, 2023
00b9dab
fix alterStationsTable
shay23b Aug 3, 2023
cf764d5
.
Aug 3, 2023
76935b3
created return of partition list on producer creation
Aug 3, 2023
dfa78b4
Merge branch 'station_partitioning', remote-tracking branch 'origin' …
Aug 3, 2023
599fa56
fixed returnings at create producer direct commom
Aug 3, 2023
4b4ecbe
socket
Aug 3, 2023
e9483f9
getstarted and partition limit
Aug 4, 2023
8ba798a
partition num
Aug 4, 2023
e4f1996
cahnged default value to -1 and fixed purge and delete message
Aug 5, 2023
a279989
Merge branch 'station_partitioning', remote-tracking branch 'origin' …
Aug 5, 2023
b560735
added purge for spacific partition
Aug 5, 2023
a92d04a
changed logs at GetStationOverviewData
Aug 6, 2023
85c4e18
purge and remove msg
Aug 6, 2023
4ab8fa9
fix ws
Aug 6, 2023
feddbae
consumer wip
shay23b Aug 6, 2023
0c949bd
changed dls to work with partitions
Aug 6, 2023
5910a03
resolved comments
Aug 6, 2023
375712c
Merge branch 'station_partitioning', remote-tracking branch 'origin' …
Aug 6, 2023
912f11c
pulled master
Aug 7, 2023
ab5486d
cg info
shay23b Aug 7, 2023
3f8f718
pulled master
Aug 7, 2023
b5212a1
Merge pull request #1205 from memphisdev/producer_partitioning
daniel-davidd Aug 7, 2023
feb87df
Merge remote-tracking branch 'origin/station_partitioning' into stati…
shay23b Aug 7, 2023
4b15a31
fix consumer queries + handle dls
shay23b Aug 7, 2023
9cd496c
fixes
shay23b Aug 7, 2023
efef8e3
dls
Aug 7, 2023
e7ca9fd
Merge branch 'station_partitioning' into partitioning_ui
SvetaMemphis Aug 7, 2023
f0799fa
cr fixes
shay23b Aug 7, 2023
a2744ee
fix createConsumerDirectCommon
shay23b Aug 7, 2023
028879e
added partitions to the producer struct and table
Aug 7, 2023
b42ec25
Merge pull request #1206 from memphisdev/station_partitioning_consumer
shay23b Aug 7, 2023
f495c74
selectedRowPartition name change
Aug 7, 2023
86fed6c
Merge branch 'partitioning_ui' of https://github.com/memphisdev/memph…
Aug 7, 2023
e4e06d6
review fixes
Aug 7, 2023
3bc5d51
purge move to comment
Aug 7, 2023
30ad5c7
.
Aug 7, 2023
4e0222e
changed the create station request as the sdk
Aug 7, 2023
9557f3f
purge station back to prev
Aug 7, 2023
924c0a2
Merge pull request #1207 from memphisdev/partitioning_ui
avrhamNeeman Aug 7, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
136 changes: 104 additions & 32 deletions db/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -340,9 +340,11 @@ func createTables(MetadataDbClient MetadataStorage) error {
IF EXISTS (
SELECT 1 FROM information_schema.tables WHERE table_name = 'stations' AND table_schema = 'public'
) THEN
ALTER TYPE enum_retention_type ADD VALUE 'ack_based';
ALTER TYPE enum_retention_type ADD VALUE IF NOT EXISTS 'ack_based';
daniel-davidd marked this conversation as resolved.
Show resolved Hide resolved
ALTER TABLE stations ADD COLUMN IF NOT EXISTS tenant_name VARCHAR NOT NULL DEFAULT '$memphis';
ALTER TABLE stations ADD COLUMN IF NOT EXISTS resend_disabled BOOL NOT NULL DEFAULT false;
ALTER TABLE stations ADD COLUMN IF NOT EXISTS partitions INTEGER[];
ALTER TABLE stations ADD COLUMN IF NOT EXISTS version INTEGER NOT NULL DEFAULT 0;
DROP INDEX IF EXISTS unique_station_name_deleted;
CREATE UNIQUE INDEX unique_station_name_deleted ON stations(name, is_deleted, tenant_name) WHERE is_deleted = false;
END IF;
Expand Down Expand Up @@ -372,6 +374,8 @@ func createTables(MetadataDbClient MetadataStorage) error {
tiered_storage_enabled BOOL NOT NULL,
tenant_name VARCHAR NOT NULL DEFAULT '$memphis',
resend_disabled BOOL NOT NULL DEFAULT false,
partitions INTEGER[],
version INTEGER NOT NULL DEFAULT 0,
PRIMARY KEY (id),
CONSTRAINT fk_tenant_name_stations
FOREIGN KEY(tenant_name)
Expand Down Expand Up @@ -436,6 +440,7 @@ func createTables(MetadataDbClient MetadataStorage) error {
CREATE INDEX IF NOT EXISTS producer_name ON producers(name);
CREATE INDEX IF NOT EXISTS producer_tenant_name ON producers(tenant_name);
CREATE INDEX IF NOT EXISTS producer_connection_id ON producers(connection_id);
ALTER TABLE producers ADD COLUMN IF NOT EXISTS partitions INTEGER[];
IF EXISTS (
SELECT 1
FROM information_schema.columns
Expand All @@ -458,6 +463,7 @@ func createTables(MetadataDbClient MetadataStorage) error {
is_active BOOL NOT NULL DEFAULT true,
updated_at TIMESTAMPTZ NOT NULL,
tenant_name VARCHAR NOT NULL DEFAULT '$memphis',
partitions INTEGER[],
PRIMARY KEY (id),
CONSTRAINT fk_station_id
FOREIGN KEY(station_id)
Expand All @@ -478,6 +484,7 @@ func createTables(MetadataDbClient MetadataStorage) error {
) THEN
ALTER TABLE dls_messages ADD COLUMN IF NOT EXISTS tenant_name VARCHAR NOT NULL DEFAULT '$memphis';
ALTER TABLE dls_messages ADD COLUMN IF NOT EXISTS producer_name VARCHAR NOT NULL DEFAULT '';
ALTER TABLE dls_messages ADD COLUMN IF NOT EXISTS partition_number INTEGER NOT NULL DEFAULT -1;
DROP INDEX IF EXISTS dls_producer_id;
IF EXISTS (
SELECT 1 FROM information_schema.columns WHERE table_name = 'dls_messages' AND column_name = 'producer_id'
Expand Down Expand Up @@ -509,6 +516,7 @@ func createTables(MetadataDbClient MetadataStorage) error {
validation_error VARCHAR DEFAULT '',
tenant_name VARCHAR NOT NULL DEFAULT '$memphis',
producer_name VARCHAR NOT NULL,
partition_number INTEGER NOT NULL DEFAULT -1,
PRIMARY KEY (id),
CONSTRAINT fk_station_id
FOREIGN KEY(station_id)
Expand Down Expand Up @@ -1168,7 +1176,9 @@ func InsertNewStation(
isNative bool,
dlsConfiguration models.DlsConfiguration,
tieredStorageEnabled bool,
tenantName string) (models.Station, int64, error) {
tenantName string,
partitionsList []int,
version int) (models.Station, int64, error) {
ctx, cancelfunc := context.WithTimeout(context.Background(), DbOperationTimeout*time.Second)
defer cancelfunc()

Expand Down Expand Up @@ -1196,9 +1206,11 @@ func InsertNewStation(
dls_configuration_poison,
dls_configuration_schemaverse,
tiered_storage_enabled,
tenant_name
tenant_name,
partitions,
version
)
VALUES($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18) RETURNING id`
VALUES($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18, $19, $20) RETURNING id`

stmt, err := conn.Conn().Prepare(ctx, "insert_new_station", query)
if err != nil {
Expand All @@ -1213,7 +1225,7 @@ func InsertNewStation(
}
rows, err := conn.Conn().Query(ctx, stmt.Name,
stationName, retentionType, retentionValue, storageType, replicas, userId, username, createAt, updatedAt,
false, schemaName, schemaVersionUpdate, idempotencyWindow, isNative, dlsConfiguration.Poison, dlsConfiguration.Schemaverse, tieredStorageEnabled, tenantName)
false, schemaName, schemaVersionUpdate, idempotencyWindow, isNative, dlsConfiguration.Poison, dlsConfiguration.Schemaverse, tieredStorageEnabled, tenantName, partitionsList, version)
if err != nil {
return models.Station{}, 0, err
}
Expand Down Expand Up @@ -1265,6 +1277,8 @@ func InsertNewStation(
DlsConfigurationSchemaverse: dlsConfiguration.Schemaverse,
TieredStorageEnabled: tieredStorageEnabled,
TenantName: tenantName,
PartitionsList: partitionsList,
Version: version,
}

rowsAffected := rows.CommandTag().RowsAffected()
Expand Down Expand Up @@ -1498,6 +1512,8 @@ func GetAllStationsDetailsLight(tenantName string) ([]models.ExtendedStationLigh
&stationRes.TieredStorageEnabled,
&stationRes.TenantName,
&stationRes.ResendDisabled,
&stationRes.PartitionsList,
&stationRes.Version,
&stationRes.Activity,
); err != nil {
return []models.ExtendedStationLight{}, err
Expand Down Expand Up @@ -2332,7 +2348,7 @@ func GetActiveProducerByStationID(producerName string, stationId int) (bool, mod
return true, producers[0], nil
}

func InsertNewProducer(name string, stationId int, producerType string, connectionIdObj string, tenantName string) (models.Producer, error) {
func InsertNewProducer(name string, stationId int, producerType string, connectionIdObj string, tenantName string, partitionsList []int) (models.Producer, error) {
ctx, cancelfunc := context.WithTimeout(context.Background(), DbOperationTimeout*time.Second)
defer cancelfunc()

Expand All @@ -2349,8 +2365,9 @@ func InsertNewProducer(name string, stationId int, producerType string, connecti
is_active,
updated_at,
type,
tenant_name)
VALUES($1, $2, $3, $4, $5, $6, $7) RETURNING id`
tenant_name,
partitions)
VALUES($1, $2, $3, $4, $5, $6, $7, $8) RETURNING id`

stmt, err := conn.Conn().Prepare(ctx, "insert_new_producer", query)
if err != nil {
Expand All @@ -2363,7 +2380,7 @@ func InsertNewProducer(name string, stationId int, producerType string, connecti
if tenantName != conf.GlobalAccount {
tenantName = strings.ToLower(tenantName)
}
rows, err := conn.Conn().Query(ctx, stmt.Name, name, stationId, connectionIdObj, isActive, updatedAt, producerType, tenantName)
rows, err := conn.Conn().Query(ctx, stmt.Name, name, stationId, connectionIdObj, isActive, updatedAt, producerType, tenantName, partitionsList)
if err != nil {
return models.Producer{}, err
}
Expand Down Expand Up @@ -2393,13 +2410,14 @@ func InsertNewProducer(name string, stationId int, producerType string, connecti
}

newProducer := models.Producer{
ID: producerId,
Name: name,
StationId: stationId,
Type: producerType,
ConnectionId: connectionIdObj,
IsActive: isActive,
UpdatedAt: time.Now(),
ID: producerId,
Name: name,
StationId: stationId,
Type: producerType,
ConnectionId: connectionIdObj,
IsActive: isActive,
UpdatedAt: time.Now(),
PartitionsList: partitionsList,
}
return newProducer, nil
}
Expand Down Expand Up @@ -4622,7 +4640,7 @@ func GetAllActiveUsersStations(tenantName string) ([]models.FilteredUser, error)
FROM users AS u
JOIN stations AS s ON u.id = s.created_by
WHERE s.tenant_name=$1 AND username NOT LIKE '$%'` // filter memphis internal users

stmt, err := conn.Conn().Prepare(ctx, "get_all_active_users_stations", query)
if err != nil {
return []models.FilteredUser{}, err
Expand Down Expand Up @@ -4658,7 +4676,7 @@ func GetAllActiveUsersSchemaVersions(tenantName string) ([]models.FilteredUser,
FROM users AS u
JOIN schema_versions AS s ON u.id = s.created_by
WHERE s.tenant_name=$1 AND username NOT LIKE '$%'` // filter memphis internal users

stmt, err := conn.Conn().Prepare(ctx, "get_all_active_users_schema_versions", query)
if err != nil {
return []models.FilteredUser{}, err
Expand Down Expand Up @@ -5146,7 +5164,7 @@ func GetImage(name string, tenantName string) (bool, models.Image, error) {
}

// dls Functions
func InsertSchemaverseDlsMsg(stationId int, messageSeq int, producerName string, poisonedCgs []string, messageDetails models.MessagePayload, validationError string, tenantName string) (models.DlsMessage, error) {
func InsertSchemaverseDlsMsg(stationId int, messageSeq int, producerName string, poisonedCgs []string, messageDetails models.MessagePayload, validationError string, tenantName string, partitionNumber int) (models.DlsMessage, error) {
ctx, cancelfunc := context.WithTimeout(context.Background(), DbOperationTimeout*time.Second)
defer cancelfunc()
connection, err := MetadataDbClient.Client.Acquire(ctx)
Expand All @@ -5164,8 +5182,9 @@ func InsertSchemaverseDlsMsg(stationId int, messageSeq int, producerName string,
message_type,
validation_error,
tenant_name,
producer_name)
VALUES($1, $2, $3, $4, $5, $6, $7, $8, $9)
producer_name,
partition_number)
VALUES($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)
RETURNING id`

stmt, err := connection.Conn().Prepare(ctx, "insert_dls_messages", query)
Expand All @@ -5176,7 +5195,7 @@ func InsertSchemaverseDlsMsg(stationId int, messageSeq int, producerName string,
if tenantName != conf.GlobalAccount {
tenantName = strings.ToLower(tenantName)
}
rows, err := connection.Conn().Query(ctx, stmt.Name, stationId, messageSeq, poisonedCgs, messageDetails, updatedAt, "schema", validationError, tenantName, producerName)
rows, err := connection.Conn().Query(ctx, stmt.Name, stationId, messageSeq, poisonedCgs, messageDetails, updatedAt, "schema", validationError, tenantName, producerName, partitionNumber)
if err != nil {
return models.DlsMessage{}, err
}
Expand Down Expand Up @@ -5207,8 +5226,9 @@ func InsertSchemaverseDlsMsg(stationId int, messageSeq int, producerName string,
Data: messageDetails.Data,
Headers: messageDetails.Headers,
},
UpdatedAt: updatedAt,
TenantName: tenantName,
UpdatedAt: updatedAt,
TenantName: tenantName,
PartitionNumber: partitionNumber,
}

if err := rows.Err(); err != nil {
Expand All @@ -5231,7 +5251,7 @@ func InsertSchemaverseDlsMsg(stationId int, messageSeq int, producerName string,
return deadLetterPayload, nil
}

func GetMsgByStationIdAndMsgSeq(stationId, messageSeq int) (bool, models.DlsMessage, error) {
func GetMsgByStationIdAndMsgSeq(stationId, messageSeq, partitionNumber int) (bool, models.DlsMessage, error) {
ctx, cancelfunc := context.WithTimeout(context.Background(), DbOperationTimeout*time.Second)
defer cancelfunc()

Expand All @@ -5241,14 +5261,14 @@ func GetMsgByStationIdAndMsgSeq(stationId, messageSeq int) (bool, models.DlsMess
}
defer connection.Release()

query := `SELECT * FROM dls_messages WHERE station_id = $1 AND message_seq = $2 LIMIT 1`
query := `SELECT * FROM dls_messages WHERE station_id = $1 AND message_seq = $2 AND partition_number = $3 LIMIT 1`

stmt, err := connection.Conn().Prepare(ctx, "get_dls_messages_by_station_id_and_message_seq", query)
if err != nil {
return false, models.DlsMessage{}, err
}

rows, err := connection.Conn().Query(ctx, stmt.Name, stationId, messageSeq)
rows, err := connection.Conn().Query(ctx, stmt.Name, stationId, messageSeq, partitionNumber)
if err != nil {
return false, models.DlsMessage{}, err
}
Expand All @@ -5265,7 +5285,7 @@ func GetMsgByStationIdAndMsgSeq(stationId, messageSeq int) (bool, models.DlsMess
return true, message[0], nil
}

func StorePoisonMsg(stationId, messageSeq int, cgName string, producerName string, poisonedCgs []string, messageDetails models.MessagePayload, tenantName string) (int, error) {
func StorePoisonMsg(stationId, messageSeq int, cgName string, producerName string, poisonedCgs []string, messageDetails models.MessagePayload, tenantName string, partitionNumber int) (int, error) {
ctx, cancelfunc := context.WithTimeout(context.Background(), DbOperationTimeout*time.Second)
defer cancelfunc()

Expand Down Expand Up @@ -5302,12 +5322,12 @@ func StorePoisonMsg(stationId, messageSeq int, cgName string, producerName strin
}
checkRows.Close()

query = `SELECT * FROM dls_messages WHERE station_id = $1 AND message_seq = $2 AND tenant_name =$3 LIMIT 1 FOR UPDATE`
query = `SELECT * FROM dls_messages WHERE station_id = $1 AND message_seq = $2 AND tenant_name =$3 AND partition_number = $4 LIMIT 1 FOR UPDATE`
stmt, err = tx.Prepare(ctx, "handle_insert_dls_message", query)
if err != nil {
return 0, err
}
rows, err := tx.Query(ctx, stmt.Name, stationId, messageSeq, tenantName)
rows, err := tx.Query(ctx, stmt.Name, stationId, messageSeq, tenantName, partitionNumber)
if err != nil {
return 0, err
}
Expand All @@ -5329,7 +5349,8 @@ func StorePoisonMsg(stationId, messageSeq int, cgName string, producerName strin
updated_at,
message_type,
validation_error,
tenant_name
tenant_name,
partition_number
)
VALUES($1, $2, $3, $4, $5, $6, $7, $8, $9)
RETURNING id`
Expand All @@ -5342,7 +5363,7 @@ func StorePoisonMsg(stationId, messageSeq int, cgName string, producerName strin
if tenantName != conf.GlobalAccount {
tenantName = strings.ToLower(tenantName)
}
rows, err := tx.Query(ctx, stmt.Name, stationId, messageSeq, producerName, poisonedCgs, messageDetails, updatedAt, "poison", "", tenantName)
rows, err := tx.Query(ctx, stmt.Name, stationId, messageSeq, producerName, poisonedCgs, messageDetails, updatedAt, "poison", "", tenantName, partitionNumber)
if err != nil {
return 0, err
}
Expand Down Expand Up @@ -5497,6 +5518,28 @@ func PurgeDlsMsgsFromStation(station_id int) error {
return nil
}

func PurgeDlsMsgsFromPartition(station_id, partitionNumber int) error {
ctx, cancelfunc := context.WithTimeout(context.Background(), DbOperationTimeout*time.Second)
defer cancelfunc()
conn, err := MetadataDbClient.Client.Acquire(ctx)
if err != nil {
return errors.New("PurgeDlsMsgsFromPartition: " + err.Error())
}
defer conn.Release()

query := `DELETE FROM dls_messages where station_id=$1 AND partition_number = $2`
stmt, err := conn.Conn().Prepare(ctx, "purge_dls_messages", query)
if err != nil {
return err
}

_, err = conn.Conn().Exec(ctx, stmt.Name, station_id, partitionNumber)
if err != nil {
return errors.New("PurgeDlsMsgsFromPartition: " + err.Error())
}
return nil
}

func RemoveCgFromDlsMsg(msgId int, cgName string, tenantName string) error {
ctx, cancelfunc := context.WithTimeout(context.Background(), DbOperationTimeout*time.Second)
defer cancelfunc()
Expand Down Expand Up @@ -5550,6 +5593,35 @@ func GetDlsMsgsByStationId(stationId int) ([]models.DlsMessage, error) {
return dlsMsgs, nil
}

func GetDlsMsgsByStationAndPartition(stationId, partitionNumber int) ([]models.DlsMessage, error) {
ctx, cancelfunc := context.WithTimeout(context.Background(), DbOperationTimeout*time.Second)
defer cancelfunc()
conn, err := MetadataDbClient.Client.Acquire(ctx)
if err != nil {
return []models.DlsMessage{}, err
}
defer conn.Release()
query := `SELECT * from dls_messages where station_id=$1 AND partition_number = $2 ORDER BY updated_at DESC limit 1000`
stmt, err := conn.Conn().Prepare(ctx, "get_dls_msg_by_station_and_partition", query)
if err != nil {
return []models.DlsMessage{}, err
}
rows, err := conn.Conn().Query(ctx, stmt.Name, stationId, partitionNumber)
if err != nil {
return []models.DlsMessage{}, err
}
defer rows.Close()
dlsMsgs, err := pgx.CollectRows(rows, pgx.RowToStructByPos[models.DlsMessage])
if err != nil {
return []models.DlsMessage{}, err
}
if len(dlsMsgs) == 0 {
return []models.DlsMessage{}, nil
}

return dlsMsgs, nil
}

func GetDlsMessageById(messageId int) (bool, models.DlsMessage, error) {
ctx, cancelfunc := context.WithTimeout(context.Background(), DbOperationTimeout*time.Second)
defer cancelfunc()
Expand Down
2 changes: 2 additions & 0 deletions models/dead_letter_station.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ type SchemaVerseDlsMessageSdk struct {
Producer ProducerDetails `json:"producer"`
Message MessagePayload `json:"message"`
ValidationError string `json:"validation_error"`
PartitionNumber int `json:"partition_number"`
}

type DlsMessage struct {
Expand All @@ -74,6 +75,7 @@ type DlsMessage struct {
ValidationError string `json:"validation_error"`
TenantName string `json:"tenant_name"`
ProducerName string `json:"producer_name"`
PartitionNumber int `json:"partition_number"`
}

type DlsMsgResendAll struct {
Expand Down
3 changes: 2 additions & 1 deletion models/monitoring.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,8 @@ type SystemComponentsStatus struct {
}

type GetStationOverviewDataSchema struct {
StationName string `form:"station_name" json:"station_name" binding:"required"`
StationName string `form:"station_name" json:"station_name" binding:"required"`
PartitionNumber int `form:"partition_number" json:"partition_number" binding:"required"`
}

type SystemLogsRequest struct {
Expand Down
Loading