# Database Engine Simulator (Demo)

This notebook demonstrates a small database engine simulation
with primary and secondary indexes.


In [None]:
from __future__ import annotations

from dataclasses import dataclass
from bisect import bisect_left, bisect_right
import os
import struct
from typing import Any, Iterable, Optional


# -----------------------------
# Binary table (on-disk storage)
# -----------------------------

@dataclass(frozen=True)
class Product:
    """In-memory representation of a product row."""

    product_id: int
    name: str
    category: str
    price: float
    stock: int


class ProductTableFile:
    """Fixed-size binary records with stable byte-offset pointers.

    

    Record layout (little-endian):
      active: 1 byte (0/1)
      product_id: uint32
      name: 64 bytes (utf-8, null-padded)
      category: 32 bytes (utf-8, null-padded)
      price: float64
      stock: uint32

    Total: 1 + 4 + 64 + 32 + 8 + 4 = 113 bytes/record.

    Deletion model:
      - Tombstone deletion: overwrite only the `active` byte with 0.
      - This keeps all other record bytes intact and keeps offsets stable.
    """

    _NAME_BYTES = 64
    _CAT_BYTES = 32
    _STRUCT = struct.Struct(f"<BI{_NAME_BYTES}s{_CAT_BYTES}sdI")
    RECORD_SIZE = _STRUCT.size

    def __init__(self, path: str):
        self.path = path

    def _encode_fixed(self, s: str, size: int) -> bytes:
        b = s.encode("utf-8")
        if len(b) > size:
            b = b[:size]
        return b.ljust(size, b"\x00")

    def _decode_fixed(self, b: bytes) -> str:
        return b.split(b"\x00", 1)[0].decode("utf-8", errors="replace")

    def create_empty(self, overwrite: bool = True) -> None:
        """Create an empty table file."""
        if overwrite and os.path.exists(self.path):
            os.remove(self.path)
        if not os.path.exists(self.path):
            with open(self.path, "wb"):
                pass

    def append(self, p: Product) -> int:
        """Append product, return byte offset pointer."""
        with open(self.path, "ab") as f:
            offset = f.tell()
            rec = self._STRUCT.pack(
                1,
                int(p.product_id),
                self._encode_fixed(p.name, self._NAME_BYTES),
                self._encode_fixed(p.category, self._CAT_BYTES),
                float(p.price),
                int(p.stock),
            )
            f.write(rec)
            return offset

    def read_at(self, offset: int) -> Optional[Product]:
        """Read a record at a byte offset; returns None if deleted/out of bounds."""
        with open(self.path, "rb") as f:
            f.seek(offset)
            raw = f.read(self.RECORD_SIZE)
            if len(raw) != self.RECORD_SIZE:
                return None
            active, product_id, name_b, cat_b, price, stock = self._STRUCT.unpack(raw)
            if active == 0:
                return None
            return Product(
                product_id=int(product_id),
                name=self._decode_fixed(name_b),
                category=self._decode_fixed(cat_b),
                price=float(price),
                stock=int(stock),
            )

    def mark_deleted(self, offset: int) -> None:
        """Tombstone delete: mark record inactive at the given offset."""
        with open(self.path, "r+b") as f:
            f.seek(offset)
            f.write(b"\x00")

    def iter_active(self) -> Iterable[tuple[int, Product]]:
        """Iterate (offset, Product) for active records only."""
        if not os.path.exists(self.path):
            return
        with open(self.path, "rb") as f:
            offset = 0
            while True:
                raw = f.read(self.RECORD_SIZE)
                if not raw:
                    break
                if len(raw) != self.RECORD_SIZE:
                    break
                active, product_id, name_b, cat_b, price, stock = self._STRUCT.unpack(raw)
                if active == 1:
                    yield offset, Product(
                        product_id=int(product_id),
                        name=self._decode_fixed(name_b),
                        category=self._decode_fixed(cat_b),
                        price=float(price),
                        stock=int(stock),
                    )
                offset += self.RECORD_SIZE


# -----------------------------
# B-Tree (primary index by ID)
# -----------------------------

class _BTreeNode:
    def __init__(self, t: int, leaf: bool):
        self.t = t
        self.leaf = leaf
        self.keys: list[int] = []
        self.values: list[int] = []  # pointers (offsets), aligned with keys
        self.children: list[_BTreeNode] = []


class BTree:
    """Minimal B-tree supporting insert/search/delete by unique key.

    Stores:
      - key: product_id
      - value: file offset pointer
    """

    def __init__(self, t: int = 3):
        if t < 2:
            raise ValueError("BTree minimum degree t must be >= 2")
        self.t = t
        self.root = _BTreeNode(t, leaf=True)

    def search(self, key: int) -> Optional[int]:
        """Return value for key, or None if not present."""
        return self._search(self.root, key)

    def _search(self, node: _BTreeNode, key: int) -> Optional[int]:
        i = bisect_left(node.keys, key)
        if i < len(node.keys) and node.keys[i] == key:
            return node.values[i]
        if node.leaf:
            return None
        return self._search(node.children[i], key)

    def insert(self, key: int, value: int) -> None:
        """Insert a new (unique) key -> value mapping."""
        if self.search(key) is not None:
            raise ValueError(f"duplicate key {key}")
        r = self.root
        if len(r.keys) == 2 * self.t - 1:
            s = _BTreeNode(self.t, leaf=False)
            s.children.append(r)
            self._split_child(s, 0)
            self.root = s
            self._insert_nonfull(s, key, value)
        else:
            self._insert_nonfull(r, key, value)

    def _split_child(self, parent: _BTreeNode, i: int) -> None:
        t = self.t
        y = parent.children[i]
        z = _BTreeNode(t, leaf=y.leaf)

        mid_key = y.keys[t - 1]
        mid_val = y.values[t - 1]

        z.keys = y.keys[t:]
        z.values = y.values[t:]
        y.keys = y.keys[: t - 1]
        y.values = y.values[: t - 1]

        if not y.leaf:
            z.children = y.children[t:]
            y.children = y.children[:t]

        parent.keys.insert(i, mid_key)
        parent.values.insert(i, mid_val)
        parent.children.insert(i + 1, z)

    def _insert_nonfull(self, node: _BTreeNode, key: int, value: int) -> None:
        if node.leaf:
            j = bisect_left(node.keys, key)
            node.keys.insert(j, key)
            node.values.insert(j, value)
            return

        j = bisect_right(node.keys, key)
        child = node.children[j]
        if len(child.keys) == 2 * self.t - 1:
            self._split_child(node, j)
            if key > node.keys[j]:
                j += 1
        self._insert_nonfull(node.children[j], key, value)

    def delete(self, key: int) -> Optional[int]:
        """Delete key and return its value (offset), or None if not found."""
        val = self.search(key)
        if val is None:
            return None
        self._delete(self.root, key)
        if not self.root.leaf and len(self.root.keys) == 0:
            self.root = self.root.children[0]
        return val

    def _delete(self, node: _BTreeNode, key: int) -> None:
        t = self.t
        i = bisect_left(node.keys, key)

        if i < len(node.keys) and node.keys[i] == key:
            if node.leaf:
                node.keys.pop(i)
                node.values.pop(i)
                return

            # Internal node: replace with predecessor (keeps B-tree properties)
            pred_node = node.children[i]
            while not pred_node.leaf:
                pred_node = pred_node.children[-1]
            pred_key = pred_node.keys[-1]
            pred_val = pred_node.values[-1]
            node.keys[i] = pred_key
            node.values[i] = pred_val
            self._delete(node.children[i], pred_key)
            return

        if node.leaf:
            return

        child = node.children[i]
        if len(child.keys) < t:
            # Try borrow from left
            if i > 0 and len(node.children[i - 1].keys) >= t:
                left = node.children[i - 1]
                child.keys.insert(0, node.keys[i - 1])
                child.values.insert(0, node.values[i - 1])
                if not left.leaf:
                    child.children.insert(0, left.children.pop())
                node.keys[i - 1] = left.keys.pop()
                node.values[i - 1] = left.values.pop()
            # Try borrow from right
            elif i < len(node.children) - 1 and len(node.children[i + 1].keys) >= t:
                right = node.children[i + 1]
                child.keys.append(node.keys[i])
                child.values.append(node.values[i])
                if not right.leaf:
                    child.children.append(right.children.pop(0))
                node.keys[i] = right.keys.pop(0)
                node.values[i] = right.values.pop(0)
            else:
                # Merge with sibling
                if i < len(node.children) - 1:
                    right = node.children[i + 1]
                    child.keys.append(node.keys.pop(i))
                    child.values.append(node.values.pop(i))
                    child.keys.extend(right.keys)
                    child.values.extend(right.values)
                    if not right.leaf:
                        child.children.extend(right.children)
                    node.children.pop(i + 1)
                else:
                    left = node.children[i - 1]
                    left.keys.append(node.keys.pop(i - 1))
                    left.values.append(node.values.pop(i - 1))
                    left.keys.extend(child.keys)
                    left.values.extend(child.values)
                    if not child.leaf:
                        left.children.extend(child.children)
                    node.children.pop(i)
                    child = left

        self._delete(child, key)


# -----------------------------
# B+ Tree (secondary indexes)
# -----------------------------

class _BPlusNode:
    def __init__(self, leaf: bool):
        self.leaf = leaf
        self.keys: list[Any] = []
        if leaf:
            self.values: list[list[int]] = []  # each key -> list of offsets
            self.next: Optional[_BPlusNode] = None
        else:
            self.children: list[_BPlusNode] = []


class BPlusTree:
    """Simple B+ tree for duplicates: key -> [offsets]. Supports range queries.

    
    """

    def __init__(self, order: int = 4):
        if order < 3:
            raise ValueError("BPlusTree order must be >= 3")
        self.order = order
        self.root = _BPlusNode(leaf=True)

    def insert(self, key: Any, offset: int) -> None:
        """Insert key -> offset (duplicates allowed)."""
        root = self.root
        split = self._insert(root, key, offset)
        if split is not None:
            sep_key, right = split
            new_root = _BPlusNode(leaf=False)
            new_root.keys = [sep_key]
            new_root.children = [root, right]
            self.root = new_root

    def _insert(self, node: _BPlusNode, key: Any, offset: int) -> Optional[tuple[Any, _BPlusNode]]:
        if node.leaf:
            i = bisect_left(node.keys, key)
            if i < len(node.keys) and node.keys[i] == key:
                node.values[i].append(offset)
            else:
                node.keys.insert(i, key)
                node.values.insert(i, [offset])
            return self._split_leaf(node)

        i = bisect_right(node.keys, key)
        split = self._insert(node.children[i], key, offset)
        if split is None:
            return None
        sep_key, right = split
        j = bisect_right(node.keys, sep_key)
        node.keys.insert(j, sep_key)
        node.children.insert(j + 1, right)
        return self._split_internal(node)

    def _split_leaf(self, node: _BPlusNode) -> Optional[tuple[Any, _BPlusNode]]:
        max_keys = self.order - 1
        if len(node.keys) <= max_keys:
            return None
        mid = len(node.keys) // 2
        right = _BPlusNode(leaf=True)
        right.keys = node.keys[mid:]
        right.values = node.values[mid:]
        node.keys = node.keys[:mid]
        node.values = node.values[:mid]
        right.next = node.next
        node.next = right
        return right.keys[0], right

    def _split_internal(self, node: _BPlusNode) -> Optional[tuple[Any, _BPlusNode]]:
        max_keys = self.order - 1
        if len(node.keys) <= max_keys:
            return None
        mid = len(node.keys) // 2
        sep = node.keys[mid]
        right = _BPlusNode(leaf=False)
        right.keys = node.keys[mid + 1 :]
        right.children = node.children[mid + 1 :]
        node.keys = node.keys[:mid]
        node.children = node.children[: mid + 1]
        return sep, right

    def search(self, key: Any) -> list[int]:
        """Return all offsets for a key (possibly empty)."""
        node = self._find_leaf(key)
        i = bisect_left(node.keys, key)
        if i < len(node.keys) and node.keys[i] == key:
            return list(node.values[i])
        return []

    def range_search(self, lo: Any, hi: Any) -> list[int]:
        """Return all offsets whose key is in [lo, hi]."""
        if lo > hi:
            return []
        node = self._find_leaf(lo)
        out: list[int] = []
        while node is not None:
            for k, offs in zip(node.keys, node.values):
                if k < lo:
                    continue
                if k > hi:
                    return out
                out.extend(offs)
            node = node.next
        return out

    def delete_pointer(self, key: Any, offset: int) -> None:
        """Remove a single offset from a key's list; remove key if list becomes empty."""
        node = self._find_leaf(key)
        i = bisect_left(node.keys, key)
        if i >= len(node.keys) or node.keys[i] != key:
            return
        vals = node.values[i]
        try:
            vals.remove(offset)
        except ValueError:
            return
        if not vals:
            node.keys.pop(i)
            node.values.pop(i)

    def _find_leaf(self, key: Any) -> _BPlusNode:
        node = self.root
        while not node.leaf:
            i = bisect_right(node.keys, key)
            node = node.children[i]
        return node


# -----------------------------
# Database facade + SQL-like operations
# -----------------------------

class ProductDB:
    """Facade that ties together file storage + primary/secondary indexes."""

    def __init__(self, table_path: str = "products.dat"):
        self.table = ProductTableFile(table_path)
        self.primary = BTree(t=3)          # product_id -> offset
        self.by_category = BPlusTree(4)    # category -> [offsets]
        self.by_price = BPlusTree(4)       # price -> [offsets]

    def load_from_file(self) -> None:
        """Rebuild indexes from existing file (scan active records)."""
        self.primary = BTree(t=3)
        self.by_category = BPlusTree(4)
        self.by_price = BPlusTree(4)
        for off, p in self.table.iter_active():
            self.primary.insert(p.product_id, off)
            self.by_category.insert(p.category, off)
            self.by_price.insert(p.price, off)

    def insert(self, product_id: int, name: str, category: str, price: float, stock: int) -> int:
        """INSERT: append row to file and insert offsets into all indexes."""
        p = Product(product_id, name, category, float(price), int(stock))
        if self.primary.search(p.product_id) is not None:
            raise ValueError(f"product_id {product_id} already exists")
        off = self.table.append(p)
        self.primary.insert(p.product_id, off)
        self.by_category.insert(p.category, off)
        self.by_price.insert(p.price, off)
        return off

    def select(self, *, product_id: Optional[int] = None, price_between: Optional[tuple[float, float]] = None) -> list[Product]:
        """SELECT * WHERE ID=... OR Price BETWEEN lo AND hi."""
        offsets: set[int] = set()
        if product_id is not None:
            off = self.primary.search(int(product_id))
            if off is not None:
                offsets.add(off)
        if price_between is not None:
            lo, hi = price_between
            offsets.update(self.by_price.range_search(float(lo), float(hi)))
        out: list[Product] = []
        for off in sorted(offsets):
            p = self.table.read_at(off)
            if p is not None:
                out.append(p)
        return out

    def delete(self, *, product_id: Optional[int] = None, price_between: Optional[tuple[float, float]] = None) -> int:
        """DELETE WHERE ID=... OR Price BETWEEN lo AND hi."""
        targets: set[int] = set()
        if product_id is not None:
            off = self.primary.search(int(product_id))
            if off is not None:
                targets.add(off)
        if price_between is not None:
            lo, hi = price_between
            targets.update(self.by_price.range_search(float(lo), float(hi)))

        deleted = 0
        for off in sorted(targets):
            p = self.table.read_at(off)
            if p is None:
                continue
            self.table.mark_deleted(off)
            self.primary.delete(p.product_id)
            self.by_category.delete_pointer(p.category, off)
            self.by_price.delete_pointer(p.price, off)
            deleted += 1
        return deleted


def as_rows(products: list[Product]) -> list[tuple[Any, ...]]:
    """Helper for pretty printing."""
    return [(p.product_id, p.name, p.category, p.price, p.stock) for p in products]


In [None]:
# Demo: building indexes and executing queries.

INITIAL_RECORDS = [
    (1001, "Wireless Mouse", "Electronics", 25.99, 150),
    (1002, "Bluetooth Headphones", "Electronics", 79.99, 85),
    (1003, "Coffee Maker", "Home Appliances", 39.95, 45),
    (1004, "Yoga Mat", "Sports", 18.50, 200),
    (1005, "Digital Camera", "Photography", 299.99, 30),
    (1006, "Electric Toothbrush", "Personal Care", 49.99, 120),
    (1007, "LED Desk Lamp", "Office Supplies", 23.99, 60),
    (1008, "Gaming Keyboard", "Electronics", 89.99, 40),
    (1009, "Fitness Tracker", "Wearables", 129.99, 75),
    (1010, "Smartphone Case", "Accessories", 12.99, 350),
    (1011, "Water Bottle", "Sports", 15.00, 250),
    (1012, "Noise Cancelling Earbuds", "Electronics", 199.99, 50),
    (1013, "Blender", "Home Appliances", 99.95, 65),
    (1014, "Running Shoes", "Footwear", 120.00, 80),
    (1015, "Laptop Stand", "Office Supplies", 27.99, 90),
    (1016, "Electric Kettle", "Home Appliances", 29.95, 110),
    (1017, "Bluetooth Speaker", "Electronics", 59.99, 70),
    (1018, "Hair Dryer", "Personal Care", 39.99, 55),
    (1019, "Action Camera", "Photography", 199.99, 25),
    (1020, "Smartwatch", "Wearables", 249.99, 40),
]

# Create fresh DB file next to the notebook
DB_PATH = "products.dat"

db = ProductDB(DB_PATH)
db.table.create_empty(overwrite=True)
for rec in INITIAL_RECORDS:
    db.insert(*rec)

print("Loaded", len(INITIAL_RECORDS), "records into", DB_PATH)

# Example: INSERT (like the prompt)
new_offset = db.insert(1021, "Smartphone", "Electronics", 499.99, 150)
print("Inserted product 1021 at offset", new_offset)

# Example: SELECT WHERE ID=1014 OR Price BETWEEN 300 AND 600
rows = as_rows(db.select(product_id=1014, price_between=(300, 600)))
print("SELECT result rows:")
for r in rows:
    print(r)

# Example: DELETE WHERE ID=1011 OR Price BETWEEN 300 AND 600
count = db.delete(product_id=1011, price_between=(300, 600))
print("Deleted", count, "rows")

# Verify deleted rows are gone
rows_after = as_rows(db.select(product_id=1011, price_between=(300, 600)))
print("SELECT after delete (should be empty):", rows_after)


Loaded 20 records into products.dat
Inserted product 1021 at offset 2260
SELECT result rows:
(1014, 'Running Shoes', 'Footwear', 120.0, 80)
(1021, 'Smartphone', 'Electronics', 499.99, 150)
Deleted 2 rows
SELECT after delete (should be empty): []
