Skip to content

Commit

Permalink
Switch the operations callback to use an indexing strategy.
Browse files Browse the repository at this point in the history
The callback endpoint when completing an operation needs to access the
KV to check the integrity of the operation context that is being passed
by the caller.

We previously used as historical query to look this up, but this almost
always ends up returning a Service Unavailable error while the
historical transaction is being fetched, and the callback needs to be
retried after a short delay.

This replaces the historical query by caching the expected digest in the
existing operations indexing strategy. This increases memory usage a
little, but given that the indexing strategy is periodically purged,
this increase should be bounded. If the operation completes too quickly,
there is still a chance the strategy won't have indexed the original
transaction yet, and the existing retry mechanism will be used.
  • Loading branch information
plietar committed Mar 17, 2023
1 parent 41a6cef commit 22585e4
Show file tree
Hide file tree
Showing 2 changed files with 110 additions and 23 deletions.
2 changes: 1 addition & 1 deletion app/src/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -698,7 +698,7 @@ namespace scitt

register_service_endpoints(context, *this);
register_operations_endpoints(
context, *this, is_tx_committed, authn_policy, post_entry_continuation);
context, *this, authn_policy, post_entry_continuation);

#ifdef ENABLE_PREFIX_TREE
PrefixTreeFrontend::init_handlers(context, *this);
Expand Down
131 changes: 109 additions & 22 deletions app/src/operations_endpoints.h
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,101 @@ namespace scitt
}
}

/**
* Get the context digest for a currently running operation.
*
* This is used in the operation callback to check the integrity of the
* context.
*
* Throws an HTTPError if the operation's state cannot be retrieved. If the
* operation state is likely to be available at a later point, a
* ServiceUnavailableError is thrown, allowing the caller to try again
* later. Otherwise a BadRequestError or NotFoundError is thrown.
*/
crypto::Sha256Hash get_context_digest(const ccf::TxID& operation_id) const
{
std::lock_guard guard(lock);

// Before we can look up the operation ID in the operations map, we must
// check whether this is a valid committed transaction or not. Otherwise
// we risk confusing sequence numbers across different views.
ccf::TxStatus tx_status;
auto result =
get_status_for_txid(operation_id.view, operation_id.seqno, tx_status);
if (result != ccf::ApiResult::OK)
{
throw InternalError(fmt::format(
"Failed to get transaction status: {}",
ccf::api_result_to_str(result)));
}

std::string tx_str = operation_id.to_str();
switch (tx_status)
{
case ccf::TxStatus::Unknown:
case ccf::TxStatus::Pending:
// This can happen when the transaction hasn't been globally
// committed to the ledger yet. Not point looking up in the map yet,
// but we throw a transient error since eventually the transaction ID
// could be valid.
throw ServiceUnavailableError(
ccf::errors::TransactionPendingOrUnknown,
fmt::format("Transaction {} is not available.", tx_str));

case ccf::TxStatus::Invalid:
// Either the client passed in a garbage TX ID, or it was a real
// transaction that got rolled back. Either way, there's no point
// retrying it since it will never become valid in the future.
throw NotFoundError(
ccf::errors::TransactionInvalid,
fmt::format("Transaction {} is invalid.", tx_str));

case ccf::TxStatus::Committed:
break;
}

if (operation_id.seqno < lower_bound)
{
throw NotFoundError(
errors::OperationExpired,
fmt::format("Transaction {} is too old", tx_str));
}
else if (operation_id.seqno >= upper_bound)
{
throw ServiceUnavailableError(
errors::IndexingInProgressRetryLater,
fmt::format("Transaction {} is not available.", tx_str));
}
else if (auto it = operations_.find(operation_id.seqno);
it != operations_.end())
{
CCF_ASSERT(
operation_id.view == it->second.view,
"Operation ID has inconsistent view");
if (it->second.status == OperationStatus::Running)
{
CCF_ASSERT_FMT(
it->second.context_digest.has_value(),
"No context digest for operation {}",
tx_str);
return it->second.context_digest.value();
}
else
{
throw BadRequestError(
errors::InvalidInput,
fmt::format(
"Operation is in an invalid state: {}",
nlohmann::json(it->second.status).dump()));
}
}
else
{
// For a well-behaved client, this shouldn't ever happen.
throw NotFoundError(errors::NotFound, "Invalid operation ID");
}
}

protected:
void visit_entry(const ccf::TxID& tx_id, const OperationLog& log) override
{
Expand Down Expand Up @@ -216,6 +311,7 @@ namespace scitt
switch (log.status)
{
case OperationStatus::Running:
it->second.context_digest = log.context_digest;
break;
case OperationStatus::Failed:
it->second.error = log.error;
Expand Down Expand Up @@ -367,6 +463,7 @@ namespace scitt
time_t created_at;
std::optional<ccf::TxID> completion_tx;
std::optional<ODataError> error;
std::optional<crypto::Sha256Hash> context_digest;
};

// It might be worth replacing this with a deque. Entries are only ever
Expand Down Expand Up @@ -403,8 +500,8 @@ namespace scitt
ccf::endpoints::EndpointContext& ctx,
nlohmann::json&& params)
{
auto id = historical::get_tx_id_from_request_path(ctx);
return index->lookup(id);
auto txid = historical::get_tx_id_from_request_path(ctx);
return index->lookup(txid);
}

/**
Expand All @@ -425,25 +522,16 @@ namespace scitt
* returned to future clients polling for the operation's status.
*/
static auto post_operation_callback(
const std::shared_ptr<OperationsIndexingStrategy>& index,
OperationCallback& callback,
ccf::endpoints::EndpointContext& ctx,
ccf::historical::StatePtr state,
nlohmann::json&& params)
{
auto txid = state->transaction_id;
auto tx = state->store->create_read_only_tx();
auto operation = tx.ro<OperationsTable>(OPERATIONS_TABLE)->get();
if (!operation.has_value())
{
throw BadRequestError(
errors::InvalidInput,
fmt::format(
"Transaction ID {} does not correspond to an operation.",
txid.to_str()));
}
auto txid = historical::get_tx_id_from_request_path(ctx);
auto expected_context_digest = index->get_context_digest(txid);

auto input = params.get<PostOperationCallback::In>();
if (crypto::Sha256Hash(input.context) != operation->context_digest)
if (crypto::Sha256Hash(input.context) != expected_context_digest)
{
throw BadRequestError(errors::InvalidInput, "Invalid context");
}
Expand Down Expand Up @@ -498,14 +586,11 @@ namespace scitt
static void register_operations_endpoints(
ccfapp::AbstractNodeContext& context,
ccf::BaseEndpointRegistry& registry,
ccf::historical::CheckHistoricalTxStatus is_tx_committed,
const ccf::AuthnPolicies& authn_policy,
OperationCallback callback)
{
using namespace std::placeholders;

auto& state_cache = context.get_historical_state();

auto operations_index =
std::make_shared<OperationsIndexingStrategy>(registry);
context.get_indexing_strategies().install_strategy(operations_index);
Expand Down Expand Up @@ -540,10 +625,12 @@ namespace scitt
.make_endpoint(
"/operations/{txid}/callback",
HTTP_POST,
scitt::historical::json_adapter(
std::bind(endpoints::post_operation_callback, callback, _1, _2, _3),
state_cache,
is_tx_committed),
ccf::json_adapter(std::bind(
endpoints::post_operation_callback,
operations_index,
callback,
_1,
_2)),
no_authn_policy)
.install();
}
Expand Down

0 comments on commit 22585e4

Please sign in to comment.