Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add SystemSchemaProvider to QueryExecutor #24990

Merged
merged 8 commits into from
May 17, 2024
Merged
92 changes: 67 additions & 25 deletions influxdb3_server/src/query_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ pub struct QueryExecutorImpl<W> {
exec: Arc<Executor>,
datafusion_config: Arc<HashMap<String, String>>,
query_execution_semaphore: Arc<InstrumentedAsyncSemaphore>,
query_log: Arc<QueryLog>,
}

impl<W: WriteBuffer> QueryExecutorImpl<W> {
Expand All @@ -72,12 +73,19 @@ impl<W: WriteBuffer> QueryExecutorImpl<W> {
));
let query_execution_semaphore =
Arc::new(semaphore_metrics.new_semaphore(concurrent_query_limit));
// TODO Fine tune this number or make configurable
const QUERY_LOG_LIMIT: usize = 1_000;
let query_log = Arc::new(QueryLog::new(
QUERY_LOG_LIMIT,
Arc::new(iox_time::SystemProvider::new()),
));
Self {
catalog,
write_buffer,
exec,
datafusion_config,
query_execution_semaphore,
query_log,
}
}
}
Expand Down Expand Up @@ -282,7 +290,7 @@ impl<W: WriteBuffer> QueryDatabase for QueryExecutorImpl<W> {
&self,
name: &str,
span: Option<Span>,
_include_debug_info_tables: bool,
include_debug_info_tables: bool,
) -> Result<Option<Arc<dyn QueryNamespace>>, DataFusionError> {
let _span_recorder = SpanRecorder::new(span);

Expand All @@ -297,6 +305,8 @@ impl<W: WriteBuffer> QueryDatabase for QueryExecutorImpl<W> {
Arc::clone(&self.write_buffer) as _,
Arc::clone(&self.exec),
Arc::clone(&self.datafusion_config),
Arc::clone(&self.query_log),
include_debug_info_tables,
))))
}

Expand All @@ -312,13 +322,14 @@ impl<W: WriteBuffer> QueryDatabase for QueryExecutorImpl<W> {
}
}

#[derive(Debug)]
#[derive(Debug, Clone)]
pub struct Database<B> {
db_schema: Arc<DatabaseSchema>,
write_buffer: Arc<B>,
exec: Arc<Executor>,
datafusion_config: Arc<HashMap<String, String>>,
query_log: Arc<QueryLog>,
system_schema_provider: Arc<SystemSchemaProvider>,
}

impl<B: WriteBuffer> Database<B> {
Expand All @@ -327,20 +338,32 @@ impl<B: WriteBuffer> Database<B> {
write_buffer: Arc<B>,
exec: Arc<Executor>,
datafusion_config: Arc<HashMap<String, String>>,
query_log: Arc<QueryLog>,
include_debug_info_tables: bool,
) -> Self {
// TODO Fine tune this number
const QUERY_LOG_LIMIT: usize = 10;

let query_log = Arc::new(QueryLog::new(
QUERY_LOG_LIMIT,
Arc::new(iox_time::SystemProvider::new()),
let system_schema_provider = Arc::new(SystemSchemaProvider::new(
write_buffer.catalog(),
Arc::clone(&query_log),
include_debug_info_tables,
));
Self {
db_schema,
write_buffer,
exec,
datafusion_config,
query_log,
system_schema_provider,
}
}

fn from_namespace(db: &Self) -> Self {
Self {
db_schema: Arc::clone(&db.db_schema),
write_buffer: Arc::clone(&db.write_buffer),
exec: Arc::clone(&db.exec),
datafusion_config: Arc::clone(&db.datafusion_config),
query_log: Arc::clone(&db.query_log),
system_schema_provider: Arc::clone(&db.system_schema_provider),
}
}

Expand Down Expand Up @@ -404,17 +427,10 @@ impl<B: WriteBuffer> QueryNamespace for Database<B> {
span_ctx: Option<SpanContext>,
_config: Option<&QueryConfig>,
) -> IOxSessionContext {
let qdb = Self::new(
Arc::clone(&self.db_schema),
Arc::clone(&self.write_buffer),
Arc::clone(&self.exec),
Arc::clone(&self.datafusion_config),
);

let mut cfg = self
.exec
.new_session_config()
.with_default_catalog(Arc::new(qdb))
.with_default_catalog(Arc::new(Self::from_namespace(self)))
.with_span_context(span_ctx);

for (k, v) in self.datafusion_config.as_ref() {
Expand All @@ -437,15 +453,8 @@ impl<B: WriteBuffer> CatalogProvider for Database<B> {

fn schema(&self, name: &str) -> Option<Arc<dyn SchemaProvider>> {
info!("CatalogProvider schema {}", name);
let qdb = Self::new(
Arc::clone(&self.db_schema),
Arc::clone(&self.write_buffer),
Arc::clone(&self.exec),
Arc::clone(&self.datafusion_config),
);

match name {
DEFAULT_SCHEMA => Some(Arc::new(qdb)),
DEFAULT_SCHEMA => Some(Arc::new(Self::from_namespace(self))),
_ => None,
}
}
Expand Down Expand Up @@ -486,7 +495,6 @@ impl<B: WriteBuffer> QueryTable<B> {
filters: &[Expr],
_limit: Option<usize>,
) -> Result<Vec<Arc<dyn QueryChunk>>, DataFusionError> {
// TODO - this is only pulling from write buffer, and not parquet?
Copy link
Contributor Author

@hiltontj hiltontj May 13, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This comment seems irrelevant, so I removed it.

self.write_buffer.get_table_chunks(
&self.db_schema.name,
self.name.as_ref(),
Expand Down Expand Up @@ -545,3 +553,37 @@ impl<B: WriteBuffer> TableProvider for QueryTable<B> {
provider.scan(ctx, projection, &filters, limit).await
}
}

const _QUERIES_TABLE: &str = "queries";
const _PARQUET_FILES_TABLE: &str = "parquet_files";

struct SystemSchemaProvider {
tables: HashMap<&'static str, Arc<dyn TableProvider>>,
}

impl std::fmt::Debug for SystemSchemaProvider {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let mut keys = self.tables.keys().copied().collect::<Vec<_>>();
keys.sort_unstable();

f.debug_struct("SystemSchemaProvider")
.field("tables", &keys.join(", "))
.finish()
}
}

impl SystemSchemaProvider {
fn new(_catalog: Arc<Catalog>, _query_log: Arc<QueryLog>, include_debug_info: bool) -> Self {
let tables = HashMap::new();
if include_debug_info {
// Using todo!() here causes gRPC integration tests to fail, likely because they
// enable debug mode by default, thus entering this if block. So, just leaving this
// here in lieu of todo!().
//
// Eventually, we will implement the queries and parquet_files tables and they will
// be injected to the provider's table hashmap here...
info!("TODO - gather system tables");
}
Self { tables }
}
}