Skip to content

Commit

Permalink
Add inline flight query tests
Browse files Browse the repository at this point in the history
  • Loading branch information
gruuya committed Jun 18, 2024
1 parent 3da259c commit 563adec
Show file tree
Hide file tree
Showing 8 changed files with 149 additions and 79 deletions.
69 changes: 3 additions & 66 deletions tests/clade/mod.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,14 @@
use crate::fixtures::fake_gcs_creds;
use crate::fixtures::schemas;
use clade::schema::{
schema_store_service_server::{SchemaStoreService, SchemaStoreServiceServer},
ListSchemaRequest, ListSchemaResponse, SchemaObject, StorageLocation, TableObject,
FILE_DESCRIPTOR_SET,
ListSchemaRequest, ListSchemaResponse, FILE_DESCRIPTOR_SET,
};
use datafusion_common::assert_batches_eq;
use object_store::aws::AmazonS3ConfigKey;
use object_store::gcp::GoogleConfigKey;
use rstest::rstest;
use seafowl::catalog::DEFAULT_DB;
use seafowl::config::context::build_context;
use seafowl::config::schema::load_config_from_string;
use seafowl::context::SeafowlContext;
use std::collections::HashMap;
use std::net::SocketAddr;
use std::sync::Arc;
use tokio::net::TcpListener;
Expand Down Expand Up @@ -78,66 +74,7 @@ async fn run_clade_server(addr: SocketAddr) {
// Setup a test metastore with some fake tables in test object stores.
let metastore = TestCladeMetastore {
catalog: DEFAULT_DB.to_string(),
schemas: ListSchemaResponse {
schemas: vec![
SchemaObject {
name: "local".to_string(),
tables: vec![TableObject {
name: "file".to_string(),
path: "delta-0.8.0-partitioned".to_string(),
location: None,
}],
},
SchemaObject {
name: "s3".to_string(),
tables: vec![TableObject {
name: "minio".to_string(),
path: "test-data/delta-0.8.0-partitioned".to_string(),
location: Some("s3://seafowl-test-bucket".to_string()),
}],
},
SchemaObject {
name: "gcs".to_string(),
tables: vec![TableObject {
name: "fake".to_string(),
path: "delta-0.8.0-partitioned".to_string(),
location: Some("gs://test-data".to_string()),
}],
},
],
stores: vec![
StorageLocation {
location: "s3://seafowl-test-bucket".to_string(),
options: HashMap::from([
(
AmazonS3ConfigKey::Endpoint.as_ref().to_string(),
"http://127.0.0.1:9000".to_string(),
),
(
AmazonS3ConfigKey::AccessKeyId.as_ref().to_string(),
"minioadmin".to_string(),
),
(
AmazonS3ConfigKey::SecretAccessKey.as_ref().to_string(),
"minioadmin".to_string(),
),
(
// This has been removed from the config enum, but it can
// still be picked up via `AmazonS3ConfigKey::from_str`
"aws_allow_http".to_string(),
"true".to_string(),
),
]),
},
StorageLocation {
location: "gs://test-data".to_string(),
options: HashMap::from([(
GoogleConfigKey::ServiceAccount.as_ref().to_string(),
fake_gcs_creds(),
)]),
},
],
},
schemas: schemas(),
};

let svc = SchemaStoreServiceServer::new(metastore);
Expand Down
68 changes: 68 additions & 0 deletions tests/fixtures.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
use clade::schema::{ListSchemaResponse, SchemaObject, StorageLocation, TableObject};
use object_store::aws::AmazonS3ConfigKey;
use object_store::gcp::GoogleConfigKey;
use serde_json::json;
use std::collections::HashMap;
use std::path::Path;

pub const FAKE_GCS_CREDS_PATH: &str = "/tmp/fake-gcs-server.json";
Expand All @@ -15,3 +19,67 @@ pub fn fake_gcs_creds() -> String {

google_application_credentials_path.display().to_string()
}

// Return a list of schemas with actual tables and object store configs that are used in testing
pub fn schemas() -> ListSchemaResponse {
ListSchemaResponse {
schemas: vec![
SchemaObject {
name: "local".to_string(),
tables: vec![TableObject {
name: "file".to_string(),
path: "delta-0.8.0-partitioned".to_string(),
location: None,
}],
},
SchemaObject {
name: "s3".to_string(),
tables: vec![TableObject {
name: "minio".to_string(),
path: "test-data/delta-0.8.0-partitioned".to_string(),
location: Some("s3://seafowl-test-bucket".to_string()),
}],
},
SchemaObject {
name: "gcs".to_string(),
tables: vec![TableObject {
name: "fake".to_string(),
path: "delta-0.8.0-partitioned".to_string(),
location: Some("gs://test-data".to_string()),
}],
},
],
stores: vec![
StorageLocation {
location: "s3://seafowl-test-bucket".to_string(),
options: HashMap::from([
(
AmazonS3ConfigKey::Endpoint.as_ref().to_string(),
"http://127.0.0.1:9000".to_string(),
),
(
AmazonS3ConfigKey::AccessKeyId.as_ref().to_string(),
"minioadmin".to_string(),
),
(
AmazonS3ConfigKey::SecretAccessKey.as_ref().to_string(),
"minioadmin".to_string(),
),
(
// This has been removed from the config enum, but it can
// still be picked up via `AmazonS3ConfigKey::from_str`
"aws_allow_http".to_string(),
"true".to_string(),
),
]),
},
StorageLocation {
location: "gs://test-data".to_string(),
options: HashMap::from([(
GoogleConfigKey::ServiceAccount.as_ref().to_string(),
fake_gcs_creds(),
)]),
},
],
}
}
4 changes: 2 additions & 2 deletions tests/flight/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use crate::flight::*;

#[tokio::test]
async fn test_basic_queries() -> Result<()> {
let (context, mut client) = flight_server().await;
let (context, mut client) = flight_server(false).await;
create_table_and_insert(context.as_ref(), "flight_table").await;

// Test the handshake works
Expand All @@ -28,7 +28,7 @@ async fn test_basic_queries() -> Result<()> {

#[tokio::test]
async fn test_ddl_types_roundtrip() -> Result<()> {
let (_context, mut client) = flight_server().await;
let (_context, mut client) = flight_server(false).await;

let all_types_query = r#"
SELECT
Expand Down
35 changes: 35 additions & 0 deletions tests/flight/inline_metastore.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
use crate::flight::*;

#[rstest]
#[should_panic(expected = "External error: Not a Delta table: no log files")]
#[case("local.file", false)]
#[case("local.file", true)]
#[case("s3.minio", true)]
#[case("gcs.fake", true)]
#[tokio::test]
async fn test_inline_query(#[case] table: &str, #[case] local_store: bool) -> () {
let (_context, mut client) = flight_server(local_store).await;

let batches = get_flight_batches_inlined(
&mut client,
format!("SELECT * FROM {table} ORDER BY value"),
schemas(),
)
.await
.unwrap();

let expected = [
"+-------+------+-------+-----+",
"| value | year | month | day |",
"+-------+------+-------+-----+",
"| 1 | 2020 | 1 | 1 |",
"| 2 | 2020 | 2 | 3 |",
"| 3 | 2020 | 2 | 5 |",
"| 4 | 2021 | 4 | 5 |",
"| 5 | 2021 | 12 | 4 |",
"| 6 | 2021 | 12 | 20 |",
"| 7 | 2021 | 12 | 20 |",
"+-------+------+-------+-----+",
];
assert_batches_eq!(expected, &batches);
}
46 changes: 38 additions & 8 deletions tests/flight/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use arrow::array::{BooleanArray, Float64Array, Int32Array, StringArray};
use arrow::record_batch::RecordBatch;
use arrow_flight::encode::{FlightDataEncoder, FlightDataEncoderBuilder};
use arrow_flight::error::{FlightError, Result};
use arrow_flight::sql::{CommandStatementQuery, ProstMessageExt};
use arrow_flight::sql::{Any, CommandStatementQuery, ProstMessageExt};
use arrow_flight::{FlightClient, FlightDescriptor};
use arrow_schema::{DataType, Field, Schema};
use datafusion_common::{assert_batches_eq, assert_batches_sorted_eq};
Expand All @@ -20,8 +20,10 @@ use tonic::transport::Channel;
use uuid::Uuid;
use warp::hyper::Client;

use clade::schema::{InlineMetastoreCommandStatementQuery, ListSchemaResponse};
use clade::sync::{DataSyncCommand, DataSyncResult};

use crate::fixtures::schemas;
use crate::http::{get_metrics, response_text};
use crate::statements::create_table_and_insert;
use crate::{test_seafowl, TestSeafowl};
Expand All @@ -33,19 +35,28 @@ use seafowl::frontend::flight::run_flight_server;

mod client;
mod e2e;
mod inline_metastore;
mod search_path;
mod sync;
mod sync_fail;

async fn make_test_context() -> Arc<SeafowlContext> {
async fn make_test_context(local_store: bool) -> Arc<SeafowlContext> {
// let OS choose a free port
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
let addr = listener.local_addr().unwrap();

let object_store_section = if local_store {
r#"[object_store]
type = "local"
data_dir = "tests/data""#
} else {
r#"[object_store]
type = "memory""#
};

let config_text = format!(
r#"
[object_store]
type = "memory"
{object_store_section}
[catalog]
type = "sqlite"
Expand All @@ -61,13 +72,13 @@ max_replication_lag_s = 1"#,
addr.port()
);

let config = load_config_from_string(&config_text, false, None).unwrap();
let config = load_config_from_string(&config_text, true, None).unwrap();

Arc::from(build_context(config).await.unwrap())
}

async fn flight_server() -> (Arc<SeafowlContext>, FlightClient) {
let context = make_test_context().await;
async fn flight_server(local_store: bool) -> (Arc<SeafowlContext>, FlightClient) {
let context = make_test_context(local_store).await;

let flight_cfg = context
.config
Expand Down Expand Up @@ -100,7 +111,26 @@ async fn get_flight_batches(
query,
transaction_id: None,
};
let request = FlightDescriptor::new_cmd(cmd.as_any().encode_to_vec());
get_flight_batches_inner(client, cmd.as_any()).await
}

async fn get_flight_batches_inlined(
client: &mut FlightClient,
query: String,
schemas: ListSchemaResponse,
) -> Result<Vec<RecordBatch>> {
let cmd = InlineMetastoreCommandStatementQuery {
query,
schemas: Some(schemas),
};
get_flight_batches_inner(client, cmd.as_any()).await
}

async fn get_flight_batches_inner(
client: &mut FlightClient,
message: Any,
) -> Result<Vec<RecordBatch>> {
let request = FlightDescriptor::new_cmd(message.encode_to_vec());
let response = client.get_flight_info(request).await?;

// Get the returned ticket
Expand Down
2 changes: 1 addition & 1 deletion tests/flight/search_path.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use crate::flight::*;
#[tokio::test]
async fn test_default_schema_override(
) -> std::result::Result<(), Box<dyn std::error::Error>> {
let (context, mut client) = flight_server().await;
let (context, mut client) = flight_server(false).await;

context.plan_query("CREATE SCHEMA some_schema").await?;
create_table_and_insert(context.as_ref(), "some_schema.flight_table").await;
Expand Down
2 changes: 1 addition & 1 deletion tests/flight/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ async fn do_put_sync(

#[tokio::test]
async fn test_sync_happy_path() -> std::result::Result<(), Box<dyn std::error::Error>> {
let (ctx, mut client) = flight_server().await;
let (ctx, mut client) = flight_server(false).await;

//
// Sync #1 that creates the table, and dictates the full schema for following syncs
Expand Down
2 changes: 1 addition & 1 deletion tests/flight/sync_fail.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ async fn assert_sync_error(

#[tokio::test]
async fn test_sync_errors() -> std::result::Result<(), Box<dyn std::error::Error>> {
let (_ctx, mut client) = flight_server().await;
let (_ctx, mut client) = flight_server(false).await;

let schema = Arc::new(Schema::new(vec![Field::new(
"old_c1",
Expand Down

0 comments on commit 563adec

Please sign in to comment.