Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: use latest influxdb3_core changes #24982

Merged
merged 6 commits into from
May 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1,735 changes: 749 additions & 986 deletions Cargo.lock

Large diffs are not rendered by default.

105 changes: 53 additions & 52 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,13 @@ license = "MIT OR Apache-2.0"

[workspace.dependencies]
anyhow = "1.0"
arrow = { version = "50.0.0", features = ["prettyprint", "chrono-tz"] }
arrow-array = "50.0.0"
arrow-buffer = "50.0.0"
arrow-csv = "50.0.0"
arrow-flight = { version = "50.0.0", features = ["flight-sql-experimental"] }
arrow-json = "50.0.0"
arrow-schema = "50.0.0"
arrow = { version = "51.0.0", features = ["prettyprint", "chrono-tz"] }
arrow-array = "51.0.0"
arrow-buffer = "51.0.0"
arrow-csv = "51.0.0"
arrow-flight = { version = "51.0.0", features = ["flight-sql-experimental"] }
arrow-json = "51.0.0"
arrow-schema = "51.0.0"
assert_cmd = "2.0.14"
async-trait = "0.1"
backtrace = "0.3"
Expand All @@ -49,8 +49,8 @@ chrono = "0.4"
clap = { version = "4", features = ["derive", "env", "string"] }
crc32fast = "1.2.0"
crossbeam-channel = "0.5.11"
datafusion = { git = "https://github.com/influxdata/arrow-datafusion.git", rev = "581e74785b876615d6a63db8c2e5ba372bf78828" }
datafusion-proto = { git = "https://github.com/influxdata/arrow-datafusion.git", rev = "581e74785b876615d6a63db8c2e5ba372bf78828" }
datafusion = { git = "https://github.com/apache/datafusion.git", rev = "e0245296792eabdc35e83e8c5872345ff38c1fdf" }
datafusion-proto = { git = "https://github.com/apache/datafusion.git", rev = "e0245296792eabdc35e83e8c5872345ff38c1fdf" }
csv = "1.3.0"
dotenvy = "0.15.7"
flate2 = "1.0.27"
Expand All @@ -67,7 +67,7 @@ num_cpus = "1.16.0"
object_store = "0.9.1"
once_cell = { version = "1.18", features = ["parking_lot"] }
parking_lot = "0.12.1"
parquet = { version = "50.0.0", features = ["object_store"] }
parquet = { version = "51.0.0", features = ["object_store"] }
pbjson = "0.6.0"
pbjson-build = "0.6.2"
pbjson-types = "0.6.0"
Expand All @@ -80,7 +80,6 @@ rand = "0.8.5"
reqwest = { version = "0.11.24", default-features = false, features = ["rustls-tls", "stream"] }
secrecy = "0.8.0"
serde = { version = "1.0", features = ["derive"] }
serde_arrow = { version = "0.10", features = ["arrow-50"] }
serde_json = "1.0"
serde_urlencoded = "0.7.0"
sha2 = "0.10.8"
Expand All @@ -90,48 +89,49 @@ sysinfo = "0.30.8"
thiserror = "1.0"
tokio = { version = "1.35", features = ["full"] }
tokio-util = "0.7.9"
tonic = { version = "0.10.2", features = ["tls", "tls-roots"] }
tonic-build = "0.10.2"
tonic-health = "0.10.2"
tonic-reflection = "0.10.2"
tonic = { version = "0.11.0", features = ["tls", "tls-roots"] }
tonic-build = "0.11.0"
tonic-health = "0.11.0"
tonic-reflection = "0.11.0"
tower = "0.4.13"
unicode-segmentation = "1.11.0"
url = "2.5.0"
urlencoding = "1.1"
uuid = { version = "1", features = ["v4"] }

# Core.git crates we depend on
arrow_util = { git = "https://github.com/influxdata/influxdb3_core", rev = "b546e7f86ee9adbff0dd3c5e687140848397604a"}
authz = { git = "https://github.com/influxdata/influxdb3_core", rev = "b546e7f86ee9adbff0dd3c5e687140848397604a", features = ["http"] }
clap_blocks = { git = "https://github.com/influxdata/influxdb3_core", rev = "b546e7f86ee9adbff0dd3c5e687140848397604a" }
data_types = { git = "https://github.com/influxdata/influxdb3_core", rev = "b546e7f86ee9adbff0dd3c5e687140848397604a" }
datafusion_util = { git = "https://github.com/influxdata/influxdb3_core", rev = "b546e7f86ee9adbff0dd3c5e687140848397604a" }
influxdb-line-protocol = { git = "https://github.com/influxdata/influxdb3_core", rev = "b546e7f86ee9adbff0dd3c5e687140848397604a" }
influxdb_influxql_parser = { git = "https://github.com/influxdata/influxdb3_core", rev = "b546e7f86ee9adbff0dd3c5e687140848397604a" }
influxdb_iox_client = { git = "https://github.com/influxdata/influxdb3_core", rev = "b546e7f86ee9adbff0dd3c5e687140848397604a" }
iox_catalog = { git = "https://github.com/influxdata/influxdb3_core", rev = "b546e7f86ee9adbff0dd3c5e687140848397604a" }
ioxd_common = { git = "https://github.com/influxdata/influxdb3_core", rev = "b546e7f86ee9adbff0dd3c5e687140848397604a" }
iox_http = { git = "https://github.com/influxdata/influxdb3_core", rev = "b546e7f86ee9adbff0dd3c5e687140848397604a" }
iox_query = { git = "https://github.com/influxdata/influxdb3_core", rev = "b546e7f86ee9adbff0dd3c5e687140848397604a" }
iox_query_params = { git = "https://github.com/influxdata/influxdb3_core", rev = "b546e7f86ee9adbff0dd3c5e687140848397604a" }
iox_query_influxql = { git = "https://github.com/influxdata/influxdb3_core", rev = "b546e7f86ee9adbff0dd3c5e687140848397604a" }
iox_time = { git = "https://github.com/influxdata/influxdb3_core", rev = "b546e7f86ee9adbff0dd3c5e687140848397604a" }
metric = { git = "https://github.com/influxdata/influxdb3_core", rev = "b546e7f86ee9adbff0dd3c5e687140848397604a" }
metric_exporters = { git = "https://github.com/influxdata/influxdb3_core", rev = "b546e7f86ee9adbff0dd3c5e687140848397604a" }
observability_deps = { git = "https://github.com/influxdata/influxdb3_core", rev = "b546e7f86ee9adbff0dd3c5e687140848397604a" }
panic_logging = { git = "https://github.com/influxdata/influxdb3_core", rev = "b546e7f86ee9adbff0dd3c5e687140848397604a" }
parquet_file = { git = "https://github.com/influxdata/influxdb3_core", rev = "b546e7f86ee9adbff0dd3c5e687140848397604a" }
schema = { git = "https://github.com/influxdata/influxdb3_core", rev = "b546e7f86ee9adbff0dd3c5e687140848397604a" }
service_common = { git = "https://github.com/influxdata/influxdb3_core", rev = "b546e7f86ee9adbff0dd3c5e687140848397604a" }
service_grpc_flight = { git = "https://github.com/influxdata/influxdb3_core", rev = "b546e7f86ee9adbff0dd3c5e687140848397604a" }
test_helpers = { git = "https://github.com/influxdata/influxdb3_core", rev = "b546e7f86ee9adbff0dd3c5e687140848397604a" }
test_helpers_end_to_end = { git = "https://github.com/influxdata/influxdb3_core", rev = "b546e7f86ee9adbff0dd3c5e687140848397604a" }
tokio_metrics_bridge = { git = "https://github.com/influxdata/influxdb3_core", rev = "b546e7f86ee9adbff0dd3c5e687140848397604a" }
trace = { git = "https://github.com/influxdata/influxdb3_core", rev = "b546e7f86ee9adbff0dd3c5e687140848397604a" }
trace_exporters = { git = "https://github.com/influxdata/influxdb3_core", rev = "b546e7f86ee9adbff0dd3c5e687140848397604a" }
trace_http = { git = "https://github.com/influxdata/influxdb3_core", rev = "b546e7f86ee9adbff0dd3c5e687140848397604a" }
tracker = { git = "https://github.com/influxdata/influxdb3_core", rev = "b546e7f86ee9adbff0dd3c5e687140848397604a" }
trogging = { git = "https://github.com/influxdata/influxdb3_core", rev = "b546e7f86ee9adbff0dd3c5e687140848397604a", default-features = true, features = ["clap"] }
arrow_util = { git = "https://github.com/influxdata/influxdb3_core", rev = "ab23285fe62f29a60652011b0bee9cc6ee3a899e"}
authz = { git = "https://github.com/influxdata/influxdb3_core", rev = "ab23285fe62f29a60652011b0bee9cc6ee3a899e", features = ["http"] }
clap_blocks = { git = "https://github.com/influxdata/influxdb3_core", rev = "ab23285fe62f29a60652011b0bee9cc6ee3a899e" }
data_types = { git = "https://github.com/influxdata/influxdb3_core", rev = "ab23285fe62f29a60652011b0bee9cc6ee3a899e" }
datafusion_util = { git = "https://github.com/influxdata/influxdb3_core", rev = "ab23285fe62f29a60652011b0bee9cc6ee3a899e" }
influxdb-line-protocol = { git = "https://github.com/influxdata/influxdb3_core", rev = "ab23285fe62f29a60652011b0bee9cc6ee3a899e" }
influxdb_influxql_parser = { git = "https://github.com/influxdata/influxdb3_core", rev = "ab23285fe62f29a60652011b0bee9cc6ee3a899e" }
influxdb_iox_client = { git = "https://github.com/influxdata/influxdb3_core", rev = "ab23285fe62f29a60652011b0bee9cc6ee3a899e" }
iox_catalog = { git = "https://github.com/influxdata/influxdb3_core", rev = "ab23285fe62f29a60652011b0bee9cc6ee3a899e" }
ioxd_common = { git = "https://github.com/influxdata/influxdb3_core", rev = "ab23285fe62f29a60652011b0bee9cc6ee3a899e" }
iox_http = { git = "https://github.com/influxdata/influxdb3_core", rev = "ab23285fe62f29a60652011b0bee9cc6ee3a899e" }
iox_query = { git = "https://github.com/influxdata/influxdb3_core", rev = "ab23285fe62f29a60652011b0bee9cc6ee3a899e" }
iox_query_params = { git = "https://github.com/influxdata/influxdb3_core", rev = "ab23285fe62f29a60652011b0bee9cc6ee3a899e" }
iox_query_influxql = { git = "https://github.com/influxdata/influxdb3_core", rev = "ab23285fe62f29a60652011b0bee9cc6ee3a899e" }
iox_system_tables = { git = "https://github.com/influxdata/influxdb3_core", rev = "ab23285fe62f29a60652011b0bee9cc6ee3a899e" }
iox_time = { git = "https://github.com/influxdata/influxdb3_core", rev = "ab23285fe62f29a60652011b0bee9cc6ee3a899e" }
metric = { git = "https://github.com/influxdata/influxdb3_core", rev = "ab23285fe62f29a60652011b0bee9cc6ee3a899e" }
metric_exporters = { git = "https://github.com/influxdata/influxdb3_core", rev = "ab23285fe62f29a60652011b0bee9cc6ee3a899e" }
observability_deps = { git = "https://github.com/influxdata/influxdb3_core", rev = "ab23285fe62f29a60652011b0bee9cc6ee3a899e" }
panic_logging = { git = "https://github.com/influxdata/influxdb3_core", rev = "ab23285fe62f29a60652011b0bee9cc6ee3a899e" }
parquet_file = { git = "https://github.com/influxdata/influxdb3_core", rev = "ab23285fe62f29a60652011b0bee9cc6ee3a899e" }
schema = { git = "https://github.com/influxdata/influxdb3_core", rev = "ab23285fe62f29a60652011b0bee9cc6ee3a899e" }
service_common = { git = "https://github.com/influxdata/influxdb3_core", rev = "ab23285fe62f29a60652011b0bee9cc6ee3a899e" }
service_grpc_flight = { git = "https://github.com/influxdata/influxdb3_core", rev = "ab23285fe62f29a60652011b0bee9cc6ee3a899e" }
test_helpers = { git = "https://github.com/influxdata/influxdb3_core", rev = "ab23285fe62f29a60652011b0bee9cc6ee3a899e" }
test_helpers_end_to_end = { git = "https://github.com/influxdata/influxdb3_core", rev = "ab23285fe62f29a60652011b0bee9cc6ee3a899e" }
tokio_metrics_bridge = { git = "https://github.com/influxdata/influxdb3_core", rev = "ab23285fe62f29a60652011b0bee9cc6ee3a899e" }
trace = { git = "https://github.com/influxdata/influxdb3_core", rev = "ab23285fe62f29a60652011b0bee9cc6ee3a899e" }
trace_exporters = { git = "https://github.com/influxdata/influxdb3_core", rev = "ab23285fe62f29a60652011b0bee9cc6ee3a899e" }
trace_http = { git = "https://github.com/influxdata/influxdb3_core", rev = "ab23285fe62f29a60652011b0bee9cc6ee3a899e" }
tracker = { git = "https://github.com/influxdata/influxdb3_core", rev = "ab23285fe62f29a60652011b0bee9cc6ee3a899e" }
trogging = { git = "https://github.com/influxdata/influxdb3_core", rev = "ab23285fe62f29a60652011b0bee9cc6ee3a899e", default-features = true, features = ["clap"] }

[workspace.lints.rust]
rust_2018_idioms = "deny"
Expand Down Expand Up @@ -175,9 +175,10 @@ opt-level = 3
# patch arrow-flight crate to allow for prepared statement parameters
# see related arrow-rs PR https://github.com/apache/arrow-rs/pull/5433
[patch.crates-io]
arrow-array = { git = "https://github.com/erratic-pattern/arrow-rs.git", branch = "50.0.0" }
arrow-schema = { git = "https://github.com/erratic-pattern/arrow-rs.git", branch = "50.0.0" }
arrow-data = { git = "https://github.com/erratic-pattern/arrow-rs.git", branch = "50.0.0" }
arrow-buffer = { git = "https://github.com/erratic-pattern/arrow-rs.git", branch = "50.0.0" }
arrow-ipc = { git = "https://github.com/erratic-pattern/arrow-rs.git", branch = "50.0.0" }
arrow-flight = { git = "https://github.com/erratic-pattern/arrow-rs.git", branch = "50.0.0" }
arrow-array = { git = "https://github.com/influxdata/arrow-rs.git", rev = "ea454d74707357731535d4bf20e9508e838f5f5d" }
arrow-schema = { git = "https://github.com/influxdata/arrow-rs.git", rev = "ea454d74707357731535d4bf20e9508e838f5f5d" }
arrow-data = { git = "https://github.com/influxdata/arrow-rs.git", rev = "ea454d74707357731535d4bf20e9508e838f5f5d" }
arrow-buffer = { git = "https://github.com/influxdata/arrow-rs.git", rev = "ea454d74707357731535d4bf20e9508e838f5f5d" }
arrow-ipc = { git = "https://github.com/influxdata/arrow-rs.git", rev = "ea454d74707357731535d4bf20e9508e838f5f5d" }
arrow-flight = { git = "https://github.com/influxdata/arrow-rs.git", rev = "ea454d74707357731535d4bf20e9508e838f5f5d" }
parquet = { git = "https://github.com/influxdata/arrow-rs.git", rev = "ea454d74707357731535d4bf20e9508e838f5f5d" }
62 changes: 41 additions & 21 deletions influxdb3/src/commands/serve.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use clap_blocks::{
memory_size::MemorySize,
object_store::{make_object_store, ObjectStoreConfig},
socket_addr::SocketAddr,
tokio::TokioDatafusionConfig,
};
use datafusion_util::config::register_iox_object_store;
use influxdb3_process::{
Expand All @@ -17,7 +18,7 @@ use influxdb3_write::persister::PersisterImpl;
use influxdb3_write::wal::WalImpl;
use influxdb3_write::write_buffer::WriteBufferImpl;
use influxdb3_write::SegmentDuration;
use iox_query::exec::{Executor, ExecutorConfig};
use iox_query::exec::{DedicatedExecutor, Executor, ExecutorConfig};
use iox_time::SystemProvider;
use ioxd_common::reexport::trace_http::ctx::TraceHeaderParser;
use object_store::DynObjectStore;
Expand Down Expand Up @@ -50,6 +51,9 @@ pub enum Error {
#[error("Tracing config error: {0}")]
TracingConfig(#[from] trace_exporters::Error),

#[error("Error initializing tokio runtime: {0}")]
TokioRuntime(#[source] std::io::Error),

#[error("Server error: {0}")]
Server(#[from] influxdb3_server::Error),

Expand All @@ -67,6 +71,22 @@ pub type Result<T, E = Error> = std::result::Result<T, E>;

#[derive(Debug, clap::Parser)]
pub struct Config {
/// object store options
#[clap(flatten)]
object_store_config: ObjectStoreConfig,

/// logging options
#[clap(flatten)]
pub(crate) logging_config: LoggingConfig,

/// tracing options
#[clap(flatten)]
pub(crate) tracing_config: TracingConfig,

/// tokio datafusion config
#[clap(flatten)]
pub(crate) tokio_datafusion_config: TokioDatafusionConfig,

/// Maximum size of HTTP requests.
#[clap(
long = "max-http-request-size",
Expand All @@ -76,9 +96,6 @@ pub struct Config {
)]
pub max_http_request_size: usize,

#[clap(flatten)]
object_store_config: ObjectStoreConfig,

/// The directory to store the write ahead log
///
/// If not specified, defaults to INFLUXDB3_DB_DIR/wal
Expand Down Expand Up @@ -116,14 +133,6 @@ pub struct Config {
)]
pub exec_mem_pool_bytes: MemorySize,

/// logging options
#[clap(flatten)]
pub(crate) logging_config: LoggingConfig,

/// tracing options
#[clap(flatten)]
pub(crate) tracing_config: TracingConfig,

/// DataFusion config.
#[clap(
long = "datafusion-config",
Expand Down Expand Up @@ -194,25 +203,36 @@ pub async fn command(config: Config) -> Result<()> {

let trace_exporter = config.tracing_config.build()?;

// TODO: make this a parameter
let num_threads =
NonZeroUsize::new(num_cpus::get()).unwrap_or_else(|| NonZeroUsize::new(1).unwrap());

info!(%num_threads, "Creating shared query executor");
let parquet_store =
ParquetStorage::new(Arc::clone(&object_store), StorageId::from("influxdb3"));
let exec = Arc::new(Executor::new_with_config(
"datafusion",

let mut tokio_datafusion_config = config.tokio_datafusion_config;
tokio_datafusion_config.num_threads = tokio_datafusion_config
.num_threads
.or_else(|| NonZeroUsize::new(num_cpus::get()))
.or_else(|| NonZeroUsize::new(1));
info!(
num_threads = tokio_datafusion_config.num_threads.map(|n| n.get()),
"Creating shared query executor"
);

let exec = Arc::new(Executor::new_with_config_and_executor(
ExecutorConfig {
num_threads,
target_query_partitions: num_threads,
target_query_partitions: tokio_datafusion_config.num_threads.unwrap(),
object_stores: [&parquet_store]
.into_iter()
.map(|store| (store.id(), Arc::clone(store.object_store())))
.collect(),
metric_registry: Arc::clone(&metrics),
mem_pool_size: config.exec_mem_pool_bytes.bytes(),
},
DedicatedExecutor::new(
"datafusion",
tokio_datafusion_config
.builder()
.map_err(Error::TokioRuntime)?,
Arc::clone(&metrics),
),
));
let runtime_env = exec.new_context().inner().runtime_env();
register_iox_object_store(runtime_env, parquet_store.id(), Arc::clone(&object_store));
Expand Down
1 change: 0 additions & 1 deletion influxdb3_server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@ parking_lot.workspace = true
pin-project-lite.workspace = true
secrecy.workspace = true
serde.workspace = true
serde_arrow.workspace = true
serde_json.workspace = true
serde_urlencoded.workspace = true
sha2.workspace = true
Expand Down
4 changes: 2 additions & 2 deletions influxdb3_server/src/grpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@ use arrow_flight::flight_service_server::{
FlightService as Flight, FlightServiceServer as FlightServer,
};
use authz::Authorizer;
use iox_query::QueryNamespaceProvider;
use iox_query::QueryDatabase;

pub(crate) fn make_flight_server<Q: QueryNamespaceProvider>(
pub(crate) fn make_flight_server<Q: QueryDatabase>(
server: Arc<Q>,
authz: Option<Arc<dyn Authorizer>>,
) -> FlightServer<impl Flight> {
Expand Down
2 changes: 2 additions & 0 deletions influxdb3_server/src/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -804,6 +804,8 @@ async fn record_batch_stream_to_body(
) -> Result<Body, Error> {
fn to_json(batches: Vec<RecordBatch>) -> Result<Bytes> {
let batches: Vec<&RecordBatch> = batches.iter().collect();
// See https://github.com/influxdata/influxdb/issues/24981
#[allow(deprecated)]
Ok(Bytes::from(serde_json::to_string(
&arrow_json::writer::record_batches_to_json_rows(batches.as_slice())?,
)?))
Expand Down
4 changes: 4 additions & 0 deletions influxdb3_server/src/http/v1.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ use arrow::{
compute::{cast_with_options, CastOptions},
record_batch::RecordBatch,
};
// Note: see https://github.com/influxdata/influxdb/issues/24981
#[allow(deprecated)]
use arrow_json::writer::record_batches_to_json_rows;

use arrow_schema::DataType;
Expand Down Expand Up @@ -335,6 +337,8 @@ impl QueryResponseStream {
}))
.context("failed to cast batch time column with `epoch` parameter specified")?;
}
// See https://github.com/influxdata/influxdb/issues/24981
#[allow(deprecated)]
let json_rows = record_batches_to_json_rows(&[&batch])
.context("failed to convert RecordBatch to JSON rows")?;
for json_row in json_rows {
Expand Down