Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Logging and persist async operation for Sapling migration #4002

Merged
merged 3 commits into from
May 13, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions qa/rpc-tests/sprout_sapling_migration.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,10 @@ def setup_nodes(self):
# Add migration parameters to nodes[0]
extra_args[0] = extra_args[0] + [
'-migration',
'-migrationdestaddress=' + SAPLING_ADDR
'-migrationdestaddress=' + SAPLING_ADDR,
'-debug=zrpcunsafe'
]
assert_equal(4, len(extra_args[0]))
assert_equal(5, len(extra_args[0]))
assert_equal(2, len(extra_args[1]))
return start_nodes(4, self.options.tmpdir, extra_args)

Expand Down Expand Up @@ -87,6 +88,8 @@ def run_migration_test(self, node, sproutAddr, saplingAddr, target_height):
assert_equal('saplingmigration', result['method'])
assert_equal(target_height, result['target_height'])
assert_equal(1, result['result']['num_tx_created'])
assert_equal(1, len(result['result']['migration_txids']))
assert_true(result['result']['amount_migrated'] > Decimal('0'))

assert_equal(0, len(node.getrawmempool()), "mempool size at 495 % 500")

Expand Down
2 changes: 1 addition & 1 deletion src/wallet/asyncrpcoperation_mergetoaddress.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -645,7 +645,7 @@ bool AsyncRPCOperation_mergetoaddress::main_impl()
wtxHeight = mapBlockIndex[wtx.hashBlock]->nHeight;
wtxDepth = wtx.GetDepthInMainChain();
}
LogPrint("zrpcunsafe", "%s: spending note (txid=%s, vjoinsplit=%d, ciphertext=%d, amount=%s, height=%d, confirmations=%d)\n",
LogPrint("zrpcunsafe", "%s: spending note (txid=%s, vjoinsplit=%d, jsoutindex=%d, amount=%s, height=%d, confirmations=%d)\n",
getId(),
jso.hash.ToString().substr(0, 10),
jso.js,
Expand Down
31 changes: 27 additions & 4 deletions src/wallet/asyncrpcoperation_saplingmigration.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ void AsyncRPCOperation_saplingmigration::main() {
}

bool AsyncRPCOperation_saplingmigration::main_impl() {
LogPrint("zrpcunsafe", "%s: Beginning AsyncRPCOperation_saplingmigration.\n", getId());
std::vector<CSproutNotePlaintextEntry> sproutEntries;
std::vector<SaplingNoteEntry> saplingEntries;
{
Expand All @@ -83,7 +84,9 @@ bool AsyncRPCOperation_saplingmigration::main_impl() {
}
// If the remaining amount to be migrated is less than 0.01 ZEC, end the migration.
if (availableFunds < CENT) {
setMigrationResult(0, 0);
LogPrint("zrpcunsafe", "%s: Available Sprout balance (%s) less than required minimum (%s). Stopping.\n",
getId(), FormatMoney(availableFunds), FormatMoney(CENT));
setMigrationResult(0, 0, std::vector<std::string>());
return true;
}

Expand All @@ -95,12 +98,14 @@ bool AsyncRPCOperation_saplingmigration::main_impl() {
// Up to the limit of 5, as many transactions are sent as are needed to migrate the remaining funds
int numTxCreated = 0;
CAmount amountMigrated = 0;
std::vector<std::string> migrationTxIds;
int noteIndex = 0;
CCoinsViewCache coinsView(pcoinsTip);
do {
CAmount amountToSend = chooseAmount(availableFunds);
auto builder = TransactionBuilder(consensusParams, targetHeight_, MIGRATION_EXPIRY_DELTA, pwalletMain, pzcashParams,
&coinsView, &cs_main);
LogPrint("zrpcunsafe", "%s: Beginning creating transaction with Sapling output amount=%s\n", getId(), FormatMoney(amountToSend - FEE));
std::vector<CSproutNotePlaintextEntry> fromNotes;
CAmount fromNoteAmount = 0;
while (fromNoteAmount < amountToSend) {
Expand All @@ -110,6 +115,15 @@ bool AsyncRPCOperation_saplingmigration::main_impl() {
}
availableFunds -= fromNoteAmount;
for (const CSproutNotePlaintextEntry& sproutEntry : fromNotes) {
std::string data(sproutEntry.plaintext.memo().begin(), sproutEntry.plaintext.memo().end());
LogPrint("zrpcunsafe", "%s: Adding Sprout note input (txid=%s, vjoinsplit=%d, jsoutindex=%d, amount=%s, memo=%s)\n",
getId(),
sproutEntry.jsop.hash.ToString().substr(0, 10),
sproutEntry.jsop.js,
int(sproutEntry.jsop.n), // uint8_t
FormatMoney(sproutEntry.plaintext.value()),
HexStr(data).substr(0, 10)
);
libzcash::SproutNote sproutNote = sproutEntry.plaintext.note(sproutEntry.address);
libzcash::SproutSpendingKey sproutSk;
pwalletMain->GetSproutSpendingKey(sproutEntry.address, sproutSk);
Expand All @@ -128,21 +142,30 @@ bool AsyncRPCOperation_saplingmigration::main_impl() {
builder.AddSaplingOutput(ovkForShieldingFromTaddr(seed), migrationDestAddress, amountToSend - FEE);
CTransaction tx = builder.Build().GetTxOrThrow();
if (isCancelled()) {
LogPrint("zrpcunsafe", "%s: Canceled. Stopping.\n", getId());
break;
}
pwalletMain->AddPendingSaplingMigrationTx(tx);
LogPrint("zrpcunsafe", "%s: Added pending migration transaction with txid=%s\n", getId(), tx.GetHash().ToString());
++numTxCreated;
amountMigrated += amountToSend;
amountMigrated += amountToSend - FEE;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why was the FEE excluded previously, but now included?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is to prevent confusion between this value and the amount_migrated returned by z_getmigrationstatus. It probably should have excluded the FEE originally.

migrationTxIds.push_back(tx.GetHash().ToString());
} while (numTxCreated < 5 && availableFunds > CENT);

setMigrationResult(numTxCreated, amountMigrated);
LogPrint("zrpcunsafe", "%s: Created %d transactions with total Sapling output amount=%s\n", getId(), numTxCreated, FormatMoney(amountMigrated));
setMigrationResult(numTxCreated, amountMigrated, migrationTxIds);
return true;
}

void AsyncRPCOperation_saplingmigration::setMigrationResult(int numTxCreated, CAmount amountMigrated) {
void AsyncRPCOperation_saplingmigration::setMigrationResult(int numTxCreated, const CAmount& amountMigrated, const std::vector<std::string>& migrationTxIds) {
UniValue res(UniValue::VOBJ);
res.push_back(Pair("num_tx_created", numTxCreated));
res.push_back(Pair("amount_migrated", FormatMoney(amountMigrated)));
UniValue txIds(UniValue::VARR);
for (const std::string& txId : migrationTxIds) {
txIds.push_back(txId);
}
res.push_back(Pair("migration_txids", txIds));
set_result(res);
}

Expand Down
2 changes: 1 addition & 1 deletion src/wallet/asyncrpcoperation_saplingmigration.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ class AsyncRPCOperation_saplingmigration : public AsyncRPCOperation

bool main_impl();

void setMigrationResult(int numTxCreated, CAmount amountMigrated);
void setMigrationResult(int numTxCreated, const CAmount& amountMigrated, const std::vector<std::string>& migrationTxIds);

CAmount chooseAmount(const CAmount& availableFunds);
};
4 changes: 2 additions & 2 deletions src/wallet/asyncrpcoperation_sendmany.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -784,7 +784,7 @@ bool AsyncRPCOperation_sendmany::main_impl() {
wtxHeight = mapBlockIndex[wtx.hashBlock]->nHeight;
wtxDepth = wtx.GetDepthInMainChain();
}
LogPrint("zrpcunsafe", "%s: spending note (txid=%s, vjoinsplit=%d, ciphertext=%d, amount=%s, height=%d, confirmations=%d)\n",
LogPrint("zrpcunsafe", "%s: spending note (txid=%s, vjoinsplit=%d, jsoutindex=%d, amount=%s, height=%d, confirmations=%d)\n",
getId(),
jso.hash.ToString().substr(0, 10),
jso.js,
Expand Down Expand Up @@ -1048,7 +1048,7 @@ bool AsyncRPCOperation_sendmany::find_unspent_notes() {
for (CSproutNotePlaintextEntry & entry : sproutEntries) {
z_sprout_inputs_.push_back(SendManyInputJSOP(entry.jsop, entry.plaintext.note(boost::get<libzcash::SproutPaymentAddress>(frompaymentaddress_)), CAmount(entry.plaintext.value())));
std::string data(entry.plaintext.memo().begin(), entry.plaintext.memo().end());
LogPrint("zrpcunsafe", "%s: found unspent Sprout note (txid=%s, vjoinsplit=%d, ciphertext=%d, amount=%s, memo=%s)\n",
LogPrint("zrpcunsafe", "%s: found unspent Sprout note (txid=%s, vjoinsplit=%d, jsoutindex=%d, amount=%s, memo=%s)\n",
getId(),
entry.jsop.hash.ToString().substr(0, 10),
entry.jsop.js,
Expand Down
4 changes: 2 additions & 2 deletions src/wallet/wallet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -610,7 +610,7 @@ void CWallet::RunSaplingMigration(int blockHeight) {
// height N-5
if (blockHeight % 500 == 495) {
std::shared_ptr<AsyncRPCQueue> q = getAsyncRPCQueue();
std::shared_ptr<AsyncRPCOperation> lastOperation = q->popOperationForId(saplingMigrationOperationId);
std::shared_ptr<AsyncRPCOperation> lastOperation = q->getOperationForId(saplingMigrationOperationId);
if (lastOperation != nullptr) {
lastOperation->cancel();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So now the base class cancel() will be invoked. Is that the intended behaviour? Will the cancel state be set correctly i.e. is the operation currently in READY state to transition to CANCEL state?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also note that the operation will still exist in an internal job map used by AsyncRPCQueue. That means, the queue will still try and process the operation. If the cancel state has been set correctly, this is fine. If not, the operation will be executed. Popping the operation removes the operation from the internal job map, which means the queue will have nothing to execute.

/**
 * Return the operation for a given operation id and then remove the operation from internal storage.
 */
std::shared_ptr<AsyncRPCOperation> AsyncRPCQueue::popOperationForId(AsyncRPCOperationId id) {
    std::shared_ptr<AsyncRPCOperation> ptr = getOperationForId(id);
    if (ptr) {
        std::lock_guard<std::mutex> guard(lock_);
        // Note: if the id still exists in the operationIdQueue, when it gets processed by a worker
        // there will no operation in the map to execute, so nothing will happen.
        operation_map_.erase(id);
    }
    return ptr;
}

...
void AsyncRPCQueue::run(size_t workerId) {
   ...
        if (!operation) {
            // cannot find operation in map, may have been removed
        } else if (operation->isCancelled()) {
            // skip cancelled operation
        } else {
            operation->main();
        }

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So now the base class cancel() will be invoked. Is that the intended behaviour? Will the cancel state be set correctly i.e. is the operation currently in READY state to transition to CANCEL state?

The intent of this is to cancel the operation if it is already running, this however does not work, and the fix does not seem to be simple (see #3988). What will currently happen is if a node is not finished generating migration transactions by the time we try to send them, then we will send what we have, but it will continue to generate transactions after the fact. These extra transactions will then be discarded in the next round of migrations.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also note that the operation will still exist in an internal job map used by AsyncRPCQueue. That means, the queue will still try and process the operation. If the cancel state has been set correctly, this is fine. If not, the operation will be executed. Popping the operation removes the operation from the internal job map, which means the queue will have nothing to execute.

This has been changed from pop to get

}
Expand All @@ -620,7 +620,7 @@ void CWallet::RunSaplingMigration(int blockHeight) {
q->addOperation(operation);
} else if (blockHeight % 500 == 499) {
std::shared_ptr<AsyncRPCQueue> q = getAsyncRPCQueue();
std::shared_ptr<AsyncRPCOperation> lastOperation = q->popOperationForId(saplingMigrationOperationId);
std::shared_ptr<AsyncRPCOperation> lastOperation = q->getOperationForId(saplingMigrationOperationId);
if (lastOperation != nullptr) {
lastOperation->cancel();
}
Expand Down