Skip to content

Commit

Permalink
Upgrade to DataFusion 39
Browse files Browse the repository at this point in the history
  • Loading branch information
gruuya committed Jun 19, 2024
1 parent 48262cb commit ea91a22
Show file tree
Hide file tree
Showing 19 changed files with 961 additions and 619 deletions.
1,045 changes: 691 additions & 354 deletions Cargo.lock

Large diffs are not rendered by default.

42 changes: 15 additions & 27 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,21 +2,21 @@
members = ["clade"]

[workspace.dependencies]
arrow = { version = "51.0.0", features = ["test_utils"] }
arrow-buffer = "51.0.0"
arrow-csv = "51.0.0"
arrow-flight = "51.0.0"
arrow = { version = "52.0.0", features = ["test_utils"] }
arrow-buffer = "52.0.0"
arrow-csv = "52.0.0"
arrow-flight = "52.0.0"
# For the JSON format support
# https://github.com/apache/arrow-rs/pull/2868
# https://github.com/apache/arrow-rs/pull/2724
arrow-integration-test = "51.0.0"
arrow-row = "51.0.0"
arrow-schema = "51.0.0"
arrow-integration-test = "52.0.0"
arrow-row = "52.0.0"
arrow-schema = "52.0.0"
async-trait = "0.1.64"

datafusion = "37.1.0"
datafusion-common = "37.1.0"
datafusion-expr = "37.1.0"
datafusion = "39.0.0"
datafusion-common = "39.0.0"
datafusion-expr = "39.0.0"

itertools = ">=0.10.0"
tokio = { version = "1.0", features = ["macros", "rt", "rt-multi-thread", "signal", "process"] }
Expand Down Expand Up @@ -51,18 +51,6 @@ object-store-gcs = ["object_store/gcp"]
object-store-s3 = ["object_store/aws"]
remote-tables = ["dep:datafusion-remote-tables"]

[patch.crates-io]
# Pick up backport of https://github.com/apache/arrow-datafusion/pull/10114
datafusion = { git = "https://github.com/splitgraph/arrow-datafusion", branch = "avg-acc-fix" }
datafusion-common = { git = "https://github.com/splitgraph/arrow-datafusion", branch = "avg-acc-fix" }
datafusion-execution = { git = "https://github.com/splitgraph/arrow-datafusion", branch = "avg-acc-fix" }
datafusion-expr = { git = "https://github.com/splitgraph/arrow-datafusion", branch = "avg-acc-fix" }
datafusion-optimizer = { git = "https://github.com/splitgraph/arrow-datafusion", branch = "avg-acc-fix" }
datafusion-physical-expr = { git = "https://github.com/splitgraph/arrow-datafusion", branch = "avg-acc-fix" }
datafusion-physical-plan = { git = "https://github.com/splitgraph/arrow-datafusion", branch = "avg-acc-fix" }
datafusion-proto = { git = "https://github.com/splitgraph/arrow-datafusion", branch = "avg-acc-fix" }
datafusion-sql = { git = "https://github.com/splitgraph/arrow-datafusion", branch = "avg-acc-fix" }

[dependencies]
arrow = { workspace = true }
arrow-buffer = { workspace = true }
Expand All @@ -84,8 +72,8 @@ clap = { version = "3.2.19", features = [ "derive" ] }
config = "0.13.3"

# PG wire protocol support
convergence = { git = "https://github.com/splitgraph/convergence", branch = "datafusion-37.1-upgrade", optional = true }
convergence-arrow = { git = "https://github.com/splitgraph/convergence", branch = "datafusion-37.1-upgrade", optional = true }
convergence = { git = "https://github.com/splitgraph/convergence", branch = "datafusion-39-upgrade", optional = true }
convergence-arrow = { git = "https://github.com/splitgraph/convergence", branch = "datafusion-39-upgrade", optional = true }

dashmap = "5.4.0"

Expand All @@ -95,7 +83,7 @@ datafusion-expr = { workspace = true }

datafusion-remote-tables = { path = "./datafusion_remote_tables", optional = true }

deltalake = { git = "https://github.com/splitgraph/delta-rs", rev = "5972aab07723fe11243c017f1938b96b70d45810", features = ["datafusion"] }
deltalake = { git = "https://github.com/delta-io/delta-rs", rev = "6205f006f444113c4e5cedf06856c4b5c93a565c", features = ["datafusion"] }

futures = "0.3"
hex = ">=0.4.0"
Expand All @@ -105,7 +93,7 @@ lazy_static = ">=1.4.0"
metrics = { version = "0.22.1" }
metrics-exporter-prometheus = { version = "0.13.1" }
moka = { version = "0.12.5", default-features = false, features = ["future", "atomic64", "quanta"] }
object_store = "0.9.1"
object_store = "0.10.1"
percent-encoding = "2.2.0"
prost = "0.12.1"

Expand All @@ -121,7 +109,7 @@ rustyline = "13.0"
serde = "1.0.156"
serde_json = "1.0.93"
sha2 = ">=0.10.1"
sqlparser = { version = "0.44", features = ["visitor"] }
sqlparser = { version = "0.47", features = ["visitor"] }
sqlx = { version = "0.7.1", features = [ "runtime-tokio-rustls", "sqlite", "any", "uuid" ] }
strum = ">=0.24"
strum_macros = ">=0.24"
Expand Down
2 changes: 1 addition & 1 deletion datafusion_remote_tables/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ arrow-schema = { workspace = true }
async-trait = { workspace = true }

# Remote query execution for a variety of DBs
connectorx = { git = "https://github.com/splitgraph/connector-x", branch = "datafusion-37.1-upgrade", features = [ "dst_arrow", "src_postgres", "src_mysql", "src_sqlite" ] }
connectorx = { git = "https://github.com/splitgraph/connector-x", branch = "datafusion-39-upgrade", features = [ "dst_arrow", "src_postgres", "src_mysql", "src_sqlite" ] }

datafusion = { workspace = true }
datafusion-common = { workspace = true }
Expand Down
2 changes: 1 addition & 1 deletion datafusion_remote_tables/src/factory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ impl TableProviderFactory for RemoteTableFactory {
) -> Result<Arc<dyn TableProvider>> {
let table = RemoteTable::new(
cmd.options
.get("name")
.get("format.name")
.ok_or(DataFusionError::Execution(
"Missing 'name' option".to_string(),
))?
Expand Down
2 changes: 1 addition & 1 deletion datafusion_remote_tables/src/filter_pushdown.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ pub trait FilterPushdownConverter {
}
}

impl<T: FilterPushdownConverter> TreeNodeVisitor for FilterPushdownVisitor<T> {
impl<T: FilterPushdownConverter> TreeNodeVisitor<'_> for FilterPushdownVisitor<T> {
type Node = Expr;

fn f_down(&mut self, expr: &Expr) -> Result<TreeNodeRecursion> {
Expand Down
14 changes: 8 additions & 6 deletions src/config/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,12 +138,13 @@ impl S3 {
bucket: String,
map: &mut HashMap<String, String>,
) -> Result<Self, ConfigError> {
// TODO: This is a bit of a hack. We should probably use a pre-defined config struct.
Ok(S3 {
region: map.get("region").cloned(),
access_key_id: map.remove("access_key_id"),
secret_access_key: map.remove("secret_access_key"),
session_token: map.remove("session_token"),
endpoint: map.remove("endpoint"),
region: map.remove("format.region"),
access_key_id: map.remove("format.access_key_id"),
secret_access_key: map.remove("format.secret_access_key"),
session_token: map.remove("format.session_token"),
endpoint: map.remove("format.endpoint"),
bucket,
prefix: None,
})
Expand All @@ -165,7 +166,8 @@ impl GCS {
GCS {
bucket,
prefix: None,
google_application_credentials: map.remove("google_application_credentials"),
google_application_credentials: map
.remove("format.google_application_credentials"),
}
}
}
Expand Down
51 changes: 28 additions & 23 deletions src/context/delta.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ use std::sync::Arc;
use tempfile::{NamedTempFile, TempPath};

use tokio::fs::File as AsyncFile;
use tokio::io::{AsyncReadExt, AsyncWriteExt, BufReader};
use tokio::io::{AsyncReadExt, BufReader};
use tokio::sync::Semaphore;
use tracing::{debug, info, warn};
use uuid::Uuid;
Expand Down Expand Up @@ -196,10 +196,9 @@ pub async fn plan_to_object_store(
let mut part_buffer =
BytesMut::with_capacity(PARTITION_FILE_MIN_PART_SIZE);

let (multipart_id, mut writer) =
store.put_multipart(&file_name).await?;
let mut multipart_upload = store.put_multipart(&file_name).await?;

let error: std::io::Error;
let error: object_store::Error;
let mut eof_counter = 0;
loop {
match reader.read_buf(&mut part_buffer).await {
Expand All @@ -226,28 +225,35 @@ pub async fn plan_to_object_store(
continue;
}
Ok(_) => {
let part_size = part_buffer.len();
debug!("Uploading part with {} bytes", part_size);
match writer.write_all(&part_buffer[..part_size]).await {
let payload = part_buffer.freeze();
debug!("Uploading part with {} bytes", payload.len());
match multipart_upload.put_part(payload.into()).await {
Ok(_) => {
part_buffer.clear();
part_buffer = BytesMut::with_capacity(
PARTITION_FILE_MIN_PART_SIZE,
);
continue;
}
Err(err) => error = err,
}
}
Err(err) => error = err,
Err(err) => {
error = object_store::Error::Generic {
store: "multipart_upload",
source: Box::new(err),
}
}
}

warn!(
"Aborting multipart partition upload due to an error: {:?}",
error
);
store.abort_multipart(&file_name, &multipart_id).await.ok();
return Err(DataFusionError::IoError(error));
multipart_upload.abort().await.ok();
return Err(DataFusionError::ObjectStore(error));
}

writer.shutdown().await?;
multipart_upload.complete().await?;
}

// Create the corresponding `Add` action; currently we don't support partition columns
Expand All @@ -257,6 +263,8 @@ pub async fn plan_to_object_store(
file_name.to_string(),
size,
&metadata,
-1, // collect stats for all columns
&None::<Vec<String>>,
)?;

Ok(add)
Expand All @@ -277,9 +285,9 @@ pub enum CreateDeltaTableDetails {
}

impl SeafowlContext {
pub async fn create_delta_table<'a>(
&'a self,
name: impl Into<TableReference<'a>>,
pub async fn create_delta_table(
&self,
name: impl Into<TableReference>,
details: CreateDeltaTableDetails,
) -> Result<Arc<DeltaTable>> {
let resolved_ref = self.resolve_table_ref(name);
Expand Down Expand Up @@ -312,7 +320,7 @@ impl SeafowlContext {
let table = CreateBuilder::new()
.with_log_store(table_log_store)
.with_table_name(&*table_name)
.with_columns(delta_schema.fields().clone())
.with_columns(delta_schema.fields().cloned())
.with_comment(format!(
"Created by Seafowl {}",
env!("CARGO_PKG_VERSION")
Expand Down Expand Up @@ -385,9 +393,9 @@ impl SeafowlContext {
}

/// Generate the Delta table builder and execute the write
pub async fn plan_to_delta_table<'a>(
pub async fn plan_to_delta_table(
&self,
name: impl Into<TableReference<'a>>,
name: impl Into<TableReference>,
plan: &Arc<dyn ExecutionPlan>,
) -> Result<DeltaTable> {
let table_uuid = self.get_table_uuid(name).await?;
Expand Down Expand Up @@ -439,17 +447,14 @@ impl SeafowlContext {
Ok(CommitBuilder::default()
.with_actions(actions)
.build(Some(table.snapshot()?), table.log_store(), op)
.map_err(|e| {
DataFusionError::Execution(format!("Transaction commit failed: {e}"))
})?
.await?
.version)
}

// Cleanup the table objects in the storage
pub async fn delete_delta_table<'a>(
&self,
table_name: impl Into<TableReference<'a>>,
table_name: impl Into<TableReference>,
) -> Result<()> {
let table = self.try_get_delta_table(table_name).await?;
let store = table.object_store();
Expand Down Expand Up @@ -702,7 +707,7 @@ mod tests {
.await
.unwrap();

assert_eq!(adds.len(), output_partitions.len(),);
assert_eq!(adds.len(), output_partitions.len());

for i in 0..output_partitions.len() {
assert_eq!(
Expand Down
15 changes: 7 additions & 8 deletions src/context/logical.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,9 @@ use datafusion_expr::logical_plan::{Extension, LogicalPlan};
use deltalake::DeltaTable;
use itertools::Itertools;
use sqlparser::ast::{
AlterTableOperation, CreateFunctionBody, Expr as SqlExpr, FunctionDefinition,
ObjectType, Query, Statement, TableFactor, TableWithJoins, VisitMut,
AlterTableOperation, CreateFunctionBody, Expr as SqlExpr, Expr, Insert, ObjectType,
Query, Statement, TableFactor, TableWithJoins, Value, VisitMut,
};
use std::borrow::Cow;
use std::sync::Arc;
use tracing::debug;

Expand Down Expand Up @@ -114,7 +113,7 @@ impl SeafowlContext {
| Statement::CreateSchema { .. }
| Statement::CreateView { .. }
| Statement::CreateDatabase { .. } => self.inner.state().statement_to_plan(stmt).await,
Statement::Insert{ source: Some(ref mut source), .. } => {
Statement::Insert(Insert{ source: Some(ref mut source), .. }) => {
let state = self.rewrite_time_travel_query(source).await?;
let plan = state.statement_to_plan(stmt).await?;
state.optimize(&plan)
Expand All @@ -134,7 +133,7 @@ impl SeafowlContext {
// We also need to do a analyze round beforehand for type coercion.
let analyzer = Analyzer::new();
let plan = analyzer.execute_and_check(
&plan,
plan,
self.inner.copied_config().options(),
|_, _| {},
)?;
Expand All @@ -144,7 +143,7 @@ impl SeafowlContext {
]
);
let config = OptimizerContext::default();
optimizer.optimize(&plan, &config, |plan: &LogicalPlan, rule: &dyn OptimizerRule| {
optimizer.optimize(plan, &config, |plan: &LogicalPlan, rule: &dyn OptimizerRule| {
debug!(
"After applying rule '{}':\n{}\n",
rule.name(),
Expand Down Expand Up @@ -225,7 +224,7 @@ impl SeafowlContext {
or_replace,
temporary: false,
name,
params: CreateFunctionBody { as_: Some( FunctionDefinition::SingleQuotedDef(details) ), .. },
function_body: Some(CreateFunctionBody::AsBeforeOptions(Expr::Value(Value::SingleQuotedString(details)))),
..
} => {
// We abuse the fact that in CREATE FUNCTION AS [class_name], class_name can be an arbitrary string
Expand Down Expand Up @@ -366,7 +365,7 @@ impl SeafowlContext {
delta_table.load_with_datetime(datetime).await?;
let table_provider_for_version = Arc::from(delta_table);

resolved_ref.table = Cow::Borrowed(name_with_version.as_str());
resolved_ref.table = Arc::from(name_with_version.as_str());

if !session_ctx.table_exist(resolved_ref.clone())? {
session_ctx.register_table(resolved_ref, table_provider_for_version)?;
Expand Down
Loading

0 comments on commit ea91a22

Please sign in to comment.