In [14]:
from pathlib import Path
import pickle

In [9]:
database_file = Path("database.pkl")
index_file = Path("index.pkl")

In [17]:
def load_index():
    index = {}
    if index_file.exists():
        with open(index_file, "rb") as f:
            index = pickle.load(f)
    return index

def write_index(index):
    with open(index_file, "wb") as f:
        pickle.dump(index, f)

In [15]:
from typing import List, Dict

def read_database(keys: List[str]):
    index = load_index()
    to_return = {}
    if not database_file.exists():
        raise FileNotFoundError("Database file not found, please write some data first")

    with open(database_file, "rb") as f:
        for key in keys:
            # seek to offset indicated by the index
            offset = index[key]
            f.seek(offset, 0)
            # first 4 bytes indicate the size of the data
            size_b = f.read(4)
            size = int.from_bytes(size_b, byteorder="big", signed=False)
            # read the data
            data_b = f.read(size)
            data = data_b.decode("utf-8")
            to_return[key] = data

    return to_return

def write_database(data: Dict[str, str]):
    index = load_index()
    with open(database_file, "ab") as f:
        for key, value in data.items():
            # write the data
            data_b = value.encode("utf-8")
            size_b = len(data_b).to_bytes(4, byteorder="big")
            # update the index
            index[key] = f.tell()
            # write the data
            f.write(size_b)
            f.write(data_b)
    write_index(index)

In [18]:
def clear_all():
    database_file.unlink(missing_ok=True)
    index_file.unlink(missing_ok=True)

In [19]:
clear_all()
write_database({"key1": "value1", "key2": "value2", "key3": "value3"})
read_database(["key1", "key2", "key3"])

In [None]:
import timeit
import statistics as stats

def get_write_stmt(count: int):
    return f"write_database({{f'key{{i}}': f'value{{i}}' for i in range({count})}})"

def get_read_stmt(count: int, limit: int):
    return f"read_database([f'key{{i}}' for i in random.sample(range({limit}), {count})])"

for count in [10_000, 100_000, 1_000_000]:
    times = timeit.repeat(get_write_stmt(count), setup='clear_all()', globals=globals(), number=1, repeat=3)
    print(f'Time to write {count} records: {stats.mean(times)}s')

for limit in [10_000, 100_000, 1_000_000]:
    times = timeit.repeat(get_read_stmt(1000, limit=limit), setup=f'import random;clear_all();{get_write_stmt(count=limit)}', globals=globals(), number=1, repeat=3)
    print(f'Time to read 1000 random records from {limit} records: {stats.mean(times)}s')