Skip to content

Commit

Permalink
chore: remove gcs depedency for gcs datalake test using fakegcs
Browse files Browse the repository at this point in the history
  • Loading branch information
achettyiitr committed Jul 3, 2023
1 parent bed100d commit c9e50a5
Show file tree
Hide file tree
Showing 3 changed files with 84 additions and 89 deletions.
161 changes: 74 additions & 87 deletions warehouse/integrations/datalake/datalake_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,17 @@ package datalake_test

import (
"context"
"encoding/json"
"errors"
"fmt"
"os"
"strconv"
"strings"
"testing"
"time"

"cloud.google.com/go/storage"

"google.golang.org/api/option"

"github.com/rudderlabs/compose-test/compose"

"github.com/rudderlabs/rudder-server/testhelper/workspaceConfig"
Expand All @@ -35,33 +38,6 @@ import (
warehouseutils "github.com/rudderlabs/rudder-server/warehouse/utils"
)

type gcsTestCredentials struct {
BucketName string `json:"bucketName"`
Credentials string `json:"credentials"`
}

const gcsTestKey = "BIGQUERY_INTEGRATION_TEST_CREDENTIALS"

func getGCSTestCredentials() (*gcsTestCredentials, error) {
cred, exists := os.LookupEnv(gcsTestKey)
if !exists {
return nil, fmt.Errorf("gcs credentials not found")
}

var credentials gcsTestCredentials
err := json.Unmarshal([]byte(cred), &credentials)
if err != nil {
return nil, fmt.Errorf("failed to unmarshal gcs credentials: %w", err)
}

return &credentials, nil
}

func isGCSTestCredentialsAvailable() bool {
_, err := getGCSTestCredentials()
return err == nil
}

func TestIntegration(t *testing.T) {
if os.Getenv("SLOW") != "1" {
t.Skip("Skipping tests. Add 'SLOW=1' env var to run test.")
Expand All @@ -78,6 +54,7 @@ func TestIntegration(t *testing.T) {
jobsDBPort := c.Port("jobsDb", 5432)
minioPort := c.Port("minio", 9000)
azurePort := c.Port("azure", 10000)
gcsPort := c.Port("gcs", 4443)

httpPort, err := kithelper.GetFreePort()
require.NoError(t, err)
Expand All @@ -95,22 +72,21 @@ func TestIntegration(t *testing.T) {

azContainerName := "azure-datalake-test"
s3BucketName := "s3-datalake-test"
gcsBucketName := "gcs-datalake-test"
azAccountName := "MYACCESSKEY"
azAccountKey := "TVlTRUNSRVRLRVk="
azEndPoint := fmt.Sprintf("localhost:%d", azurePort)
s3Region := "us-east-1"
s3AccessKeyID := "MYACCESSKEY"
s3AccessKey := "MYSECRETKEY"
s3EndPoint := fmt.Sprintf("localhost:%d", minioPort)
gcsEndPoint := fmt.Sprintf("http://localhost:%d/storage/v1/", gcsPort)

accessKeyID := "MYACCESSKEY"
secretAccessKey := "MYSECRETKEY"

minioEndpoint := fmt.Sprintf("localhost:%d", minioPort)

var gcsBucketName string
var gcsCredentials string

templateConfigurations := map[string]any{
"workspaceID": workspaceID,
"azWriteKey": azWriteKey,
Expand All @@ -131,21 +107,8 @@ func TestIntegration(t *testing.T) {
"s3AccessKeyID": s3AccessKeyID,
"s3AccessKey": s3AccessKey,
"s3EndPoint": s3EndPoint,
}
if isGCSTestCredentialsAvailable() {
credentials, err := getGCSTestCredentials()
require.NoError(t, err)

escapedCredentials, err := json.Marshal(credentials.Credentials)
require.NoError(t, err)

escapedCredentialsTrimmedStr := strings.Trim(string(escapedCredentials), `"`)

templateConfigurations["gcsBucketName"] = credentials.BucketName
templateConfigurations["gcsCredentials"] = escapedCredentialsTrimmedStr

gcsBucketName = credentials.BucketName
gcsCredentials = credentials.Credentials
"gcsBucketName": gcsBucketName,
"gcsEndPoint": gcsEndPoint,
}

workspaceConfigPath := workspaceConfig.CreateTempFile(t, "testdata/template.json", templateConfigurations)
Expand All @@ -159,6 +122,8 @@ func TestIntegration(t *testing.T) {
t.Setenv("MINIO_SSL", "false")
t.Setenv("RSERVER_WAREHOUSE_WEB_PORT", strconv.Itoa(httpPort))
t.Setenv("RSERVER_BACKEND_CONFIG_CONFIG_JSONPATH", workspaceConfigPath)
t.Setenv("STORAGE_EMULATOR_HOST", fmt.Sprintf("localhost:%d", gcsPort))
t.Setenv("RSERVER_WORKLOAD_IDENTITY_TYPE", "GKE")

svcDone := make(chan struct{})

Expand Down Expand Up @@ -208,20 +173,7 @@ func TestIntegration(t *testing.T) {
"syncFrequency": "30",
},
prerequisite: func(t testing.TB) {
t.Helper()

const (
secure = false
region = "us-east-1"
)

minioClient, err := minio.New(s3EndPoint, &minio.Options{
Creds: credentials.NewStaticV4(s3AccessKeyID, s3AccessKey, ""),
Secure: secure,
})
require.NoError(t, err)

_ = minioClient.MakeBucket(context.TODO(), s3BucketName, minio.MakeBucketOptions{Region: region})
createMinioBucket(t, ctx, s3EndPoint, s3AccessKeyID, s3AccessKey, s3BucketName, s3Region)
},
stagingFilePrefix: "testdata/upload-job-s3-datalake",
},
Expand All @@ -235,15 +187,13 @@ func TestIntegration(t *testing.T) {
conf: map[string]interface{}{
"bucketName": gcsBucketName,
"prefix": "",
"credentials": gcsCredentials,
"endPoint": gcsEndPoint,
"disableSSL": true,
"jsonReads": true,
"syncFrequency": "30",
},
prerequisite: func(t testing.TB) {
t.Helper()

if !isGCSTestCredentialsAvailable() {
t.Skipf("Skipping %s as %s is not set", t.Name(), gcsTestKey)
}
createGCSBucket(t, ctx, gcsEndPoint, gcsBucketName)
},
stagingFilePrefix: "testdata/upload-job-gcs-datalake",
},
Expand Down Expand Up @@ -316,18 +266,7 @@ func TestIntegration(t *testing.T) {
})

t.Run("S3 DataLake Validation", func(t *testing.T) {
const (
secure = false
region = "us-east-1"
)

minioClient, err := minio.New(s3EndPoint, &minio.Options{
Creds: credentials.NewStaticV4(s3AccessKeyID, s3AccessKey, ""),
Secure: secure,
})
require.NoError(t, err)

_ = minioClient.MakeBucket(context.Background(), s3BucketName, minio.MakeBucketOptions{Region: region})
createMinioBucket(t, ctx, s3EndPoint, s3AccessKeyID, s3AccessKey, s3BucketName, s3Region)

dest := backendconfig.DestinationT{
ID: s3DestinationID,
Expand Down Expand Up @@ -356,19 +295,16 @@ func TestIntegration(t *testing.T) {
})

t.Run("GCS DataLake Validation", func(t *testing.T) {
if !isGCSTestCredentialsAvailable() {
t.Skipf("Skipping %s as %s is not set", t.Name(), gcsTestKey)
}

credentials, err := getGCSTestCredentials()
require.NoError(t, err)
createGCSBucket(t, ctx, gcsEndPoint, gcsBucketName)

dest := backendconfig.DestinationT{
ID: gcsDestinationID,
Config: map[string]interface{}{
"bucketName": credentials.BucketName,
"bucketName": gcsBucketName,
"prefix": "",
"credentials": credentials.Credentials,
"endPoint": gcsEndPoint,
"disableSSL": true,
"jsonReads": true,
"syncFrequency": "30",
},
DestinationDefinition: backendconfig.DestinationDefinitionT{
Expand Down Expand Up @@ -408,3 +344,54 @@ func TestIntegration(t *testing.T) {
testhelper.VerifyConfigurationTest(t, dest)
})
}

func createGCSBucket(t testing.TB, ctx context.Context, endpoint, bucketName string) {
t.Helper()

require.NoError(t, testhelper.WithConstantRetries(func() error {
client, err := storage.NewClient(ctx, option.WithEndpoint(endpoint))
if err != nil {
return fmt.Errorf("create GCS client: %w", err)
}

bucket := client.Bucket(bucketName)

_, err = bucket.Attrs(ctx)
if err == nil {
return nil
}
if !errors.Is(err, storage.ErrBucketNotExist) {
return fmt.Errorf("bucket attrs: %w", err)
}

return bucket.Create(ctx, "test", &storage.BucketAttrs{
Location: "US",
Name: bucketName,
})
}))
}

func createMinioBucket(t testing.TB, ctx context.Context, endpoint, accessKeyId, accessKey, bucketName, region string) {
t.Helper()

require.NoError(t, testhelper.WithConstantRetries(func() error {
minioClient, err := minio.New(endpoint, &minio.Options{
Creds: credentials.NewStaticV4(accessKeyId, accessKey, ""),
Secure: false,
})
if err != nil {
return fmt.Errorf("create minio client: %w", err)
}

exists, err := minioClient.BucketExists(ctx, bucketName)
if err != nil {
return fmt.Errorf("check if bucket exists: %w", err)
}

if exists {
return nil
}

return minioClient.MakeBucket(ctx, bucketName, minio.MakeBucketOptions{Region: region})
}))
}
6 changes: 6 additions & 0 deletions warehouse/integrations/datalake/testdata/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,9 @@ services:
test: nc -z localhost 10000 || exit 1
interval: 1s
retries: 25

gcs:
image: fsouza/fake-gcs-server:latest
ports:
- "4443"
command: [ "-scheme", "http", "-location", "us-east-1", "-backend", "memory" ]
6 changes: 4 additions & 2 deletions warehouse/integrations/datalake/testdata/template.json
Original file line number Diff line number Diff line change
Expand Up @@ -243,8 +243,10 @@
"config": {
"bucketName": "{{.gcsBucketName}}",
"prefix": "",
"credentials": "{{.gcsCredentials}}",
"syncFrequency": "30"
"endPoint": "{{.gcsEndPoint}}",
"syncFrequency": "30",
"disableSSL": true,
"jsonReads": true
},
"liveEventsConfig": {},
"secretConfig": {},
Expand Down

0 comments on commit c9e50a5

Please sign in to comment.