Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1,735 changes: 1,207 additions & 528 deletions Cargo.lock

Large diffs are not rendered by default.

29 changes: 16 additions & 13 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,21 +9,22 @@ build = "build.rs"

[dependencies]
# Arrow and DataFusion ecosystem
arrow = "54.0.0"
arrow-array = "54.0.0"
arrow-flight = { version = "54.0.0", features = ["tls"] }
arrow-ipc = { version = "54.0.0", features = ["zstd"] }
arrow-json = "54.0.0"
arrow-schema = { version = "54.0.0", features = ["serde"] }
arrow-select = "54.0.0"
datafusion = "45.0.0"
object_store = { version = "0.11.2", features = [
arrow = "57.0.0"
arrow-array = "57.0.0"
arrow-flight = { version = "57.0.0", features = ["tls-aws-lc","tls-native-roots"] }
arrow-ipc = { version = "57.0.0", features = ["zstd"] }
arrow-json = "57.0.0"
arrow-schema = { version = "57.0.0", features = ["serde"] }
arrow-select = "57.0.0"
# datafusion = "50.3.0"
datafusion = {git="https://github.com/apache/datafusion"}
object_store = { version = "0.12.4", features = [
"cloud",
"aws",
"azure",
"gcp",
] }
parquet = "54.0.0"
parquet = "57.0.0"

# Web server and HTTP-related
actix-cors = "0.7.0"
Expand All @@ -33,8 +34,9 @@ actix-web-prometheus = { version = "0.1" }
actix-web-static-files = "4.0"
http = "0.2.7"
http-auth-basic = "0.3.3"
tonic = { version = "0.12.3", features = ["tls", "transport", "gzip", "zstd", "prost"] }
tonic-web = "0.12.3"
tonic = { version = "0.14.1", features = ["tls-aws-lc", "tls-native-roots", "transport", "gzip", "zstd"] }
tonic-prost = "0.14.1"
tonic-web = "0.14.1"
tower-http = { version = "0.6.1", features = ["cors"] }
url = "2.4.0"

Expand Down Expand Up @@ -126,12 +128,13 @@ itertools = "0.14"
once_cell = "1.20"
rayon = "1.8"
rand = "0.8.5"
regex = "1.7.3"
regex = "1.12.2"
reqwest = { version = "0.11.27", default-features = false, features = [
"rustls-tls",
"json",
"gzip",
"brotli",
"stream"
] } # cannot update cause rustls is not latest `see rustls`
semver = "1.0"
static-files = "0.2"
Expand Down
22 changes: 20 additions & 2 deletions src/catalog/column.rs
Original file line number Diff line number Diff line change
Expand Up @@ -156,8 +156,8 @@ impl TryFrom<&Statistics> for TypedStatistics {
max: *stats.max_opt().expect("Int64 stats max not set"),
}),
Statistics::Int96(stats) => TypedStatistics::Int(Int64Type {
min: stats.min_opt().expect("Int96 stats min not set").to_i64(),
max: stats.max_opt().expect("Int96 stats max not set").to_i64(),
min: int96_to_i64_nanos(stats.min_opt().expect("Int96 stats min not set")),
max: int96_to_i64_nanos(stats.max_opt().expect("Int96 stats max not set")),
}),
Statistics::Float(stats) => TypedStatistics::Float(Float64Type {
min: *stats.min_opt().expect("Float32 stats min not set") as f64,
Expand Down Expand Up @@ -196,3 +196,21 @@ impl TryFrom<&Statistics> for TypedStatistics {
Ok(res)
}
}

// Int96 is a deprecated timestamp format used by legacy Impala files
// Convert to i64 nanoseconds since Unix epoch for statistics
fn int96_to_i64_nanos(int96: &parquet::data_type::Int96) -> i64 {
const JULIAN_DAY_OF_EPOCH: i64 = 2_440_588; // Julian day for 1970-01-01
const SECONDS_PER_DAY: i64 = 86_400;
const NANOS_PER_SECOND: i64 = 1_000_000_000;

// Extract nanoseconds from first 8 bytes (little-endian)
let nanos_of_day = int96.data()[0] as i64 | ((int96.data()[1] as i64) << 32);

// Extract Julian day from last 4 bytes
let julian_day = int96.data()[2] as i64;

// Convert to nanoseconds since Unix epoch
let days_since_epoch = julian_day - JULIAN_DAY_OF_EPOCH;
days_since_epoch * SECONDS_PER_DAY * NANOS_PER_SECOND + nanos_of_day
}
9 changes: 5 additions & 4 deletions src/catalog/manifest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,10 @@
use std::collections::HashMap;

use itertools::Itertools;
use parquet::{file::reader::FileReader, format::SortingColumn};
use parquet::file::{
metadata::{RowGroupMetaData, SortingColumn},
reader::FileReader,
};

use crate::metastore::metastore_traits::MetastoreObject;

Expand Down Expand Up @@ -170,9 +173,7 @@ fn sort_order(
sort_orders
}

fn column_statistics(
row_groups: &[parquet::file::metadata::RowGroupMetaData],
) -> HashMap<String, Column> {
fn column_statistics(row_groups: &[RowGroupMetaData]) -> HashMap<String, Column> {
let mut columns: HashMap<String, Column> = HashMap::new();
for row_group in row_groups {
for col in row_group.columns() {
Expand Down
11 changes: 9 additions & 2 deletions src/parseable/staging/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -342,7 +342,8 @@ mod tests {
types::Int64Type,
};
use arrow_ipc::writer::{
DictionaryTracker, IpcDataGenerator, IpcWriteOptions, StreamWriter, write_message,
CompressionContext, DictionaryTracker, IpcDataGenerator, IpcWriteOptions, StreamWriter,
write_message,
};
use arrow_schema::{DataType, Field, Schema};
use chrono::Utc;
Expand Down Expand Up @@ -433,6 +434,7 @@ mod tests {
let options = IpcWriteOptions::default();
let mut dictionary_tracker = DictionaryTracker::new(error_on_replacement);
let data_gen = IpcDataGenerator {};
let mut compression_context = CompressionContext::default();

let mut buf = Vec::new();
let rb1 = rb(1);
Expand All @@ -446,7 +448,12 @@ mod tests {

for i in (1..=3).cycle().skip(1).take(10000) {
let (_, encoded_message) = data_gen
.encoded_batch(&rb(i), &mut dictionary_tracker, &options)
.encode(
&rb(i),
&mut dictionary_tracker,
&options,
&mut compression_context,
)
.unwrap();
write_message(&mut buf, encoded_message, &options).unwrap();
}
Expand Down
3 changes: 1 addition & 2 deletions src/parseable/streams.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,9 @@ use parquet::{
arrow::ArrowWriter,
basic::Encoding,
file::{
FOOTER_SIZE, properties::WriterProperties, reader::FileReader,
FOOTER_SIZE, metadata::SortingColumn, properties::WriterProperties, reader::FileReader,
serialized_reader::SerializedFileReader,
},
format::SortingColumn,
schema::types::ColumnPath,
};
use relative_path::RelativePathBuf;
Expand Down
7 changes: 4 additions & 3 deletions src/query/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,16 +24,16 @@ use actix_web::Either;
use chrono::NaiveDateTime;
use chrono::{DateTime, Duration, Utc};
use datafusion::arrow::record_batch::RecordBatch;
use datafusion::catalog::resolve_table_references;
use datafusion::common::tree_node::Transformed;
use datafusion::execution::disk_manager::DiskManagerConfig;
use datafusion::execution::disk_manager::DiskManager;
use datafusion::execution::{SendableRecordBatchStream, SessionState, SessionStateBuilder};
use datafusion::logical_expr::expr::Alias;
use datafusion::logical_expr::{
Aggregate, Explain, Filter, LogicalPlan, PlanType, Projection, ToStringifiedPlan,
};
use datafusion::prelude::*;
use datafusion::sql::parser::DFParser;
use datafusion::sql::resolve::resolve_table_references;
use datafusion::sql::sqlparser::dialect::PostgreSqlDialect;
use itertools::Itertools;
use once_cell::sync::Lazy;
Expand Down Expand Up @@ -120,7 +120,7 @@ impl Query {
fn create_session_state(storage: Arc<dyn ObjectStorageProvider>) -> SessionState {
let runtime_config = storage
.get_datafusion_runtime()
.with_disk_manager(DiskManagerConfig::NewOs);
.with_disk_manager_builder(DiskManager::builder());
let (pool_size, fraction) = match PARSEABLE.options.query_memory_pool_size {
Some(size) => (size, 1.),
None => {
Expand Down Expand Up @@ -231,6 +231,7 @@ impl Query {
self.time_range.end.naive_utc(),
);
LogicalPlan::Explain(Explain {
explain_format: plan.explain_format,
verbose: plan.verbose,
stringified_plans: vec![
transformed
Expand Down
Loading
Loading