Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
669 changes: 355 additions & 314 deletions Cargo.lock

Large diffs are not rendered by default.

54 changes: 21 additions & 33 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,51 +2,40 @@
members = ["clade", "object_store_factory"]

[workspace.dependencies]
arrow = { version = "52.2.0", features = ["test_utils"] }
arrow-buffer = "52.2.0"
arrow-csv = "52.2.0"
arrow-flight = "52.2.0"
arrow = { version = "53.2.0", features = ["test_utils"] }
arrow-buffer = "53.2.0"
arrow-csv = "53.2.0"
arrow-flight = "53.2.0"
# For the JSON format support
# https://github.com/apache/arrow-rs/pull/2868
# https://github.com/apache/arrow-rs/pull/2724
arrow-integration-test = "52.2.0"
arrow-row = "52.2.0"
arrow-schema = "52.2.0"
arrow-integration-test = "53.2.0"
arrow-row = "53.2.0"
arrow-schema = "53.2.0"
async-trait = "0.1.83"

datafusion = { version = "41.0.0", features = ["backtrace"] }
datafusion-common = "41.0.0"
datafusion-expr = "41.0.0"
datafusion-functions-nested = "41.0.0"
datafusion = { version = "43.0.0", features = ["backtrace"] }
datafusion-common = "43.0.0"
datafusion-expr = "43.0.0"
datafusion-functions-nested = "43.0.0"

futures = "0.3"

itertools = ">=0.10.0"
object_store = { version = "0.10.2", features = ["aws", "azure", "gcp"] }
prost = "0.12.6"
object_store = { version = "0.11", features = ["aws", "azure", "gcp"] }
prost = "0.13"

serde = "1.0.213"
serde_json = "1.0.132"

tempfile = "3"
tokio = { version = "1.40", features = ["macros", "rt", "rt-multi-thread", "signal", "process"] }
tonic = "0.12"
tracing = { version = "0.1", features = ["log"] }
tracing-log = "0.2"
tracing-subscriber = { version = "0.3.18", features = ["json", "env-filter"] }
url = "2.5"

[patch.crates-io]
arrow = { git = "https://github.com/splitgraph/arrow-rs", branch = "backport-pr6729" }
arrow-array = { git = "https://github.com/splitgraph/arrow-rs", branch = "backport-pr6729" }
arrow-buffer = { git = "https://github.com/splitgraph/arrow-rs", branch = "backport-pr6729" }
arrow-csv = { git = "https://github.com/splitgraph/arrow-rs", branch = "backport-pr6729" }
arrow-data = { git = "https://github.com/splitgraph/arrow-rs", branch = "backport-pr6729" }
arrow-flight = { git = "https://github.com/splitgraph/arrow-rs", branch = "backport-pr6729" }
arrow-integration-test = { git = "https://github.com/splitgraph/arrow-rs", branch = "backport-pr6729" }
arrow-ipc = { git = "https://github.com/splitgraph/arrow-rs", branch = "backport-pr6729" }
arrow-row = { git = "https://github.com/splitgraph/arrow-rs", branch = "backport-pr6729" }
arrow-schema = { git = "https://github.com/splitgraph/arrow-rs", branch = "backport-pr6729" }

[package]
name = "seafowl"
build = "build.rs"
Expand Down Expand Up @@ -95,8 +84,8 @@ clap = { version = "4.5.20", features = [ "derive" ] }
config = "0.14.0"

# PG wire protocol support
convergence = { git = "https://github.com/splitgraph/convergence", branch = "datafusion-41-upgrade", optional = true }
convergence-arrow = { git = "https://github.com/splitgraph/convergence", branch = "datafusion-41-upgrade", optional = true }
convergence = { git = "https://github.com/splitgraph/convergence", branch = "datafusion-43-upgrade", optional = true }
convergence-arrow = { git = "https://github.com/splitgraph/convergence", branch = "datafusion-43-upgrade", optional = true }

dashmap = "6.1.0"

Expand All @@ -107,8 +96,7 @@ datafusion-functions-nested = { workspace = true }

datafusion-remote-tables = { path = "./datafusion_remote_tables", optional = true }

# pr-2975-backport, pick up https://github.com/delta-io/delta-rs/pull/2975
deltalake = { git = "https://github.com/splitgraph/delta-rs", branch = "pr-2975-with-arrow-pr-6729-backport", features = ["datafusion"] }
deltalake = { git = "https://github.com/splitgraph/delta-rs", branch = "fix-decimal-stat-overflow", features = ["datafusion"] }

futures = "0.3"
hex = ">=0.4.0"
Expand All @@ -118,7 +106,7 @@ lazy_static = ">=1.4.0"
metrics = { version = "0.23.0" }
metrics-exporter-prometheus = { version = "0.15.3" }
moka = { version = "0.12.5", default-features = false, features = ["future", "atomic64", "quanta"] }
object_store = { version = "0.10.2", features = ["aws", "azure", "gcp"] }
object_store = { workspace = true }
object_store_factory = { path = "object_store_factory" }
percent-encoding = "2.2.0"
prost = { workspace = true }
Expand All @@ -135,15 +123,15 @@ rustyline = "14.0"
serde = { workspace = true }
serde_json = { workspace = true }
sha2 = ">=0.10.1"
sqlparser = { version = "0.49", features = ["visitor"] }
sqlparser = { version = "0.51", features = ["visitor"] }
sqlx = { version = "0.7.1", features = [ "runtime-tokio-rustls", "sqlite", "any", "uuid" ] }
strum = ">=0.24"
strum_macros = ">=0.24"
tempfile = "3"
thiserror = "2"
tokio = { workspace = true }
tokio-graceful-shutdown = { version = "0.15" }
tonic = { version = "0.11.0", optional = true }
tonic = { workspace = true, optional = true }
tower = "0.5"
tracing = { workspace = true }
tracing-log = "0.2"
Expand All @@ -165,7 +153,7 @@ aws-credential-types = { version = "1.2.1", features = ["hardcoded-credentials"]
aws-sdk-sts = { version = "1.46.0", features = ["behavior-version-latest"] }
rstest = "*"
serial_test = "3"
tonic-reflection = "0.11"
tonic-reflection = "0.12"
wiremock = "0.6"

[build-dependencies]
Expand Down
4 changes: 2 additions & 2 deletions clade/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ license = "Apache-2.0"
[dependencies]
arrow-flight = { workspace = true }
prost = { workspace = true }
tonic = "0.11"
tonic = { workspace = true }

[build-dependencies]
tonic-build = "0.11"
tonic-build = "0.12"
2 changes: 1 addition & 1 deletion clade/build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
.build_server(true)
.build_client(true)
.type_attribute("clade.sync.ColumnDescriptor", "#[derive(Eq, Hash)]")
.compile(&["proto/schema.proto", "proto/sync.proto"], &["proto"])?;
.compile_protos(&["proto/schema.proto", "proto/sync.proto"], &["proto"])?;

Ok(())
}
2 changes: 1 addition & 1 deletion datafusion_remote_tables/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ arrow-schema = { workspace = true }
async-trait = { workspace = true }

# Remote query execution for a variety of DBs
connectorx = { git = "https://github.com/splitgraph/connector-x", branch = "datafusion-41-upgrade", features = [ "dst_arrow", "src_postgres", "src_mysql", "src_sqlite" ] }
connectorx = { git = "https://github.com/splitgraph/connector-x", branch = "datafusion-43-upgrade", features = [ "dst_arrow", "src_postgres", "src_mysql", "src_sqlite" ] }

datafusion = { workspace = true }
datafusion-common = { workspace = true }
Expand Down
1 change: 1 addition & 0 deletions datafusion_remote_tables/src/factory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use std::ops::Deref;
use std::sync::Arc;

/// Factory for creating remote tables
#[derive(Debug)]
pub struct RemoteTableFactory {}

#[async_trait]
Expand Down
1 change: 1 addition & 0 deletions datafusion_remote_tables/src/provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use tokio::task;
use tracing::debug;

// Implementation of a remote table, capable of querying Postgres, MySQL, SQLite, etc...
#[derive(Debug)]
pub struct RemoteTable {
// We manually escape the field names during scans, but expect the user to escape the table name
// appropriately in the remote table definition
Expand Down
2 changes: 1 addition & 1 deletion src/catalog/empty.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use clade::schema::ListSchemaResponse;

use super::CatalogError;

#[derive(Clone)]
#[derive(Debug, Clone)]
pub struct EmptyStore {}

#[async_trait]
Expand Down
2 changes: 1 addition & 1 deletion src/catalog/external.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use tonic::transport::{channel::Channel, Endpoint, Error};
use tonic::Request;

// An external store, facilitated via a remote clade server implementation
#[derive(Clone)]
#[derive(Debug, Clone)]
pub struct ExternalStore {
client: SchemaStoreServiceClient<Channel>,
}
Expand Down
2 changes: 1 addition & 1 deletion src/catalog/memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use crate::catalog::{
use crate::repository::interface::AllDatabaseFunctionsResult;
use clade::schema::ListSchemaResponse;

#[derive(Clone)]
#[derive(Debug, Clone)]
pub struct MemoryStore {
pub schemas: ListSchemaResponse,
}
Expand Down
2 changes: 1 addition & 1 deletion src/catalog/metastore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ type LocationAndOptions = (String, HashMap<String, String>);
// This is the main entrypoint to all individual catalogs for various objects types.
// The intention is to make it extensible and de-coupled from the underlying metastore
// persistence mechanism (such as the presently used `Repository`).
#[derive(Clone)]
#[derive(Debug, Clone)]
pub struct Metastore {
pub catalogs: Arc<dyn CatalogStore>,
pub schemas: Arc<dyn SchemaStore>,
Expand Down
9 changes: 5 additions & 4 deletions src/catalog/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use arrow_schema::Schema;
use async_trait::async_trait;
use clade::schema::ListSchemaResponse;
use datafusion_common::DataFusionError;
use std::fmt::Debug;
use tonic::Status;
use uuid::Uuid;

Expand Down Expand Up @@ -149,7 +150,7 @@ impl From<serde_json::Error> for CreateFunctionError {
pub type CatalogResult<T> = Result<T, CatalogError>;

#[async_trait]
pub trait CatalogStore: Sync + Send {
pub trait CatalogStore: Debug + Sync + Send {
async fn create(&self, _name: &str) -> CatalogResult<()> {
not_impl()
}
Expand All @@ -164,7 +165,7 @@ pub trait CatalogStore: Sync + Send {
}

#[async_trait]
pub trait SchemaStore: Sync + Send {
pub trait SchemaStore: Debug + Sync + Send {
async fn create(&self, _catalog_name: &str, _schema_name: &str) -> CatalogResult<()> {
not_impl()
}
Expand All @@ -187,7 +188,7 @@ pub trait SchemaStore: Sync + Send {
}

#[async_trait]
pub trait TableStore: Sync + Send {
pub trait TableStore: Debug + Sync + Send {
async fn create(
&self,
_catalog_name: &str,
Expand Down Expand Up @@ -275,7 +276,7 @@ pub trait TableStore: Sync + Send {
}

#[async_trait]
pub trait FunctionStore: Sync + Send {
pub trait FunctionStore: Debug + Sync + Send {
async fn create(
&self,
_catalog_name: &str,
Expand Down
1 change: 1 addition & 0 deletions src/catalog/repository.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use crate::repository::interface::{
use crate::wasm_udf::data_types::CreateFunctionDetails;

// The native catalog implementation for Seafowl.
#[derive(Debug)]
pub struct RepositoryStore {
pub repository: Arc<dyn Repository>,
}
Expand Down
2 changes: 1 addition & 1 deletion src/config/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ pub async fn build_context(cfg: schema::SeafowlConfig) -> Result<SeafowlContext>
.with_information_schema(true)
.with_default_catalog_and_schema(DEFAULT_DB, DEFAULT_SCHEMA);

let runtime_env = RuntimeEnv::new(runtime_config)?;
let runtime_env = RuntimeEnv::try_new(runtime_config)?;
let state = build_state_with_table_factories(session_config, Arc::new(runtime_env));
let context = SessionContext::new_with_state(state);

Expand Down
4 changes: 2 additions & 2 deletions src/context/delta.rs
Original file line number Diff line number Diff line change
Expand Up @@ -582,7 +582,7 @@ mod tests {
vec![
(
PART_0_FILE_NAME.to_string(),
1298,
1164,
true,
true,
json!({
Expand All @@ -606,7 +606,7 @@ mod tests {
),
(
PART_1_FILE_NAME.to_string(),
1313,
1176,
true,
true,
json!({
Expand Down
8 changes: 4 additions & 4 deletions src/context/logical.rs
Original file line number Diff line number Diff line change
Expand Up @@ -246,9 +246,9 @@ impl SeafowlContext {
})),
}))
},
Statement::Truncate { table: false, table_name, partitions, .. } => {
Statement::Truncate { table: false, table_names, partitions, .. } => {
let table_name = if partitions.is_none() {
Some(table_name.to_string())
Some(table_names[0].to_string())
} else {
None
};
Expand All @@ -268,10 +268,10 @@ impl SeafowlContext {
})),
}))
}
Statement::Truncate { table: true, table_name, .. } => {
Statement::Truncate { table: true, table_names, .. } => {
Ok(LogicalPlan::Extension(Extension {
node: Arc::new(SeafowlExtensionNode::Truncate(Truncate {
table_name: table_name.to_string(),
table_name: table_names[0].to_string(),
output_schema: Arc::new(DFSchema::empty())
})),
}))
Expand Down
10 changes: 8 additions & 2 deletions src/context/physical.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ use datafusion::{
physical_plan::{ExecutionPlan, SendableRecordBatchStream},
sql::TableReference,
};
use datafusion_common::config::TableParquetOptions;
use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
use datafusion_common::{Column as ColumnExpr, ResolvedTableReference, SchemaReference};
use datafusion_expr::logical_plan::{
Expand Down Expand Up @@ -190,7 +191,7 @@ impl SeafowlContext {
}
LogicalPlan::Dml(DmlStatement {
table_name,
op: WriteOp::InsertInto,
op: WriteOp::Insert(_),
input,
..
}) => {
Expand Down Expand Up @@ -941,7 +942,12 @@ impl SeafowlContext {
let table_path = ListingTableUrl::parse(file_path)?;
let file_format: Arc<dyn FileFormat> = match file_type {
"csv" => Arc::new(CsvFormat::default().with_has_header(has_header)),
"parquet" => Arc::new(ParquetFormat::default()),
"parquet" => {
// TODO: We can remove this once delta-rs supports Utf8View
let mut options = TableParquetOptions::default();
options.global.schema_force_view_types = false;
Arc::new(ParquetFormat::default().with_options(options))
}
_ => {
return Err(Error::Plan(format!(
"File type {file_type:?} not supported!"
Expand Down
Loading