Skip to content

Commit

Permalink
fix: make DB Buffer use the up to date schema
Browse files Browse the repository at this point in the history
Alternate Title: The DB Schema only ever has one table

This is a story of subtle bugs, gnashing of teeth, and hair pulling.
Gather round as I tell you the tale of of an Arc that pointed to an
outdated schema.

In #24954 we introduced an Index for the database as this will allow us
to perform faster queries. When we added that code this check was added:

```rust
if !self.table_buffers.contains_key(&table_name) {
    // TODO: this check shouldn't be necessary. If the table doesn't exist in the catalog
    // and we've gotten here, it means we're dropping a write.
    if let Some(table) = self.db_schema.get_table(&table_name) {
        self.table_buffers.insert(
            table_name.clone(),
            TableBuffer::new(segment_key.clone(), &table.index_columns()),
        );
    } else {
        return;
    }
}
```

Adding the return there let us continue on with our day and make the
tests pass. However, just because these tests passed didn't mean the
code was correct as I would soon find out. With a follow up ticket of
#24955 created we merged the changes and I began to debug the issue.

Note we had the assumption of dropping a single write due to limits
because the limits test is what failed. What began was a chase of a few
days to prove that the limits weren't what was failing. This was a bit
long but the conclusion was that the limits weren't causing it, but it
did expose the fact that a Database only ever had one table which was
weird.

I then began to dig into this a bit more. Why would there only be one
table? We weren't just dropping one write, we were dropping all but
*one* write or so it seemed. Many printlns/hours later it became clear
that we were actually updating the schema! It existed in the Catalog,
but not in the pointer to the schema in the DatabaseBuffer struct so
what gives?

Well we need to look at [another piece of code](https://github.com/influxdata/influxdb/blob/8f72bf06e13e2269db973b9c2852e092239c701e/influxdb3_write/src/write_buffer/mod.rs#L540-L541).

In the `validate_or_insert_schema_and_partitions` function for the
WriteBuffer we have this bit of code:

```rust
// The (potentially updated) DatabaseSchema to return to the caller.
let mut schema = Cow::Borrowed(schema);
```

As we pass in a reference to the schema in the catalog. However, when we
[go a bit further down](https://github.com/influxdata/influxdb/blob/8f72bf06e13e2269db973b9c2852e092239c701e/influxdb3_write/src/write_buffer/mod.rs#L565-L568)
we see this code:

```rust
    let schema = match schema {
        Cow::Owned(s) => Some(s),
        Cow::Borrowed(_) => None,
    };
```

What this means is that if we make a change we clone the original and
update it. We *aren't* making a change to the original schema. When we
go back up the call stack we get to [this bit of code](https://github.com/influxdata/influxdb/blob/8f72bf06e13e2269db973b9c2852e092239c701e/influxdb3_write/src/write_buffer/mod.rs#L456-L460):

```rust
    if let Some(schema) = result.schema.take() {
        debug!("replacing schema for {:?}", schema);


        catalog.replace_database(sequence, Arc::new(schema))?;
    }
```

We are updating the catalog with the new schema, but how does that work?

```rust
        inner.databases.insert(db.name.clone(), db);
```

Oh. Oh no. We're just overwriting it. Which means that the
DatabaseBuffer has an Arc to the *old* schema, not the *new* one. Which
means that the buffer will get the first copy of the schema with the
first new table, but *none* of the other ones. The solution is to make
sure that the buffer has a pointer to the Catalog instead which means
the DatabaseBuffer will have the most up to date schema and instead lets
only the Catalog handle the schema itself. This commit makes those
changes to make sure it works.

This was a very very subtle mutability/pointer bug given the
intersection of valid borrow checking and some writes making it in, but
luckily we caught it. It does mean though that until this fix is in, we
can consider changes between the Index PR and now are subtly broken and
shouldn't be used for anything beyond writing to a signle table per DB.

TL;DR We should ask the Catalog what the schema is as it contains the up
to date version of it.

Closes #24955
  • Loading branch information
mgattozzi committed May 16, 2024
1 parent 7a2867b commit d8708df
Show file tree
Hide file tree
Showing 3 changed files with 27 additions and 22 deletions.
2 changes: 1 addition & 1 deletion influxdb3_write/src/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,13 +123,13 @@ impl Catalog {
db
}
None => {
info!("return new db {}", db_name);
let mut inner = self.inner.write();

if inner.databases.len() >= Self::NUM_DBS_LIMIT {
return Err(Error::TooManyDbs);
}

info!("return new db {}", db_name);
let db = Arc::new(DatabaseSchema::new(db_name));
inner.databases.insert(db.name.clone(), Arc::clone(&db));
db
Expand Down
3 changes: 2 additions & 1 deletion influxdb3_write/src/wal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -663,6 +663,7 @@ mod tests {
use crate::Catalog;
use crate::LpWriteOp;
use crate::Precision;
use std::sync::Arc;

#[test]
fn segment_writer_reader() {
Expand Down Expand Up @@ -808,7 +809,7 @@ mod tests {

// Reopen the wal and make sure it loads the precision via
// `load_buffer_from_segment`
let catalog = Catalog::default();
let catalog = Arc::new(Catalog::default());
let wal = WalImpl::new(dir).unwrap();
let schema = schema::SchemaBuilder::new()
.tag("host")
Expand Down
44 changes: 24 additions & 20 deletions influxdb3_write/src/write_buffer/buffer_segment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,12 @@
//! single WAL segment. Only one segment should be open for writes in the write buffer at any
//! given time.

use crate::catalog::{Catalog, DatabaseSchema};
use crate::catalog::Catalog;
use crate::chunk::BufferChunk;
use crate::paths::ParquetFilePath;
use crate::write_buffer::flusher::BufferedWriteResult;
use crate::write_buffer::table_buffer::{Builder, Result as TableBufferResult, TableBuffer};
use crate::write_buffer::DatabaseSchema;
use crate::write_buffer::{
parse_validate_and_update_catalog, Error, TableBatch, ValidSegmentedData,
};
Expand Down Expand Up @@ -108,23 +109,20 @@ impl OpenBufferSegment {
.buffered_data
.database_buffers
.entry(db_name.to_string())
.or_insert_with(|| {
let db_schema = self
.catalog
.db_schema(&db_name)
.expect("db schema should exist");
DatabaseBuffer {
table_buffers: HashMap::new(),
db_schema,
}
.or_insert_with(|| DatabaseBuffer {
table_buffers: HashMap::new(),
});

let schema = self
.catalog
.db_schema(&db_name)
.expect("database should exist in schema");
for (table_name, table_batch) in db_batch.table_batches {
// TODO: for now we'll just have the number of rows represent the segment size. The entire
// buffer is going to get refactored to use different structures, so this will change.
self.segment_size += table_batch.rows.len();

db_buffer.buffer_table_batch(table_name, &self.segment_key, table_batch);
db_buffer.buffer_table_batch(table_name, &self.segment_key, table_batch, &schema);
}
}

Expand Down Expand Up @@ -177,7 +175,7 @@ impl OpenBufferSegment {
}

pub(crate) fn load_buffer_from_segment(
catalog: &Catalog,
catalog: &Arc<Catalog>,
mut segment_reader: Box<dyn WalSegmentReader>,
) -> Result<(BufferedData, usize)> {
let mut segment_size = 0;
Expand All @@ -201,12 +199,10 @@ pub(crate) fn load_buffer_from_segment(

let db_name = &write.db_name;
if !buffered_data.database_buffers.contains_key(db_name) {
let db_schema = catalog.db_schema(db_name).expect("db schema should exist");
buffered_data.database_buffers.insert(
db_name.clone(),
DatabaseBuffer {
table_buffers: HashMap::new(),
db_schema,
},
);
}
Expand All @@ -221,12 +217,20 @@ pub(crate) fn load_buffer_from_segment(
}
let segment_data = validated_write.valid_segmented_data.pop().unwrap();

let schema = catalog
.db_schema(&db_name)
.expect("database exists in schema");
for (table_name, table_batch) in segment_data.table_batches {
// TODO: for now we'll just have the number of rows represent the segment size. The entire
// buffer is going to get refactored to use different structures, so this will change.
segment_size += table_batch.rows.len();

db_buffer.buffer_table_batch(table_name, &segment_key, table_batch);
db_buffer.buffer_table_batch(
table_name,
&segment_key,
table_batch,
&schema,
);
}
}
}
Expand Down Expand Up @@ -318,7 +322,6 @@ impl BufferedData {
#[derive(Debug)]
struct DatabaseBuffer {
table_buffers: HashMap<String, TableBuffer>,
db_schema: Arc<DatabaseSchema>,
}

impl DatabaseBuffer {
Expand All @@ -327,19 +330,20 @@ impl DatabaseBuffer {
table_name: String,
segment_key: &PartitionKey,
table_batch: TableBatch,
schema: &Arc<DatabaseSchema>,
) {
if !self.table_buffers.contains_key(&table_name) {
// TODO: this check shouldn't be necessary. If the table doesn't exist in the catalog
// and we've gotten here, it means we're dropping a write.
if let Some(table) = self.db_schema.get_table(&table_name) {
if let Some(table) = schema.get_table(&table_name) {
self.table_buffers.insert(
table_name.clone(),
TableBuffer::new(segment_key.clone(), &table.index_columns()),
);
} else {
return;
// Sanity check panic in case this isn't true
unreachable!("table should exist in schema");
}
}

let table_buffer = self
.table_buffers
.get_mut(&table_name)
Expand Down

0 comments on commit d8708df

Please sign in to comment.