From a5ee8c7a42b224d7499ca9cfb6915050dc98b59e Mon Sep 17 00:00:00 2001 From: shohamroditimemphis Date: Tue, 3 Oct 2023 17:10:30 +0300 Subject: [PATCH 1/3] add indication for broken integration --- db/db.go | 29 ++++ models/integrations.go | 1 + server/background_tasks.go | 129 ++++++++++++++++++ server/source_code_management_github_cloud.go | 4 + 4 files changed, 163 insertions(+) diff --git a/db/db.go b/db/db.go index 0aec3f2dd..6282b4c78 100644 --- a/db/db.go +++ b/db/db.go @@ -197,6 +197,7 @@ func createTables(MetadataDbClient MetadataStorage) error { SELECT 1 FROM information_schema.tables WHERE table_name = 'integrations' AND table_schema = 'public' ) THEN ALTER TABLE integrations ADD COLUMN IF NOT EXISTS tenant_name VARCHAR NOT NULL DEFAULT '$memphis'; + ALTER TABLE integrations ADD COLUMN IF NOT EXISTS is_valid BOOL NOT NULL DEFAULT true; ALTER TABLE integrations DROP CONSTRAINT IF EXISTS integrations_name_key; ALTER TABLE integrations DROP CONSTRAINT IF EXISTS tenant_name_name; ALTER TABLE integrations ADD CONSTRAINT tenant_name_name UNIQUE(name, tenant_name); @@ -209,6 +210,7 @@ func createTables(MetadataDbClient MetadataStorage) error { keys JSON NOT NULL DEFAULT '{}', properties JSON NOT NULL DEFAULT '{}', tenant_name VARCHAR NOT NULL DEFAULT '$memphis', + is_valid BOOL NOT NULL DEFAULT true, PRIMARY KEY (id), CONSTRAINT fk_tenant_name FOREIGN KEY(tenant_name) @@ -4184,6 +4186,33 @@ func UpdateIntegration(tenantName string, name string, keys map[string]interface return integrations[0], nil } +func UpdateIsValidIntegration(tenantName, integrationName string, isValid bool) error { + ctx, cancelfunc := context.WithTimeout(context.Background(), DbOperationTimeout*time.Second) + defer cancelfunc() + conn, err := MetadataDbClient.Client.Acquire(ctx) + if err != nil { + return err + } + defer conn.Release() + query := `UPDATE integrations SET is_valid = $2 WHERE name = $1 AND tenant_name=$3` + stmt, err := conn.Conn().Prepare(ctx, "update_pending_user", query) + if err != nil { + return err + } + if tenantName != conf.GlobalAccount { + tenantName = strings.ToLower(tenantName) + } + _, err = conn.Conn().Query(ctx, stmt.Name, integrationName, isValid, tenantName) + if err != nil { + return err + } + + return nil + +} + +// + // User Functions func UpdatePendingUser(tenantName, username string, pending bool) error { ctx, cancelfunc := context.WithTimeout(context.Background(), DbOperationTimeout*time.Second) diff --git a/models/integrations.go b/models/integrations.go index 1d94a72bc..80fc758e4 100644 --- a/models/integrations.go +++ b/models/integrations.go @@ -23,6 +23,7 @@ type Integration struct { Keys map[string]interface{} `json:"keys"` Properties map[string]bool `json:"properties"` TenantName string `json:"tenant_name"` + IsValid bool `json:"is_valid"` } type SlackIntegration struct { diff --git a/server/background_tasks.go b/server/background_tasks.go index d4165ce4d..4092ddb5d 100644 --- a/server/background_tasks.go +++ b/server/background_tasks.go @@ -12,11 +12,15 @@ package server import ( + "context" "encoding/json" "errors" "fmt" "sync" + awsconfig "github.com/aws/aws-sdk-go-v2/config" + "github.com/aws/aws-sdk-go-v2/credentials" + "github.com/aws/aws-sdk-go-v2/service/s3" "github.com/memphisdev/memphis/db" "github.com/memphisdev/memphis/memphis_cache" "github.com/memphisdev/memphis/models" @@ -328,6 +332,7 @@ func (s *Server) StartBackgroundTasks() error { go s.RemoveOldProducersAndConsumers() go ScheduledCloudCacheRefresh() go s.SendBillingAlertWhenNeeded() + go s.CheckBrokenConnectedIntegration() return nil } @@ -637,3 +642,127 @@ func (s *Server) RemoveOldProducersAndConsumers() { } } } + +func (s *Server) CheckBrokenConnectedIntegration() error { + ticker := time.NewTicker(15 * time.Minute) + for range ticker.C { + _, integrations, err := db.GetAllIntegrations() + if err != nil { + serv.Errorf("CheckBrokenConnectedIntegration at GetAllIntegrations: %v", err.Error()) + } + + for _, integration := range integrations { + switch integration.Name { + case "github": + if _, ok := integration.Keys["installation_id"].(string); !ok { + integration.Keys["installation_id"] = "" + } + err := testGithubIntegration(integration.Keys["installation_id"].(string)) + if err != nil { + serv.Errorf("CheckBrokenConnectedIntegration at testGithubIntegration: %v", err.Error()) + err = db.UpdateIsValidIntegration(integration.TenantName, integration.Name, false) + if err != nil { + serv.Errorf("CheckBrokenConnectedIntegration at UpdateIsValidIntegration: %v", err.Error()) + } + } else { + err = db.UpdateIsValidIntegration(integration.TenantName, integration.Name, true) + if err != nil { + serv.Errorf("CheckBrokenConnectedIntegration at UpdateIsValidIntegration: %v", err.Error()) + } + } + case "slack": + key := getAESKey() + if _, ok := integration.Keys["auth_token"].(string); !ok { + integration.Keys["auth_token"] = "" + } + if _, ok := integration.Keys["channel_id"].(string); !ok { + integration.Keys["channel_id"] = "" + } + authToken, err := DecryptAES(key, integration.Keys["auth_token"].(string)) + if err != nil { + serv.Errorf("CheckBrokenConnectedIntegration at DecryptAES: %v", err.Error()) + } + err = testSlackIntegration(authToken, integration.Keys["channel_id"].(string), "Slack integration sanity test for broken connected integration was successfully") + if err != nil { + serv.Errorf("CheckBrokenConnectedIntegration at testSlackIntegration: %v", err.Error()) + err = db.UpdateIsValidIntegration(integration.TenantName, integration.Name, false) + if err != nil { + serv.Errorf("CheckBrokenConnectedIntegration at UpdateIsValidIntegration: %v", err.Error()) + } + } else { + err = db.UpdateIsValidIntegration(integration.TenantName, integration.Name, true) + if err != nil { + serv.Errorf("CheckBrokenConnectedIntegration at UpdateIsValidIntegration: %v", err.Error()) + } + } + case "s3": + key := getAESKey() + if _, ok := integration.Keys["access_key"].(string); !ok { + integration.Keys["access_key"] = "" + } + if _, ok := integration.Keys["secret_key"].(string); !ok { + integration.Keys["secret_key"] = "" + } + if _, ok := integration.Keys["region"].(string); !ok { + integration.Keys["region"] = "" + } + if _, ok := integration.Keys["url"].(string); !ok { + integration.Keys["url"] = "" + } + if _, ok := integration.Keys["s3_path_style"].(string); !ok { + integration.Keys["s3_path_style"] = "" + } + if _, ok := integration.Keys["bucket_name"].(string); !ok { + integration.Keys["bucket_name"] = "" + } + accessKey := integration.Keys["access_key"].(string) + secretKey, err := DecryptAES(key, integration.Keys["secret_key"].(string)) + if err != nil { + serv.Errorf("CheckBrokenConnectedIntegration at DecryptAES: %v", err.Error()) + } + + provider := credentials.NewStaticCredentialsProvider(accessKey, secretKey, "") + _, err = provider.Retrieve(context.Background()) + if err != nil { + if strings.Contains(err.Error(), "static credentials are empty") { + serv.Errorf("CheckBrokenConnectedIntegration at provider.Retrieve: credentials are empty %v", err.Error()) + } else { + serv.Errorf("CheckBrokenConnectedIntegration at provider.Retrieve: %v", err.Error()) + } + } + cfg, err := awsconfig.LoadDefaultConfig(context.Background(), + awsconfig.WithCredentialsProvider(provider), + awsconfig.WithRegion(integration.Keys["region"].(string)), + awsconfig.WithEndpointResolverWithOptions(getS3EndpointResolver(integration.Keys["region"].(string), integration.Keys["url"].(string))), + ) + if err != nil { + serv.Errorf("CheckBrokenConnectedIntegration at awsconfig.LoadDefaultConfig: %v", err.Error()) + } + var usePathStyle bool + svc := s3.NewFromConfig(cfg, func(o *s3.Options) { + switch integration.Keys["s3_path_style"].(string) { + case "true": + usePathStyle = true + case "false": + usePathStyle = false + } + o.UsePathStyle = usePathStyle + }) + _, err = testS3Integration(svc, integration.Keys["bucket_name"].(string), integration.Keys["url"].(string)) + if err != nil { + serv.Errorf("CheckBrokenConnectedIntegration at testS3Integration: %v", err.Error()) + err = db.UpdateIsValidIntegration(integration.TenantName, integration.Name, false) + if err != nil { + serv.Errorf("CheckBrokenConnectedIntegration at UpdateIsValidIntegration: %v", err.Error()) + } + } else { + err = db.UpdateIsValidIntegration(integration.TenantName, integration.Name, true) + if err != nil { + serv.Errorf("CheckBrokenConnectedIntegration at UpdateIsValidIntegration: %v", err.Error()) + } + } + } + } + } + return nil +} diff --git a/server/source_code_management_github_cloud.go b/server/source_code_management_github_cloud.go index 45b004f61..bb06832e0 100644 --- a/server/source_code_management_github_cloud.go +++ b/server/source_code_management_github_cloud.go @@ -52,6 +52,10 @@ func getGithubClient(tenantName string) (string, string, *github.Client, error) return "", "", client, nil } +func testGithubIntegration(installationId string) error { + return nil +} + func (s *Server) getGithubRepositories(integration models.Integration, body interface{}) (models.Integration, interface{}, error) { return models.Integration{}, nil, nil } From bc992925485bec251ff279efc1e2d41ce87e0453 Mon Sep 17 00:00:00 2001 From: shohamroditimemphis Date: Tue, 3 Oct 2023 18:01:48 +0300 Subject: [PATCH 2/3] some changes --- server/background_tasks.go | 36 ++++++++++++------------- server/memphis_handlers_integrations.go | 1 + 2 files changed, 19 insertions(+), 18 deletions(-) diff --git a/server/background_tasks.go b/server/background_tasks.go index 4092ddb5d..26a1e3b08 100644 --- a/server/background_tasks.go +++ b/server/background_tasks.go @@ -332,7 +332,7 @@ func (s *Server) StartBackgroundTasks() error { go s.RemoveOldProducersAndConsumers() go ScheduledCloudCacheRefresh() go s.SendBillingAlertWhenNeeded() - go s.CheckBrokenConnectedIntegration() + go s.CheckBrokenConnectedIntegrations() return nil } @@ -643,12 +643,12 @@ func (s *Server) RemoveOldProducersAndConsumers() { } } -func (s *Server) CheckBrokenConnectedIntegration() error { +func (s *Server) CheckBrokenConnectedIntegrations() error { ticker := time.NewTicker(15 * time.Minute) for range ticker.C { _, integrations, err := db.GetAllIntegrations() if err != nil { - serv.Errorf("CheckBrokenConnectedIntegration at GetAllIntegrations: %v", err.Error()) + serv.Errorf("CheckBrokenConnectedIntegrations at GetAllIntegrations: %v", err.Error()) } for _, integration := range integrations { @@ -659,15 +659,15 @@ func (s *Server) CheckBrokenConnectedIntegration() error { } err := testGithubIntegration(integration.Keys["installation_id"].(string)) if err != nil { - serv.Errorf("CheckBrokenConnectedIntegration at testGithubIntegration: %v", err.Error()) + serv.Errorf("CheckBrokenConnectedIntegrations at testGithubIntegration: %v", err.Error()) err = db.UpdateIsValidIntegration(integration.TenantName, integration.Name, false) if err != nil { - serv.Errorf("CheckBrokenConnectedIntegration at UpdateIsValidIntegration: %v", err.Error()) + serv.Errorf("CheckBrokenConnectedIntegrations at UpdateIsValidIntegration: %v", err.Error()) } } else { err = db.UpdateIsValidIntegration(integration.TenantName, integration.Name, true) if err != nil { - serv.Errorf("CheckBrokenConnectedIntegration at UpdateIsValidIntegration: %v", err.Error()) + serv.Errorf("CheckBrokenConnectedIntegrations at UpdateIsValidIntegration: %v", err.Error()) } } case "slack": @@ -680,19 +680,19 @@ func (s *Server) CheckBrokenConnectedIntegration() error { } authToken, err := DecryptAES(key, integration.Keys["auth_token"].(string)) if err != nil { - serv.Errorf("CheckBrokenConnectedIntegration at DecryptAES: %v", err.Error()) + serv.Errorf("CheckBrokenConnectedIntegrations at DecryptAES: %v", err.Error()) } - err = testSlackIntegration(authToken, integration.Keys["channel_id"].(string), "Slack integration sanity test for broken connected integration was successfully") + err = testSlackIntegration(authToken, integration.Keys["channel_id"].(string), "Slack integration sanity test for broken connected integration was successful") if err != nil { - serv.Errorf("CheckBrokenConnectedIntegration at testSlackIntegration: %v", err.Error()) + serv.Errorf("CheckBrokenConnectedIntegrations at testSlackIntegration: %v", err.Error()) err = db.UpdateIsValidIntegration(integration.TenantName, integration.Name, false) if err != nil { - serv.Errorf("CheckBrokenConnectedIntegration at UpdateIsValidIntegration: %v", err.Error()) + serv.Errorf("CheckBrokenConnectedIntegrations at UpdateIsValidIntegration: %v", err.Error()) } } else { err = db.UpdateIsValidIntegration(integration.TenantName, integration.Name, true) if err != nil { - serv.Errorf("CheckBrokenConnectedIntegration at UpdateIsValidIntegration: %v", err.Error()) + serv.Errorf("CheckBrokenConnectedIntegrations at UpdateIsValidIntegration: %v", err.Error()) } } case "s3": @@ -718,16 +718,16 @@ func (s *Server) CheckBrokenConnectedIntegration() error { accessKey := integration.Keys["access_key"].(string) secretKey, err := DecryptAES(key, integration.Keys["secret_key"].(string)) if err != nil { - serv.Errorf("CheckBrokenConnectedIntegration at DecryptAES: %v", err.Error()) + serv.Errorf("CheckBrokenConnectedIntegrations at DecryptAES: %v", err.Error()) } provider := credentials.NewStaticCredentialsProvider(accessKey, secretKey, "") _, err = provider.Retrieve(context.Background()) if err != nil { if strings.Contains(err.Error(), "static credentials are empty") { - serv.Errorf("CheckBrokenConnectedIntegration at provider.Retrieve: credentials are empty %v", err.Error()) + serv.Errorf("CheckBrokenConnectedIntegrations at provider.Retrieve: credentials are empty %v", err.Error()) } else { - serv.Errorf("CheckBrokenConnectedIntegration at provider.Retrieve: %v", err.Error()) + serv.Errorf("CheckBrokenConnectedIntegrations at provider.Retrieve: %v", err.Error()) } } cfg, err := awsconfig.LoadDefaultConfig(context.Background(), @@ -736,7 +736,7 @@ func (s *Server) CheckBrokenConnectedIntegration() error { awsconfig.WithEndpointResolverWithOptions(getS3EndpointResolver(integration.Keys["region"].(string), integration.Keys["url"].(string))), ) if err != nil { - serv.Errorf("CheckBrokenConnectedIntegration at awsconfig.LoadDefaultConfig: %v", err.Error()) + serv.Errorf("CheckBrokenConnectedIntegrations at awsconfig.LoadDefaultConfig: %v", err.Error()) } var usePathStyle bool svc := s3.NewFromConfig(cfg, func(o *s3.Options) { @@ -750,15 +750,15 @@ func (s *Server) CheckBrokenConnectedIntegration() error { }) _, err = testS3Integration(svc, integration.Keys["bucket_name"].(string), integration.Keys["url"].(string)) if err != nil { - serv.Errorf("CheckBrokenConnectedIntegration at testS3Integration: %v", err.Error()) + serv.Errorf("CheckBrokenConnectedIntegrations at testS3Integration: %v", err.Error()) err = db.UpdateIsValidIntegration(integration.TenantName, integration.Name, false) if err != nil { - serv.Errorf("CheckBrokenConnectedIntegration at UpdateIsValidIntegration: %v", err.Error()) + serv.Errorf("CheckBrokenConnectedIntegrations at UpdateIsValidIntegration: %v", err.Error()) } } else { err = db.UpdateIsValidIntegration(integration.TenantName, integration.Name, true) if err != nil { - serv.Errorf("CheckBrokenConnectedIntegration at UpdateIsValidIntegration: %v", err.Error()) + serv.Errorf("CheckBrokenConnectedIntegrations at UpdateIsValidIntegration: %v", err.Error()) } } } diff --git a/server/memphis_handlers_integrations.go b/server/memphis_handlers_integrations.go index 523777775..c9065dd8d 100644 --- a/server/memphis_handlers_integrations.go +++ b/server/memphis_handlers_integrations.go @@ -396,6 +396,7 @@ func (it IntegrationsHandler) GetIntegrationDetails(c *gin.Context) { githubIntegration.Keys = map[string]interface{}{} githubIntegration.Name = sourceCodeIntegration.Name githubIntegration.TenantName = sourceCodeIntegration.TenantName + githubIntegration.IsValid = integration.IsValid githubIntegration.Keys["connected_repos"] = sourceCodeIntegration.Keys["connected_repos"] githubIntegration.Keys["memphis_functions"] = memphisFunctions githubIntegration.Keys["application_name"] = applicationName From e88bdcc36ae4da7c4490a8ad370363d4f76f08e3 Mon Sep 17 00:00:00 2001 From: shohamroditimemphis Date: Tue, 3 Oct 2023 18:05:49 +0300 Subject: [PATCH 3/3] remove spaces --- db/db.go | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/db/db.go b/db/db.go index 6282b4c78..c7e5415b0 100644 --- a/db/db.go +++ b/db/db.go @@ -4195,7 +4195,7 @@ func UpdateIsValidIntegration(tenantName, integrationName string, isValid bool) } defer conn.Release() query := `UPDATE integrations SET is_valid = $2 WHERE name = $1 AND tenant_name=$3` - stmt, err := conn.Conn().Prepare(ctx, "update_pending_user", query) + stmt, err := conn.Conn().Prepare(ctx, "update_is_valid_integration", query) if err != nil { return err } @@ -4206,13 +4206,9 @@ func UpdateIsValidIntegration(tenantName, integrationName string, isValid bool) if err != nil { return err } - return nil - } -// - // User Functions func UpdatePendingUser(tenantName, username string, pending bool) error { ctx, cancelfunc := context.WithTimeout(context.Background(), DbOperationTimeout*time.Second)