-
Notifications
You must be signed in to change notification settings - Fork 308
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
chore: added trino and spark test #3525
Merged
Merged
Changes from 10 commits
Commits
Show all changes
12 commits
Select commit
Hold shift + click to select a range
4ff9587
chore: added trino test
achettyiitr 1f6da05
chore: added spark test
achettyiitr 843ba1a
Merge branch 'master' into chore.trino-test
achettyiitr d8bc499
chore: restriction for resources for trino and spark
achettyiitr ac88374
chore: master pull
achettyiitr 41f9ea9
chore: added trino test
achettyiitr a77108a
chore: rebase master
achettyiitr 72ca735
chore: restriction for resources for trino and spark
achettyiitr afafc48
Merge branch 'master' of github.com:rudderlabs/rudder-server into cho…
achettyiitr bc4d69b
chore: using rudderstack hive-metastore image
achettyiitr 0ac4a4b
chore: master pull
achettyiitr c8d815b
chore: review comments
achettyiitr File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -2,6 +2,7 @@ package datalake_test | |
|
||
import ( | ||
"context" | ||
"database/sql" | ||
"encoding/json" | ||
"fmt" | ||
"os" | ||
|
@@ -33,6 +34,8 @@ import ( | |
backendconfig "github.com/rudderlabs/rudder-server/backend-config" | ||
|
||
warehouseutils "github.com/rudderlabs/rudder-server/warehouse/utils" | ||
|
||
_ "github.com/trinodb/trino-go-client/trino" | ||
) | ||
|
||
type gcsTestCredentials struct { | ||
|
@@ -67,7 +70,13 @@ func TestIntegration(t *testing.T) { | |
t.Skip("Skipping tests. Add 'SLOW=1' env var to run test.") | ||
} | ||
|
||
c := testcompose.New(t, compose.FilePaths([]string{"testdata/docker-compose.yml", "../testdata/docker-compose.jobsdb.yml", "../testdata/docker-compose.minio.yml"})) | ||
c := testcompose.New(t, compose.FilePaths([]string{ | ||
"testdata/docker-compose.yml", | ||
"testdata/docker-compose.trino.yml", | ||
"testdata/docker-compose.spark.yml", | ||
"../testdata/docker-compose.jobsdb.yml", | ||
"../testdata/docker-compose.minio.yml", | ||
})) | ||
c.Start(context.Background()) | ||
|
||
misc.Init() | ||
|
@@ -78,6 +87,7 @@ func TestIntegration(t *testing.T) { | |
jobsDBPort := c.Port("jobsDb", 5432) | ||
minioPort := c.Port("minio", 9000) | ||
azurePort := c.Port("azure", 10000) | ||
trinoPort := c.Port("trino", 8080) | ||
|
||
httpPort, err := kithelper.GetFreePort() | ||
require.NoError(t, err) | ||
|
@@ -407,4 +417,145 @@ func TestIntegration(t *testing.T) { | |
} | ||
testhelper.VerifyConfigurationTest(t, dest) | ||
}) | ||
|
||
t.Run("Trino", func(t *testing.T) { | ||
dsn := fmt.Sprintf("http://user@localhost:%d?catalog=minio&schema=default&session_properties=minio.parquet_use_column_index=true", | ||
trinoPort, | ||
) | ||
db, err := sql.Open("trino", dsn) | ||
require.NoError(t, err) | ||
|
||
require.Eventually(t, func() bool { | ||
_, err := db.ExecContext(ctx, "SELECT 1") | ||
return err == nil | ||
}, 60*time.Second, 1*time.Second) | ||
|
||
require.NoError(t, testhelper.WithConstantRetries(func() error { | ||
_, err = db.ExecContext(ctx, ` | ||
CREATE SCHEMA IF NOT EXISTS minio.rudderstack WITH ( | ||
location = 's3a://`+s3BucketName+`/') | ||
`) | ||
return err | ||
})) | ||
|
||
require.NoError(t, testhelper.WithConstantRetries(func() error { | ||
_, err = db.ExecContext(ctx, ` | ||
CREATE TABLE IF NOT EXISTS minio.rudderstack.tracks ( | ||
"_timestamp" TIMESTAMP, | ||
context_destination_id VARCHAR, | ||
context_destination_type VARCHAR, | ||
context_ip VARCHAR, | ||
context_library_name VARCHAR, | ||
context_passed_ip VARCHAR, | ||
context_request_ip VARCHAR, | ||
context_source_id VARCHAR, | ||
context_source_type VARCHAR, | ||
event VARCHAR, | ||
event_text VARCHAR, | ||
id VARCHAR, | ||
original_timestamp TIMESTAMP, | ||
received_at TIMESTAMP, | ||
sent_at TIMESTAMP, | ||
"timestamp" TIMESTAMP, | ||
user_id VARCHAR, | ||
uuid_ts TIMESTAMP | ||
) | ||
WITH ( | ||
external_location = 's3a://`+s3BucketName+`/some-prefix/rudder-datalake/s_3_datalake_integration/tracks/2023/05/12/04/', | ||
format = 'PARQUET' | ||
) | ||
`) | ||
return err | ||
})) | ||
|
||
var count int64 | ||
|
||
require.NoError(t, testhelper.WithConstantRetries(func() error { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I suggest using |
||
return db.QueryRowContext(ctx, ` | ||
select | ||
count(*) | ||
from | ||
minio.rudderstack.tracks | ||
`).Scan(&count) | ||
})) | ||
require.Equal(t, int64(8), count) | ||
|
||
require.NoError(t, testhelper.WithConstantRetries(func() error { | ||
return db.QueryRowContext(ctx, ` | ||
select | ||
count(*) | ||
from | ||
minio.rudderstack.tracks | ||
where | ||
context_destination_id = '`+s3DestinationID+`' | ||
`).Scan(&count) | ||
})) | ||
require.Equal(t, int64(8), count) | ||
}) | ||
|
||
t.Run("Spark", func(t *testing.T) { | ||
_ = c.Exec(ctx, | ||
"spark-master", | ||
"spark-sql", | ||
"-e", | ||
` | ||
CREATE EXTERNAL TABLE tracks ( | ||
_timestamp timestamp, | ||
context_destination_id string, | ||
context_destination_type string, | ||
context_ip string, | ||
context_library_name string, | ||
context_passed_ip string, | ||
context_request_ip string, | ||
context_source_id string, | ||
context_source_type string, | ||
event string, | ||
event_text string, id string, | ||
original_timestamp timestamp, | ||
received_at timestamp, | ||
sent_at timestamp, | ||
timestamp timestamp, | ||
user_id string, | ||
uuid_ts timestamp | ||
) | ||
STORED AS PARQUET | ||
location "s3a://s3-datalake-test/some-prefix/rudder-datalake/s_3_datalake_integration/tracks/2023/05/12/04/"; | ||
`, | ||
"-S", | ||
) | ||
|
||
countOutput := c.Exec(ctx, | ||
"spark-master", | ||
"spark-sql", | ||
"-e", | ||
` | ||
select | ||
count(*) | ||
from | ||
tracks; | ||
`, | ||
"-S", | ||
) | ||
countOutput = strings.ReplaceAll(strings.ReplaceAll(countOutput, "\n", ""), "\r", "") // remove trailing newline | ||
require.NotEmpty(t, countOutput) | ||
require.Equal(t, string(countOutput[len(countOutput)-1]), "8", countOutput) // last character is the count | ||
|
||
filteredCountOutput := c.Exec(ctx, | ||
"spark-master", | ||
"spark-sql", | ||
"-e", | ||
` | ||
select | ||
count(*) | ||
from | ||
tracks | ||
where | ||
context_destination_id = '`+s3DestinationID+`'; | ||
`, | ||
"-S", | ||
) | ||
filteredCountOutput = strings.ReplaceAll(strings.ReplaceAll(filteredCountOutput, "\n", ""), "\r", "") // remove trailing newline | ||
require.NotEmpty(t, filteredCountOutput) | ||
require.Equal(t, string(filteredCountOutput[len(filteredCountOutput)-1]), "8", filteredCountOutput) // last character is the count | ||
}) | ||
} |
46 changes: 46 additions & 0 deletions
46
warehouse/integrations/datalake/testdata/conf/core-site.xml
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,46 @@ | ||
<configuration> | ||
<property> | ||
<name>hive.metastore.uris</name> | ||
<value>thrift://hive-metastore:9083</value> | ||
</property> | ||
<property> | ||
<name>fs.s3.impl</name> | ||
<value>org.apache.hadoop.fs.s3a.S3AFileSystem</value> | ||
</property> | ||
<property> | ||
<name>fs.s3a.impl</name> | ||
<value>org.apache.hadoop.fs.s3a.S3AFileSystem</value> | ||
</property> | ||
<property> | ||
<name>fs.s3n.impl</name> | ||
<value>org.apache.hadoop.fs.s3a.S3AFileSystem</value> | ||
</property> | ||
<property> | ||
<name>s3.impl.disable.cache</name> | ||
<value>true</value> | ||
</property> | ||
<property> | ||
<name>s3a.impl.disable.cache</name> | ||
<value>true</value> | ||
</property> | ||
<property> | ||
<name>s3n.impl.disable.cache</name> | ||
<value>true</value> | ||
</property> | ||
<property> | ||
<name>fs.s3a.endpoint</name> | ||
<value>http://minio:9000</value> | ||
</property> | ||
<property> | ||
<name>fs.s3a.path.style.access</name> | ||
<value>true</value> | ||
</property> | ||
<property> | ||
<name>fs.s3a.access.key</name> | ||
<value>MYACCESSKEY</value> | ||
</property> | ||
<property> | ||
<name>fs.s3a.secret.key</name> | ||
<value>MYSECRETKEY</value> | ||
</property> | ||
</configuration> |
46 changes: 46 additions & 0 deletions
46
warehouse/integrations/datalake/testdata/conf/hive-site.xml
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,46 @@ | ||
<configuration> | ||
<property> | ||
<name>hive.metastore.uris</name> | ||
<value>thrift://hive-metastore:9083</value> | ||
</property> | ||
<property> | ||
<name>fs.s3.impl</name> | ||
<value>org.apache.hadoop.fs.s3a.S3AFileSystem</value> | ||
</property> | ||
<property> | ||
<name>fs.s3a.impl</name> | ||
<value>org.apache.hadoop.fs.s3a.S3AFileSystem</value> | ||
</property> | ||
<property> | ||
<name>fs.s3n.impl</name> | ||
<value>org.apache.hadoop.fs.s3a.S3AFileSystem</value> | ||
</property> | ||
<property> | ||
<name>s3.impl.disable.cache</name> | ||
<value>true</value> | ||
</property> | ||
<property> | ||
<name>s3a.impl.disable.cache</name> | ||
<value>true</value> | ||
</property> | ||
<property> | ||
<name>s3n.impl.disable.cache</name> | ||
<value>true</value> | ||
</property> | ||
<property> | ||
<name>fs.s3a.endpoint</name> | ||
<value>http://minio:9000</value> | ||
</property> | ||
<property> | ||
<name>fs.s3a.path.style.access</name> | ||
<value>true</value> | ||
</property> | ||
<property> | ||
<name>fs.s3a.access.key</name> | ||
<value>MYACCESSKEY</value> | ||
</property> | ||
<property> | ||
<name>fs.s3a.secret.key</name> | ||
<value>MYSECRETKEY</value> | ||
</property> | ||
</configuration> |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We could try more often, we don't even have log pollution here. Every 100ms for example could work.