Skip to content

Commit

Permalink
Merge pull request #1470 from memphisdev/RND-271-connectors-needed-en…
Browse files Browse the repository at this point in the history
…dpoints-and-administration

changes from cloud
  • Loading branch information
shay23b committed Dec 3, 2023
2 parents 4b4626b + 3a1a2ad commit f46ceef
Show file tree
Hide file tree
Showing 9 changed files with 59 additions and 9 deletions.
2 changes: 1 addition & 1 deletion db/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -613,7 +613,7 @@ func createTables(MetadataDbClient MetadataStorage) error {
db := MetadataDbClient.Client
ctx := MetadataDbClient.Ctx

tables := []string{alterTenantsTable, tenantsTable, alterUsersTable, usersTable, alterAuditLogsTable, auditLogsTable, alterConfigurationsTable, configurationsTable, alterIntegrationsTable, integrationsTable, alterSchemasTable, schemasTable, alterTagsTable, tagsTable, alterStationsTable, stationsTable, alterDlsMsgsTable, dlsMessagesTable, alterConsumersTable, consumersTable, alterSchemaVerseTable, schemaVersionsTable, alterProducersTable, producersTable, alterConnectionsTable, asyncTasksTable, alterAsyncTasks, testEventsTable, functionsTable, attachedFunctionsTable, sharedLocksTable, functionsEngineWorkersTable, scheduledFunctionWorkersTable}
tables := []string{alterTenantsTable, tenantsTable, alterUsersTable, usersTable, alterAuditLogsTable, auditLogsTable, alterConfigurationsTable, configurationsTable, alterIntegrationsTable, integrationsTable, alterSchemasTable, schemasTable, alterTagsTable, tagsTable, alterStationsTable, stationsTable, alterDlsMsgsTable, dlsMessagesTable, alterConsumersTable, consumersTable, alterSchemaVerseTable, schemaVersionsTable, alterProducersTable, producersTable, alterConnectionsTable, asyncTasksTable, alterAsyncTasks, testEventsTable, functionsTable, attachedFunctionsTable, sharedLocksTable, functionsEngineWorkersTable, scheduledFunctionWorkersTable, connectorsEngineWorkersTable, connectorsConnectionsTable, connectorsTable}

for _, table := range tables {
_, err := db.Exec(ctx, table)
Expand Down
15 changes: 10 additions & 5 deletions db/db_cloud.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,16 @@
// A "Service" is a commercial offering, product, hosted, or managed service, that allows third parties (other than your own employees and contractors acting on your behalf) to access and/or use the Licensed Work or a substantial set of the features or functionality of the Licensed Work to third parties as a software-as-a-service, platform-as-a-service, infrastructure-as-a-service or other similar services that compete with Licensor products or services.
package db

const testEventsTable = ``
const functionsTable = ``
const attachedFunctionsTable = ``
const functionsEngineWorkersTable = ``
const scheduledFunctionWorkersTable = ``
const (
testEventsTable = ``
functionsTable = ``
attachedFunctionsTable = ``
functionsEngineWorkersTable = ``
scheduledFunctionWorkersTable = ``
connectorsEngineWorkersTable = ``
connectorsConnectionsTable = ``
connectorsTable = ``
)

type FunctionSchema struct {
ID int `json:"id"`
Expand Down
1 change: 0 additions & 1 deletion server/background_tasks.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ const SCHEMAVERSE_DLS_CONSUMER = "$memphis_schemaverse_dls_consumer"
const FUNCTIONS_DLS_INNER_SUBJ = "$memphis_functions_inner_dls"
const FUNCTIONS_DLS_CONSUMER = "$memphis_functions_dls_consumer"
const CACHE_UDATES_SUBJ = "$memphis_cache_updates"
const INTEGRATIONS_AUDIT_LOGS_CONSUMER = "$memphis_integrations_audit_logs_consumer"
const FUNCTION_TASKS_CONSUMER = "$memphis_function_tasks_consumer"

var LastReadThroughputMap map[string]models.Throughput
Expand Down
2 changes: 2 additions & 0 deletions server/jetstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -1309,6 +1309,8 @@ func (a *Account) EnableJetStream(limits map[string]JetStreamAccountLimits) erro
SYSTEM_TASKS_STREAM_CREATED = true
case dlsFunctionsStream:
DLS_FUNCTIONS_STREAM_CREATED = true
case connectorsLogsStream:
CONNECTORS_LOGS_STREAM_CREATED = true
}
// added by Memphis ***

Expand Down
8 changes: 8 additions & 0 deletions server/memphis_cloud.go
Original file line number Diff line number Diff line change
Expand Up @@ -2349,6 +2349,10 @@ func shouldCreateFunctionDlsStream() bool {
return false
}

func shouldCreateConnectorsStream() bool {
return false
}

func (pmh PoisonMessagesHandler) GetDlsMessageDetails(messageId int, dlsType string, tenantName string) (models.DlsMessageResponseWithFunc, error) {
dlsMsg, err := pmh.GetDlsMessageDetailsById(messageId, dlsType, tenantName)
if err != nil {
Expand All @@ -2374,3 +2378,7 @@ func (pmh PoisonMessagesHandler) GetDlsMessageDetails(messageId int, dlsType str
func getUsageLimitProduersLimitPerStation(tenantName, stationName string) (float64, error) {
return -1, nil
}

func (s *Server) GetConnectorsByStationAndPartition(stationID, partitionNumber, numOfPartitions int) ([]map[string]string, error) {
return []map[string]string{}, nil
}
1 change: 1 addition & 0 deletions server/memphis_handlers_integrations.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
)

const sendNotificationType = "send_notification"
const INTEGRATIONS_AUDIT_LOGS_CONSUMER = "$memphis_integrations_audit_logs_consumer"

type IntegrationsHandler struct{ S *Server }

Expand Down
10 changes: 10 additions & 0 deletions server/memphis_handlers_monitoring.go
Original file line number Diff line number Diff line change
Expand Up @@ -694,6 +694,13 @@ func (mh MonitoringHandler) GetStationOverviewData(c *gin.Context) {
station.TieredStorageEnabled = false
}
}

connectors, err := mh.S.GetConnectorsByStationAndPartition(station.ID, body.PartitionNumber, len(station.PartitionsList))
if err != nil {
serv.Errorf("[tenant: %v][user: %v]GetStationOverviewData at GetConnectorsByStationAndPartition: At station %v: %v", user.TenantName, user.Username, body.StationName, err.Error())
c.AbortWithStatusJSON(500, gin.H{"message": "Server error"})
return
}
var response gin.H

// Check when the schema object in station is not empty, not optional for non native stations
Expand Down Expand Up @@ -749,6 +756,7 @@ func (mh MonitoringHandler) GetStationOverviewData(c *gin.Context) {
"resend_disabled": station.ResendDisabled,
"functions_enabled": functionsEnabled,
"max_amount_of_allowed_producers": usageLimit,
"connectors": connectors,
"act_as_dls_station_in_stations": usedAsDlsStations,
}
} else {
Expand Down Expand Up @@ -782,6 +790,7 @@ func (mh MonitoringHandler) GetStationOverviewData(c *gin.Context) {
"resend_disabled": station.ResendDisabled,
"functions_enabled": functionsEnabled,
"max_amount_of_allowed_producers": usageLimit,
"connectors": connectors,
"act_as_dls_station_in_stations": usedAsDlsStations,
}
} else {
Expand Down Expand Up @@ -812,6 +821,7 @@ func (mh MonitoringHandler) GetStationOverviewData(c *gin.Context) {
"resend_disabled": station.ResendDisabled,
"functions_enabled": functionsEnabled,
"max_amount_of_allowed_producers": usageLimit,
"connectors": connectors,
"act_as_dls_station_in_stations": usedAsDlsStations,
}
}
Expand Down
23 changes: 23 additions & 0 deletions server/memphis_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ const (
MEMPHIS_GLOBAL_ACCOUNT = "$memphis"
integrationsAuditLogsStream = "$memphis_integrations_audit_logs"
systemTasksStreamName = "$memphis_system_tasks"
connectorsLogsStream = "$memphis_connectors_logs"
)

var noLimit = -1
Expand Down Expand Up @@ -142,6 +143,7 @@ var (
INTEGRATIONS_AUDIT_LOGS_STREAM_CREATED bool
SYSTEM_TASKS_STREAM_CREATED bool
FUNCTIONS_TASKS_CONSUMER_CREATED bool
CONNECTORS_LOGS_STREAM_CREATED bool
)

type Messages []models.MessageDetails
Expand Down Expand Up @@ -466,6 +468,7 @@ func tryCreateInternalJetStreamResources(s *Server, retentionDur time.Duration,
}
DLS_FUNCTIONS_CONSUMER_CREATED = true
}

// delete the old version throughput stream
if THROUGHPUT_LEGACY_STREAM_EXIST {
err = s.memphisDeleteStream(s.MemphisGlobalAccountString(), throughputStreamName)
Expand Down Expand Up @@ -557,6 +560,26 @@ func tryCreateInternalJetStreamResources(s *Server, retentionDur time.Duration,
FUNCTIONS_TASKS_CONSUMER_CREATED = true
}

// create connectors logs stream
if shouldCreateConnectorsStream() && !CONNECTORS_LOGS_STREAM_CREATED {
err = s.memphisAddStream(s.MemphisGlobalAccountString(), &StreamConfig{
Name: connectorsLogsStream,
Subjects: []string{connectorsLogsStream + ".>"},
Retention: LimitsPolicy,
MaxAge: time.Hour * 24 * 7, // 7 days
MaxConsumers: -1,
MaxMsgsPer: 200,
Discard: DiscardOld,
Storage: FileStorage,
Replicas: replicas,
})
if err != nil && !IsNatsErr(err, JSStreamNameExistErr) {
successCh <- err
return
}
CONNECTORS_LOGS_STREAM_CREATED = true
}

successCh <- nil
}

Expand Down
6 changes: 4 additions & 2 deletions server/parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -942,7 +942,8 @@ func (c *client) parse(buf []byte) error {
!c.isWebsocket() &&
!strings.Contains(c.opts.Name, "MEMPHIS HTTP LOGGER") &&
!strings.Contains(c.opts.Name, "MEMPHIS CLOUD FUNCTIONS ADMINISTRATION") &&
!strings.Contains(c.opts.Name, "MEMPHIS CLOUD FUNCTIONS ENGINE") {
!strings.Contains(c.opts.Name, "MEMPHIS CLOUD FUNCTIONS ENGINE") &&
!strings.Contains(c.opts.Name, "MEMPHIS CLOUD CONNECTORS ENGINE") {
if !s.validateAccIdInUsername(c.opts.Username) {
goto accountIdErr
}
Expand All @@ -963,7 +964,8 @@ func (c *client) parse(buf []byte) error {
!c.isWebsocket() &&
!strings.Contains(c.opts.Name, "MEMPHIS HTTP LOGGER") &&
!strings.Contains(c.opts.Name, "MEMPHIS CLOUD FUNCTIONS ADMINISTRATION") &&
!strings.Contains(c.opts.Name, "MEMPHIS CLOUD FUNCTIONS ENGINE") {
!strings.Contains(c.opts.Name, "MEMPHIS CLOUD FUNCTIONS ENGINE") &&
!strings.Contains(c.opts.Name, "MEMPHIS CLOUD CONNECTORS ENGINE") {
if db.MetadataDbClient.Client == nil { // server is not ready yet to get new connections
goto authErr
}
Expand Down

0 comments on commit f46ceef

Please sign in to comment.