Skip to content

Commit

Permalink
fix: warehouse flaky test (#3402)
Browse files Browse the repository at this point in the history
  • Loading branch information
achettyiitr committed May 26, 2023
1 parent 990a405 commit 3f88f50
Show file tree
Hide file tree
Showing 2 changed files with 65 additions and 76 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,6 @@ func TestDownloader(t *testing.T) {
require.NoError(t, err)

var (
minioResource *destination.MINIOResource
destType = "POSTGRES"
provider = "MINIO"
workers = 12
Expand Down Expand Up @@ -121,7 +120,7 @@ func TestDownloader(t *testing.T) {
t.Run(tc.name, func(t *testing.T) {
t.Parallel()

minioResource, err = destination.SetupMINIO(pool, t)
minioResource, err := destination.SetupMINIO(pool, t)
require.NoError(t, err)

t.Log("minio:", minioResource.Endpoint)
Expand Down
138 changes: 64 additions & 74 deletions warehouse/validations/validate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,11 +81,9 @@ func TestValidator(t *testing.T) {
t.Run("Object Storage", func(t *testing.T) {
t.Parallel()

tr := setup(t, pool)
pgResource, minioResource := tr.pgResource, tr.minioResource

t.Run("Non Datalakes", func(t *testing.T) {
t.Parallel()
tr := setup(t, pool)
pgResource, minioResource := tr.pgResource, tr.minioResource

v, err := validations.NewValidator(ctx, model.VerifyingObjectStorage, &backendconfig.DestinationT{
DestinationDefinition: backendconfig.DestinationDefinitionT{
Expand All @@ -109,9 +107,7 @@ func TestValidator(t *testing.T) {
})

t.Run("Datalakes", func(t *testing.T) {
t.Parallel()

minioResource, err = destination.SetupMINIO(pool, t)
minioResource, err := destination.SetupMINIO(pool, t)
require.NoError(t, err)

t.Log("minio:", minioResource.Endpoint)
Expand Down Expand Up @@ -148,9 +144,6 @@ func TestValidator(t *testing.T) {
t.Run("Connections", func(t *testing.T) {
t.Parallel()

tr := setup(t, pool)
pgResource, minioResource := tr.pgResource, tr.minioResource

testCases := []struct {
name string
config map[string]interface{}
Expand All @@ -172,7 +165,8 @@ func TestValidator(t *testing.T) {
tc := tc

t.Run(tc.name, func(t *testing.T) {
t.Parallel()
tr := setup(t, pool)
pgResource, minioResource := tr.pgResource, tr.minioResource

conf := map[string]interface{}{
"host": pgResource.Host,
Expand Down Expand Up @@ -212,9 +206,6 @@ func TestValidator(t *testing.T) {
t.Run("Create Schema", func(t *testing.T) {
t.Parallel()

tr := setup(t, pool)
pgResource, minioResource := tr.pgResource, tr.minioResource

var (
password = "test_password"
userWithNoPrivilege = "test_user_with_no_privilege"
Expand All @@ -239,17 +230,18 @@ func TestValidator(t *testing.T) {
},
}

t.Log("Creating users with no privileges")
for _, user := range []string{userWithNoPrivilege} {
_, err = pgResource.DB.Exec(fmt.Sprintf("CREATE USER %s WITH PASSWORD '%s';", user, password))
require.NoError(t, err)
}

for _, tc := range testCases {
tc := tc

t.Run(tc.name, func(t *testing.T) {
t.Parallel()
tr := setup(t, pool)
pgResource, minioResource := tr.pgResource, tr.minioResource

t.Log("Creating users with no privileges")
for _, user := range []string{userWithNoPrivilege} {
_, err = pgResource.DB.Exec(fmt.Sprintf("CREATE USER %s WITH PASSWORD '%s';", user, password))
require.NoError(t, err)
}

conf := map[string]interface{}{
"host": pgResource.Host,
Expand Down Expand Up @@ -290,37 +282,13 @@ func TestValidator(t *testing.T) {
t.Run("Create And Alter Table", func(t *testing.T) {
t.Parallel()

tr := setup(t, pool)
pgResource, minioResource := tr.pgResource, tr.minioResource

var (
password = "test_password"
userWithNoPrivilege = "test_user_with_no_privilege"
userWithCreateTablePrivilege = "test_user_with_create_table_privilege"
userWithAlterPrivilege = "test_user_with_alter_privilege"
)

_, err = pgResource.DB.Exec(fmt.Sprintf("CREATE SCHEMA IF NOT EXISTS %s", namespace))
require.NoError(t, err)

t.Log("Creating users with no privileges")
for _, user := range []string{userWithNoPrivilege, userWithCreateTablePrivilege, userWithAlterPrivilege} {
_, err = pgResource.DB.Exec(fmt.Sprintf("CREATE USER %s WITH PASSWORD '%s';", user, password))
require.NoError(t, err)
}

t.Log("Granting create table privilege to users")
for _, user := range []string{userWithCreateTablePrivilege, userWithAlterPrivilege} {
_, err = pgResource.DB.Exec(fmt.Sprintf("GRANT CREATE ON SCHEMA %s TO %s;", namespace, user))
require.NoError(t, err)
}

t.Log("Granting insert privilege to users")
for _, user := range []string{userWithAlterPrivilege} {
_, err = pgResource.DB.Exec(fmt.Sprintf("GRANT USAGE ON SCHEMA %s TO %s;", namespace, user))
require.NoError(t, err)
}

testCases := []struct {
name string
config map[string]interface{}
Expand Down Expand Up @@ -358,6 +326,30 @@ func TestValidator(t *testing.T) {
tc := tc

t.Run(tc.name, func(t *testing.T) {
tr := setup(t, pool)
pgResource, minioResource := tr.pgResource, tr.minioResource

_, err = pgResource.DB.Exec(fmt.Sprintf("CREATE SCHEMA IF NOT EXISTS %s", namespace))
require.NoError(t, err)

t.Log("Creating users with no privileges")
for _, user := range []string{userWithNoPrivilege, userWithCreateTablePrivilege, userWithAlterPrivilege} {
_, err = pgResource.DB.Exec(fmt.Sprintf("CREATE USER %s WITH PASSWORD '%s';", user, password))
require.NoError(t, err)
}

t.Log("Granting create table privilege to users")
for _, user := range []string{userWithCreateTablePrivilege, userWithAlterPrivilege} {
_, err = pgResource.DB.Exec(fmt.Sprintf("GRANT CREATE ON SCHEMA %s TO %s;", namespace, user))
require.NoError(t, err)
}

t.Log("Granting insert privilege to users")
for _, user := range []string{userWithAlterPrivilege} {
_, err = pgResource.DB.Exec(fmt.Sprintf("GRANT USAGE ON SCHEMA %s TO %s;", namespace, user))
require.NoError(t, err)
}

conf := map[string]interface{}{
"host": pgResource.Host,
"port": pgResource.Port,
Expand Down Expand Up @@ -398,8 +390,6 @@ func TestValidator(t *testing.T) {
})

t.Run("Fetch schema", func(t *testing.T) {
t.Parallel()

tr := setup(t, pool)
pgResource, minioResource := tr.pgResource, tr.minioResource

Expand Down Expand Up @@ -436,40 +426,13 @@ func TestValidator(t *testing.T) {
t.Run("Load table", func(t *testing.T) {
t.Parallel()

tr := setup(t, pool)
pgResource, minioResource := tr.pgResource, tr.minioResource

var (
password = "test_password"
userWithNoPrivilege = "test_user_with_no_privilege"
userWithCreateTablePrivilege = "test_user_with_create_table_privilege"
userWithInsertPrivilege = "test_user_with_insert_privilege"
)

_, err = pgResource.DB.Exec(fmt.Sprintf("CREATE SCHEMA IF NOT EXISTS %s", namespace))
require.NoError(t, err)

t.Log("Creating users with no privileges")
for _, user := range []string{userWithNoPrivilege, userWithCreateTablePrivilege, userWithInsertPrivilege} {
_, err = pgResource.DB.Exec(fmt.Sprintf("CREATE USER %s WITH PASSWORD '%s';", user, password))
require.NoError(t, err)
}

t.Log("Granting create table privilege to users")
for _, user := range []string{userWithCreateTablePrivilege, userWithInsertPrivilege} {
_, err = pgResource.DB.Exec(fmt.Sprintf("GRANT CREATE ON SCHEMA %s TO %s;", namespace, user))
require.NoError(t, err)
}

t.Log("Granting insert privilege to users")
for _, user := range []string{userWithInsertPrivilege} {
_, err = pgResource.DB.Exec(fmt.Sprintf("GRANT USAGE ON SCHEMA %s TO %s;", namespace, user))
require.NoError(t, err)

_, err = pgResource.DB.Exec(fmt.Sprintf("GRANT INSERT ON ALL TABLES IN SCHEMA %s TO %s;", namespace, user))
require.NoError(t, err)
}

testCases := []struct {
name string
config map[string]interface{}
Expand Down Expand Up @@ -516,6 +479,33 @@ func TestValidator(t *testing.T) {
tc := tc

t.Run(tc.name, func(t *testing.T) {
tr := setup(t, pool)
pgResource, minioResource := tr.pgResource, tr.minioResource

_, err = pgResource.DB.Exec(fmt.Sprintf("CREATE SCHEMA IF NOT EXISTS %s", namespace))
require.NoError(t, err)

t.Log("Creating users with no privileges")
for _, user := range []string{userWithNoPrivilege, userWithCreateTablePrivilege, userWithInsertPrivilege} {
_, err = pgResource.DB.Exec(fmt.Sprintf("CREATE USER %s WITH PASSWORD '%s';", user, password))
require.NoError(t, err)
}

t.Log("Granting create table privilege to users")
for _, user := range []string{userWithCreateTablePrivilege, userWithInsertPrivilege} {
_, err = pgResource.DB.Exec(fmt.Sprintf("GRANT CREATE ON SCHEMA %s TO %s;", namespace, user))
require.NoError(t, err)
}

t.Log("Granting insert privilege to users")
for _, user := range []string{userWithInsertPrivilege} {
_, err = pgResource.DB.Exec(fmt.Sprintf("GRANT USAGE ON SCHEMA %s TO %s;", namespace, user))
require.NoError(t, err)

_, err = pgResource.DB.Exec(fmt.Sprintf("GRANT INSERT ON ALL TABLES IN SCHEMA %s TO %s;", namespace, user))
require.NoError(t, err)
}

conf := map[string]interface{}{
"host": pgResource.Host,
"port": pgResource.Port,
Expand Down

0 comments on commit 3f88f50

Please sign in to comment.