diff --git a/src/btree.rs b/src/btree.rs index 99e4554..60f7dd6 100644 --- a/src/btree.rs +++ b/src/btree.rs @@ -173,6 +173,12 @@ impl BtreePageType { self.0 & (MASK) == MASK } + #[inline] + pub fn is_index_interior(&self) -> bool { + const MASK: u8 = LEAF_FLAG | INDEX_FLAG; + self.0 & (MASK) == INDEX_FLAG + } + /// The btree page header size. /// /// * Returns 8 if this is a leaf page. @@ -811,11 +817,11 @@ pub fn allocate_from_unallocated_space( } /// Write a table leaf cell to the specified offset. -pub fn write_leaf_cell>( +pub fn write_leaf_cell( buffer: &mut PageBufferMut, offset: usize, cell_header: &[u8], - payload: &P, + payload: &dyn Payload<()>, n_local: u16, overflow_page_id: Option, ) { diff --git a/src/cursor.rs b/src/cursor.rs index b546437..f6a20ad 100644 --- a/src/cursor.rs +++ b/src/cursor.rs @@ -44,10 +44,12 @@ use crate::pager::MemPage; use crate::pager::PageBuffer; use crate::pager::PageBufferMut; use crate::pager::PageId; +use crate::pager::PagePayload; use crate::pager::Pager; use crate::payload::LocalPayload; use crate::payload::Payload; use crate::payload::PayloadSize; +use crate::payload::SlicePayload; use crate::record::compare_record; use crate::utils::i64_to_u64; use crate::utils::len_varint_buffer; @@ -597,10 +599,10 @@ impl<'a> BtreeCursor<'a> { } } - fn insert_cell>( + fn insert_cell( &mut self, cell_header: &[u8], - payload: &P, + payload: &dyn Payload<()>, n_local: u16, overflow_page_id: Option, ) -> Result<()> { @@ -1324,8 +1326,6 @@ impl<'a> BtreeCursor<'a> { return Err(Error::NotInitialized); } else if self.current_page.idx_cell >= self.current_page.n_cells { return Err(Error::NoEntry); - } else if self.current_page.page_type.is_index() { - todo!("delete index page"); } assert!(self.current_page.n_cells > 0); assert!(self.current_page.idx_cell < self.current_page.n_cells); @@ -1438,10 +1438,33 @@ impl<'a> BtreeCursor<'a> { BtreePageHeaderMut::from_page(&self.current_page.mem, &mut buffer); page_header.set_n_cells(self.current_page.n_cells); drop(buffer); - if self.current_page.page_type.is_table() - && !self.current_page.page_type.is_leaf() - { - self.move_to_left_most()?; + if !self.current_page.page_type.is_leaf() { + if self.current_page.page_type.is_index() { + let payload_size = (cell_size as u64 - 4).try_into().map_err(|_| { + Error::FileCorrupt { + page_id: self.current_page.page_id, + e: FileCorrupt::new("cell size"), + } + })?; + // The payload of the freed cell is not broken because freeblocks only + // overwrites the first 4 bytes of the cell. + // Using PagePayload pointing the freed cell for insert_cell() is safe. + // insert_cell() first copies the payload to the leaf page. Even if the + // new cell is inserted into the parent interior page as the divider, it + // is copied into interior_cell_buf. + // TODO: This can save memory copy. But it is not safe because the freed + // cell may be overwritten by future change for insert_cell(). + let payload = PagePayload::new( + &self.current_page.mem, + cell_offset + 4, + payload_size, + ); + self.move_to_left_most()?; + // Insert the payload of the removed cell to child page. + self.insert_cell(&[], &payload, payload.size().get() as u16, None)?; + } else { + self.move_to_left_most()?; + } } break; } else { @@ -1460,8 +1483,6 @@ impl<'a> BtreeCursor<'a> { // TODO: Optimization: shrink the page level by // replacing the interior page with the right child // page. - } else if self.current_page.page_type.is_index() { - todo!("delete index page"); } } else { // Initialize the root page. @@ -1492,6 +1513,8 @@ impl<'a> BtreeCursor<'a> { assert!(self.current_page.page_type.is_index()); // TODO: Delete cell in interior page. + + // Fill the interior cell with the right most cell in child. } Ok(()) } @@ -1527,6 +1550,7 @@ impl<'a> BtreeCursor<'a> { } fn free_cell(page: &MemPage, buffer: &mut PageBufferMut, offset: usize, cell_size: u16) { + let cell_size = if cell_size < 4 { 4 } else { cell_size }; buffer.copy_within(BTREE_FIRST_FREEBLOCK_OFFSET, offset); set_u16(buffer, offset + 2, cell_size); let mut page_header = BtreePageHeaderMut::from_page(page, buffer); @@ -1726,6 +1750,7 @@ mod tests { use crate::pager::PAGE_ID_1; use crate::payload::SlicePayload; use crate::record::parse_record; + use crate::record::RecordPayload; use crate::test_utils::*; use crate::value::Collation; use crate::value::Value; @@ -4812,4 +4837,304 @@ mod tests { assert_eq!(pager.num_pages() - pager.num_free_pages(), initial_pages); } + + #[test] + fn test_delete_single_index_leaf_page() { + let file = create_sqlite_database(&[ + "PRAGMA page_size = 512;", + "CREATE TABLE example(col);", + "CREATE INDEX index1 ON example(col);", + ]); + let pager = create_pager(file.as_file().try_clone().unwrap()).unwrap(); + let bctx = load_btree_context(file.as_file()).unwrap(); + let index_page_id = find_index_page_id("index1", file.path()); + + let mut cursor = BtreeCursor::new(index_page_id, &pager, &bctx).unwrap(); + + let mut entries = Vec::new(); + let mut payloads = Vec::new(); + for i in 0..10 { + let values = [Some(Value::Integer(i)), Some(Value::Integer(i))]; + let payload = build_record( + values + .iter() + .map(Option::as_ref) + .collect::>() + .as_slice(), + ); + cursor + .index_insert( + &build_comparators(values.as_slice()), + &SlicePayload::new(&payload).unwrap(), + ) + .unwrap(); + entries.push(values); + payloads.push(payload); + } + + cursor.move_to_last().unwrap(); + cursor.move_next().unwrap(); + assert!(matches!(cursor.delete(), Err(Error::NoEntry))); + assert!(cursor.get_index_payload().unwrap().is_none()); + + cursor + .index_move_to(&build_comparators(&entries[9])) + .unwrap(); + cursor.delete().unwrap(); + assert!(cursor.get_index_payload().unwrap().is_none()); + + cursor.move_to_first().unwrap(); + for i in 0..9 { + let payload = cursor.get_index_payload().unwrap().unwrap(); + assert_eq!(payload.buf(), &payloads[i]); + drop(payload); + cursor.move_next().unwrap(); + } + assert!(cursor.get_index_payload().unwrap().is_none()); + + cursor + .index_move_to(&build_comparators(&entries[0])) + .unwrap(); + cursor.delete().unwrap(); + let payload = cursor.get_index_payload().unwrap().unwrap(); + assert_eq!(payload.buf(), &payloads[1]); + drop(payload); + + cursor.move_to_first().unwrap(); + for i in 1..9 { + let payload = cursor.get_index_payload().unwrap().unwrap(); + assert_eq!(payload.buf(), &payloads[i]); + drop(payload); + cursor.move_next().unwrap(); + } + assert!(cursor.get_index_payload().unwrap().is_none()); + + cursor + .index_move_to(&build_comparators(&entries[2])) + .unwrap(); + cursor.delete().unwrap(); + let payload = cursor.get_index_payload().unwrap().unwrap(); + assert_eq!(payload.buf(), &payloads[3]); + drop(payload); + + cursor.move_to_first().unwrap(); + let payload = cursor.get_index_payload().unwrap().unwrap(); + assert_eq!(payload.buf(), &payloads[1]); + drop(payload); + cursor.move_next().unwrap(); + for i in 3..9 { + let payload = cursor.get_index_payload().unwrap().unwrap(); + assert_eq!(payload.buf(), &payloads[i]); + drop(payload); + cursor.move_next().unwrap(); + } + assert!(cursor.get_index_payload().unwrap().is_none()); + + cursor.move_to_first().unwrap(); + for _ in 0..7 { + cursor.delete().unwrap(); + } + assert!(matches!(cursor.delete(), Err(Error::NoEntry))); + + cursor.move_to_first().unwrap(); + assert!(cursor.get_index_payload().unwrap().is_none()); + + // The root page is cleared + let root_page = pager.get_page(index_page_id).unwrap(); + let buffer = root_page.buffer(); + let page_header = BtreePageHeader::from_page(&root_page, &buffer); + assert!(page_header.page_type().is_leaf()); + assert_eq!(page_header.n_cells(), 0); + assert_eq!(page_header.first_freeblock_offset(), 0); + assert_eq!(page_header.fragmented_free_bytes(), 0); + assert_eq!( + page_header.cell_content_area_offset().get(), + bctx.usable_size + ); + } + + #[test] + fn test_delete_index_leaf_page_freeblock() { + let file = create_sqlite_database(&[ + "PRAGMA page_size = 512;", + "CREATE TABLE example(col);", + "CREATE INDEX index1 ON example(col);", + ]); + let pager = create_pager(file.as_file().try_clone().unwrap()).unwrap(); + let bctx = load_btree_context(file.as_file()).unwrap(); + let index_page_id = find_index_page_id("index1", file.path()); + + let mut cursor = BtreeCursor::new(index_page_id, &pager, &bctx).unwrap(); + + let mut entries = Vec::new(); + let mut payloads = Vec::new(); + for i in 0..4 { + let values = [ + Some(Value::Blob(vec![i as u8; 97].into())), + Some(Value::Integer(i)), + ]; + let payload = build_record( + values + .iter() + .map(Option::as_ref) + .collect::>() + .as_slice(), + ); + cursor + .index_insert( + &build_comparators(values.as_slice()), + &SlicePayload::new(&payload).unwrap(), + ) + .unwrap(); + entries.push(values); + payloads.push(payload); + } + + let root_page = pager.get_page(index_page_id).unwrap(); + let buffer = root_page.buffer(); + let page_header = BtreePageHeader::from_page(&root_page, &buffer); + assert_eq!(page_header.first_freeblock_offset(), 0); + drop(buffer); + + cursor.move_to_first().unwrap(); + + // Delete 2 cells. + cursor.delete().unwrap(); + cursor.move_next().unwrap(); + cursor.delete().unwrap(); + + let buffer = root_page.buffer(); + let page_header = BtreePageHeader::from_page(&root_page, &buffer); + assert_ne!(page_header.first_freeblock_offset(), 0); + drop(buffer); + + let values = [ + Some(Value::Blob(vec![4; 97].into())), + Some(Value::Integer(4)), + ]; + let payload = build_record( + values + .iter() + .map(Option::as_ref) + .collect::>() + .as_slice(), + ); + cursor + .index_insert( + &build_comparators(values.as_slice()), + &SlicePayload::new(&payload).unwrap(), + ) + .unwrap(); + entries.push(values); + payloads.push(payload); + + // No defragment but use the freeblock. + let buffer = root_page.buffer(); + let page_header = BtreePageHeader::from_page(&root_page, &buffer); + assert_ne!(page_header.first_freeblock_offset(), 0); + drop(buffer); + + cursor.move_to_first().unwrap(); + for i in [1, 3, 4] { + let payload = cursor.get_index_payload().unwrap().unwrap(); + assert_eq!(payload.buf(), &payloads[i]); + drop(payload); + cursor.move_next().unwrap(); + } + assert!(cursor.get_index_payload().unwrap().is_none()); + } + + #[test] + fn test_delete_index() { + let file = create_sqlite_database(&[ + "PRAGMA page_size = 512;", + "CREATE TABLE example(col);", + "CREATE INDEX index1 ON example(col);", + ]); + let pager = create_pager(file.as_file().try_clone().unwrap()).unwrap(); + let bctx = load_btree_context(file.as_file()).unwrap(); + let index_page_id = find_index_page_id("index1", file.path()); + + let initial_pages = pager.num_pages(); + assert_eq!(initial_pages, 3); + + let mut cursor = BtreeCursor::new(index_page_id, &pager, &bctx).unwrap(); + + // At least 3 leaf pages. + let mut entries = Vec::new(); + let mut payloads = Vec::new(); + for i in 0..16 { + let values = [ + Some(Value::Blob(vec![i as u8; 97].into())), + Some(Value::Integer(i)), + ]; + let payload = build_record( + values + .iter() + .map(Option::as_ref) + .collect::>() + .as_slice(), + ); + cursor + .index_insert( + &build_comparators(values.as_slice()), + &SlicePayload::new(&payload).unwrap(), + ) + .unwrap(); + entries.push(values); + payloads.push(payload); + } + + cursor.move_to_first().unwrap(); + for i in 0..16 { + let payload = cursor.get_index_payload().unwrap().unwrap(); + assert_eq!(payload.buf(), &payloads[i]); + drop(payload); + cursor.move_next().unwrap(); + } + assert!(cursor.get_index_payload().unwrap().is_none()); + + // Delete last 4 entries. At least, the last leaf page is deleted. + for i in 0..4 { + cursor + .index_move_to(&build_comparators(&entries[15 - i])) + .unwrap(); + cursor.delete().unwrap(); + } + + cursor.move_to_first().unwrap(); + for i in 0..12 { + let payload = cursor.get_index_payload().unwrap().unwrap(); + assert_eq!(payload.buf(), &payloads[i]); + drop(payload); + cursor.move_next().unwrap(); + } + assert!(cursor.get_index_payload().unwrap().is_none()); + + // Delete the first leaf page. + for i in 0..4 { + cursor + .index_move_to(&build_comparators(&entries[i])) + .unwrap(); + cursor.delete().unwrap(); + } + + for i in 4..12 { + let payload = cursor.get_index_payload().unwrap().unwrap(); + assert_eq!(payload.buf(), &payloads[i]); + drop(payload); + cursor.move_next().unwrap(); + } + assert!(cursor.get_index_payload().unwrap().is_none()); + + cursor.move_to_first().unwrap(); + for _ in 0..8 { + cursor.delete().unwrap(); + } + + cursor.move_to_first().unwrap(); + assert!(cursor.get_index_payload().unwrap().is_none()); + + assert_eq!(pager.num_pages() - pager.num_free_pages(), initial_pages); + } } diff --git a/src/pager.rs b/src/pager.rs index 2110fd1..f880fb2 100644 --- a/src/pager.rs +++ b/src/pager.rs @@ -32,6 +32,8 @@ use std::rc::Rc; use crate::header::DatabaseHeader; use crate::header::DatabaseHeaderMut; use crate::header::DATABASE_HEADER_SIZE; +use crate::payload::Payload; +use crate::payload::PayloadSize; /// Page 1 is special: /// @@ -479,6 +481,36 @@ impl MemPage { } } +pub struct PagePayload { + page: MemPage, + offset: usize, + size: PayloadSize, +} + +impl PagePayload { + pub fn new(page: &MemPage, offset: usize, size: PayloadSize) -> Self { + let page = MemPage { + page: page.page.clone(), + header_offset: page.header_offset, + }; + Self { page, offset, size } + } +} + +impl Payload<()> for PagePayload { + fn size(&self) -> PayloadSize { + self.size + } + + fn load(&self, offset: usize, buf: &mut [u8]) -> std::result::Result { + assert!(offset <= self.size.get() as usize); + let n = buf.len().min(self.size.get() as usize - offset); + buf[..n] + .copy_from_slice(&self.page.buffer()[self.offset + offset..self.offset + offset + n]); + Ok(n) + } +} + struct RawPage { buf: Vec, is_dirty: bool, diff --git a/src/payload.rs b/src/payload.rs index 17e8cee..0568011 100644 --- a/src/payload.rs +++ b/src/payload.rs @@ -28,7 +28,6 @@ pub struct SlicePayload<'a> { } impl<'a> SlicePayload<'a> { - #[allow(dead_code)] pub fn new(buf: &'a [u8]) -> anyhow::Result { let size = PayloadSize::try_from(buf.len() as u64) .map_err(|_| anyhow::anyhow!("payload size too large"))?; diff --git a/src/test_utils.rs b/src/test_utils.rs index 5fffa12..3b9c498 100644 --- a/src/test_utils.rs +++ b/src/test_utils.rs @@ -25,7 +25,9 @@ use crate::pager::Pager; use crate::payload::Payload; use crate::record::RecordPayload; use crate::schema::Schema; +use crate::value::Collation; use crate::value::Value; +use crate::value::ValueCmp; use crate::Connection; use crate::DatabaseHeader; use crate::Expression; @@ -125,3 +127,10 @@ pub fn build_record(record: &[Option<&Value>]) -> Vec { ); buf } + +pub fn build_comparators<'a>(values: &'a [Option]) -> Vec>> { + values + .iter() + .map(|v| v.as_ref().map(|v| ValueCmp::new(v, &Collation::Binary))) + .collect::>() +}