diff --git a/db/db.go b/db/db.go index 0aec3f2dd..c7e5415b0 100644 --- a/db/db.go +++ b/db/db.go @@ -197,6 +197,7 @@ func createTables(MetadataDbClient MetadataStorage) error { SELECT 1 FROM information_schema.tables WHERE table_name = 'integrations' AND table_schema = 'public' ) THEN ALTER TABLE integrations ADD COLUMN IF NOT EXISTS tenant_name VARCHAR NOT NULL DEFAULT '$memphis'; + ALTER TABLE integrations ADD COLUMN IF NOT EXISTS is_valid BOOL NOT NULL DEFAULT true; ALTER TABLE integrations DROP CONSTRAINT IF EXISTS integrations_name_key; ALTER TABLE integrations DROP CONSTRAINT IF EXISTS tenant_name_name; ALTER TABLE integrations ADD CONSTRAINT tenant_name_name UNIQUE(name, tenant_name); @@ -209,6 +210,7 @@ func createTables(MetadataDbClient MetadataStorage) error { keys JSON NOT NULL DEFAULT '{}', properties JSON NOT NULL DEFAULT '{}', tenant_name VARCHAR NOT NULL DEFAULT '$memphis', + is_valid BOOL NOT NULL DEFAULT true, PRIMARY KEY (id), CONSTRAINT fk_tenant_name FOREIGN KEY(tenant_name) @@ -4184,6 +4186,29 @@ func UpdateIntegration(tenantName string, name string, keys map[string]interface return integrations[0], nil } +func UpdateIsValidIntegration(tenantName, integrationName string, isValid bool) error { + ctx, cancelfunc := context.WithTimeout(context.Background(), DbOperationTimeout*time.Second) + defer cancelfunc() + conn, err := MetadataDbClient.Client.Acquire(ctx) + if err != nil { + return err + } + defer conn.Release() + query := `UPDATE integrations SET is_valid = $2 WHERE name = $1 AND tenant_name=$3` + stmt, err := conn.Conn().Prepare(ctx, "update_is_valid_integration", query) + if err != nil { + return err + } + if tenantName != conf.GlobalAccount { + tenantName = strings.ToLower(tenantName) + } + _, err = conn.Conn().Query(ctx, stmt.Name, integrationName, isValid, tenantName) + if err != nil { + return err + } + return nil +} + // User Functions func UpdatePendingUser(tenantName, username string, pending bool) error { ctx, cancelfunc := context.WithTimeout(context.Background(), DbOperationTimeout*time.Second) diff --git a/models/integrations.go b/models/integrations.go index 1d94a72bc..80fc758e4 100644 --- a/models/integrations.go +++ b/models/integrations.go @@ -23,6 +23,7 @@ type Integration struct { Keys map[string]interface{} `json:"keys"` Properties map[string]bool `json:"properties"` TenantName string `json:"tenant_name"` + IsValid bool `json:"is_valid"` } type SlackIntegration struct { diff --git a/server/background_tasks.go b/server/background_tasks.go index d4165ce4d..26a1e3b08 100644 --- a/server/background_tasks.go +++ b/server/background_tasks.go @@ -12,11 +12,15 @@ package server import ( + "context" "encoding/json" "errors" "fmt" "sync" + awsconfig "github.com/aws/aws-sdk-go-v2/config" + "github.com/aws/aws-sdk-go-v2/credentials" + "github.com/aws/aws-sdk-go-v2/service/s3" "github.com/memphisdev/memphis/db" "github.com/memphisdev/memphis/memphis_cache" "github.com/memphisdev/memphis/models" @@ -328,6 +332,7 @@ func (s *Server) StartBackgroundTasks() error { go s.RemoveOldProducersAndConsumers() go ScheduledCloudCacheRefresh() go s.SendBillingAlertWhenNeeded() + go s.CheckBrokenConnectedIntegrations() return nil } @@ -637,3 +642,127 @@ func (s *Server) RemoveOldProducersAndConsumers() { } } } + +func (s *Server) CheckBrokenConnectedIntegrations() error { + ticker := time.NewTicker(15 * time.Minute) + for range ticker.C { + _, integrations, err := db.GetAllIntegrations() + if err != nil { + serv.Errorf("CheckBrokenConnectedIntegrations at GetAllIntegrations: %v", err.Error()) + } + + for _, integration := range integrations { + switch integration.Name { + case "github": + if _, ok := integration.Keys["installation_id"].(string); !ok { + integration.Keys["installation_id"] = "" + } + err := testGithubIntegration(integration.Keys["installation_id"].(string)) + if err != nil { + serv.Errorf("CheckBrokenConnectedIntegrations at testGithubIntegration: %v", err.Error()) + err = db.UpdateIsValidIntegration(integration.TenantName, integration.Name, false) + if err != nil { + serv.Errorf("CheckBrokenConnectedIntegrations at UpdateIsValidIntegration: %v", err.Error()) + } + } else { + err = db.UpdateIsValidIntegration(integration.TenantName, integration.Name, true) + if err != nil { + serv.Errorf("CheckBrokenConnectedIntegrations at UpdateIsValidIntegration: %v", err.Error()) + } + } + case "slack": + key := getAESKey() + if _, ok := integration.Keys["auth_token"].(string); !ok { + integration.Keys["auth_token"] = "" + } + if _, ok := integration.Keys["channel_id"].(string); !ok { + integration.Keys["channel_id"] = "" + } + authToken, err := DecryptAES(key, integration.Keys["auth_token"].(string)) + if err != nil { + serv.Errorf("CheckBrokenConnectedIntegrations at DecryptAES: %v", err.Error()) + } + err = testSlackIntegration(authToken, integration.Keys["channel_id"].(string), "Slack integration sanity test for broken connected integration was successful") + if err != nil { + serv.Errorf("CheckBrokenConnectedIntegrations at testSlackIntegration: %v", err.Error()) + err = db.UpdateIsValidIntegration(integration.TenantName, integration.Name, false) + if err != nil { + serv.Errorf("CheckBrokenConnectedIntegrations at UpdateIsValidIntegration: %v", err.Error()) + } + } else { + err = db.UpdateIsValidIntegration(integration.TenantName, integration.Name, true) + if err != nil { + serv.Errorf("CheckBrokenConnectedIntegrations at UpdateIsValidIntegration: %v", err.Error()) + } + } + case "s3": + key := getAESKey() + if _, ok := integration.Keys["access_key"].(string); !ok { + integration.Keys["access_key"] = "" + } + if _, ok := integration.Keys["secret_key"].(string); !ok { + integration.Keys["secret_key"] = "" + } + if _, ok := integration.Keys["region"].(string); !ok { + integration.Keys["region"] = "" + } + if _, ok := integration.Keys["url"].(string); !ok { + integration.Keys["url"] = "" + } + if _, ok := integration.Keys["s3_path_style"].(string); !ok { + integration.Keys["s3_path_style"] = "" + } + if _, ok := integration.Keys["bucket_name"].(string); !ok { + integration.Keys["bucket_name"] = "" + } + accessKey := integration.Keys["access_key"].(string) + secretKey, err := DecryptAES(key, integration.Keys["secret_key"].(string)) + if err != nil { + serv.Errorf("CheckBrokenConnectedIntegrations at DecryptAES: %v", err.Error()) + } + + provider := credentials.NewStaticCredentialsProvider(accessKey, secretKey, "") + _, err = provider.Retrieve(context.Background()) + if err != nil { + if strings.Contains(err.Error(), "static credentials are empty") { + serv.Errorf("CheckBrokenConnectedIntegrations at provider.Retrieve: credentials are empty %v", err.Error()) + } else { + serv.Errorf("CheckBrokenConnectedIntegrations at provider.Retrieve: %v", err.Error()) + } + } + cfg, err := awsconfig.LoadDefaultConfig(context.Background(), + awsconfig.WithCredentialsProvider(provider), + awsconfig.WithRegion(integration.Keys["region"].(string)), + awsconfig.WithEndpointResolverWithOptions(getS3EndpointResolver(integration.Keys["region"].(string), integration.Keys["url"].(string))), + ) + if err != nil { + serv.Errorf("CheckBrokenConnectedIntegrations at awsconfig.LoadDefaultConfig: %v", err.Error()) + } + var usePathStyle bool + svc := s3.NewFromConfig(cfg, func(o *s3.Options) { + switch integration.Keys["s3_path_style"].(string) { + case "true": + usePathStyle = true + case "false": + usePathStyle = false + } + o.UsePathStyle = usePathStyle + }) + _, err = testS3Integration(svc, integration.Keys["bucket_name"].(string), integration.Keys["url"].(string)) + if err != nil { + serv.Errorf("CheckBrokenConnectedIntegrations at testS3Integration: %v", err.Error()) + err = db.UpdateIsValidIntegration(integration.TenantName, integration.Name, false) + if err != nil { + serv.Errorf("CheckBrokenConnectedIntegrations at UpdateIsValidIntegration: %v", err.Error()) + } + } else { + err = db.UpdateIsValidIntegration(integration.TenantName, integration.Name, true) + if err != nil { + serv.Errorf("CheckBrokenConnectedIntegrations at UpdateIsValidIntegration: %v", err.Error()) + } + } + } + } + } + return nil +} diff --git a/server/memphis_handlers_integrations.go b/server/memphis_handlers_integrations.go index 523777775..c9065dd8d 100644 --- a/server/memphis_handlers_integrations.go +++ b/server/memphis_handlers_integrations.go @@ -396,6 +396,7 @@ func (it IntegrationsHandler) GetIntegrationDetails(c *gin.Context) { githubIntegration.Keys = map[string]interface{}{} githubIntegration.Name = sourceCodeIntegration.Name githubIntegration.TenantName = sourceCodeIntegration.TenantName + githubIntegration.IsValid = integration.IsValid githubIntegration.Keys["connected_repos"] = sourceCodeIntegration.Keys["connected_repos"] githubIntegration.Keys["memphis_functions"] = memphisFunctions githubIntegration.Keys["application_name"] = applicationName diff --git a/server/source_code_management_github_cloud.go b/server/source_code_management_github_cloud.go index 45b004f61..bb06832e0 100644 --- a/server/source_code_management_github_cloud.go +++ b/server/source_code_management_github_cloud.go @@ -52,6 +52,10 @@ func getGithubClient(tenantName string) (string, string, *github.Client, error) return "", "", client, nil } +func testGithubIntegration(installationId string) error { + return nil +} + func (s *Server) getGithubRepositories(integration models.Integration, body interface{}) (models.Integration, interface{}, error) { return models.Integration{}, nil, nil }