Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Abstract storage engine as an option #46

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
119 changes: 65 additions & 54 deletions lib/cubdb.ex
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,6 @@ defmodule CubDB do

alias CubDB.Btree
alias CubDB.CatchUp
alias CubDB.CleanUp
alias CubDB.Compactor
alias CubDB.Reader
alias CubDB.Store
Expand Down Expand Up @@ -410,13 +409,21 @@ defmodule CubDB do
reduce: fn n, sum -> sum + n end # reduce to the sum of selected values
)
"""

def select(db, options \\ []) when is_list(options) do
timeout = Keyword.get(options, :timeout, :infinity)
perform_read(db, {:select, options}, timeout)
end

@spec size(GenServer.server()) :: pos_integer

@doc """
Returns the root location of the current store.
"""
def root_loc(db) do
GenServer.call(db, :root_loc, :infinity)
end

@doc """
Returns the number of entries present in the database.
"""
Expand Down Expand Up @@ -835,23 +842,43 @@ defmodule CubDB do
auto_compact = parse_auto_compact!(Keyword.get(options, :auto_compact, true))
auto_file_sync = Keyword.get(options, :auto_file_sync, true)

with file_name when is_binary(file_name) or is_nil(file_name) <- find_db_file(data_dir),
{:ok, store} <-
Store.File.create(Path.join(data_dir, file_name || "0#{@db_file_extension}")),
{:ok, clean_up} <- CleanUp.start_link(data_dir),
{:ok, task_supervisor} <- Task.Supervisor.start_link() do
{:ok,
%State{
btree: Btree.new(store),
task_supervisor: task_supervisor,
data_dir: data_dir,
clean_up: clean_up,
auto_compact: auto_compact,
auto_file_sync: auto_file_sync
}}
else
{:error, reason} ->
{:stop, reason}
case data_dir do
store when is_map(store) ->
with {:ok, clean_up} <- Store.start_cleanup(store),
{:ok, task_supervisor} <- Task.Supervisor.start_link() do
{:ok,
%State{
btree: Btree.new(store),
task_supervisor: task_supervisor,
data_dir: data_dir,
clean_up: clean_up,
auto_compact: auto_compact,
auto_file_sync: auto_file_sync
}}
else
{:error, reason} ->
{:stop, reason}
end

_ ->
with file_name when is_binary(file_name) or is_nil(file_name) <- find_db_file(data_dir),
{:ok, store} <-
Store.File.create(Path.join(data_dir, file_name || "0#{@db_file_extension}")),
{:ok, clean_up} <- Store.start_cleanup(store),
{:ok, task_supervisor} <- Task.Supervisor.start_link() do
{:ok,
%State{
btree: Btree.new(store),
task_supervisor: task_supervisor,
data_dir: data_dir,
clean_up: clean_up,
auto_compact: auto_compact,
auto_file_sync: auto_file_sync
}}
else
{:error, reason} ->
{:stop, reason}
end
end
end

Expand All @@ -873,8 +900,13 @@ defmodule CubDB do
nil
end

%Btree{store: %Store.File{file_path: file_path}} = btree
{:noreply, %State{state | readers: Map.put(readers, ref, {file_path, timer})}}
%Btree{store: store} = btree
{:noreply, %State{state | readers: Map.put(readers, ref, {Store.identifier(store), timer})}}
end

def handle_call(:root_loc, _, state = %State{btree: btree}) do
%Btree{root_loc: loc, store: store} = btree
{:reply, {Store.identifier(store), loc}, state}
end

def handle_call(:size, _, state = %State{btree: btree}) do
Expand Down Expand Up @@ -1115,12 +1147,14 @@ defmodule CubDB do

@spec trigger_compaction(%State{}) :: {:ok, pid} | {:error, any}

defp trigger_compaction(state = %State{btree: btree, data_dir: data_dir, clean_up: clean_up}) do
defp trigger_compaction(state = %State{btree: btree, clean_up: clean_up}) do
%Btree{store: store} = btree

case compaction_running?(state) do
false ->
for pid <- state.subs, do: send(pid, :compaction_started)
{:ok, store} = new_compaction_store(data_dir)
CleanUp.clean_up_old_compaction_files(clean_up, store)
{:ok, store} = new_compaction_store(store)
Store.clean_up_old_compaction_files(store, clean_up)

with result <-
Task.Supervisor.start_child(state.task_supervisor, Compactor, :run, [
Expand Down Expand Up @@ -1181,22 +1215,10 @@ defmodule CubDB do
Btree.new(store)
end

@spec new_compaction_store(String.t()) :: {:ok, Store.t()} | {:error, any}

defp new_compaction_store(data_dir) do
with {:ok, file_names} <- File.ls(data_dir) do
new_filename =
file_names
|> Enum.filter(&cubdb_file?/1)
|> Enum.map(&file_name_to_n/1)
|> Enum.sort()
|> List.last()
|> (&(&1 + 1)).()
|> Integer.to_string(16)
|> (&(&1 <> @compaction_file_extension)).()
@spec new_compaction_store(Store.t()) :: {:ok, Store.t()} | {:error, any}

Store.File.create(Path.join(data_dir, new_filename))
end
defp new_compaction_store(store) do
Store.next_compaction_store(store)
end

@spec compaction_running?(%State{}) :: boolean
Expand Down Expand Up @@ -1236,11 +1258,13 @@ defmodule CubDB do
@spec clean_up_now(%State{}) :: %State{}

defp clean_up_now(state = %State{btree: btree, clean_up: clean_up}) do
%Btree{store: store} = btree

for old_btree <- state.old_btrees do
if Btree.alive?(old_btree), do: :ok = Btree.stop(old_btree)
end

:ok = CleanUp.clean_up(clean_up, btree)
:ok = Store.clean_up(store, clean_up, btree)
for pid <- state.subs, do: send(pid, :clean_up_started)
%State{state | clean_up_pending: false, old_btrees: []}
end
Expand Down Expand Up @@ -1319,26 +1343,13 @@ defmodule CubDB do
defp split_options(data_dir_or_options) do
case Keyword.pop(data_dir_or_options, :data_dir) do
{nil, data_dir_or_options} ->
try do
{:ok, {to_string(data_dir_or_options), [], []}}
rescue
ArgumentError ->
{:error, "Options must include :data_dir"}

Protocol.UndefinedError ->
{:error, "data_dir must be a string (or implement String.Chars)"}
end
{:ok, {data_dir_or_options, [], []}}

{data_dir, options} ->
{gen_server_opts, opts} =
Keyword.split(options, [:name, :timeout, :spawn_opt, :hibernate_after, :debug])

try do
{:ok, {to_string(data_dir), opts, gen_server_opts}}
rescue
Protocol.UndefinedError ->
{:error, "data_dir must be a string (or implement String.Chars)"}
end
{:ok, {data_dir, opts, gen_server_opts}}
end
end
end
2 changes: 1 addition & 1 deletion lib/cubdb/btree.ex
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ defmodule CubDB.Btree do
@type val :: CubDB.value()
@type btree_size :: non_neg_integer
@type dirt :: non_neg_integer
@type location :: non_neg_integer
@type location :: any
@type capacity :: pos_integer
@type child_pointer :: {key, location}
@type leaf_node :: record(:leaf, children: [child_pointer])
Expand Down
15 changes: 15 additions & 0 deletions lib/cubdb/store.ex
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,21 @@ defprotocol CubDB.Store do

alias CubDB.Btree

@spec identifier(t) :: String.t()
def identifier(store)

@spec next_compaction_store(t) :: t
def next_compaction_store(store)

@spec start_cleanup(t) :: {:ok, pid} | {:error, String.t()}
def start_cleanup(t)

@spec clean_up_old_compaction_files(t, pid) :: :ok | {:error, String.t()}
def clean_up_old_compaction_files(t, cleanup)

@spec clean_up(t, pid, Btree.btree_node()) :: :ok | {:error, String.t()}
def clean_up(t, cleanup, btree)

@spec put_node(t, Btree.btree_node()) :: Btree.location()
def put_node(store, node)

Expand Down
41 changes: 40 additions & 1 deletion lib/cubdb/store/file.ex
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ defmodule CubDB.Store.File do

defp init(file_path) do
ensure_exclusive_access!(file_path)
{:ok, file} = :file.open(file_path, [:read, :append, :raw, :binary])
{:ok, file} = :file.open(file_path, [:read, :append, :raw, :binary, :delayed_write])
{:ok, pos} = :file.position(file, :eof)

{file, pos}
Expand All @@ -49,6 +49,45 @@ end
defimpl CubDB.Store, for: CubDB.Store.File do
alias CubDB.Store
alias CubDB.Store.File.Blocks
alias CubDB.Store.File.CleanUp

@compaction_file_extension ".compact"

def identifier(%Store.File{file_path: file_path}) do
file_path
end

def clean_up(_store, cpid, btree) do
CleanUp.clean_up(cpid, btree)
end

def clean_up_old_compaction_files(store, pid) do
CleanUp.clean_up_old_compaction_files(pid, store)
end

def start_cleanup(%Store.File{file_path: file_path}) do
data_dir = Path.dirname(file_path)

CleanUp.start_link(data_dir)
end

def next_compaction_store(%Store.File{file_path: file_path}) do
data_dir = Path.dirname(file_path)

with {:ok, file_names} <- File.ls(data_dir) do
new_filename =
file_names
|> Enum.filter(&CubDB.cubdb_file?/1)
|> Enum.map(&CubDB.file_name_to_n/1)
|> Enum.sort()
|> List.last()
|> (&(&1 + 1)).()
|> Integer.to_string(16)
|> (&(&1 <> @compaction_file_extension)).()

Store.File.create(Path.join(data_dir, new_filename))
end
end

def put_node(%Store.File{pid: pid}, node) do
Agent.get_and_update(
Expand Down
2 changes: 1 addition & 1 deletion lib/cubdb/clean_up.ex → lib/cubdb/store/file/clean_up.ex
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
defmodule CubDB.CleanUp do
defmodule CubDB.Store.File.CleanUp do
@moduledoc false

# The `CubDB.CleanUp` module takes care of cleaning up obsolete files, like
Expand Down
20 changes: 20 additions & 0 deletions lib/cubdb/store/test_store.ex
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,26 @@ end
defimpl CubDB.Store, for: CubDB.Store.TestStore do
alias CubDB.Store.TestStore

def identifier(%TestStore{agent: pid}) do
"#{pid}"
end

def clean_up(_store, cpid, btree) do
:ok
end

def clean_up_old_compaction_files(store, pid) do
:ok
end

def start_cleanup(%TestStore{}) do
{:ok, nil}
end

def next_compaction_store(%TestStore{}) do
Store.TestStore.create()
end

def put_node(%TestStore{agent: agent}, node) do
Agent.get_and_update(
agent,
Expand Down
2 changes: 1 addition & 1 deletion test/cubdb/clean_up_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ defmodule CubDB.Store.CleanUpTest do
use ExUnit.Case, async: true

alias CubDB.Btree
alias CubDB.CleanUp
alias CubDB.Store.File.CleanUp
alias CubDB.Store

setup do
Expand Down
2 changes: 2 additions & 0 deletions test/cubdb/store/file_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ defmodule CubDB.Store.FileTest do
CubDB.Store.put_header(store, good_header)

CubDB.Store.put_header(store, {0, 0, 0})
CubDB.Store.sync(store)

# corrupt the last header
{:ok, file} = :file.open(store.file_path, [:read, :write, :raw, :binary])
Expand All @@ -46,6 +47,7 @@ defmodule CubDB.Store.FileTest do
CubDB.Store.put_header(store, good_header)

CubDB.Store.put_header(store, {0, 0, 0})
CubDB.Store.sync(store)

# truncate the last header
{:ok, file} = :file.open(store.file_path, [:read, :write, :raw, :binary])
Expand Down
2 changes: 1 addition & 1 deletion test/cubdb_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -701,7 +701,7 @@ defmodule CubDBTest do
test "data_dir/1 returns the path to the data directory", %{tmp_dir: tmp_dir} do
{:ok, db} = CubDB.start_link(tmp_dir)
tmp_dir_string = to_string(tmp_dir)
assert ^tmp_dir_string = CubDB.data_dir(db)
assert ^tmp_dir_string = to_string(CubDB.data_dir(db))
end

test "current_db_file/1 returns the path to the current database file", %{tmp_dir: tmp_dir} do
Expand Down