diff --git a/src/btree.rs b/src/btree.rs index cb92dc8..fe714cd 100644 --- a/src/btree.rs +++ b/src/btree.rs @@ -19,6 +19,7 @@ use crate::pager::MemPage; use crate::pager::PageBuffer; use crate::pager::PageBufferMut; use crate::pager::PageId; +use crate::utils::len_varint_buffer; use crate::utils::parse_varint; use crate::utils::u64_to_i64; @@ -28,6 +29,7 @@ type ParseResult = std::result::Result; pub const BTREE_PAGE_INTERIOR_HEADER_SIZE: usize = 12; pub const BTREE_PAGE_LEAF_HEADER_SIZE: usize = 8; pub const BTREE_PAGE_HEADER_MAX_SIZE: usize = BTREE_PAGE_INTERIOR_HEADER_SIZE; +pub const BTREE_PAGE_CELL_POINTER_SIZE: usize = 2; const LEAF_FLAG: u8 = 0x08; const INDEX_FLAG: u8 = 0x02; @@ -45,6 +47,15 @@ fn parse_non_zero_u16(buf: [u8; 2]) -> NonZeroU32 { NonZeroU32::new(v).unwrap() } +/// TODO: Find non conditional branch way. +pub fn non_zero_to_u16(v: u32) -> u16 { + if v == 65536 { + 0 + } else { + v as u16 + } +} + #[inline(always)] pub fn set_u16(buf: &mut [u8], offset: usize, value: u16) { buf[offset..offset + 2].copy_from_slice(&value.to_be_bytes()); @@ -56,6 +67,11 @@ pub const BTREE_OVERFLOW_PAGE_ID_BYTES: usize = 4; pub struct BtreePageType(u8); impl BtreePageType { + #[inline] + pub fn interior_type(&self) -> Self { + Self(self.0 & !LEAF_FLAG) + } + #[inline] pub fn is_leaf(&self) -> bool { self.0 & LEAF_FLAG != 0 @@ -84,6 +100,17 @@ impl BtreePageType { let additional_size = is_interior >> 1; 8 + additional_size } + + pub fn compute_cell_size_fn(&self) -> fn(&BtreeContext, &[u8], usize) -> ParseResult { + if self.is_index() { + todo!("index cell size"); + } + if self.is_leaf() { + compute_table_leaf_cell_size + } else { + compute_table_interior_cell_size + } + } } pub struct BtreePageHeader<'page>(&'page [u8; BTREE_PAGE_HEADER_MAX_SIZE]); @@ -187,6 +214,10 @@ impl<'a> BtreePageHeaderMut<'a> { pub fn clear_fragmented_free_bytes(&mut self) { self.0[7] = 0; } + + pub fn set_right_page_id(&mut self, page_id: PageId) { + self.0[8..12].copy_from_slice(&page_id.to_be_bytes()); + } } pub struct FreeblockIterator<'a> { @@ -322,6 +353,33 @@ pub fn get_cell_offset( Ok(cell_offset) } +fn compute_table_leaf_cell_size( + ctx: &BtreeContext, + // TODO: How to accept both PageBufferMut and TemporaryPage? + buffer: &[u8], + offset: usize, +) -> ParseResult { + let (payload_size, payload_size_length) = + parse_varint(&buffer[offset..]).ok_or("parse payload size")?; + let key_length = len_varint_buffer(&buffer[offset + payload_size_length..]); + let n_local = if payload_size <= ctx.max_local(true) as u64 { + payload_size as u16 + } else { + ctx.n_local(true, payload_size as i32) + }; + Ok(payload_size_length as u16 + key_length as u16 + n_local) +} + +fn compute_table_interior_cell_size( + _ctx: &BtreeContext, + // TODO: How to accept both PageBufferMut and TemporaryPage? + buffer: &[u8], + offset: usize, +) -> ParseResult { + let key_length = len_varint_buffer(&buffer[offset + 4..]); + Ok(4 + key_length as u16) +} + /// Context containing constant values to parse btree page. pub struct BtreeContext { /// Maximum local payload size. The first is for index pages, the second is @@ -598,6 +656,26 @@ pub fn allocate_from_unallocated_space( new_cell_content_area_offset } +/// Write a table leaf cell to the specified offset. +pub fn write_table_leaf_cell( + buffer: &mut PageBufferMut, + offset: usize, + cell_header: &[u8], + local_payload: &[u8], + overflow_page_id: Option, +) { + // Copy payload to the btree page. + let payload_offset = offset + cell_header.len(); + buffer[offset..payload_offset].copy_from_slice(cell_header); + let payload_tail_offset = payload_offset + local_payload.len(); + buffer[payload_offset..payload_tail_offset].copy_from_slice(local_payload); + if let Some(overflow_page_id) = overflow_page_id { + let overflow_page_id = overflow_page_id.to_be_bytes(); + buffer[payload_tail_offset..payload_tail_offset + overflow_page_id.len()] + .copy_from_slice(&overflow_page_id); + } +} + /// Compute the free size of the page. /// /// n_cells is an argument because this is cached in cursor. @@ -629,6 +707,32 @@ pub fn compute_free_size(page: &MemPage, buffer: &PageBufferMut, n_cells: u16) - Ok(free_size) } +/// This is due to Rust borrow checker... +pub fn copy_key_for_split( + buf: &mut [u8], + is_leaf: bool, + left_page: &MemPage, + left_buffer: &mut PageBufferMut, + n_cells: u16, + header_size: u8, +) -> ParseResult { + let cell_offset = get_cell_offset(left_page, left_buffer, n_cells - 1, header_size)?; + let key_offset_in_cell = if is_leaf { + // payload_size_length + len_varint_buffer(&left_buffer[cell_offset..]) + } else { + left_buffer.copy_within(cell_offset..cell_offset + 4, left_page.header_offset + 8); + // Remove the cell at the tail. + // TODO: add the cell to freeblocks list or try not to copy the cell. + BtreePageHeaderMut::from_page(left_page, left_buffer).set_n_cells(n_cells - 1); + 4 + }; + let key_offset = cell_offset + key_offset_in_cell; + let key_length = len_varint_buffer(&left_buffer[key_offset..]); + buf[..key_length].copy_from_slice(&left_buffer[key_offset..key_offset + key_length]); + Ok(key_length as u16) +} + #[cfg(test)] mod tests { use super::*; diff --git a/src/cursor.rs b/src/cursor.rs index 44a3f51..e612cdc 100644 --- a/src/cursor.rs +++ b/src/cursor.rs @@ -20,10 +20,13 @@ use crate::btree::allocate_from_freeblocks; use crate::btree::allocate_from_unallocated_space; use crate::btree::cell_pointer_offset; use crate::btree::compute_free_size; +use crate::btree::copy_key_for_split; use crate::btree::get_cell_offset; +use crate::btree::non_zero_to_u16; use crate::btree::parse_btree_interior_cell_page_id; use crate::btree::parse_btree_leaf_table_cell; use crate::btree::set_u16; +use crate::btree::write_table_leaf_cell; use crate::btree::BtreeContext; use crate::btree::BtreePageHeader; use crate::btree::BtreePageHeaderMut; @@ -32,14 +35,14 @@ use crate::btree::IndexCellKeyParser; use crate::btree::PayloadInfo; use crate::btree::TableCellKeyParser; use crate::btree::BTREE_OVERFLOW_PAGE_ID_BYTES; +use crate::btree::BTREE_PAGE_CELL_POINTER_SIZE; +use crate::pager::swap_page_buffer; use crate::pager::MemPage; use crate::pager::PageBuffer; use crate::pager::PageId; use crate::pager::Pager; use crate::record::compare_record; use crate::utils::i64_to_u64; -use crate::utils::len_varint_buffer; -use crate::utils::parse_varint; use crate::utils::put_varint; use crate::value::ValueCmp; @@ -402,20 +405,21 @@ impl<'a> BtreeCursor<'a> { pub fn insert(&mut self, key: i64, payload: &[u8]) -> anyhow::Result<()> { let current_cell_key = self.table_move_to(key)?; - let mut cell_header = [0; 18]; + let mut cell_header_buf = [0; 18]; let payload_size: i32 = payload.len().try_into()?; let mut cell_header_size = - put_varint(cell_header.as_mut_slice(), payload_size as u64) as u16; + put_varint(cell_header_buf.as_mut_slice(), payload_size as u64) as u16; cell_header_size += put_varint( - &mut cell_header[cell_header_size as usize..], + &mut cell_header_buf[cell_header_size as usize..], i64_to_u64(key), ) as u16; + let cell_header = &cell_header_buf[..cell_header_size as usize]; let is_table = true; - let (cell_size, n_local, overflow_page_id) = if payload_size + let (new_cell_size, local_payload, overflow_page_id) = if payload_size <= self.btree_ctx.max_local(is_table) as i32 { let payload_size = payload_size as u16; - (cell_header_size + payload_size, payload_size, None) + (cell_header_size + payload_size, payload, None) } else { // Split the payload into local and overflow pages. let n_local = self.btree_ctx.n_local(is_table, payload_size); @@ -445,56 +449,61 @@ impl<'a> BtreeCursor<'a> { ( cell_header_size + n_local + BTREE_OVERFLOW_PAGE_ID_BYTES as u16, - n_local, - Some(first_overflow_page_id.to_be_bytes()), + &payload[..n_local as usize], + Some(first_overflow_page_id), ) }; - // Allocate 4 bytes or more to be compatible to allocateSpace() of SQLite. - // Otherwise freeSpace() of SQLite corrupts the database file (e.g. on updating - // or deleting records). 4 bytes allocation is introduced in order to make - // spaces easier to reused as freeblock. 1 ~ 3 bytes spaces are counted as - // fragmented and never reused. cell_size can be less than 4 on index pages. - let cell_size = if cell_size < 4 { 4 } else { cell_size }; match current_cell_key { Some(current_cell_key) if current_cell_key == key => { // TODO: Update the payload todo!("update the payload"); } - _ => { - assert!(self.current_page.page_type.is_table()); - assert!(self.current_page.page_type.is_leaf()); + _ => {} + } - // Upgrading should be success because there must be no buffer reference of the - // page. We can guarantee it because: - // - // * This cursor is the only cursor handling the btree containing the page and - // * Only the possible reference is the returned payload from - // get_table_payload(). However the payload is dropped before calling insert() - // which is mutable method. - let mut buffer = self.pager.make_page_mut(&self.current_page.mem)?; - - // TODO: Cache free size. - let free_size = - compute_free_size(&self.current_page.mem, &buffer, self.current_page.n_cells) - .map_err(|e| anyhow::anyhow!("compute free size: {:?}", e))?; - - if free_size < cell_size + 2 { - // TODO: balance the btree. - todo!("balance the btree"); - } + assert!(self.current_page.page_type.is_table()); + assert!(self.current_page.page_type.is_leaf()); + + let mut depth = self.parent_pages.len(); + let mut interior_cell_buf = [0; 13]; + + // Allocate 4 bytes or more to be compatible to allocateSpace() of SQLite. + // Otherwise freeSpace() of SQLite corrupts the database file (e.g. on updating + // or deleting records). 4 bytes allocation is introduced in order to make + // spaces easier to reused as freeblock. 1 ~ 3 bytes spaces are counted as + // fragmented and never reused. cell_size can be less than 4 on index pages. + let mut new_cell_size = if new_cell_size < 4 { 4 } else { new_cell_size }; + + loop { + let current_page = if depth == self.parent_pages.len() { + &mut self.current_page + } else { + &mut self.parent_pages[depth] + }; - let page_header = BtreePageHeader::from_page_mut(&self.current_page.mem, &buffer); - let page_type = self.current_page.page_type; + // make_page_mut() should succeed because there must be no buffer reference of + // the page. We can guarantee it because: + // + // * This cursor is the only cursor handling the btree containing the page and + // * Only the possible reference is the returned payload from + // get_table_payload(). However the payload is dropped before calling insert() + // which is mutable method. + let mut buffer = self.pager.make_page_mut(¤t_page.mem)?; + + // TODO: Cache free size. + let free_size = compute_free_size(¤t_page.mem, &buffer, current_page.n_cells) + .map_err(|e| anyhow::anyhow!("compute free size: {:?}", e))?; + + if free_size >= new_cell_size + 2 { + let page_header = BtreePageHeader::from_page_mut(¤t_page.mem, &buffer); + let page_type = current_page.page_type; let header_size = page_type.header_size(); let first_freeblock_offset = page_header.first_freeblock_offset(); let cell_content_area_offset = page_header.cell_content_area_offset().get() as usize; - let unallocated_space_offset = cell_pointer_offset( - &self.current_page.mem, - self.current_page.n_cells, - header_size, - ); + let unallocated_space_offset = + cell_pointer_offset(¤t_page.mem, current_page.n_cells, header_size); let fragmented_free_bytes = page_header.fragmented_free_bytes(); // unallocated_size must fit in u16 because cell_content_area_offset is at most // 65536 and unallocated_space_offset must be bigger than 0. @@ -516,10 +525,10 @@ impl<'a> BtreeCursor<'a> { && fragmented_free_bytes <= 57 { allocate_from_freeblocks( - &self.current_page.mem, + ¤t_page.mem, &mut buffer, first_freeblock_offset, - cell_size, + new_cell_size, ) } else { None @@ -531,54 +540,47 @@ impl<'a> BtreeCursor<'a> { } else { // 2. Defragmentation if needed. let mut cell_content_area_offset = cell_content_area_offset; - if unallocated_size < cell_size + 2 { + if unallocated_size < new_cell_size + 2 { // TODO: Optimization: Move cell block of the size of a freeblock if // there is only a few freeblocks. See defragmentPage() of SQLite. let mut tmp_page = self.pager.allocate_tmp_page(); - buffer.swap(&mut tmp_page); + buffer.swap_tmp(&mut tmp_page); // Copy database header if the page is page 1. - buffer[..self.current_page.mem.header_offset] - .copy_from_slice(&tmp_page[..self.current_page.mem.header_offset]); + buffer[..current_page.mem.header_offset] + .copy_from_slice(&tmp_page[..current_page.mem.header_offset]); let mut page_header = - BtreePageHeaderMut::from_page(&self.current_page.mem, &mut buffer); - page_header.set_page_type(page_type); + BtreePageHeaderMut::from_page(¤t_page.mem, &mut buffer); + page_header.set_page_type(current_page.page_type); page_header.set_first_freeblock_offset(0); page_header.clear_fragmented_free_bytes(); + if !current_page.page_type.is_leaf() { + let right_page_id_offset = current_page.mem.header_offset + 8; + buffer[right_page_id_offset..right_page_id_offset + 4] + .copy_from_slice( + &tmp_page[right_page_id_offset..right_page_id_offset + 4], + ); + } // cell_content_area_offset and n_cells will be set later. cell_content_area_offset = self.btree_ctx.usable_size as usize; // Copy reserved area. buffer[cell_content_area_offset..] .copy_from_slice(&tmp_page[cell_content_area_offset..]); - for i in 0..self.current_page.n_cells { - let offset = get_cell_offset( - &self.current_page.mem, - &tmp_page, - i, - header_size, - ) - .map_err(|e| { - anyhow::anyhow!("failed to get cell offset: {:?}", e) - })?; - // Compute cell size. - let (payload_size, payload_size_length) = - parse_varint(&tmp_page[offset..]) - .ok_or_else(|| anyhow::anyhow!("parse payload size"))?; - let key_length = - len_varint_buffer(&tmp_page[offset + payload_size_length..]) - as u16; - let n_local = - if payload_size <= self.btree_ctx.max_local(is_table) as u64 { - payload_size as u16 - } else { - self.btree_ctx.n_local(is_table, payload_size as i32) - }; - let cell_size = payload_size_length as u16 + key_length + n_local; - + let compute_cell_size = page_type.compute_cell_size_fn(); + for i in 0..current_page.n_cells { + let offset = + get_cell_offset(¤t_page.mem, &tmp_page, i, header_size) + .map_err(|e| { + anyhow::anyhow!("failed to get cell offset: {:?}", e) + })?; + let cell_size = + compute_cell_size(self.btree_ctx, &tmp_page, offset).map_err( + |e| anyhow::anyhow!("failed to compute cell size: {:?}", e), + )?; // There must be enough size of unallocated space before // cell_content_area_offset because all cells have fit in the page // before defragmentation. let new_cell_content_area_offset = allocate_from_unallocated_space( - &self.current_page.mem, + ¤t_page.mem, &mut buffer, header_size, cell_content_area_offset, @@ -595,16 +597,16 @@ impl<'a> BtreeCursor<'a> { // changed. assert!( unallocated_space_offset + 2 - <= cell_content_area_offset - cell_size as usize + <= cell_content_area_offset - new_cell_size as usize ); // Clear the unallocated space. buffer[unallocated_space_offset + 2 - ..cell_content_area_offset - cell_size as usize] + ..cell_content_area_offset - new_cell_size as usize] .fill(0); } // 3. Allocate space from unallocated space. - let allocated_offset = cell_content_area_offset - cell_size as usize; + let allocated_offset = cell_content_area_offset - new_cell_size as usize; // cell_content_area_offset is less than or equal to 65536. data is not // empty. The offset must be less than 65536 and // safe to cast into u16. @@ -613,24 +615,21 @@ impl<'a> BtreeCursor<'a> { // New cell content area offset must not be less than the tail of cell // pointers. assert!(allocated_offset >= unallocated_space_offset + 2); - BtreePageHeaderMut::from_page(&self.current_page.mem, &mut buffer) + BtreePageHeaderMut::from_page(¤t_page.mem, &mut buffer) .set_cell_content_area_offset(allocated_offset as u16); allocated_offset } }; - self.current_page.n_cells += 1; - BtreePageHeaderMut::from_page(&self.current_page.mem, &mut buffer) - .set_n_cells(self.current_page.n_cells); + current_page.n_cells += 1; + BtreePageHeaderMut::from_page(¤t_page.mem, &mut buffer) + .set_n_cells(current_page.n_cells); // Update cell pointer. let cell_pointer_offset = if current_cell_key.is_some() { // Insert the new cell between cells. - let cell_pointer_offset = cell_pointer_offset( - &self.current_page.mem, - self.current_page.idx_cell, - header_size, - ); + let cell_pointer_offset = + cell_pointer_offset(¤t_page.mem, current_page.idx_cell, header_size); buffer.copy_within( cell_pointer_offset..unallocated_space_offset, cell_pointer_offset + 2, @@ -642,21 +641,246 @@ impl<'a> BtreeCursor<'a> { }; set_u16(&mut buffer, cell_pointer_offset, allocated_offset as u16); - // Copy payload to the btree page. - let payload_offset = allocated_offset + cell_header_size as usize; - buffer[allocated_offset..payload_offset] - .copy_from_slice(&cell_header[..cell_header_size as usize]); - let payload_tail_offset = payload_offset + n_local as usize; - buffer[payload_offset..payload_tail_offset] - .copy_from_slice(&payload[..n_local as usize]); - if let Some(overflow_page_id) = overflow_page_id { - assert_eq!(overflow_page_id.len(), 4); - buffer[payload_tail_offset..payload_tail_offset + overflow_page_id.len()] - .copy_from_slice(&overflow_page_id); + if page_type.is_leaf() { + write_table_leaf_cell( + &mut buffer, + allocated_offset, + cell_header, + local_payload, + overflow_page_id, + ); + } else { + buffer[allocated_offset..allocated_offset + new_cell_size as usize] + .copy_from_slice(&interior_cell_buf[..new_cell_size as usize]); } + return Ok(()); + } + + // Split the overflowing page. + if depth == 0 { + let (page_id, new_page) = self.pager.allocate_page()?; + let mut new_buffer = self.pager.make_page_mut(&new_page)?; + swap_page_buffer(&mut buffer, &mut new_buffer); + // Build the btree page header of the root page. + let mut page_header = BtreePageHeaderMut::from_page(¤t_page.mem, &mut buffer); + let root_page_type = current_page.page_type.interior_type(); + page_header.set_page_type(root_page_type); + page_header.set_first_freeblock_offset(0); + page_header.set_n_cells(0); + page_header + .set_cell_content_area_offset(non_zero_to_u16(self.btree_ctx.usable_size)); + page_header.clear_fragmented_free_bytes(); + page_header.set_right_page_id(page_id); + if current_page.mem.header_offset > 0 { + // Move database header back to page 1. + buffer[..current_page.mem.header_offset] + .copy_from_slice(&new_buffer[..current_page.mem.header_offset]); + // Move the page header and cell pointers starting after the database + // header to head of the page. Values does not change because they are + // independent from the position of page header. + new_buffer.copy_within( + current_page.mem.header_offset + ..current_page.mem.header_offset + + current_page.page_type.header_size() as usize + + 2 * current_page.n_cells as usize, + 0, + ); + // TODO: Increase cached free size by the size of + // database header. + } + drop(buffer); + drop(new_buffer); + let mut new_page = new_page; + std::mem::swap(&mut current_page.mem, &mut new_page); + self.parent_pages.insert( + 0, + CursorPage { + mem: new_page, + idx_cell: 0, + n_cells: 0, + page_type: root_page_type, + }, + ); + depth += 1; + } else { + let header_size = current_page.page_type.header_size(); + let (new_page_id, new_page) = self.pager.allocate_page()?; + let mut new_buffer = self.pager.make_page_mut(&new_page)?; + + const CELL_POINTER_SIZE: u16 = BTREE_PAGE_CELL_POINTER_SIZE as u16; + let mut total_size = new_cell_size + CELL_POINTER_SIZE * (current_page.n_cells + 1); + let mut cells = Vec::with_capacity(current_page.n_cells as usize); + let compute_cell_size = current_page.page_type.compute_cell_size_fn(); + for i in 0..current_page.n_cells { + let offset = get_cell_offset(¤t_page.mem, &buffer, i, header_size) + .map_err(|e| anyhow::anyhow!("failed to get cell offset: {:?}", e))?; + let cell_size = compute_cell_size(self.btree_ctx, &buffer, offset) + .map_err(|e| anyhow::anyhow!("failed to compute cell size: {:?}", e))?; + cells.push((offset, cell_size)); + total_size += cell_size; + } + let mut left_size = 0; + let mut right_size = total_size; + let mut i_right = None; + for i in 0..current_page.idx_cell { + left_size += cells[i as usize].1 + CELL_POINTER_SIZE; + right_size -= cells[i as usize].1 + CELL_POINTER_SIZE; + if left_size >= right_size { + i_right = Some(i + 1); + break; + } + } + let (move_to_right, idx_cells) = if let Some(i_right) = i_right { + // Move right half of cells to a new page. + (true, i_right..current_page.n_cells) + } else { + left_size += new_cell_size + CELL_POINTER_SIZE; + right_size -= new_cell_size + CELL_POINTER_SIZE; + let mut i_right = current_page.n_cells; + for i in current_page.idx_cell..current_page.n_cells { + if left_size >= right_size { + i_right = i; + break; + } + left_size += cells[i as usize].1 + CELL_POINTER_SIZE; + right_size -= cells[i as usize].1 + CELL_POINTER_SIZE; + } + + // Move remaining cell pointers + buffer.copy_within( + cell_pointer_offset(¤t_page.mem, i_right, header_size) + ..cell_pointer_offset( + ¤t_page.mem, + current_page.n_cells, + header_size, + ), + cell_pointer_offset(¤t_page.mem, 0, header_size), + ); + // Move left half of cells to a new page. + (false, 0..i_right) + }; + assert!(current_page.idx_cell >= idx_cells.start); + assert!(current_page.idx_cell <= idx_cells.end); + let new_idx_cell = current_page.idx_cell - idx_cells.start; + + let mut i_new = 0; + let n_moved_cells = idx_cells.len() as u16; + let mut first_freeblock_offset = + BtreePageHeader::from_page_mut(¤t_page.mem, &buffer) + .first_freeblock_offset(); + // TODO: Does this assertion avoid boundary check of cells[i as usize]? + assert!(idx_cells.end as usize <= cells.len()); + + // Insert the new cell. + let mut cell_content_area_offset = allocate_from_unallocated_space( + &new_page, + &mut new_buffer, + header_size, + self.btree_ctx.usable_size as usize, + current_page.idx_cell - idx_cells.start, + new_cell_size, + ); + if current_page.page_type.is_leaf() { + write_table_leaf_cell( + &mut new_buffer, + cell_content_area_offset, + &cell_header[..cell_header_size as usize], + local_payload, + overflow_page_id, + ); + } else { + new_buffer[cell_content_area_offset..self.btree_ctx.usable_size as usize] + .copy_from_slice(&interior_cell_buf[..new_cell_size as usize]); + } + + // Move cells to the new page. + for i_current in idx_cells.clone() { + if i_current == current_page.idx_cell { + i_new += 1; + } + let (offset, cell_size) = cells[i_current as usize]; + let new_cell_content_area_offset = allocate_from_unallocated_space( + &new_page, + &mut new_buffer, + header_size, + cell_content_area_offset, + i_new, + cell_size, + ); + new_buffer[new_cell_content_area_offset..cell_content_area_offset] + .copy_from_slice(&buffer[offset..offset + cell_size as usize]); + cell_content_area_offset = new_cell_content_area_offset; + i_new += 1; + + // TODO: Merge freeblock to unallocated space if possible. + // Put the cell to the freeblock list. + set_u16(&mut buffer, offset, first_freeblock_offset); + set_u16(&mut buffer, offset + 2, cell_size); + first_freeblock_offset = offset as u16; + } + + let mut page_header = BtreePageHeaderMut::from_page(¤t_page.mem, &mut buffer); + page_header.set_first_freeblock_offset(first_freeblock_offset); + let n_current_cells = current_page.n_cells - n_moved_cells; + page_header.set_n_cells(n_current_cells); + + let n_new_cells = n_moved_cells + 1; + let mut new_page_header = BtreePageHeaderMut::from_page(&new_page, &mut new_buffer); + new_page_header.set_page_type(current_page.page_type); + new_page_header.set_first_freeblock_offset(0); + new_page_header.set_n_cells(n_new_cells); + // At least 1 cell is written. cell_content_area_offset must fit in u16. + new_page_header.set_cell_content_area_offset(cell_content_area_offset as u16); + new_page_header.clear_fragmented_free_bytes(); + + interior_cell_buf[..4].copy_from_slice(&new_page_id.to_be_bytes()); + // Fill interior cell with the new key. + let key_length = { + if move_to_right { + if !current_page.page_type.is_leaf() { + // Copy the right page id from left to right page. + // No header offset calibration because both pages must not page 1 + // because they are not the root page. + new_buffer[8..12].copy_from_slice(&buffer[8..12]); + } + copy_key_for_split( + &mut interior_cell_buf[4..13], + current_page.page_type.is_leaf(), + ¤t_page.mem, + &mut buffer, + n_current_cells, + header_size, + ) + .map_err(|e| anyhow::anyhow!("failed to copy_key_for_split: {:?}", e))? + } else { + copy_key_for_split( + &mut interior_cell_buf[4..13], + current_page.page_type.is_leaf(), + &new_page, + &mut new_buffer, + n_new_cells, + header_size, + ) + .map_err(|e| anyhow::anyhow!("failed to copy_key_for_split: {:?}", e))? + } + }; + new_cell_size = 4 + key_length; + + if move_to_right { + swap_page_buffer(&mut buffer, &mut new_buffer); + drop(buffer); + drop(new_buffer); + current_page.mem = new_page; + current_page.n_cells = n_new_cells; + } else { + current_page.n_cells = n_current_cells; + } + current_page.n_cells -= (!current_page.page_type.is_leaf()) as u16; + current_page.idx_cell = new_idx_cell; + + depth -= 1; } } - Ok(()) } pub fn get_table_key(&self) -> anyhow::Result> { @@ -2563,4 +2787,117 @@ mod tests { ], ); } + + #[test] + fn test_insert_split() { + let file = + create_sqlite_database(&["PRAGMA page_size = 512;", "CREATE TABLE example(col);"]); + let pager = create_pager(file.as_file().try_clone().unwrap()).unwrap(); + let bctx = load_btree_context(file.as_file()).unwrap(); + let page_id = find_table_page_id("example", file.path()); + + let mut cursor = BtreeCursor::new(page_id, &pager, &bctx).unwrap(); + for i in 0..1000 { + cursor.insert(i, &[(i % 256) as u8; 200]).unwrap(); + } + + cursor.move_to_first().unwrap(); + for i in 0..1000 { + let (key, payload) = cursor.get_table_payload().unwrap().unwrap(); + assert_eq!(key, i); + assert_eq!(payload.buf(), &[(i % 256) as u8; 200]); + drop(payload); + cursor.move_next().unwrap(); + } + assert!(cursor.get_table_payload().unwrap().is_none()); + } + + #[test] + fn test_insert_split_reversed() { + let file = + create_sqlite_database(&["PRAGMA page_size = 512;", "CREATE TABLE example(col);"]); + let pager = create_pager(file.as_file().try_clone().unwrap()).unwrap(); + let bctx = load_btree_context(file.as_file()).unwrap(); + let page_id = find_table_page_id("example", file.path()); + + let mut cursor = BtreeCursor::new(page_id, &pager, &bctx).unwrap(); + for i in 0..1000 { + cursor + .insert(i64::MAX - i, &[(i % 256) as u8; 200]) + .unwrap(); + } + + cursor.move_to_first().unwrap(); + for i in 0..1000 { + let (key, payload) = cursor.get_table_payload().unwrap().unwrap(); + assert_eq!(key, i64::MAX - 999 + i); + assert_eq!(payload.buf(), &[((999 - i) % 256) as u8; 200]); + drop(payload); + cursor.move_next().unwrap(); + } + assert!(cursor.get_table_payload().unwrap().is_none()); + } + + #[test] + fn test_insert_split_holes() { + let file = + create_sqlite_database(&["PRAGMA page_size = 512;", "CREATE TABLE example(col);"]); + let pager = create_pager(file.as_file().try_clone().unwrap()).unwrap(); + let bctx = load_btree_context(file.as_file()).unwrap(); + let page_id = find_table_page_id("example", file.path()); + + let mut cursor = BtreeCursor::new(page_id, &pager, &bctx).unwrap(); + for i in 0..4 { + for j in 0..1000 { + cursor + .insert(4 * j + i, &[((4 * j + i) % 256) as u8; 200]) + .unwrap(); + } + } + + cursor.move_to_first().unwrap(); + for i in 0..4000 { + let (key, payload) = cursor.get_table_payload().unwrap().unwrap(); + assert_eq!(key, i); + assert_eq!(payload.buf(), &[(i % 256) as u8; 200]); + drop(payload); + cursor.move_next().unwrap(); + } + assert!(cursor.get_table_payload().unwrap().is_none()); + } + + #[test] + fn test_insert_split_page1() { + let file = + create_sqlite_database(&["PRAGMA page_size = 512;", "CREATE TABLE example(col);"]); + let pager = create_pager(file.as_file().try_clone().unwrap()).unwrap(); + let bctx = load_btree_context(file.as_file()).unwrap(); + + let mut cursor = BtreeCursor::new(1, &pager, &bctx).unwrap(); + cursor.move_to_first().unwrap(); + let (key, payload) = cursor.get_table_payload().unwrap().unwrap(); + assert_eq!(key, 1); + assert_eq!(payload.buf().len(), payload.size() as usize); + let payload1 = payload.buf().to_vec(); + drop(payload); + + for i in 1..1000 { + cursor.insert(key + i, &[(i % 256) as u8; 200]).unwrap(); + } + + cursor.move_to_first().unwrap(); + let (key, payload) = cursor.get_table_payload().unwrap().unwrap(); + assert_eq!(key, 1); + assert_eq!(payload.buf(), &payload1); + drop(payload); + + for i in 1..1000 { + cursor.move_next().unwrap(); + let (key, payload) = cursor.get_table_payload().unwrap().unwrap(); + assert_eq!(key, i + 1); + assert_eq!(payload.buf(), &[(i % 256) as u8; 200]); + } + cursor.move_next().unwrap(); + assert!(cursor.get_table_payload().unwrap().is_none()); + } } diff --git a/src/pager.rs b/src/pager.rs index d888186..ad919f4 100644 --- a/src/pager.rs +++ b/src/pager.rs @@ -61,7 +61,7 @@ impl<'a> Deref for PageBuffer<'a> { pub struct PageBufferMut<'a>(RefMut<'a, RawPage>); impl PageBufferMut<'_> { - pub fn swap(&mut self, buffer: &mut TemporaryPage) { + pub fn swap_tmp(&mut self, buffer: &mut TemporaryPage) { std::mem::swap(&mut self.0.buf, &mut buffer.0); } } @@ -80,6 +80,10 @@ impl<'a> DerefMut for PageBufferMut<'a> { } } +pub fn swap_page_buffer(a: &mut PageBufferMut, b: &mut PageBufferMut) { + std::mem::swap(&mut a.0.buf, &mut b.0.buf); +} + pub const ROOT_PAGE_ID: PageId = 1; pub struct Pager { diff --git a/tests/insert.rs b/tests/insert.rs index 46285ac..15b8cca 100644 --- a/tests/insert.rs +++ b/tests/insert.rs @@ -379,3 +379,46 @@ fn test_insert_freeblock() { &conn, ) } + +#[test] +fn test_insert_split() { + let file = create_sqlite_database(&["PRAGMA page_size = 512;", "CREATE TABLE example(col);"]); + let conn = Connection::open(file.path()).unwrap(); + let magic_v = "abc".repeat(50); + + for i in 0..5 { + for j in 0..3 { + for k in 0..500 { + let rowid = 15 * k + 5 * j + i; + conn.prepare(&format!( + "INSERT INTO example (rowid, col) VALUES ({}, '{}{}{}');", + rowid, rowid, &magic_v, rowid + )) + .unwrap() + .execute() + .unwrap(); + } + } + } + + let test_conn = rusqlite::Connection::open(file.path()).unwrap(); + let sql = "SELECT rowid, col FROM example;"; + let mut test_stmt = test_conn.prepare(sql).unwrap(); + let mut test_rows = test_stmt.query([]).unwrap(); + let stmt = conn.prepare(sql).unwrap(); + let mut rows = stmt.query().unwrap(); + for i in 0..3 * 5 * 500 { + let expected = format!("{}{}{}", i, &magic_v, i); + let test_row = test_rows.next().unwrap().unwrap(); + assert_eq!(test_row.get::<_, i64>(0).unwrap(), i); + assert_eq!(&test_row.get::<_, String>(1).unwrap(), &expected); + + let row = rows.next_row().unwrap().unwrap(); + let columns = row.parse().unwrap(); + assert_eq!(columns.len(), 2); + assert_eq!(columns.get(0), &Value::Integer(i)); + assert_eq!(columns.get(1), &Value::Text(expected.as_bytes().into())); + } + assert!(test_rows.next().unwrap().is_none()); + assert!(rows.next_row().unwrap().is_none()); +}