Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Switch the operations callback to use an indexing strategy. #140

Merged
merged 2 commits into from
Mar 17, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion app/src/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -698,7 +698,7 @@ namespace scitt

register_service_endpoints(context, *this);
register_operations_endpoints(
context, *this, is_tx_committed, authn_policy, post_entry_continuation);
context, *this, authn_policy, post_entry_continuation);

#ifdef ENABLE_PREFIX_TREE
PrefixTreeFrontend::init_handlers(context, *this);
Expand Down
131 changes: 109 additions & 22 deletions app/src/operations_endpoints.h
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,101 @@ namespace scitt
}
}

/**
* Get the context digest for a currently running operation.
*
* This is used in the operation callback to check the integrity of the
* context.
*
* Throws an HTTPError if the operation's state cannot be retrieved. If the
* operation state is likely to be available at a later point, a
* ServiceUnavailableError is thrown, allowing the caller to try again
* later. Otherwise a BadRequestError or NotFoundError is thrown.
*/
crypto::Sha256Hash get_context_digest(const ccf::TxID& operation_id) const
{
std::lock_guard guard(lock);

// Before we can look up the operation ID in the operations map, we must
// check whether this is a valid committed transaction or not. Otherwise
// we risk confusing sequence numbers across different views.
ccf::TxStatus tx_status;
auto result =
get_status_for_txid(operation_id.view, operation_id.seqno, tx_status);
if (result != ccf::ApiResult::OK)
{
throw InternalError(fmt::format(
"Failed to get transaction status: {}",
ccf::api_result_to_str(result)));
}

std::string tx_str = operation_id.to_str();
switch (tx_status)
{
case ccf::TxStatus::Unknown:
case ccf::TxStatus::Pending:
// This can happen when the transaction hasn't been globally
// committed to the ledger yet. No point looking up in the map yet,
// but we throw a transient error since eventually the transaction ID
// could be valid.
throw ServiceUnavailableError(
ccf::errors::TransactionPendingOrUnknown,
fmt::format("Transaction {} is not available.", tx_str));

case ccf::TxStatus::Invalid:
// Either the client passed in a garbage TX ID, or it was a real
// transaction that got rolled back. Either way, there's no point
// retrying it since it will never become valid in the future.
throw NotFoundError(
ccf::errors::TransactionInvalid,
fmt::format("Transaction {} is invalid.", tx_str));

case ccf::TxStatus::Committed:
break;
}

if (operation_id.seqno < lower_bound)
{
throw NotFoundError(
errors::OperationExpired,
fmt::format("Transaction {} is too old", tx_str));
}
else if (operation_id.seqno >= upper_bound)
{
throw ServiceUnavailableError(
errors::IndexingInProgressRetryLater,
fmt::format("Transaction {} is not available.", tx_str));
}
else if (auto it = operations_.find(operation_id.seqno);
it != operations_.end())
{
CCF_ASSERT(
operation_id.view == it->second.view,
"Operation ID has inconsistent view");
if (it->second.status == OperationStatus::Running)
{
CCF_ASSERT_FMT(
it->second.context_digest.has_value(),
"No context digest for operation {}",
tx_str);
return it->second.context_digest.value();
}
else
{
throw BadRequestError(
errors::InvalidInput,
fmt::format(
"Operation is in an invalid state: {}",
nlohmann::json(it->second.status).dump()));
}
}
else
{
// For a well-behaved client, this shouldn't ever happen.
throw NotFoundError(errors::NotFound, "Invalid operation ID");
}
}

protected:
void visit_entry(const ccf::TxID& tx_id, const OperationLog& log) override
{
Expand Down Expand Up @@ -216,6 +311,7 @@ namespace scitt
switch (log.status)
{
case OperationStatus::Running:
it->second.context_digest = log.context_digest;
break;
case OperationStatus::Failed:
it->second.error = log.error;
Expand Down Expand Up @@ -367,6 +463,7 @@ namespace scitt
time_t created_at;
std::optional<ccf::TxID> completion_tx;
std::optional<ODataError> error;
std::optional<crypto::Sha256Hash> context_digest;
};

// It might be worth replacing this with a deque. Entries are only ever
Expand Down Expand Up @@ -403,8 +500,8 @@ namespace scitt
ccf::endpoints::EndpointContext& ctx,
nlohmann::json&& params)
{
auto id = historical::get_tx_id_from_request_path(ctx);
return index->lookup(id);
auto txid = historical::get_tx_id_from_request_path(ctx);
return index->lookup(txid);
}

/**
Expand All @@ -425,25 +522,16 @@ namespace scitt
* returned to future clients polling for the operation's status.
*/
static auto post_operation_callback(
const std::shared_ptr<OperationsIndexingStrategy>& index,
OperationCallback& callback,
ccf::endpoints::EndpointContext& ctx,
ccf::historical::StatePtr state,
nlohmann::json&& params)
{
auto txid = state->transaction_id;
auto tx = state->store->create_read_only_tx();
auto operation = tx.ro<OperationsTable>(OPERATIONS_TABLE)->get();
if (!operation.has_value())
{
throw BadRequestError(
errors::InvalidInput,
fmt::format(
"Transaction ID {} does not correspond to an operation.",
txid.to_str()));
}
auto txid = historical::get_tx_id_from_request_path(ctx);
auto expected_context_digest = index->get_context_digest(txid);

auto input = params.get<PostOperationCallback::In>();
if (crypto::Sha256Hash(input.context) != operation->context_digest)
if (crypto::Sha256Hash(input.context) != expected_context_digest)
{
throw BadRequestError(errors::InvalidInput, "Invalid context");
}
Expand Down Expand Up @@ -498,14 +586,11 @@ namespace scitt
static void register_operations_endpoints(
ccfapp::AbstractNodeContext& context,
ccf::BaseEndpointRegistry& registry,
ccf::historical::CheckHistoricalTxStatus is_tx_committed,
const ccf::AuthnPolicies& authn_policy,
OperationCallback callback)
{
using namespace std::placeholders;

auto& state_cache = context.get_historical_state();

auto operations_index =
std::make_shared<OperationsIndexingStrategy>(registry);
context.get_indexing_strategies().install_strategy(operations_index);
Expand Down Expand Up @@ -540,10 +625,12 @@ namespace scitt
.make_endpoint(
"/operations/{txid}/callback",
HTTP_POST,
scitt::historical::json_adapter(
std::bind(endpoints::post_operation_callback, callback, _1, _2, _3),
state_cache,
is_tx_committed),
ccf::json_adapter(std::bind(
endpoints::post_operation_callback,
operations_index,
callback,
_1,
_2)),
no_authn_policy)
.install();
}
Expand Down