Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Indication for broken integration #1352

Merged
merged 3 commits into from
Oct 4, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions db/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,7 @@ func createTables(MetadataDbClient MetadataStorage) error {
SELECT 1 FROM information_schema.tables WHERE table_name = 'integrations' AND table_schema = 'public'
) THEN
ALTER TABLE integrations ADD COLUMN IF NOT EXISTS tenant_name VARCHAR NOT NULL DEFAULT '$memphis';
ALTER TABLE integrations ADD COLUMN IF NOT EXISTS is_valid BOOL NOT NULL DEFAULT true;
ALTER TABLE integrations DROP CONSTRAINT IF EXISTS integrations_name_key;
ALTER TABLE integrations DROP CONSTRAINT IF EXISTS tenant_name_name;
ALTER TABLE integrations ADD CONSTRAINT tenant_name_name UNIQUE(name, tenant_name);
Expand All @@ -209,6 +210,7 @@ func createTables(MetadataDbClient MetadataStorage) error {
keys JSON NOT NULL DEFAULT '{}',
properties JSON NOT NULL DEFAULT '{}',
tenant_name VARCHAR NOT NULL DEFAULT '$memphis',
is_valid BOOL NOT NULL DEFAULT true,
PRIMARY KEY (id),
CONSTRAINT fk_tenant_name
FOREIGN KEY(tenant_name)
Expand Down Expand Up @@ -4184,6 +4186,29 @@ func UpdateIntegration(tenantName string, name string, keys map[string]interface
return integrations[0], nil
}

func UpdateIsValidIntegration(tenantName, integrationName string, isValid bool) error {
ctx, cancelfunc := context.WithTimeout(context.Background(), DbOperationTimeout*time.Second)
defer cancelfunc()
conn, err := MetadataDbClient.Client.Acquire(ctx)
if err != nil {
return err
}
defer conn.Release()
query := `UPDATE integrations SET is_valid = $2 WHERE name = $1 AND tenant_name=$3`
stmt, err := conn.Conn().Prepare(ctx, "update_is_valid_integration", query)
if err != nil {
return err
}
if tenantName != conf.GlobalAccount {
tenantName = strings.ToLower(tenantName)
}
_, err = conn.Conn().Query(ctx, stmt.Name, integrationName, isValid, tenantName)
if err != nil {
return err
}
return nil
}

// User Functions
func UpdatePendingUser(tenantName, username string, pending bool) error {
ctx, cancelfunc := context.WithTimeout(context.Background(), DbOperationTimeout*time.Second)
Expand Down
1 change: 1 addition & 0 deletions models/integrations.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ type Integration struct {
Keys map[string]interface{} `json:"keys"`
Properties map[string]bool `json:"properties"`
TenantName string `json:"tenant_name"`
IsValid bool `json:"is_valid"`
}

type SlackIntegration struct {
Expand Down
129 changes: 129 additions & 0 deletions server/background_tasks.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,15 @@
package server

import (
"context"
"encoding/json"
"errors"
"fmt"
"sync"

awsconfig "github.com/aws/aws-sdk-go-v2/config"
"github.com/aws/aws-sdk-go-v2/credentials"
"github.com/aws/aws-sdk-go-v2/service/s3"
"github.com/memphisdev/memphis/db"
"github.com/memphisdev/memphis/memphis_cache"
"github.com/memphisdev/memphis/models"
Expand Down Expand Up @@ -328,6 +332,7 @@ func (s *Server) StartBackgroundTasks() error {
go s.RemoveOldProducersAndConsumers()
go ScheduledCloudCacheRefresh()
go s.SendBillingAlertWhenNeeded()
go s.CheckBrokenConnectedIntegrations()

return nil
}
Expand Down Expand Up @@ -637,3 +642,127 @@ func (s *Server) RemoveOldProducersAndConsumers() {
}
}
}

func (s *Server) CheckBrokenConnectedIntegrations() error {
ticker := time.NewTicker(15 * time.Minute)
for range ticker.C {
_, integrations, err := db.GetAllIntegrations()
if err != nil {
serv.Errorf("CheckBrokenConnectedIntegrations at GetAllIntegrations: %v", err.Error())
}

for _, integration := range integrations {
switch integration.Name {
case "github":
if _, ok := integration.Keys["installation_id"].(string); !ok {
integration.Keys["installation_id"] = ""
}
err := testGithubIntegration(integration.Keys["installation_id"].(string))
if err != nil {
serv.Errorf("CheckBrokenConnectedIntegrations at testGithubIntegration: %v", err.Error())
err = db.UpdateIsValidIntegration(integration.TenantName, integration.Name, false)
if err != nil {
serv.Errorf("CheckBrokenConnectedIntegrations at UpdateIsValidIntegration: %v", err.Error())
}
} else {
err = db.UpdateIsValidIntegration(integration.TenantName, integration.Name, true)
if err != nil {
serv.Errorf("CheckBrokenConnectedIntegrations at UpdateIsValidIntegration: %v", err.Error())
}
}
case "slack":
key := getAESKey()
if _, ok := integration.Keys["auth_token"].(string); !ok {
integration.Keys["auth_token"] = ""
}
if _, ok := integration.Keys["channel_id"].(string); !ok {
integration.Keys["channel_id"] = ""
}
authToken, err := DecryptAES(key, integration.Keys["auth_token"].(string))
if err != nil {
serv.Errorf("CheckBrokenConnectedIntegrations at DecryptAES: %v", err.Error())
}
err = testSlackIntegration(authToken, integration.Keys["channel_id"].(string), "Slack integration sanity test for broken connected integration was successful")
if err != nil {
serv.Errorf("CheckBrokenConnectedIntegrations at testSlackIntegration: %v", err.Error())
err = db.UpdateIsValidIntegration(integration.TenantName, integration.Name, false)
if err != nil {
serv.Errorf("CheckBrokenConnectedIntegrations at UpdateIsValidIntegration: %v", err.Error())
}
} else {
err = db.UpdateIsValidIntegration(integration.TenantName, integration.Name, true)
if err != nil {
serv.Errorf("CheckBrokenConnectedIntegrations at UpdateIsValidIntegration: %v", err.Error())
}
}
case "s3":
key := getAESKey()
if _, ok := integration.Keys["access_key"].(string); !ok {
integration.Keys["access_key"] = ""
}
if _, ok := integration.Keys["secret_key"].(string); !ok {
integration.Keys["secret_key"] = ""
}
if _, ok := integration.Keys["region"].(string); !ok {
integration.Keys["region"] = ""
}
if _, ok := integration.Keys["url"].(string); !ok {
integration.Keys["url"] = ""
}
if _, ok := integration.Keys["s3_path_style"].(string); !ok {
integration.Keys["s3_path_style"] = ""
}
if _, ok := integration.Keys["bucket_name"].(string); !ok {
integration.Keys["bucket_name"] = ""
}
accessKey := integration.Keys["access_key"].(string)
secretKey, err := DecryptAES(key, integration.Keys["secret_key"].(string))
if err != nil {
serv.Errorf("CheckBrokenConnectedIntegrations at DecryptAES: %v", err.Error())
}

provider := credentials.NewStaticCredentialsProvider(accessKey, secretKey, "")
_, err = provider.Retrieve(context.Background())
if err != nil {
if strings.Contains(err.Error(), "static credentials are empty") {
serv.Errorf("CheckBrokenConnectedIntegrations at provider.Retrieve: credentials are empty %v", err.Error())
} else {
serv.Errorf("CheckBrokenConnectedIntegrations at provider.Retrieve: %v", err.Error())
}
}
cfg, err := awsconfig.LoadDefaultConfig(context.Background(),
awsconfig.WithCredentialsProvider(provider),
awsconfig.WithRegion(integration.Keys["region"].(string)),
awsconfig.WithEndpointResolverWithOptions(getS3EndpointResolver(integration.Keys["region"].(string), integration.Keys["url"].(string))),
)
if err != nil {
serv.Errorf("CheckBrokenConnectedIntegrations at awsconfig.LoadDefaultConfig: %v", err.Error())
}
var usePathStyle bool
svc := s3.NewFromConfig(cfg, func(o *s3.Options) {
switch integration.Keys["s3_path_style"].(string) {
case "true":
usePathStyle = true
case "false":
usePathStyle = false
}
o.UsePathStyle = usePathStyle
})
_, err = testS3Integration(svc, integration.Keys["bucket_name"].(string), integration.Keys["url"].(string))
if err != nil {
serv.Errorf("CheckBrokenConnectedIntegrations at testS3Integration: %v", err.Error())
err = db.UpdateIsValidIntegration(integration.TenantName, integration.Name, false)
if err != nil {
serv.Errorf("CheckBrokenConnectedIntegrations at UpdateIsValidIntegration: %v", err.Error())
}
} else {
err = db.UpdateIsValidIntegration(integration.TenantName, integration.Name, true)
if err != nil {
serv.Errorf("CheckBrokenConnectedIntegrations at UpdateIsValidIntegration: %v", err.Error())
}
}
}
}
}
return nil
}
1 change: 1 addition & 0 deletions server/memphis_handlers_integrations.go
Original file line number Diff line number Diff line change
Expand Up @@ -396,6 +396,7 @@ func (it IntegrationsHandler) GetIntegrationDetails(c *gin.Context) {
githubIntegration.Keys = map[string]interface{}{}
githubIntegration.Name = sourceCodeIntegration.Name
githubIntegration.TenantName = sourceCodeIntegration.TenantName
githubIntegration.IsValid = integration.IsValid
githubIntegration.Keys["connected_repos"] = sourceCodeIntegration.Keys["connected_repos"]
githubIntegration.Keys["memphis_functions"] = memphisFunctions
githubIntegration.Keys["application_name"] = applicationName
Expand Down
4 changes: 4 additions & 0 deletions server/source_code_management_github_cloud.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,10 @@ func getGithubClient(tenantName string) (string, string, *github.Client, error)
return "", "", client, nil
}

func testGithubIntegration(installationId string) error {
return nil
}

func (s *Server) getGithubRepositories(integration models.Integration, body interface{}) (models.Integration, interface{}, error) {
return models.Integration{}, nil, nil
}
Expand Down