Skip to content

Commit

Permalink
Separate chain and io transaction in DB storage (archethic-foundation…
Browse files Browse the repository at this point in the history
…#738)

* Write IO transaction in a new folder to separate logic
* Load IO transactions on node start
* Fix contract loader on start
  • Loading branch information
Neylix authored and tenmoves committed Dec 6, 2022
1 parent 1bfb06a commit 446ed5f
Show file tree
Hide file tree
Showing 25 changed files with 341 additions and 110 deletions.
4 changes: 1 addition & 3 deletions lib/archethic.ex
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,6 @@ defmodule Archethic do
alias __MODULE__.P2P
alias __MODULE__.P2P.Node

alias __MODULE__.DB

alias __MODULE__.P2P.Message.Balance
alias __MODULE__.P2P.Message.GetBalance
alias __MODULE__.P2P.Message.NewTransaction
Expand Down Expand Up @@ -275,7 +273,7 @@ defmodule Archethic do
try do
{local_chain, paging_address} =
with true <- paging_address != nil,
true <- DB.transaction_exists?(paging_address),
true <- TransactionChain.transaction_exists?(paging_address),
last_address when last_address != nil <-
TransactionChain.get_last_local_address(address),
true <- last_address != paging_address do
Expand Down
4 changes: 2 additions & 2 deletions lib/archethic/account.ex
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,6 @@ defmodule Archethic.Account do
@doc """
Load the transaction into the Account context filling the memory tables for ledgers
"""
@spec load_transaction(Transaction.t()) :: :ok
defdelegate load_transaction(transaction), to: MemTablesLoader
@spec load_transaction(Transaction.t(), boolean()) :: :ok
defdelegate load_transaction(transaction, io_transaction?), to: MemTablesLoader
end
47 changes: 28 additions & 19 deletions lib/archethic/account/mem_tables_loader.ex
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,13 @@ defmodule Archethic.Account.MemTablesLoader do

@spec init(args :: list()) :: {:ok, []}
def init(_args) do
TransactionChain.list_io_transactions(@query_fields)
|> Stream.each(&load_transaction(&1, true))
|> Stream.run()

TransactionChain.list_all(@query_fields)
|> Stream.reject(&(&1.type in @excluded_types))
|> Stream.each(&load_transaction/1)
|> Stream.each(&load_transaction(&1, false))
|> Stream.run()

{:ok, []}
Expand All @@ -67,25 +71,30 @@ defmodule Archethic.Account.MemTablesLoader do
@doc """
Load the transaction into the memory tables
"""
@spec load_transaction(Transaction.t()) :: :ok
def load_transaction(%Transaction{
address: address,
type: tx_type,
previous_public_key: previous_public_key,
validation_stamp: %ValidationStamp{
timestamp: timestamp,
protocol_version: protocol_version,
ledger_operations: %LedgerOperations{
fee: fee,
unspent_outputs: unspent_outputs,
transaction_movements: transaction_movements
@spec load_transaction(Transaction.t(), boolean()) :: :ok
def load_transaction(
%Transaction{
address: address,
type: tx_type,
previous_public_key: previous_public_key,
validation_stamp: %ValidationStamp{
timestamp: timestamp,
protocol_version: protocol_version,
ledger_operations: %LedgerOperations{
fee: fee,
unspent_outputs: unspent_outputs,
transaction_movements: transaction_movements
}
}
}
}) do
previous_address = Crypto.derive_address(previous_public_key)

UCOLedger.spend_all_unspent_outputs(previous_address)
TokenLedger.spend_all_unspent_outputs(previous_address)
},
io_transaction?
) do
unless io_transaction? do
previous_address = Crypto.derive_address(previous_public_key)

UCOLedger.spend_all_unspent_outputs(previous_address)
TokenLedger.spend_all_unspent_outputs(previous_address)
end

burn_storage_nodes =
Election.storage_nodes(
Expand Down
22 changes: 12 additions & 10 deletions lib/archethic/contracts/loader.ex
Original file line number Diff line number Diff line change
Expand Up @@ -27,17 +27,19 @@ defmodule Archethic.Contracts.Loader do
def init(_opts) do
DB.list_last_transaction_addresses()
|> Stream.map(fn address ->
{:ok, tx} =
DB.get_transaction(address, [
:address,
:previous_public_key,
:data,
validation_stamp: [:timestamp]
])

tx
DB.get_transaction(address, [
:address,
:previous_public_key,
:data,
validation_stamp: [:timestamp]
])
end)
|> Stream.filter(&(&1.data.code != ""))
|> Stream.filter(fn
{:ok, %Transaction{data: %TransactionData{code: ""}}} -> false
{:error, _} -> false
_ -> true
end)
|> Stream.map(fn {:ok, tx} -> tx end)
|> Stream.each(&load_transaction(&1, true))
|> Stream.run()

Expand Down
7 changes: 5 additions & 2 deletions lib/archethic/db.ex
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ defmodule Archethic.DB do

use Knigge, otp_app: :archethic, default: EmbeddedImpl

@type storage_type() :: :chain | :io

@callback get_transaction(address :: binary(), fields :: list()) ::
{:ok, Transaction.t()} | {:error, :transaction_not_exists}
@callback get_beacon_summary(summary_address :: binary()) ::
Expand All @@ -24,12 +26,13 @@ defmodule Archethic.DB do
fields :: list(),
opts :: [paging_state: nil | binary(), after: DateTime.t()]
) :: Enumerable.t()
@callback write_transaction(Transaction.t()) :: :ok
@callback write_transaction(Transaction.t(), storage_type()) :: :ok
@callback write_beacon_summary(Summary.t()) :: :ok
@callback clear_beacon_summaries() :: :ok
@callback write_beacon_summaries_aggregate(SummaryAggregate.t()) :: :ok
@callback write_transaction_chain(Enumerable.t()) :: :ok
@callback list_transactions(fields :: list()) :: Enumerable.t()
@callback list_io_transactions(fields :: list()) :: Enumerable.t()
@callback add_last_transaction_address(binary(), binary(), DateTime.t()) :: :ok
@callback list_last_transaction_addresses() :: Enumerable.t()

Expand Down Expand Up @@ -61,7 +64,7 @@ defmodule Archethic.DB do
@callback get_latest_burned_fees() :: non_neg_integer()
@callback get_nb_transactions() :: non_neg_integer()

@callback transaction_exists?(binary()) :: boolean()
@callback transaction_exists?(binary(), storage_type()) :: boolean()

@callback register_p2p_summary(list({Crypto.key(), boolean(), float(), DateTime.t()})) :: :ok

Expand Down
56 changes: 41 additions & 15 deletions lib/archethic/db/embedded_impl.ex
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ defmodule Archethic.DB.EmbeddedImpl do

alias Archethic.Crypto

alias Archethic.DB

alias __MODULE__.BootstrapInfo
alias __MODULE__.ChainIndex
alias __MODULE__.ChainReader
Expand Down Expand Up @@ -74,38 +76,54 @@ defmodule Archethic.DB.EmbeddedImpl do
Enum.each(sorted_chain, fn tx ->
unless ChainIndex.transaction_exists?(tx.address, db_path()) do
ChainWriter.append_transaction(genesis_address, tx)

# Delete IO transaction if it exists as it is now stored as a chain
delete_io_transaction(tx.address)
end
end)
end

@doc """
Write a single transaction and append it to its chain
"""
@spec write_transaction(Transaction.t()) :: :ok
def write_transaction(tx = %Transaction{}) do
@spec write_transaction(Transaction.t(), DB.storage_type()) :: :ok
def write_transaction(tx, storage_type \\ :chain)

def write_transaction(tx = %Transaction{}, :chain) do
if ChainIndex.transaction_exists?(tx.address, db_path()) do
{:error, :transaction_already_exists}
else
previous_address = Transaction.previous_address(tx)

case ChainIndex.get_tx_entry(previous_address, db_path()) do
{:ok, %{genesis_address: genesis_address}} ->
do_write_transaction(genesis_address, tx)
genesis_address =
case ChainIndex.get_tx_entry(previous_address, db_path()) do
{:ok, %{genesis_address: genesis_address}} ->
genesis_address

{:error, :not_exists} ->
ChainWriter.append_transaction(previous_address, tx)
end
{:error, :not_exists} ->
previous_address
end

ChainWriter.append_transaction(genesis_address, tx)

# Delete IO transaction if it exists as it is now stored as a chain
delete_io_transaction(tx.address)
end
end

defp do_write_transaction(genesis_address, tx) do
if ChainIndex.transaction_exists?(tx.address, db_path()) do
def write_transaction(tx = %Transaction{}, :io) do
if ChainIndex.transaction_exists?(tx.address, :io, db_path()) do
{:error, :transaction_already_exists}
else
ChainWriter.append_transaction(genesis_address, tx)
ChainWriter.write_io_transaction(tx, db_path())
end
end

defp delete_io_transaction(address) do
ChainWriter.io_path(db_path(), address) |> File.rm()
:ok
end

@doc """
Write a beacon summary in DB
"""
Expand Down Expand Up @@ -137,9 +155,9 @@ defmodule Archethic.DB.EmbeddedImpl do
@doc """
Determine if the transaction exists or not
"""
@spec transaction_exists?(address :: binary()) :: boolean()
def transaction_exists?(address) when is_binary(address) do
ChainIndex.transaction_exists?(address, db_path())
@spec transaction_exists?(address :: binary(), storage_type :: DB.storage_type()) :: boolean()
def transaction_exists?(address, storage_type) when is_binary(address) do
ChainIndex.transaction_exists?(address, storage_type, db_path())
end

@doc """
Expand Down Expand Up @@ -278,14 +296,22 @@ defmodule Archethic.DB.EmbeddedImpl do
end

@doc """
List all the transactions
List all the transactions in chain storage
"""
@spec list_transactions(fields :: list()) :: Enumerable.t() | list(Transaction.t())
def list_transactions(fields \\ []) when is_list(fields) do
ChainIndex.list_genesis_addresses()
|> Stream.flat_map(&ChainReader.stream_scan_chain(&1, nil, fields, db_path()))
end

@doc """
List all the transactions in io storage
"""
@spec list_io_transactions(fields :: list()) :: Enumerable.t() | list(Transaction.t())
def list_io_transactions(fields \\ []) do
ChainReader.list_io_transactions(fields, db_path())
end

@doc """
List all the last transaction chain addresses
"""
Expand Down
11 changes: 8 additions & 3 deletions lib/archethic/db/embedded_impl/chain_index.ex
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ defmodule Archethic.DB.EmbeddedImpl.ChainIndex do
@vsn Mix.Project.config()[:version]

alias Archethic.Crypto
alias Archethic.DB
alias Archethic.DB.EmbeddedImpl.ChainWriter
alias Archethic.TransactionChain.Transaction

Expand Down Expand Up @@ -196,14 +197,18 @@ defmodule Archethic.DB.EmbeddedImpl.ChainIndex do
@doc """
Determine if a transaction exists
"""
@spec transaction_exists?(binary(), String.t()) :: boolean()
def transaction_exists?(address = <<_::8, _::8, _subset::8, _digest::binary>>, db_path) do
@spec transaction_exists?(binary(), DB.storage_type(), String.t()) :: boolean()
def transaction_exists?(address, storage_type \\ :chain, db_path)

def transaction_exists?(address, storage_type, db_path) do
case get_tx_entry(address, db_path) do
{:ok, _} ->
true

{:error, :not_exists} ->
false
if storage_type == :io,
do: ChainWriter.io_path(db_path, address) |> File.exists?(),
else: false
end
end

Expand Down
54 changes: 54 additions & 0 deletions lib/archethic/db/embedded_impl/chain_reader.ex
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,60 @@ defmodule Archethic.DB.EmbeddedImpl.ChainReader do
end
end

@doc """
List all the transactions in io storage
"""
@spec list_io_transactions(fields :: list(), db_path :: String.t()) ::
Enumerable.t() | list(Transaction.t())
def list_io_transactions(fields, db_path) do
io_transactions_path =
ChainWriter.base_io_path(db_path)
|> Path.join("*")
|> Path.wildcard()

Stream.resource(
fn -> io_transactions_path end,
fn
[filepath | rest] -> {[read_io_transaction(filepath, fields)], rest}
[] -> {:halt, nil}
end,
fn _ -> :ok end
)
end

defp read_io_transaction(filepath, fields) do
# Open the file as the position from the transaction in the chain file
fd = File.open!(filepath, [:binary, :read])

{:ok, <<size::32, version::32>>} = :file.read(fd, 8)
column_names = fields_to_column_names(fields)

# Ensure the validation stamp's protocol version is retrieved if we fetch validation stamp fields
has_validation_stamp_fields? =
Enum.any?(column_names, &String.starts_with?(&1, "validation_stamp."))

has_validation_stamp_protocol_field? =
Enum.any?(column_names, &(&1 == "validation_stamp.protocol_version"))

column_names =
if has_validation_stamp_fields? and !has_validation_stamp_protocol_field? do
["validation_stamp.protocol_version" | column_names]
else
column_names
end

# Read the transaction and extract requested columns from the fields arg
tx =
fd
|> read_transaction(column_names, size, 0)
|> Enum.into(%{})
|> decode_transaction_columns(version)

:file.close(fd)

tx
end

defp process_get_chain(fd, address, fields, opts, db_path) do
# Set the file cursor position to the paging state
case Keyword.get(opts, :paging_state) do
Expand Down
Loading

0 comments on commit 446ed5f

Please sign in to comment.