diff --git a/src/cursor.rs b/src/cursor.rs index 86532b9..ababd68 100644 --- a/src/cursor.rs +++ b/src/cursor.rs @@ -228,17 +228,22 @@ impl CellPayload for LeafCellPayload<'_, P> { } } -struct PageCellPayload { +struct PageCellPayload<'a> { + header: &'a [u8], payload: PagePayload, } -impl CellPayload for PageCellPayload { +impl CellPayload for PageCellPayload<'_> { fn size(&self) -> u16 { - CopiablePayload::size(&self.payload).get() as u16 + self.header.len() as u16 + CopiablePayload::size(&self.payload).get() as u16 } fn copy_all(&self, buf: &mut [u8]) { - assert_eq!(self.payload.copy(0, buf), self.size() as usize); + buf[..self.header.len()].copy_from_slice(self.header); + assert_eq!( + self.payload.copy(0, &mut buf[self.header.len()..]), + CopiablePayload::size(&self.payload).get() as usize + ); } } @@ -545,12 +550,15 @@ impl<'a> BtreeCursor<'a> { let (cell_header, n_local, overflow_page_id) = self.pack_cell(cell_header_buf.as_mut_slice(), payload, None)?; - self.insert_cell(&LeafCellPayload { - cell_header, - payload, - n_local, - overflow_page_id, - }) + self.insert_cell( + self.parent_pages.len(), + &LeafCellPayload { + cell_header, + payload, + n_local, + overflow_page_id, + }, + ) } /// Insert or update a new item to the table. @@ -575,12 +583,15 @@ impl<'a> BtreeCursor<'a> { let (cell_header, n_local, overflow_page_id) = self.pack_cell(cell_header_buf.as_mut_slice(), payload, Some(key))?; - self.insert_cell(&LeafCellPayload { - cell_header, - payload, - n_local, - overflow_page_id, - }) + self.insert_cell( + self.parent_pages.len(), + &LeafCellPayload { + cell_header, + payload, + n_local, + overflow_page_id, + }, + ) } fn pack_cell<'b, P: CopiablePayload>( @@ -647,7 +658,8 @@ impl<'a> BtreeCursor<'a> { } } - fn insert_cell(&mut self, cell_payload: &dyn CellPayload) -> Result<()> { + fn insert_cell(&mut self, mut depth: usize, cell_payload: &dyn CellPayload) -> Result<()> { + assert!(depth <= self.parent_pages.len()); let new_cell_size = cell_payload.size(); // Allocate 4 bytes or more to be compatible to allocateSpace() of SQLite. // Otherwise freeSpace() of SQLite corrupts the database file (e.g. on updating @@ -656,7 +668,7 @@ impl<'a> BtreeCursor<'a> { // fragmented and never reused. cell_size can be less than 4 on index pages. let mut new_cell_size = if new_cell_size < 4 { 4 } else { new_cell_size }; - let mut depth = self.parent_pages.len(); + let mut cell_payload = Some(cell_payload); let mut interior_cell_buf = self.pager.allocate_tmp_page(); // sub interior cell is for a special case when table leaf page is splitted into // 3 pages. In that case, new 2 interior cells are inserted to the @@ -841,7 +853,7 @@ impl<'a> BtreeCursor<'a> { ); set_u16(&mut buffer, cell_pointer_offset, allocated_offset as u16); - if page_type.is_leaf() { + if let Some(cell_payload) = cell_payload.take() { cell_payload.copy_all(&mut buffer[allocated_offset..]); } else { current_page.idx_cell += new_cell_in_right_page as u16; @@ -1063,7 +1075,10 @@ impl<'a> BtreeCursor<'a> { new_cell_size, ); assert!(current_page.page_type.is_leaf()); - cell_payload.copy_all(&mut new_buffer[cell_content_area_offset..]); + cell_payload + .take() + .unwrap() + .copy_all(&mut new_buffer[cell_content_area_offset..]); let mut new_page_header = BtreePageHeaderMut::from_page(&new_page, &mut new_buffer); @@ -1104,7 +1119,7 @@ impl<'a> BtreeCursor<'a> { current_page.idx_cell - idx_cells.start, new_cell_size, ); - if current_page.page_type.is_leaf() { + if let Some(cell_payload) = cell_payload.take() { cell_payload.copy_all(&mut new_buffer[cell_content_area_offset..]); } else { new_buffer[cell_content_area_offset @@ -1371,13 +1386,104 @@ impl<'a> BtreeCursor<'a> { drop(buffer); } - if !self.current_page.page_type.is_leaf() { + let delete_interior_cell = !self.current_page.page_type.is_leaf(); + if delete_interior_cell { assert!(self.current_page.page_type.is_index()); + assert!(self.current_page.idx_cell < self.current_page.n_cells); + + let depth_interior_page = self.parent_pages.len(); - // TODO: Delete cell in interior page. - todo!("delete cell in interior page"); + self.move_to_right_most()?; + self.current_page.idx_cell -= 1; - // Fill the interior cell with the right most cell in child. + // Remove the cell from the interior page. + let page_id_buf = { + let current_page = &mut self.parent_pages[depth_interior_page]; + let header_size = current_page.page_type.header_size(); + let mut buffer = + self.pager + .make_page_mut(¤t_page.mem) + .map_err(|e| Error::Pager { + page_id: current_page.page_id, + e, + })?; + let cell_offset = get_cell_offset( + ¤t_page.mem, + &buffer, + current_page.idx_cell, + header_size, + ) + .map_err(|e| Error::FileCorrupt { + page_id: current_page.page_id, + e, + })?; + let cell_size = current_page.page_type.compute_cell_size_fn()( + self.btree_ctx, + &buffer, + cell_offset, + ) + .map_err(|e| Error::FileCorrupt { + page_id: current_page.page_id, + e, + })?; + let cell_pointers_tail_offset = + cell_pointer_offset(¤t_page.mem, current_page.n_cells, header_size); + let cell_pointer_offset = + cell_pointer_offset(¤t_page.mem, current_page.idx_cell, header_size); + // Move the cell pointers. + buffer.copy_within( + cell_pointer_offset + 2..cell_pointers_tail_offset, + cell_pointer_offset, + ); + let page_id_buf: [u8; 4] = buffer[cell_offset..cell_offset + 4].try_into().unwrap(); + Self::free_cell(¤t_page.mem, &mut buffer, cell_offset, cell_size); + + // Reduce the n_cells for insert_cell(). The header value is updated in + // insert_cell(). + current_page.n_cells -= 1; + + page_id_buf + }; + + // Copy the right most cell in the child page to the interior page. + { + let buffer = self.current_page.mem.buffer(); + let cell_offset = get_cell_offset( + &self.current_page.mem, + &buffer, + self.current_page.idx_cell, + self.current_page.page_type.header_size(), + ) + .map_err(|e| Error::FileCorrupt { + page_id: self.current_page.page_id, + e, + })?; + let cell_size = self.current_page.page_type.compute_cell_size_fn()( + self.btree_ctx, + &buffer, + cell_offset, + ) + .map_err(|e| Error::FileCorrupt { + page_id: self.current_page.page_id, + e, + })?; + assert!(cell_size > 0); + let payload = PagePayload::new( + &self.current_page.mem, + cell_offset, + (cell_size as u64).try_into().unwrap(), + ); + drop(buffer); + self.insert_cell( + depth_interior_page, + &PageCellPayload { + header: &page_id_buf, + payload, + }, + )?; + } + + // Remove the moved right most cell in the child page. } loop { @@ -1504,7 +1610,13 @@ impl<'a> BtreeCursor<'a> { assert!(self.move_to_left_most()?); } // Insert the payload of the removed cell to child page. - self.insert_cell(&PageCellPayload { payload })?; + self.insert_cell( + self.parent_pages.len(), + &PageCellPayload { + header: &[], + payload, + }, + )?; } else { assert!(self.move_to_left_most()?); } @@ -1550,6 +1662,10 @@ impl<'a> BtreeCursor<'a> { } } } + + if delete_interior_cell { + self.move_next()?; + } Ok(()) } @@ -1681,18 +1797,8 @@ impl<'a> BtreeCursor<'a> { if !self.move_to_current_child()? { return Ok(false); } - loop { - if self.current_page.page_type.is_leaf() { - break; - } - let buffer = self.current_page.mem.buffer(); - let page_id = parse_btree_interior_cell_page_id(&self.current_page.mem, &buffer, 0) - .map_err(|e| Error::FileCorrupt { - page_id: self.current_page.page_id, - e, - })?; - drop(buffer); - self.move_to_child(page_id)?; + while !self.current_page.page_type.is_leaf() { + assert!(self.move_to_current_child()?); } Ok(true) } @@ -5230,4 +5336,142 @@ mod tests { assert_eq!(pager.num_pages() - pager.num_free_pages(), initial_pages); } + + #[test] + fn test_delete_index_interior() { + // Deleting entries from interior pages. + let file = create_sqlite_database(&[ + "PRAGMA page_size = 512;", + "CREATE TABLE example(col);", + "CREATE INDEX index1 ON example(col);", + ]); + let pager = create_pager(file.as_file().try_clone().unwrap()).unwrap(); + let bctx = load_btree_context(file.as_file()).unwrap(); + let index_page_id = find_index_page_id("index1", file.path()); + + let initial_pages = pager.num_pages(); + assert_eq!(initial_pages, 3); + + let mut cursor = BtreeCursor::new(index_page_id, &pager, &bctx).unwrap(); + + // At least 3 levels. + let mut entries = Vec::new(); + let mut payloads = Vec::new(); + for i in 0..24 { + let values = [ + Some(Value::Blob(vec![i as u8; 97].into())), + Some(Value::Integer(i)), + ]; + let payload = build_record( + values + .iter() + .map(Option::as_ref) + .collect::>() + .as_slice(), + ); + cursor + .index_insert( + &build_comparators(values.as_slice()), + &SlicePayload::new(&payload).unwrap(), + ) + .unwrap(); + entries.push(values); + payloads.push(payload); + } + + // Delete cell in root page. + cursor.move_to_first().unwrap(); + assert!(cursor.parent_pages.len() >= 2); + while !cursor.parent_pages.is_empty() { + cursor.move_next().unwrap(); + } + let payload = cursor.get_index_payload().unwrap().unwrap(); + assert_eq!(payload.buf(), &payloads[8]); + drop(payload); + + cursor.delete().unwrap(); + let payload = cursor.get_index_payload().unwrap().unwrap(); + assert_eq!(payload.buf(), &payloads[9]); + drop(payload); + + cursor.move_to_first().unwrap(); + for expected in payloads.iter().take(8) { + assert_eq!(cursor.get_index_payload().unwrap().unwrap().buf(), expected); + cursor.move_next().unwrap(); + } + for expected in payloads.iter().take(24).skip(9) { + assert_eq!(cursor.get_index_payload().unwrap().unwrap().buf(), expected); + cursor.move_next().unwrap(); + } + assert!(cursor.get_index_payload().unwrap().is_none()); + + // Delete cell in the second level interior page. + cursor.move_to_first().unwrap(); + assert!(cursor.parent_pages.len() >= 2); + while cursor.parent_pages.len() != 1 { + cursor.move_next().unwrap(); + } + let payload = cursor.get_index_payload().unwrap().unwrap(); + assert_eq!(payload.buf(), &payloads[2]); + drop(payload); + + cursor.delete().unwrap(); + let payload = cursor.get_index_payload().unwrap().unwrap(); + assert_eq!(payload.buf(), &payloads[3]); + drop(payload); + + cursor.move_to_first().unwrap(); + for expected in payloads.iter().take(2) { + assert_eq!(cursor.get_index_payload().unwrap().unwrap().buf(), expected); + cursor.move_next().unwrap(); + } + for expected in payloads.iter().take(8).skip(3) { + assert_eq!(cursor.get_index_payload().unwrap().unwrap().buf(), expected); + cursor.move_next().unwrap(); + } + for expected in payloads.iter().take(24).skip(9) { + assert_eq!(cursor.get_index_payload().unwrap().unwrap().buf(), expected); + cursor.move_next().unwrap(); + } + assert!(cursor.get_index_payload().unwrap().is_none()); + + // Delete cell in the root page. + for _ in 0..6 { + cursor.move_to_first().unwrap(); + assert!(cursor.parent_pages.len() >= 2); + while !cursor.parent_pages.is_empty() { + cursor.move_next().unwrap(); + } + cursor.delete().unwrap(); + } + cursor.move_to_first().unwrap(); + // Root page is empty. + assert_eq!(cursor.parent_pages[0].n_cells, 0); + + // Delete cell in the second level page. + for _ in 0..12 { + cursor.move_to_first().unwrap(); + assert!(cursor.parent_pages.len() >= 2); + while cursor.parent_pages.len() != 1 { + cursor.move_next().unwrap(); + } + cursor.delete().unwrap(); + } + + cursor.move_to_first().unwrap(); + assert_eq!(cursor.parent_pages[0].n_cells, 0); + // Second level page is empty. + assert_eq!(cursor.parent_pages[1].n_cells, 0); + + for _ in 0..4 { + cursor.move_to_first().unwrap(); + cursor.delete().unwrap(); + } + assert!(cursor.get_index_payload().unwrap().is_none()); + + cursor.move_to_first().unwrap(); + assert!(cursor.get_index_payload().unwrap().is_none()); + + assert_eq!(pager.num_pages() - pager.num_free_pages(), initial_pages); + } }