From a8d41437be7a16dd024c66f4dac289d7b9449532 Mon Sep 17 00:00:00 2001 From: shay23b Date: Wed, 3 Jan 2024 17:17:26 +0200 Subject: [PATCH] from cloud --- db/db.go | 48 +++++++++++++++++++++++ server/memphis_handlers_consumers.go | 57 ++++++++++++++++++++++++++++ 2 files changed, 105 insertions(+) diff --git a/db/db.go b/db/db.go index 75285bbbc..42f729e3d 100644 --- a/db/db.go +++ b/db/db.go @@ -2824,6 +2824,26 @@ func DeleteProducerByNameAndStationID(name string, stationId int) (bool, error) return true, nil } +func DeleteConnectorProducerByNameAndStationID(name string, stationId int) (bool, error) { + ctx, cancelfunc := context.WithTimeout(context.Background(), DbOperationTimeout*time.Second) + defer cancelfunc() + conn, err := MetadataDbClient.Client.Acquire(ctx) + if err != nil { + return false, err + } + defer conn.Release() + query := `DELETE FROM producers WHERE name = $1 AND station_id = $2 AND type = 'connector' LIMIT 1` + stmt, err := conn.Conn().Prepare(ctx, "delete_producer_by_name_and_station_id", query) + if err != nil { + return false, err + } + _, err = conn.Conn().Query(ctx, stmt.Name, name, stationId) + if err != nil { + return false, err + } + return true, nil +} + func DeleteProducerByNameStationIDAndConnID(name string, stationId int, connId string) (bool, error) { ctx, cancelfunc := context.WithTimeout(context.Background(), DbOperationTimeout*time.Second) defer cancelfunc() @@ -3223,6 +3243,34 @@ func DeleteConsumerByNameStationIDAndConnID(connectionId, name string, stationId return true, consumers[0], nil } +func DeleteConsumerByNameStationIDAndType(consumerType, name string, stationId int) (bool, models.Consumer, error) { + ctx, cancelfunc := context.WithTimeout(context.Background(), DbOperationTimeout*time.Second) + defer cancelfunc() + conn, err := MetadataDbClient.Client.Acquire(ctx) + if err != nil { + return false, models.Consumer{}, err + } + defer conn.Release() + query := ` DELETE FROM consumers WHERE ctid = ( SELECT ctid FROM consumers WHERE type = $1 AND name = $2 AND station_id = $3 LIMIT 1) RETURNING *` + deleteStmt, err := conn.Conn().Prepare(ctx, "delete_consumers", query) + if err != nil { + return false, models.Consumer{}, err + } + rows, err := conn.Conn().Query(ctx, deleteStmt.Name, consumerType, name, stationId) + if err != nil { + return false, models.Consumer{}, err + } + defer rows.Close() + consumers, err := pgx.CollectRows(rows, pgx.RowToStructByPos[models.Consumer]) + if err != nil { + return false, models.Consumer{}, err + } + if len(consumers) == 0 { + return false, models.Consumer{}, err + } + return true, consumers[0], nil +} + func DeleteConsumerByNameAndStationId(name string, stationId int) (bool, models.Consumer, error) { ctx, cancelfunc := context.WithTimeout(context.Background(), DbOperationTimeout*time.Second) defer cancelfunc() diff --git a/server/memphis_handlers_consumers.go b/server/memphis_handlers_consumers.go index 5e0f5fe5b..025ddf1c6 100644 --- a/server/memphis_handlers_consumers.go +++ b/server/memphis_handlers_consumers.go @@ -757,6 +757,63 @@ func (s *Server) destroyCGFromNats(c *client, reply, userName, tenantName string } +func (s *Server) destroyCGFromNatsInternal(username, tenantName string, stationName StationName, consumer models.Consumer, station models.Station) error { + // ensure not part of an active consumer group + count, err := db.CountActiveConsumersInCG(consumer.ConsumersGroup, station.ID) + if err != nil { + return err + } + + deleted := false + if count == 0 { // no other members in this group + err = s.RemoveConsumer(station.TenantName, stationName, consumer.ConsumersGroup, consumer.PartitionsList) + if err != nil && !IsNatsErr(err, JSConsumerNotFoundErr) && !IsNatsErr(err, JSStreamNotFoundErr) { + return err + } + if err == nil { + deleted = true + } + + err = db.RemovePoisonedCg(station.ID, consumer.ConsumersGroup) + if err != nil && !IsNatsErr(err, JSConsumerNotFoundErr) && !IsNatsErr(err, JSStreamNotFoundErr) { + return err + } + } + + name := strings.ToLower(consumer.Name) + if deleted { + _, user, err := memphis_cache.GetUser(username, consumer.TenantName, false) + if err != nil && !IsNatsErr(err, JSConsumerNotFoundErr) && !IsNatsErr(err, JSStreamNotFoundErr) { + return err + } + message := fmt.Sprintf("Consumer %v has been destroyed", name) + serv.Noticef("[tenant: %v][user: %v]: %v", user.TenantName, user.Username, message) + var auditLogs []interface{} + newAuditLog := models.AuditLog{ + StationName: stationName.Ext(), + Message: message, + CreatedBy: user.ID, + CreatedByUsername: user.Username, + CreatedAt: time.Now(), + TenantName: user.TenantName, + } + auditLogs = append(auditLogs, newAuditLog) + err = CreateAuditLogs(auditLogs) + if err != nil { + serv.Errorf("[tenant: %v]destroyCGFromNats at CreateAuditLogs: Consumer %v at station %v: %v", user.TenantName, consumer.Name, station.Name, err.Error()) + } + + shouldSendAnalytics, _ := shouldSendAnalytics() + if shouldSendAnalytics { + analyticsParams := make(map[string]interface{}) + analytics.SendEvent(user.TenantName, username, analyticsParams, "user-remove-consumer-sdk") + } + } + + return nil + +} + func comparePartitionsList(pList1, pList2 []int) bool { if len(pList1) != len(pList2) { return false