Skip to content

Commit

Permalink
fix: error message for deltalake during test connection (#3883)
Browse files Browse the repository at this point in the history
  • Loading branch information
achettyiitr committed Sep 19, 2023
1 parent e188b94 commit 1fa2f45
Show file tree
Hide file tree
Showing 5 changed files with 20 additions and 16 deletions.
7 changes: 4 additions & 3 deletions warehouse/integrations/clickhouse/clickhouse_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -630,6 +630,7 @@ func TestClickhouse_TestConnection(t *testing.T) {
workspaceID := "test_workspace_id"
namespace := "test_namespace"
provider := "MINIO"
timeout := 5 * time.Second

dsn := fmt.Sprintf("tcp://%s:%d?compress=false&database=%s&password=%s&secure=false&skip_verify=true&username=%s",
"localhost", clickhousePort, databaseName, password, user,
Expand All @@ -653,16 +654,16 @@ func TestClickhouse_TestConnection(t *testing.T) {
},
{
name: "Success",
timeout: warehouseutils.TestConnectionTimeout,
timeout: timeout,
},
{
name: "TLS config",
timeout: warehouseutils.TestConnectionTimeout,
timeout: timeout,
tlConfig: "test-tls-config",
},
{
name: "No such host",
timeout: warehouseutils.TestConnectionTimeout,
timeout: timeout,
wantError: errors.New(`dial tcp: lookup clickhouse`),
host: "clickhouse",
},
Expand Down
2 changes: 1 addition & 1 deletion warehouse/integrations/deltalake/deltalake.go
Original file line number Diff line number Diff line change
Expand Up @@ -1203,7 +1203,7 @@ func (*Deltalake) IsEmpty(context.Context, model.Warehouse) (bool, error) {
func (d *Deltalake) TestConnection(ctx context.Context, _ model.Warehouse) error {
err := d.DB.PingContext(ctx)
if errors.Is(err, context.DeadlineExceeded) {
return fmt.Errorf("connection timeout: %w", err)
return errors.New("connection timeout: verify the availability of the SQL warehouse/cluster on Databricks (this process may take up to 15 minutes). Once the SQL warehouse/cluster is ready, please attempt your connection again")
}
if err != nil {
return fmt.Errorf("pinging: %w", err)
Expand Down
7 changes: 3 additions & 4 deletions warehouse/utils/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,10 +184,9 @@ var DiscardsSchema = map[string]string{
}

const (
LoadFileTypeCsv = "csv"
LoadFileTypeJson = "json"
LoadFileTypeParquet = "parquet"
TestConnectionTimeout = 15 * time.Second
LoadFileTypeCsv = "csv"
LoadFileTypeJson = "json"
LoadFileTypeParquet = "parquet"
)

func Init() {
Expand Down
6 changes: 3 additions & 3 deletions warehouse/validations/validate.go
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,7 @@ func (os *objectStorage) Validate(ctx context.Context) error {
func (c *connections) Validate(ctx context.Context) error {
defer c.manager.Cleanup(ctx)

ctx, cancel := context.WithTimeout(ctx, warehouseutils.TestConnectionTimeout)
ctx, cancel := context.WithTimeout(ctx, queryTimeout)
defer cancel()

return c.manager.TestConnection(ctx, createDummyWarehouse(c.destination))
Expand Down Expand Up @@ -467,7 +467,7 @@ func createFileManager(dest *backendconfig.DestinationT) (filemanager.FileManage
return nil, fmt.Errorf("creating file manager: %w", err)
}

fileManager.SetTimeout(objectStorageValidationTimeout)
fileManager.SetTimeout(objectStorageTimeout)

return fileManager, nil
}
Expand All @@ -485,7 +485,7 @@ func createManager(ctx context.Context, dest *backendconfig.DestinationT) (manag
return nil, fmt.Errorf("getting manager: %w", err)
}

operations.SetConnectionTimeout(warehouseutils.TestConnectionTimeout)
operations.SetConnectionTimeout(queryTimeout)

if err = operations.Setup(ctx, warehouse, &dummyUploader{
dest: dest,
Expand Down
14 changes: 9 additions & 5 deletions warehouse/validations/validations.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,11 @@ const (
)

var (
connectionTestingFolder string
pkgLogger logger.Logger
fileManagerFactory filemanager.Factory
objectStorageValidationTimeout time.Duration
connectionTestingFolder string
pkgLogger logger.Logger
fileManagerFactory filemanager.Factory
objectStorageTimeout time.Duration
queryTimeout time.Duration
)

var (
Expand All @@ -50,7 +51,10 @@ func Init() {
connectionTestingFolder = config.GetString("RUDDER_CONNECTION_TESTING_BUCKET_FOLDER_NAME", misc.RudderTestPayload)
pkgLogger = logger.NewLogger().Child("warehouse").Child("validations")
fileManagerFactory = filemanager.New
objectStorageValidationTimeout = 15 * time.Second
objectStorageTimeout = config.GetDuration("Warehouse.Validations.ObjectStorageTimeout", 15, time.Second)

// Since we have a cp-router default timeout of 30 seconds, keeping the query timeout to 25 seconds
queryTimeout = config.GetDuration("Warehouse.Validations.QueryTimeout", 25, time.Second)
}

// Validate the destination by running all the validation steps
Expand Down

0 comments on commit 1fa2f45

Please sign in to comment.