Skip to content

Commit

Permalink
chore: restriction for resources for trino and spark
Browse files Browse the repository at this point in the history
  • Loading branch information
achettyiitr committed Jun 26, 2023
1 parent 843ba1a commit d8bc499
Show file tree
Hide file tree
Showing 4 changed files with 153 additions and 58 deletions.
161 changes: 113 additions & 48 deletions warehouse/integrations/datalake/datalake_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -423,67 +423,132 @@ func TestIntegration(t *testing.T) {
return err == nil
}, 60*time.Second, 1*time.Second)

_, err = db.ExecContext(ctx, `
CREATE SCHEMA IF NOT EXISTS minio.rudderstack WITH (
location = 's3a://`+s3BucketName+`/')
`)
require.NoError(t, err)

_, err = db.ExecContext(ctx, `
CREATE TABLE IF NOT EXISTS minio.rudderstack.tracks (
"_timestamp" TIMESTAMP,
context_destination_id VARCHAR,
context_destination_type VARCHAR,
context_ip VARCHAR,
context_library_name VARCHAR,
context_passed_ip VARCHAR,
context_request_ip VARCHAR,
context_source_id VARCHAR,
context_source_type VARCHAR,
event VARCHAR,
event_text VARCHAR,
id VARCHAR,
original_timestamp TIMESTAMP,
received_at TIMESTAMP,
sent_at TIMESTAMP,
"timestamp" TIMESTAMP,
user_id VARCHAR,
uuid_ts TIMESTAMP
)
WITH (
external_location = 's3a://`+s3BucketName+`/some-prefix/rudder-datalake/s_3_datalake_integration/tracks/2023/05/12/04/',
format = 'PARQUET'
)
`)
require.NoError(t, err)
require.NoError(t, testhelper.WithConstantRetries(func() error {
_, err = db.ExecContext(ctx, `
CREATE SCHEMA IF NOT EXISTS minio.rudderstack WITH (
location = 's3a://`+s3BucketName+`/')
`)
return err
}))

require.NoError(t, testhelper.WithConstantRetries(func() error {
_, err = db.ExecContext(ctx, `
CREATE TABLE IF NOT EXISTS minio.rudderstack.tracks (
"_timestamp" TIMESTAMP,
context_destination_id VARCHAR,
context_destination_type VARCHAR,
context_ip VARCHAR,
context_library_name VARCHAR,
context_passed_ip VARCHAR,
context_request_ip VARCHAR,
context_source_id VARCHAR,
context_source_type VARCHAR,
event VARCHAR,
event_text VARCHAR,
id VARCHAR,
original_timestamp TIMESTAMP,
received_at TIMESTAMP,
sent_at TIMESTAMP,
"timestamp" TIMESTAMP,
user_id VARCHAR,
uuid_ts TIMESTAMP
)
WITH (
external_location = 's3a://`+s3BucketName+`/some-prefix/rudder-datalake/s_3_datalake_integration/tracks/2023/05/12/04/',
format = 'PARQUET'
)
`)
return err
}))

var count int64

err = db.QueryRowContext(ctx, `
select count(*) from minio.rudderstack.tracks
`).Scan(&count)
require.NoError(t, err)
require.NoError(t, testhelper.WithConstantRetries(func() error {
return db.QueryRowContext(ctx, `
select
count(*)
from
minio.rudderstack.tracks
`).Scan(&count)
}))
require.Equal(t, int64(8), count)

err = db.QueryRowContext(ctx, `
select count(*) from minio.rudderstack.tracks where context_destination_id = '`+s3DestinationID+`'
`).Scan(&count)
require.NoError(t, err)
require.NoError(t, testhelper.WithConstantRetries(func() error {
return db.QueryRowContext(ctx, `
select
count(*)
from
minio.rudderstack.tracks
where
context_destination_id = '`+s3DestinationID+`'
`).Scan(&count)
}))
require.Equal(t, int64(8), count)
})

t.Run("Spark", func(t *testing.T) {
_ = c.Exec(ctx, "spark-master", "spark-sql", "-e", `CREATE EXTERNAL TABLE tracks ( _timestamp timestamp, context_destination_id string, context_destination_type string, context_ip string, context_library_name string, context_passed_ip string, context_request_ip string, context_source_id string, context_source_type string, event string, event_text string, id string, original_timestamp timestamp, received_at timestamp, sent_at timestamp, timestamp timestamp, user_id string, uuid_ts timestamp ) STORED AS PARQUET location "s3a://s3-datalake-test/some-prefix/rudder-datalake/s_3_datalake_integration/tracks/2023/05/12/04/";`, "-S")
_ = c.Exec(ctx,
"spark-master",
"spark-sql",
"-e",
`
CREATE EXTERNAL TABLE tracks (
_timestamp timestamp,
context_destination_id string,
context_destination_type string,
context_ip string,
context_library_name string,
context_passed_ip string,
context_request_ip string,
context_source_id string,
context_source_type string,
event string,
event_text string, id string,
original_timestamp timestamp,
received_at timestamp,
sent_at timestamp,
timestamp timestamp,
user_id string,
uuid_ts timestamp
)
STORED AS PARQUET
location "s3a://s3-datalake-test/some-prefix/rudder-datalake/s_3_datalake_integration/tracks/2023/05/12/04/";
`,
"-S",
)

countOutput := c.Exec(ctx, "spark-master", "spark-sql", "-e", `select count(*) from tracks;`, "-S")
countOutput := c.Exec(ctx,
"spark-master",
"spark-sql",
"-e",
`
select
count(*)
from
tracks;
`,
"-S",
)
countOutput = strings.ReplaceAll(strings.ReplaceAll(countOutput, "\n", ""), "\r", "") // remove trailing newline
require.NotEmpty(t, countOutput)
require.Equal(t, string(countOutput[len(countOutput)-1]), "8") // last character is the count

strings.Contains(countOutput, "8")
filteredCountOutput := c.Exec(ctx, "spark-master", "spark-sql", "-e", `select count(*) from tracks where context_destination_id = '`+s3DestinationID+`';`, "-S")
require.Equal(t, string(countOutput[len(countOutput)-1]), "8", countOutput) // last character is the count

filteredCountOutput := c.Exec(ctx,
"spark-master",
"spark-sql",
"-e",
`
select
count(*)
from
tracks
where
context_destination_id = '`+s3DestinationID+`';
`,
"-S",
)
filteredCountOutput = strings.ReplaceAll(strings.ReplaceAll(filteredCountOutput, "\n", ""), "\r", "") // remove trailing newline
require.NotEmpty(t, filteredCountOutput)
require.Equal(t, string(filteredCountOutput[len(filteredCountOutput)-1]), "8") // last character is the count
require.Equal(t, string(filteredCountOutput[len(filteredCountOutput)-1]), "8", filteredCountOutput) // last character is the count
})
}
Original file line number Diff line number Diff line change
@@ -1 +1,4 @@
spark.master.rest.enabled true
spark.sql.parquet.aggregatePushdown true
spark.sql.parquet.recordLevelFilter.enabled true
spark.sql.parquet.fieldId.read.enabled true
10 changes: 10 additions & 0 deletions warehouse/integrations/datalake/testdata/docker-compose.spark.yml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,11 @@ services:
ports:
- 8080
- 7077
deploy:
resources:
limits:
cpus: '1'
memory: 1G

spark-worker:
image: bitnami/spark:3.2.4
Expand All @@ -44,3 +49,8 @@ services:
- 8081
depends_on:
- spark-master
deploy:
resources:
limits:
cpus: '1'
memory: 1G
37 changes: 27 additions & 10 deletions warehouse/integrations/datalake/testdata/docker-compose.trino.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,13 @@ services:
- 8080
volumes:
- ./etc:/etc/trino/:ro

mariadb:
image: mariadb:10.5.8
ports:
- 3306
environment:
MYSQL_ROOT_PASSWORD: admin
MYSQL_USER: admin
MYSQL_PASSWORD: admin
MYSQL_DATABASE: metastore_db
depends_on:
- hive-metastore
deploy:
resources:
limits:
cpus: '2.5'
memory: 1G

hive-metastore:
image: bitsondatadev/hive-metastore:latest
Expand All @@ -27,3 +24,23 @@ services:
METASTORE_DB_HOSTNAME: mariadb
depends_on:
- mariadb
deploy:
resources:
limits:
cpus: '1'
memory: 1G

mariadb:
image: mariadb:10.5.8
ports:
- 3306
environment:
MYSQL_ROOT_PASSWORD: admin
MYSQL_USER: admin
MYSQL_PASSWORD: admin
MYSQL_DATABASE: metastore_db
deploy:
resources:
limits:
cpus: '1'
memory: 1G

0 comments on commit d8bc499

Please sign in to comment.