From bd4964d6dac2c167b56e2230dd977c0c081df95b Mon Sep 17 00:00:00 2001 From: Kawamura Shintaro Date: Sun, 12 Nov 2023 18:02:39 +0900 Subject: [PATCH] Split a table leaf page into 3 pages There can be a case when a new cell can't fit in both splitted pages. In that case, we need to split the page into 3 pages. This only happens for a table leaf page because table leaf cells can be more than half of page size while table interior cells is less than or equal to 13 bytes and index cells has at most about quarter of usable size. --- src/cursor.rs | 366 ++++++++++++++++++++++++++++++++++++++++++-------- 1 file changed, 307 insertions(+), 59 deletions(-) diff --git a/src/cursor.rs b/src/cursor.rs index 0a95526..72ec3dc 100644 --- a/src/cursor.rs +++ b/src/cursor.rs @@ -611,6 +611,8 @@ impl<'a> BtreeCursor<'a> { let mut depth = self.parent_pages.len(); let mut interior_cell_buf = self.pager.allocate_tmp_page(); + let mut sub_interiror_cell_buf = [0; 13]; + let mut sub_interior_cell_len = None; loop { let current_page = if depth == self.parent_pages.len() { @@ -796,9 +798,16 @@ impl<'a> BtreeCursor<'a> { overflow_page_id, ); } else { + current_page.idx_cell += 1; buffer[allocated_offset..allocated_offset + new_cell_size as usize] .copy_from_slice(&interior_cell_buf[..new_cell_size as usize]); } + if let Some(sub_interior_cell_len) = sub_interior_cell_len.take() { + new_cell_size = sub_interior_cell_len; + interior_cell_buf[..new_cell_size as usize] + .copy_from_slice(&sub_interiror_cell_buf[..new_cell_size as usize]); + continue; + } return Ok(()); } @@ -853,10 +862,6 @@ impl<'a> BtreeCursor<'a> { depth += 1; } else { let header_size = current_page.page_type.header_size(); - let (new_page_id, new_page) = - self.pager.allocate_page().map_err(Error::AllocatePage)?; - // make_page_mut() must succeed for allocated pages. - let mut new_buffer = self.pager.make_page_mut(&new_page).unwrap(); const CELL_POINTER_SIZE: u32 = BTREE_PAGE_CELL_POINTER_SIZE as u32; // total_size is u32 because it may overflow u16 size. @@ -893,34 +898,45 @@ impl<'a> BtreeCursor<'a> { break; } } - let (move_to_right, idx_cells) = if let Some(i_right) = i_right { + let (move_to_right, split_into_3, idx_cells) = if let Some(i_right) = i_right { // Move right half of cells and the new cell to a new page. - (true, i_right..current_page.n_cells) + (true, false, i_right..current_page.n_cells) } else { let capacity = self.btree_ctx.usable_size - header_size as u32; left_size += new_cell_size as u32 + CELL_POINTER_SIZE; - if left_size > capacity { - if right_size > capacity { - todo!("split into 3 pages"); - } else { - // Move right half of cells and the new cell to a new page. - (true, current_page.idx_cell..current_page.n_cells) - } + if left_size > capacity && right_size <= capacity { + // If idx_cell is zero, left_size is the same as the new cell size. But + // the new cell size must not exceeds the capacity. So idx_cell must be + // more than zero. + assert_ne!(current_page.idx_cell, 0); + // Move right half of cells and the new cell to a new page. + (true, false, current_page.idx_cell..current_page.n_cells) } else { - right_size -= new_cell_size as u32 + CELL_POINTER_SIZE; - let mut i_right = current_page.n_cells; - for i in current_page.idx_cell..current_page.n_cells { - if left_size >= right_size { - i_right = i; - break; - } - left_size += cells[i as usize].1 as u32 + CELL_POINTER_SIZE; - if left_size > capacity { - i_right = i; - break; + let (split_into_3, i_right) = if left_size > capacity { + // Split into 3 pages. + // The page must not be a table interior page because a cell is at most + // 13 bytes. + // The page must not be a index pages because a cell is at most about + // quarter of usable size. + assert!(current_page.page_type.is_table_leaf()); + (true, current_page.idx_cell) + } else { + right_size -= new_cell_size as u32 + CELL_POINTER_SIZE; + let mut i_right = current_page.n_cells; + for i in current_page.idx_cell..current_page.n_cells { + if left_size >= right_size { + i_right = i; + break; + } + left_size += cells[i as usize].1 as u32 + CELL_POINTER_SIZE; + if left_size > capacity { + i_right = i; + break; + } + right_size -= cells[i as usize].1 as u32 + CELL_POINTER_SIZE; } - right_size -= cells[i as usize].1 as u32 + CELL_POINTER_SIZE; - } + (false, i_right) + }; // Move remaining cell pointers buffer.copy_within( @@ -933,7 +949,7 @@ impl<'a> BtreeCursor<'a> { cell_pointer_offset(¤t_page.mem, 0, header_size), ); // Move left half of cells to a new page. - (false, 0..i_right) + (false, split_into_3, 0..i_right) } }; assert!(current_page.idx_cell >= idx_cells.start); @@ -942,39 +958,24 @@ impl<'a> BtreeCursor<'a> { let mut i_new = 0; let n_moved_cells = idx_cells.len() as u16; + let n_new_cells = n_moved_cells + + (!split_into_3) as u16 + + (!split_into_3 && sub_interior_cell_len.is_some()) as u16; let mut first_freeblock_offset = BtreePageHeader::from_page_mut(¤t_page.mem, &buffer) .first_freeblock_offset(); // TODO: Does this assertion avoid boundary check of cells[i as usize]? assert!(idx_cells.end as usize <= cells.len()); - // Insert the new cell. - let mut cell_content_area_offset = allocate_from_unallocated_space( - &new_page, - &mut new_buffer, - header_size, - self.btree_ctx.usable_size as usize, - current_page.idx_cell - idx_cells.start, - new_cell_size, - ); - if current_page.page_type.is_leaf() { - write_leaf_cell( - &mut new_buffer, - cell_content_area_offset, - cell_header, - payload, - n_local, - overflow_page_id, - ); - } else { - new_buffer[cell_content_area_offset..self.btree_ctx.usable_size as usize] - .copy_from_slice(&interior_cell_buf[..new_cell_size as usize]); - } - + let (new_page_id, new_page) = + self.pager.allocate_page().map_err(Error::AllocatePage)?; + // make_page_mut() must succeed for allocated pages. + let mut new_buffer = self.pager.make_page_mut(&new_page).unwrap(); + let mut cell_content_area_offset = self.btree_ctx.usable_size as usize; // Move cells to the new page. for i_current in idx_cells.clone() { if i_current == current_page.idx_cell { - i_new += 1; + i_new += 1 + sub_interior_cell_len.is_some() as u16; } let (offset, cell_size) = cells[i_current as usize]; let new_cell_content_area_offset = allocate_from_unallocated_space( @@ -997,12 +998,105 @@ impl<'a> BtreeCursor<'a> { first_freeblock_offset = offset as u16; } + // Insert the new cell. + if split_into_3 { + let (new_page_id, new_page) = + self.pager.allocate_page().map_err(Error::AllocatePage)?; + let mut new_buffer = self.pager.make_page_mut(&new_page).unwrap(); + + let cell_content_area_offset = allocate_from_unallocated_space( + &new_page, + &mut new_buffer, + header_size, + self.btree_ctx.usable_size as usize, + 0, + new_cell_size, + ); + assert!(current_page.page_type.is_leaf()); + write_leaf_cell( + &mut new_buffer, + cell_content_area_offset, + cell_header, + payload, + n_local, + overflow_page_id, + ); + + let mut new_page_header = + BtreePageHeaderMut::from_page(&new_page, &mut new_buffer); + new_page_header.set_page_type(current_page.page_type); + new_page_header.set_first_freeblock_offset(0); + new_page_header.set_n_cells(1); + // At least 1 cell is written. cell_content_area_offset must fit in u16. + new_page_header.set_cell_content_area_offset(cell_content_area_offset as u16); + new_page_header.clear_fragmented_free_bytes(); + + sub_interiror_cell_buf[..4].copy_from_slice(&new_page_id.get().to_be_bytes()); + let payload_size_len = len_varint_buffer( + &new_buffer[cell_content_area_offset..], + ) + .ok_or_else(|| Error::FileCorrupt { + page_id: new_page_id, + e: FileCorrupt::new("payload size len"), + })?; + let key_offset = cell_content_area_offset + payload_size_len; + let key_len = + len_varint_buffer(&new_buffer[key_offset..]).ok_or_else(|| { + Error::FileCorrupt { + page_id: new_page_id, + e: FileCorrupt::new("key length"), + } + })?; + sub_interiror_cell_buf[4..4 + key_len] + .copy_from_slice(&new_buffer[key_offset..key_offset + key_len]); + sub_interior_cell_len = Some(4 + key_len as u16); + + drop(new_buffer); + } else { + cell_content_area_offset = allocate_from_unallocated_space( + &new_page, + &mut new_buffer, + header_size, + cell_content_area_offset, + current_page.idx_cell - idx_cells.start, + new_cell_size, + ); + if current_page.page_type.is_leaf() { + write_leaf_cell( + &mut new_buffer, + cell_content_area_offset, + cell_header, + payload, + n_local, + overflow_page_id, + ); + } else { + new_buffer[cell_content_area_offset + ..cell_content_area_offset + new_cell_size as usize] + .copy_from_slice(&interior_cell_buf[..new_cell_size as usize]); + if let Some(sub_interior_cell_len) = sub_interior_cell_len.take() { + cell_content_area_offset = allocate_from_unallocated_space( + &new_page, + &mut new_buffer, + header_size, + cell_content_area_offset, + current_page.idx_cell + 1 - idx_cells.start, + new_cell_size, + ); + new_buffer[cell_content_area_offset + ..cell_content_area_offset + sub_interior_cell_len as usize] + .copy_from_slice( + &sub_interiror_cell_buf[..sub_interior_cell_len as usize], + ); + } + } + }; + let mut page_header = BtreePageHeaderMut::from_page(¤t_page.mem, &mut buffer); page_header.set_first_freeblock_offset(first_freeblock_offset); let n_current_cells = current_page.n_cells - n_moved_cells; page_header.set_n_cells(n_current_cells); - let n_new_cells = n_moved_cells + 1; let mut new_page_header = BtreePageHeaderMut::from_page(&new_page, &mut new_buffer); new_page_header.set_page_type(current_page.page_type); new_page_header.set_first_freeblock_offset(0); @@ -3420,10 +3514,7 @@ mod tests { for i in 1..1000 { cursor - .table_insert( - key + i, - &SlicePayload::new(&[(i % 256) as u8; 200]).unwrap(), - ) + .table_insert(key + i, &SlicePayload::new(&[(i % 256) as u8; 50]).unwrap()) .unwrap(); } @@ -3437,7 +3528,7 @@ mod tests { cursor.move_next().unwrap(); let (key, payload) = cursor.get_table_payload().unwrap().unwrap(); assert_eq!(key, i + 1); - assert_eq!(payload.buf(), &[(i % 256) as u8; 200]); + assert_eq!(payload.buf(), &[(i % 256) as u8; 50]); } cursor.move_next().unwrap(); assert!(cursor.get_table_payload().unwrap().is_none()); @@ -3456,7 +3547,7 @@ mod tests { // is bigger than left page. for i in 0..1000 { cursor - .table_insert(i, &SlicePayload::new(&[(i % 256) as u8; 400]).unwrap()) + .table_insert(i, &SlicePayload::new(&[(i % 256) as u8; 450]).unwrap()) .unwrap(); } @@ -3464,10 +3555,167 @@ mod tests { for i in 0..1000 { let (key, payload) = cursor.get_table_payload().unwrap().unwrap(); assert_eq!(key, i); - assert_eq!(payload.buf(), &[(i % 256) as u8; 400]); + assert_eq!(payload.buf(), &[(i % 256) as u8; 450]); + drop(payload); + cursor.move_next().unwrap(); + } + assert!(cursor.get_table_payload().unwrap().is_none()); + } + + #[test] + fn test_insert_table_split_into_3pages() { + let file = + create_sqlite_database(&["PRAGMA page_size = 512;", "CREATE TABLE example(col);"]); + let pager = create_pager(file.as_file().try_clone().unwrap()).unwrap(); + let bctx = load_btree_context(file.as_file()).unwrap(); + let page_id = find_table_page_id("example", file.path()); + + let mut cursor = BtreeCursor::new(page_id, &pager, &bctx).unwrap(); + cursor + .table_insert(1, &SlicePayload::new(&[1; 200]).unwrap()) + .unwrap(); + cursor + .table_insert(3, &SlicePayload::new(&[3; 200]).unwrap()) + .unwrap(); + cursor + .table_insert(2, &SlicePayload::new(&[2; 400]).unwrap()) + .unwrap(); + + cursor.move_to_first().unwrap(); + let (key, payload) = cursor.get_table_payload().unwrap().unwrap(); + assert_eq!(key, 1); + assert_eq!(payload.buf(), &[1; 200]); + drop(payload); + + cursor.move_next().unwrap(); + let (key, payload) = cursor.get_table_payload().unwrap().unwrap(); + assert_eq!(key, 2); + assert_eq!(payload.buf(), &[2; 400]); + drop(payload); + + cursor.move_next().unwrap(); + let (key, payload) = cursor.get_table_payload().unwrap().unwrap(); + assert_eq!(key, 3); + assert_eq!(payload.buf(), &[3; 200]); + drop(payload); + + cursor.move_next().unwrap(); + assert!(cursor.get_table_payload().unwrap().is_none()); + } + + #[test] + fn test_insert_table_split_into_3pages_split_interior_again() { + let file = + create_sqlite_database(&["PRAGMA page_size = 512;", "CREATE TABLE example(col);"]); + let pager = create_pager(file.as_file().try_clone().unwrap()).unwrap(); + let bctx = load_btree_context(file.as_file()).unwrap(); + let page_id = find_table_page_id("example", file.path()); + + let mut cursor = BtreeCursor::new(page_id, &pager, &bctx).unwrap(); + for i in 0..141 { + cursor + .table_insert(i, &SlicePayload::new(&[i as u8; 200]).unwrap()) + .unwrap(); + } + cursor + .table_insert(142, &SlicePayload::new(&[142; 200]).unwrap()) + .unwrap(); + let root_page = pager.get_page(page_id).unwrap(); + let buffer = root_page.buffer(); + let page_header = BtreePageHeader::from_page(&root_page, &buffer); + assert!(!page_header.page_type().is_leaf()); + assert_eq!(page_header.n_cells(), 70); + drop(buffer); + // The interiror page has no room for 1 cell. + cursor + .table_insert(141, &SlicePayload::new(&[141; 400]).unwrap()) + .unwrap(); + let buffer = root_page.buffer(); + let page_header = BtreePageHeader::from_page(&root_page, &buffer); + assert!(!page_header.page_type().is_leaf()); + assert_eq!(page_header.n_cells(), 1); + drop(buffer); + + cursor.move_to_first().unwrap(); + + for i in 0..141 { + let (key, payload) = cursor.get_table_payload().unwrap().unwrap(); + assert_eq!(key, i); + assert_eq!(payload.buf(), &[i as u8; 200]); drop(payload); cursor.move_next().unwrap(); } + + let (key, payload) = cursor.get_table_payload().unwrap().unwrap(); + assert_eq!(key, 141); + assert_eq!(payload.buf(), &[141; 400]); + drop(payload); + + cursor.move_next().unwrap(); + let (key, payload) = cursor.get_table_payload().unwrap().unwrap(); + assert_eq!(key, 142); + assert_eq!(payload.buf(), &[142; 200]); + drop(payload); + + cursor.move_next().unwrap(); + assert!(cursor.get_table_payload().unwrap().is_none()); + } + + #[test] + fn test_insert_table_split_into_3pages_split_interior_again2() { + let file = + create_sqlite_database(&["PRAGMA page_size = 512;", "CREATE TABLE example(col);"]); + let pager = create_pager(file.as_file().try_clone().unwrap()).unwrap(); + let bctx = load_btree_context(file.as_file()).unwrap(); + let page_id = find_table_page_id("example", file.path()); + + let mut cursor = BtreeCursor::new(page_id, &pager, &bctx).unwrap(); + for i in 0..139 { + cursor + .table_insert(i, &SlicePayload::new(&[i as u8; 200]).unwrap()) + .unwrap(); + } + cursor + .table_insert(140, &SlicePayload::new(&[140; 200]).unwrap()) + .unwrap(); + let root_page = pager.get_page(page_id).unwrap(); + let buffer = root_page.buffer(); + let page_header = BtreePageHeader::from_page(&root_page, &buffer); + assert!(!page_header.page_type().is_leaf()); + assert_eq!(page_header.n_cells(), 69); + drop(buffer); + // The interiror page has room for 1 cell. + cursor + .table_insert(139, &SlicePayload::new(&[139; 400]).unwrap()) + .unwrap(); + let buffer = root_page.buffer(); + let page_header = BtreePageHeader::from_page(&root_page, &buffer); + assert!(!page_header.page_type().is_leaf()); + assert_eq!(page_header.n_cells(), 1); + drop(buffer); + + cursor.move_to_first().unwrap(); + + for i in 0..139 { + let (key, payload) = cursor.get_table_payload().unwrap().unwrap(); + assert_eq!(key, i); + assert_eq!(payload.buf(), &[i as u8; 200]); + drop(payload); + cursor.move_next().unwrap(); + } + + let (key, payload) = cursor.get_table_payload().unwrap().unwrap(); + assert_eq!(key, 139); + assert_eq!(payload.buf(), &[139; 400]); + drop(payload); + + cursor.move_next().unwrap(); + let (key, payload) = cursor.get_table_payload().unwrap().unwrap(); + assert_eq!(key, 140); + assert_eq!(payload.buf(), &[140; 200]); + drop(payload); + + cursor.move_next().unwrap(); assert!(cursor.get_table_payload().unwrap().is_none()); }