Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Implement index for buffer #24954

Merged
merged 3 commits into from
May 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
226 changes: 121 additions & 105 deletions Cargo.lock

Large diffs are not rendered by default.

8 changes: 8 additions & 0 deletions influxdb3_server/src/query_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use datafusion::datasource::{TableProvider, TableType};
use datafusion::error::DataFusionError;
use datafusion::execution::context::SessionState;
use datafusion::execution::SendableRecordBatchStream;
use datafusion::logical_expr::TableProviderFilterPushDown;
use datafusion::physical_plan::ExecutionPlan;
use datafusion::prelude::Expr;
use datafusion_util::config::DEFAULT_SCHEMA;
Expand Down Expand Up @@ -468,6 +469,13 @@ impl<B: WriteBuffer> TableProvider for QueryTable<B> {
TableType::Base
}

fn supports_filters_pushdown(
&self,
filters: &[&Expr],
) -> datafusion::common::Result<Vec<TableProviderFilterPushDown>> {
Ok(vec![TableProviderFilterPushDown::Inexact; filters.len()])
}

async fn scan(
&self,
ctx: &SessionState,
Expand Down
13 changes: 13 additions & 0 deletions influxdb3_write/src/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -319,6 +319,19 @@ impl TableDefinition {
self.schema = schema;
}

pub(crate) fn index_columns(&self) -> Vec<String> {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure that we'll want to continue to default to having all tag columns be indexed. We'll need a new API & CLI to set the columns explicitly that a user wants indexed. We may find that by default we don't want to index anything.

self.columns
.iter()
.filter_map(|(name, column_type)| {
if *column_type == ColumnType::Tag as i16 {
Some(name.clone())
} else {
None
}
})
.collect()
}

#[allow(dead_code)]
pub(crate) fn schema(&self) -> &Schema {
&self.schema
Expand Down
128 changes: 94 additions & 34 deletions influxdb3_write/src/write_buffer/buffer_segment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,11 @@
//! single WAL segment. Only one segment should be open for writes in the write buffer at any
//! given time.

use crate::catalog::Catalog;
use crate::catalog::{Catalog, DatabaseSchema};
use crate::chunk::BufferChunk;
use crate::paths::ParquetFilePath;
use crate::write_buffer::flusher::BufferedWriteResult;
use crate::write_buffer::table_buffer::Builder;
use crate::write_buffer::table_buffer::TableBuffer;
use crate::write_buffer::table_buffer::{Builder, Result as TableBufferResult, TableBuffer};
use crate::write_buffer::{
parse_validate_and_update_catalog, Error, TableBatch, ValidSegmentedData,
};
Expand All @@ -16,19 +15,20 @@ use crate::{
Persister, Precision, SegmentDuration, SegmentId, SegmentRange, SequenceNumber,
TableParquetFiles, WalOp, WalSegmentReader, WalSegmentWriter,
};
use arrow::datatypes::SchemaRef;
use arrow::record_batch::RecordBatch;
use data_types::ChunkId;
use data_types::ChunkOrder;
use data_types::TableId;
use data_types::TransitionPartitionId;
use data_types::{NamespaceName, PartitionKey};
use datafusion::logical_expr::Expr;
use datafusion_util::stream_from_batches;
use iox_query::chunk_statistics::create_chunk_statistics;
use iox_query::frontend::reorg::ReorgPlanner;
use iox_query::QueryChunk;
use iox_time::Time;
use schema::sort::SortKey;
use schema::Schema;
use std::collections::HashMap;
use std::sync::Arc;
use std::time::Duration;
Expand All @@ -43,6 +43,7 @@ pub struct OpenBufferSegment {
segment_key: PartitionKey,
buffered_data: BufferedData,
segment_open_time: Time,
catalog: Arc<Catalog>,
#[allow(dead_code)]
starting_catalog_sequence_number: SequenceNumber,
// TODO: This is temporarily just the number of rows in the segment. When the buffer gets refactored to use
Expand All @@ -52,6 +53,7 @@ pub struct OpenBufferSegment {

impl OpenBufferSegment {
pub fn new(
catalog: Arc<Catalog>,
segment_id: SegmentId,
segment_range: SegmentRange,
segment_open_time: Time,
Expand All @@ -64,6 +66,7 @@ impl OpenBufferSegment {
let segment_duration = SegmentDuration::from_range(segment_range);

Self {
catalog,
segment_writer,
segment_id,
segment_range,
Expand Down Expand Up @@ -105,7 +108,16 @@ impl OpenBufferSegment {
.buffered_data
.database_buffers
.entry(db_name.to_string())
.or_default();
.or_insert_with(|| {
let db_schema = self
.catalog
.db_schema(&db_name)
.expect("db schema should exist");
DatabaseBuffer {
table_buffers: HashMap::new(),
db_schema,
}
});

for (table_name, table_batch) in db_batch.table_batches {
// TODO: for now we'll just have the number of rows represent the segment size. The entire
Expand All @@ -120,14 +132,15 @@ impl OpenBufferSegment {
}

/// Returns the table data as record batches
pub(crate) fn table_record_batches(
pub(crate) fn table_record_batch(
&self,
db_name: &str,
table_name: &str,
schema: &Schema,
) -> Option<Vec<RecordBatch>> {
schema: SchemaRef,
filter: &[Expr],
) -> Option<TableBufferResult<RecordBatch>> {
self.buffered_data
.table_record_batches(db_name, table_name, schema)
.table_record_batches(db_name, table_name, schema, filter)
}

/// Returns true if the segment should be persisted. A segment should be persisted if both of
Expand Down Expand Up @@ -186,10 +199,18 @@ pub(crate) fn load_buffer_from_segment(
Precision::Nanosecond,
)?;

let db_buffer = buffered_data
.database_buffers
.entry(write.db_name)
.or_default();
let db_name = &write.db_name;
if !buffered_data.database_buffers.contains_key(db_name) {
let db_schema = catalog.db_schema(db_name).expect("db schema should exist");
buffered_data.database_buffers.insert(
db_name.clone(),
DatabaseBuffer {
table_buffers: HashMap::new(),
db_schema,
},
);
}
let db_buffer = buffered_data.database_buffers.get_mut(db_name).unwrap();
Comment on lines +203 to +213
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you can keep the initial form with using the entry API like so sans borrow check issues:

Suggested change
if !buffered_data.database_buffers.contains_key(db_name) {
let db_schema = catalog.db_schema(db_name).expect("db schema should exist");
buffered_data.database_buffers.insert(
db_name.clone(),
DatabaseBuffer {
table_buffers: HashMap::new(),
db_schema,
},
);
}
let db_buffer = buffered_data.database_buffers.get_mut(db_name).unwrap();
let db_buffer = buffered_data
.database_buffers
.entry(db_name)
.or_insert_with(|| {
DatabaseBuffer {
table_buffers: HashMap::new(),
db_schema: catalog
.db_schema(db_name)
.expect("db schema should exist");
}
})

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking at the PR it seems you had done something similar earlier. Was there a reason you didn't do so here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that doesn't work because db_name is a borrowed string, which doesn't work with the entry API. So I did it this way to avoid a string clone, rather than use the entry API.


// there should only ever be data for a single segment as this is all read
// from one segment file
Expand Down Expand Up @@ -261,12 +282,13 @@ impl BufferedData {
&self,
db_name: &str,
table_name: &str,
schema: &Schema,
) -> Option<Vec<RecordBatch>> {
schema: SchemaRef,
filter: &[Expr],
) -> Option<TableBufferResult<RecordBatch>> {
self.database_buffers
.get(db_name)
.and_then(|db_buffer| db_buffer.table_buffers.get(table_name))
.map(|table_buffer| table_buffer.record_batches(schema))
.map(|table_buffer| table_buffer.record_batch(schema, filter))
}

/// Verifies that the passed in buffer has the same data as this buffer
Expand All @@ -282,18 +304,21 @@ impl BufferedData {
let other_table_buffer = other_db_buffer.table_buffers.get(table_name).unwrap();
let schema = db_schema.get_table_schema(table_name).unwrap();

let table_data = table_buffer.record_batches(schema);
let other_table_data = other_table_buffer.record_batches(schema);
let table_data = table_buffer.record_batch(schema.as_arrow(), &[]).unwrap();
let other_table_data = other_table_buffer
.record_batch(schema.as_arrow(), &[])
.unwrap();

assert_eq!(table_data, other_table_data);
}
}
}
}

#[derive(Debug, Default)]
#[derive(Debug)]
struct DatabaseBuffer {
table_buffers: HashMap<String, TableBuffer>,
db_schema: Arc<DatabaseSchema>,
}

impl DatabaseBuffer {
Expand All @@ -303,10 +328,22 @@ impl DatabaseBuffer {
segment_key: &PartitionKey,
table_batch: TableBatch,
) {
if !self.table_buffers.contains_key(&table_name) {
// TODO: this check shouldn't be necessary. If the table doesn't exist in the catalog
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I created #24955 to follow up and check what's going on here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this happen without your changes at all?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Without my changes, the limit test passes, but that's because the buffer doesn't look for the table in the db_schema. So it looks to me like the limit check is hit, but it still calls to buffer the write, which passes on the old code path because it doesn't look into the catalog. That would be a bug because the buffered write would make it into the WAL and into the in-memory buffer without a corresponding catalog update. At least, that's what I think is going on here.

// and we've gotten here, it means we're dropping a write.
if let Some(table) = self.db_schema.get_table(&table_name) {
self.table_buffers.insert(
table_name.clone(),
TableBuffer::new(segment_key.clone(), &table.index_columns()),
);
} else {
return;
}
}
let table_buffer = self
.table_buffers
.entry(table_name)
.or_insert_with(|| TableBuffer::new(segment_key.clone()));
.get_mut(&table_name)
.expect("table buffer should exist");

table_buffer.add_rows(table_batch.rows);
}
Expand Down Expand Up @@ -389,8 +426,8 @@ impl ClosedBufferSegment {

// All of the record batches for this table that we will
// want to dedupe
let batches = table_buffer.record_batches(table.schema());
let row_count = batches.iter().map(|b| b.num_rows()).sum();
let batch = table_buffer.record_batch(table.schema().as_arrow(), &[])?;
let row_count = batch.num_rows();

// Dedupe and sort using the COMPACT query built into
// iox_query
Expand All @@ -406,7 +443,7 @@ impl ClosedBufferSegment {
);

chunks.push(Arc::new(BufferChunk {
batches,
batches: vec![batch],
schema: schema.clone(),
stats: Arc::new(chunk_stats),
partition_id: TransitionPartitionId::new(
Expand Down Expand Up @@ -536,7 +573,9 @@ pub(crate) mod tests {

#[test]
fn buffers_rows() {
let catalog = Arc::new(Catalog::new());
let mut open_segment = OpenBufferSegment::new(
Arc::clone(&catalog),
SegmentId::new(0),
SegmentRange::test_range(),
Time::from_timestamp_nanos(0),
Expand All @@ -546,7 +585,6 @@ pub(crate) mod tests {
);

let db_name: NamespaceName<'static> = NamespaceName::new("db1").unwrap();
let catalog = Catalog::new();

let batches = lp_to_table_batches(
&catalog,
Expand All @@ -565,7 +603,13 @@ pub(crate) mod tests {

let db_schema = catalog.db_schema("db1").unwrap();
let cpu_table = open_segment
.table_record_batches("db1", "cpu", db_schema.get_table_schema("cpu").unwrap())
.table_record_batch(
"db1",
"cpu",
db_schema.get_table_schema("cpu").unwrap().as_arrow(),
&[],
)
.unwrap()
.unwrap();
let expected_cpu_table = vec![
"+------------------------------------------------------------------+-----+----------+--------------------------------+",
Expand All @@ -575,10 +619,16 @@ pub(crate) mod tests {
"| 505f9f5fc3347ac9d6ba45f2b2c94ad53a313e456e86e61db85ba1935369b238 | 2.0 | cupcakes | 1970-01-01T00:00:00.000000030Z |",
"+------------------------------------------------------------------+-----+----------+--------------------------------+",
];
assert_batches_eq!(&expected_cpu_table, &cpu_table);
assert_batches_eq!(&expected_cpu_table, &[cpu_table]);

let mem_table = open_segment
.table_record_batches("db1", "mem", db_schema.get_table_schema("mem").unwrap())
.table_record_batch(
"db1",
"mem",
db_schema.get_table_schema("mem").unwrap().as_arrow(),
&[],
)
.unwrap()
.unwrap();
let expected_mem_table = vec![
"+------------------------------------------------------------------+-----+--------+--------------------------------+",
Expand All @@ -587,12 +637,14 @@ pub(crate) mod tests {
"| 5ae2bb295e8b0dec713daf0da555ecd3f2899a8967f18db799e26557029198f3 | 2.0 | snakes | 1970-01-01T00:00:00.000000020Z |",
"+------------------------------------------------------------------+-----+--------+--------------------------------+",
];
assert_batches_eq!(&expected_mem_table, &mem_table);
assert_batches_eq!(&expected_mem_table, &[mem_table]);
}

#[tokio::test]
async fn buffers_schema_update() {
let catalog = Arc::new(Catalog::new());
let mut open_segment = OpenBufferSegment::new(
Arc::clone(&catalog),
SegmentId::new(0),
SegmentRange::test_range(),
Time::from_timestamp_nanos(0),
Expand All @@ -602,7 +654,6 @@ pub(crate) mod tests {
);

let db_name: NamespaceName<'static> = NamespaceName::new("db1").unwrap();
let catalog = Catalog::new();

let batches = lp_to_table_batches(&catalog, "db1", "cpu,tag1=cupcakes bar=1 10", 10);
let mut write_batch = WriteBatch::default();
Expand All @@ -628,7 +679,13 @@ pub(crate) mod tests {
let db_schema = catalog.db_schema("db1").unwrap();
println!("{:?}", db_schema);
let cpu_table = open_segment
.table_record_batches("db1", "cpu", db_schema.get_table_schema("cpu").unwrap())
.table_record_batch(
"db1",
"cpu",
db_schema.get_table_schema("cpu").unwrap().as_arrow(),
&[],
)
.unwrap()
.unwrap();
let expected_cpu_table = vec![
"+------------------------------------------------------------------+-----+------+------+----------+------+--------------------------------+",
Expand All @@ -641,7 +698,7 @@ pub(crate) mod tests {
"| e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855 | | 2.1 | | | | 1970-01-01T00:00:00.000000040Z |",
"+------------------------------------------------------------------+-----+------+------+----------+------+--------------------------------+",
];
assert_batches_eq!(&expected_cpu_table, &cpu_table);
assert_batches_eq!(&expected_cpu_table, &[cpu_table]);
}

#[tokio::test]
Expand All @@ -650,7 +707,9 @@ pub(crate) mod tests {

let segment_id = SegmentId::new(4);
let segment_writer = Box::new(WalSegmentWriterNoopImpl::new(segment_id));
let catalog = Arc::new(Catalog::new());
let mut open_segment = OpenBufferSegment::new(
Arc::clone(&catalog),
segment_id,
SegmentRange::test_range(),
Time::from_timestamp_nanos(0),
Expand All @@ -659,8 +718,6 @@ pub(crate) mod tests {
None,
);

let catalog = Catalog::new();

// When we persist the data all of these duplicates should be removed
let lp = "cpu,tag1=cupcakes bar=1 10\n\
cpu,tag1=cupcakes bar=1 10\n\
Expand Down Expand Up @@ -747,7 +804,9 @@ pub(crate) mod tests {

#[test]
fn should_persist() {
let catalog = Arc::new(Catalog::new());
let segment = OpenBufferSegment::new(
Arc::clone(&catalog),
SegmentId::new(0),
SegmentRange::from_time_and_duration(
Time::from_timestamp_nanos(0),
Expand All @@ -770,6 +829,7 @@ pub(crate) mod tests {
assert!(segment.should_persist(Time::from_timestamp(61 + 30, 0).unwrap()));

let segment = OpenBufferSegment::new(
Arc::clone(&catalog),
SegmentId::new(0),
SegmentRange::from_time_and_duration(
Time::from_timestamp_nanos(0),
Expand Down