Skip to content

Commit

Permalink
Split overflowing pages
Browse files Browse the repository at this point in the history
balance() in btree.c of SQLite is the original logic. The original logic
rebalance 3 sibling pages and allocate a new page only if required to
get better disk utilization (aiming 3/4).

This commit implements the simple logic which split a overflowing page
into 2.

This now only supports splitting table leaf/interior pages.
  • Loading branch information
kawasin73 committed Oct 28, 2023
1 parent d0ca6b8 commit 8563681
Show file tree
Hide file tree
Showing 4 changed files with 590 additions and 102 deletions.
104 changes: 104 additions & 0 deletions src/btree.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use crate::pager::MemPage;
use crate::pager::PageBuffer;
use crate::pager::PageBufferMut;
use crate::pager::PageId;
use crate::utils::len_varint_buffer;
use crate::utils::parse_varint;
use crate::utils::u64_to_i64;

Expand All @@ -28,6 +29,7 @@ type ParseResult<T> = std::result::Result<T, ParseError>;
pub const BTREE_PAGE_INTERIOR_HEADER_SIZE: usize = 12;
pub const BTREE_PAGE_LEAF_HEADER_SIZE: usize = 8;
pub const BTREE_PAGE_HEADER_MAX_SIZE: usize = BTREE_PAGE_INTERIOR_HEADER_SIZE;
pub const BTREE_PAGE_CELL_POINTER_SIZE: usize = 2;

const LEAF_FLAG: u8 = 0x08;
const INDEX_FLAG: u8 = 0x02;
Expand All @@ -45,6 +47,15 @@ fn parse_non_zero_u16(buf: [u8; 2]) -> NonZeroU32 {
NonZeroU32::new(v).unwrap()
}

/// TODO: Find non conditional branch way.
pub fn non_zero_to_u16(v: u32) -> u16 {
if v == 65536 {
0
} else {
v as u16
}
}

#[inline(always)]
pub fn set_u16(buf: &mut [u8], offset: usize, value: u16) {
buf[offset..offset + 2].copy_from_slice(&value.to_be_bytes());
Expand All @@ -56,6 +67,11 @@ pub const BTREE_OVERFLOW_PAGE_ID_BYTES: usize = 4;
pub struct BtreePageType(u8);

impl BtreePageType {
#[inline]
pub fn interior_type(&self) -> Self {
Self(self.0 & !LEAF_FLAG)
}

#[inline]
pub fn is_leaf(&self) -> bool {
self.0 & LEAF_FLAG != 0
Expand Down Expand Up @@ -84,6 +100,17 @@ impl BtreePageType {
let additional_size = is_interior >> 1;
8 + additional_size
}

pub fn compute_cell_size_fn(&self) -> fn(&BtreeContext, &[u8], usize) -> ParseResult<u16> {
if self.is_index() {
todo!("index cell size");
}
if self.is_leaf() {
compute_table_leaf_cell_size
} else {
compute_table_interior_cell_size
}
}
}

pub struct BtreePageHeader<'page>(&'page [u8; BTREE_PAGE_HEADER_MAX_SIZE]);
Expand Down Expand Up @@ -187,6 +214,10 @@ impl<'a> BtreePageHeaderMut<'a> {
pub fn clear_fragmented_free_bytes(&mut self) {
self.0[7] = 0;
}

pub fn set_right_page_id(&mut self, page_id: PageId) {
self.0[8..12].copy_from_slice(&page_id.to_be_bytes());
}
}

pub struct FreeblockIterator<'a> {
Expand Down Expand Up @@ -322,6 +353,33 @@ pub fn get_cell_offset(
Ok(cell_offset)
}

fn compute_table_leaf_cell_size(
ctx: &BtreeContext,
// TODO: How to accept both PageBufferMut and TemporaryPage?
buffer: &[u8],
offset: usize,
) -> ParseResult<u16> {
let (payload_size, payload_size_length) =
parse_varint(&buffer[offset..]).ok_or("parse payload size")?;
let key_length = len_varint_buffer(&buffer[offset + payload_size_length..]);
let n_local = if payload_size <= ctx.max_local(true) as u64 {
payload_size as u16
} else {
ctx.n_local(true, payload_size as i32)
};
Ok(payload_size_length as u16 + key_length as u16 + n_local)
}

fn compute_table_interior_cell_size(
_ctx: &BtreeContext,
// TODO: How to accept both PageBufferMut and TemporaryPage?
buffer: &[u8],
offset: usize,
) -> ParseResult<u16> {
let key_length = len_varint_buffer(&buffer[offset + 4..]);
Ok(4 + key_length as u16)
}

/// Context containing constant values to parse btree page.
pub struct BtreeContext {
/// Maximum local payload size. The first is for index pages, the second is
Expand Down Expand Up @@ -598,6 +656,26 @@ pub fn allocate_from_unallocated_space(
new_cell_content_area_offset
}

/// Write a table leaf cell to the specified offset.
pub fn write_table_leaf_cell(
buffer: &mut PageBufferMut,
offset: usize,
cell_header: &[u8],
local_payload: &[u8],
overflow_page_id: Option<PageId>,
) {
// Copy payload to the btree page.
let payload_offset = offset + cell_header.len();
buffer[offset..payload_offset].copy_from_slice(cell_header);
let payload_tail_offset = payload_offset + local_payload.len();
buffer[payload_offset..payload_tail_offset].copy_from_slice(local_payload);
if let Some(overflow_page_id) = overflow_page_id {
let overflow_page_id = overflow_page_id.to_be_bytes();
buffer[payload_tail_offset..payload_tail_offset + overflow_page_id.len()]
.copy_from_slice(&overflow_page_id);
}
}

/// Compute the free size of the page.
///
/// n_cells is an argument because this is cached in cursor.
Expand Down Expand Up @@ -629,6 +707,32 @@ pub fn compute_free_size(page: &MemPage, buffer: &PageBufferMut, n_cells: u16) -
Ok(free_size)
}

/// This is due to Rust borrow checker...
pub fn copy_key_for_split(
buf: &mut [u8],
is_leaf: bool,
left_page: &MemPage,
left_buffer: &mut PageBufferMut,
n_cells: u16,
header_size: u8,
) -> ParseResult<u16> {
let cell_offset = get_cell_offset(left_page, left_buffer, n_cells - 1, header_size)?;
let key_offset_in_cell = if is_leaf {
// payload_size_length
len_varint_buffer(&left_buffer[cell_offset..])
} else {
left_buffer.copy_within(cell_offset..cell_offset + 4, left_page.header_offset + 8);
// Remove the cell at the tail.
// TODO: add the cell to freeblocks list or try not to copy the cell.
BtreePageHeaderMut::from_page(left_page, left_buffer).set_n_cells(n_cells - 1);
4
};
let key_offset = cell_offset + key_offset_in_cell;
let key_length = len_varint_buffer(&left_buffer[key_offset..]);
buf[..key_length].copy_from_slice(&left_buffer[key_offset..key_offset + key_length]);
Ok(key_length as u16)
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down
Loading

0 comments on commit 8563681

Please sign in to comment.