Large diffs are not rendered by default.

@@ -0,0 +1,178 @@
// Copyright (c) The Libra Core Contributors
// SPDX-License-Identifier: Apache-2.0

//! The following document is a minimalist version of Libra Wallet. Note that this Wallet does
//! not promote security as the mnemonic is stored in unencrypted form. In future iterations,
//! we will be realesing more robust Wallet implementations. It is our intention to present a
//! foundation that is simple to understand and incrementally improve the LibraWallet
//! implementation and it's security guarantees throughout testnet. For a more robust wallet
//! reference, the authors suggest to audit the file of the same name in the rust-wallet crate.
//! That file can be found here:
//!
//! https://github.com/rust-bitcoin/rust-wallet/blob/master/wallet/src/walletlibrary.rs

use crate::{
error::*,
io_utils,
key_factory::{ChildNumber, KeyFactory, Seed},
mnemonic::Mnemonic,
};
use libra_crypto::hash::CryptoHash;
use proto_conv::{FromProto, IntoProto};
use protobuf::Message;
use rand::{rngs::EntropyRng, Rng};
use std::{collections::HashMap, path::Path};
use types::{
account_address::AccountAddress,
proto::transaction::SignedTransaction as ProtoSignedTransaction,
transaction::{RawTransaction, RawTransactionBytes, SignedTransaction},
};

/// WalletLibrary contains all the information needed to recreate a particular wallet
pub struct WalletLibrary {
mnemonic: Mnemonic,
key_factory: KeyFactory,
addr_map: HashMap<AccountAddress, ChildNumber>,
key_leaf: ChildNumber,
}

impl WalletLibrary {
/// Constructor that generates a Mnemonic from OS randomness and subsequently instantiates an
/// empty WalletLibrary from that Mnemonic
#[allow(clippy::new_without_default)]
pub fn new() -> Self {
let mut rng = EntropyRng::new();
let data: [u8; 32] = rng.gen();
let mnemonic = Mnemonic::mnemonic(&data).unwrap();
Self::new_from_mnemonic(mnemonic)
}

/// Constructor that instantiates a new WalletLibrary from Mnemonic
pub fn new_from_mnemonic(mnemonic: Mnemonic) -> Self {
let seed = Seed::new(&mnemonic, "LIBRA");
WalletLibrary {
mnemonic,
key_factory: KeyFactory::new(&seed).unwrap(),
addr_map: HashMap::new(),
key_leaf: ChildNumber(0),
}
}

/// Function that returns the string representation of the WalletLibrary Menmonic
/// NOTE: This is not secure, and in general the mnemonic should be stored in encrypted format
pub fn mnemonic(&self) -> String {
self.mnemonic.to_string()
}

/// Function that writes the wallet Mnemonic to file
/// NOTE: This is not secure, and in general the Mnemonic would need to be decrypted before it
/// can be written to file; otherwise the encrypted Mnemonic should be written to file
pub fn write_recovery(&self, output_file_path: &Path) -> Result<()> {
io_utils::write_recovery(&self, &output_file_path)?;
Ok(())
}

/// Recover wallet from input_file_path
pub fn recover(input_file_path: &Path) -> Result<WalletLibrary> {
let wallet = io_utils::recover(&input_file_path)?;
Ok(wallet)
}

/// Get the current ChildNumber in u64 format
pub fn key_leaf(&self) -> u64 {
self.key_leaf.0
}

/// Function that iterates from the current key_leaf until the supplied depth
pub fn generate_addresses(&mut self, depth: u64) -> Result<()> {
let current = self.key_leaf.0;
if current > depth {
return Err(WalletError::LibraWalletGeneric(
"Addresses already generated up to the supplied depth".to_string(),
));
}
while self.key_leaf != ChildNumber(depth) {
let _ = self.new_address();
}
Ok(())
}

/// Function that allows to get the address of a particular key at a certain ChildNumber
pub fn new_address_at_child_number(
&mut self,
child_number: ChildNumber,
) -> Result<AccountAddress> {
let child = self.key_factory.private_child(child_number)?;
child.get_address()
}

/// Function that generates a new key and adds it to the addr_map and subsequently returns the
/// AccountAddress associated to the PrivateKey, along with it's ChildNumber
pub fn new_address(&mut self) -> Result<(AccountAddress, ChildNumber)> {
let child = self.key_factory.private_child(self.key_leaf)?;
let address = child.get_address()?;
let child = self.key_leaf;
self.key_leaf.increment();
match self.addr_map.insert(address, child) {
Some(_) => Err(WalletError::LibraWalletGeneric(
"This address is already in your wallet".to_string(),
)),
None => Ok((address, child)),
}
}

/// Returns a list of all addresses controlled by this wallet that are currently held by the
/// addr_map
pub fn get_addresses(&self) -> Result<Vec<AccountAddress>> {
let mut ret = Vec::with_capacity(self.addr_map.len());
let rev_map = self
.addr_map
.iter()
.map(|(&k, &v)| (v.as_ref().to_owned(), k.to_owned()))
.collect::<HashMap<_, _>>();
for i in 0..self.addr_map.len() as u64 {
match rev_map.get(&i) {
Some(account_address) => {
ret.push(*account_address);
}
None => {
return Err(WalletError::LibraWalletGeneric(format!(
"Child num {} not exist while depth is {}",
i,
self.addr_map.len()
)))
}
}
}
Ok(ret)
}

/// Simple public function that allows to sign a Libra RawTransaction with the PrivateKey
/// associated to a particular AccountAddress. If the PrivateKey associated to an
/// AccountAddress is not contained in the addr_map, then this function will return an Error
pub fn sign_txn(
&self,
addr: &AccountAddress,
txn: RawTransaction,
) -> Result<SignedTransaction> {
if let Some(child) = self.addr_map.get(addr) {
let raw_bytes = txn.into_proto().write_to_bytes()?;
let txn_hashvalue = RawTransactionBytes(&raw_bytes).hash();

let child_key = self.key_factory.private_child(child.clone())?;
let signature = child_key.sign(txn_hashvalue);
let public_key = child_key.get_public();

let mut signed_txn = ProtoSignedTransaction::new();
signed_txn.set_raw_txn_bytes(raw_bytes.to_vec());
signed_txn.set_sender_public_key(public_key.to_bytes().to_vec());
signed_txn.set_sender_signature(signature.to_bytes().to_vec());

Ok(SignedTransaction::from_proto(signed_txn)?)
} else {
Err(WalletError::LibraWalletGeneric(
"Well, that address is nowhere to be found... This is awkward".to_string(),
))
}
}
}
@@ -0,0 +1,148 @@
// Copyright (c) The Libra Core Contributors
// SPDX-License-Identifier: Apache-2.0

use crate::{client_proxy::ClientProxy, commands::*};

/// Major command for account related operations.
pub struct AccountCommand {}

impl Command for AccountCommand {
fn get_aliases(&self) -> Vec<&'static str> {
vec!["account", "a"]
}
fn get_description(&self) -> &'static str {
"Account operations"
}
fn execute(&self, client: &mut ClientProxy, params: &[&str]) {
let commands: Vec<Box<dyn Command>> = vec![
Box::new(AccountCommandCreate {}),
Box::new(AccountCommandListAccounts {}),
Box::new(AccountCommandRecoverWallet {}),
Box::new(AccountCommandWriteRecovery {}),
Box::new(AccountCommandMint {}),
];

subcommand_execute(&params[0], commands, client, &params[1..]);
}
}

/// Sub command to create a random account. The account will not be saved on chain.
pub struct AccountCommandCreate {}

impl Command for AccountCommandCreate {
fn get_aliases(&self) -> Vec<&'static str> {
vec!["create", "c"]
}
fn get_description(&self) -> &'static str {
"Create an account. Returns reference ID to use in other operations"
}
fn execute(&self, client: &mut ClientProxy, params: &[&str]) {
println!(">> Creating/retrieving next account from wallet");
match client.create_next_account(&params) {
Ok(account_data) => println!(
"Created/retrieved account #{} address {}",
account_data.index,
hex::encode(account_data.address)
),
Err(e) => report_error("Error creating account", e),
}
}
}

/// Sub command to recover wallet from the file specified.
pub struct AccountCommandRecoverWallet {}

impl Command for AccountCommandRecoverWallet {
fn get_aliases(&self) -> Vec<&'static str> {
vec!["recover", "r"]
}
fn get_params_help(&self) -> &'static str {
"<file_path>"
}
fn get_description(&self) -> &'static str {
"Recover Libra wallet from the file path"
}
fn execute(&self, client: &mut ClientProxy, params: &[&str]) {
println!(">> Recovering Wallet");
match client.recover_wallet_accounts(&params) {
Ok(account_data) => {
println!(
"Wallet recovered and the first {} child accounts were derived",
account_data.len()
);
for data in account_data {
println!("#{} address {}", data.index, hex::encode(data.address));
}
}
Err(e) => report_error("Error recovering Libra wallet", e),
}
}
}

/// Sub command to backup wallet to the file specified.
pub struct AccountCommandWriteRecovery {}

impl Command for AccountCommandWriteRecovery {
fn get_aliases(&self) -> Vec<&'static str> {
vec!["write", "w"]
}
fn get_params_help(&self) -> &'static str {
"<file_path>"
}
fn get_description(&self) -> &'static str {
"Save Libra wallet mnemonic recovery seed to disk"
}
fn execute(&self, client: &mut ClientProxy, params: &[&str]) {
println!(">> Saving Libra wallet mnemonic recovery seed to disk");
match client.write_recovery(&params) {
Ok(_) => println!("Saved mnemonic seed to disk"),
Err(e) => report_error("Error writing mnemonic recovery seed to file", e),
}
}
}

/// Sub command to list all accounts information.
pub struct AccountCommandListAccounts {}

impl Command for AccountCommandListAccounts {
fn get_aliases(&self) -> Vec<&'static str> {
vec!["list", "la"]
}
fn get_description(&self) -> &'static str {
"Print all accounts that were created or loaded"
}
fn execute(&self, client: &mut ClientProxy, _params: &[&str]) {
client.print_all_accounts();
}
}

/// Sub command to mint account.
pub struct AccountCommandMint {}

impl Command for AccountCommandMint {
fn get_aliases(&self) -> Vec<&'static str> {
vec!["mint", "mintb", "m", "mb"]
}
fn get_params_help(&self) -> &'static str {
"<receiver_account_ref_id>|<receiver_account_address> <number_of_coins>"
}
fn get_description(&self) -> &'static str {
"Mint coins to the account. Suffix 'b' is for blocking"
}
fn execute(&self, client: &mut ClientProxy, params: &[&str]) {
println!(">> Minting coins");
let is_blocking = blocking_cmd(params[0]);
match client.mint_coins(&params, is_blocking) {
Ok(_) => {
if is_blocking {
println!("Finished minting!");
} else {
// If this value is updated, it must also be changed in
// setup_scripts/docker/mint/server.py
println!("Mint request submitted");
}
}
Err(e) => report_error("Error minting coins", e),
}
}
}

Large diffs are not rendered by default.

@@ -0,0 +1,138 @@
// Copyright (c) The Libra Core Contributors
// SPDX-License-Identifier: Apache-2.0

use crate::{
account_commands::AccountCommand, client_proxy::ClientProxy, query_commands::QueryCommand,
transfer_commands::TransferCommand,
};

use failure::prelude::*;
use metrics::counters::*;
use std::{collections::HashMap, sync::Arc};
use types::account_address::ADDRESS_LENGTH;

/// Print the error and bump up error counter.
pub fn report_error(msg: &str, e: Error) {
println!("[ERROR] {}: {}", msg, pretty_format_error(e));
COUNTER_CLIENT_ERRORS.inc();
}

fn pretty_format_error(e: Error) -> String {
if let Some(grpc_error) = e.downcast_ref::<grpcio::Error>() {
if let grpcio::Error::RpcFailure(grpc_rpc_failure) = grpc_error {
match grpc_rpc_failure.status {
grpcio::RpcStatusCode::Unavailable | grpcio::RpcStatusCode::DeadlineExceeded => {
return "Server unavailable, please retry and/or check \
if host passed to the client is running"
.to_string();
}
_ => {}
}
}
}

return format!("{}", e);
}

/// Check whether a command is blocking.
pub fn blocking_cmd(cmd: &str) -> bool {
cmd.ends_with('b')
}

/// Chech whether a command is debugging command.
pub fn debug_format_cmd(cmd: &str) -> bool {
cmd.ends_with('?')
}

/// Check whether the input string is a valid libra address.
pub fn is_address(data: &str) -> bool {
match hex::decode(data) {
Ok(vec) => vec.len() == ADDRESS_LENGTH,
Err(_) => false,
}
}

/// Returns all the commands available, as well as the reverse index from the aliases to the
/// commands.
pub fn get_commands() -> (
Vec<Arc<dyn Command>>,
HashMap<&'static str, Arc<dyn Command>>,
) {
let commands: Vec<Arc<dyn Command>> = vec![
Arc::new(AccountCommand {}),
Arc::new(QueryCommand {}),
Arc::new(TransferCommand {}),
];
let mut alias_to_cmd = HashMap::new();
for command in &commands {
for alias in command.get_aliases() {
alias_to_cmd.insert(alias, Arc::clone(command));
}
}
(commands, alias_to_cmd)
}

/// Parse a cmd string, the first element in the returned vector is the command to run
pub fn parse_cmd(cmd_str: &str) -> Vec<&str> {
let input = &cmd_str[..];
input.trim().split(' ').map(str::trim).collect()
}

/// Print the help message for all sub commands.
pub fn print_subcommand_help(parent_command: &str, commands: &[Box<dyn Command>]) {
println!(
"usage: {} <arg>\n\nUse the following args for this command:\n",
parent_command
);
for cmd in commands {
println!(
"{} {}\n\t{}",
cmd.get_aliases().join(" | "),
cmd.get_params_help(),
cmd.get_description()
);
}
println!("\n");
}

/// Execute sub command.
// TODO: Convert subcommands arrays to lazy statics
pub fn subcommand_execute(
parent_command_name: &str,
commands: Vec<Box<dyn Command>>,
client: &mut ClientProxy,
params: &[&str],
) {
let mut commands_map = HashMap::new();
for (i, cmd) in commands.iter().enumerate() {
for alias in cmd.get_aliases() {
if commands_map.insert(alias, i) != None {
panic!("Duplicate alias {}", alias);
}
}
}

if params.is_empty() {
print_subcommand_help(parent_command_name, &commands);
return;
}

match commands_map.get(&params[0]) {
Some(&idx) => commands[idx].execute(client, &params),
_ => print_subcommand_help(parent_command_name, &commands),
}
}

/// Trait to perform client operations.
pub trait Command {
/// all commands and aliases this command support.
fn get_aliases(&self) -> Vec<&'static str>;
/// string that describes params.
fn get_params_help(&self) -> &'static str {
""
}
/// string that describes whet command does.
fn get_description(&self) -> &'static str;
/// code to execute.
fn execute(&self, client: &mut ClientProxy, params: &[&str]);
}

Large diffs are not rendered by default.

@@ -0,0 +1,47 @@
// Copyright (c) The Libra Core Contributors
// SPDX-License-Identifier: Apache-2.0

#![feature(duration_float)]
#![deny(missing_docs)]
//! Libra Client
//!
//! Client (binary) is the CLI tool to interact with Libra validator.
//! It supposes all public APIs.
use crypto::signing::KeyPair;
use serde::{Deserialize, Serialize};
use types::account_address::AccountAddress;

pub(crate) mod account_commands;
/// Main instance of client holding corresponding information, e.g. account address.
pub mod client_proxy;
/// Command struct to interact with client.
pub mod commands;
/// gRPC client wrapper to connect to validator.
pub(crate) mod grpc_client;
pub(crate) mod query_commands;
pub(crate) mod transfer_commands;

/// Struct used to store data for each created account. We track the sequence number
/// so we can create new transactions easily
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
pub struct AccountData {
/// Address of the account.
pub address: AccountAddress,
/// (private_key, public_key) pair if the account is not managed by wallet
pub key_pair: Option<KeyPair>,
/// Latest sequence number maintained by client, it can be different from validator.
pub sequence_number: u64,
}

impl AccountData {
/// Serialize account keypair if exists.
pub fn keypair_as_string(&self) -> Option<(String, String)> {
match &self.key_pair {
Some(key_pair) => Some((
crypto::utils::encode_to_string(&key_pair.private_key()),
crypto::utils::encode_to_string(&key_pair.public_key()),
)),
None => None,
}
}
}
@@ -0,0 +1,137 @@
// Copyright (c) The Libra Core Contributors
// SPDX-License-Identifier: Apache-2.0

use client::{client_proxy::ClientProxy, commands::*};
use logger::set_default_global_logger;
use rustyline::{config::CompletionType, error::ReadlineError, Config, Editor};
use structopt::StructOpt;

#[derive(Debug, StructOpt)]
#[structopt(
name = "Libra Client",
author = "The Libra Association",
about = "Libra client to connect to a specific validator"
)]
struct Args {
/// Admission Control port to connect to.
#[structopt(short = "p", long = "port", default_value = "30307")]
pub port: String,
/// Host address/name to connect to.
#[structopt(short = "a", long = "host")]
pub host: String,
/// Path to the generated keypair for the faucet account. The faucet account can be used to
/// mint coins. If not passed, a new keypair will be generated for
/// you and placed in a temporary directory.
/// To manually generate a keypair, use generate_keypair:
/// `cargo run -p generate_keypair -- -o <output_file_path>`
#[structopt(short = "m", long = "faucet_key_file_path")]
pub faucet_account_file: Option<String>,
/// Host that operates a faucet service
/// If not passed, will be derived from host parameter
#[structopt(short = "f", long = "faucet_server")]
pub faucet_server: Option<String>,
/// File location from which to load mnemonic word for user account address/key generation.
/// If not passed, a new mnemonic file will be generated by libra_wallet in the current
/// directory.
#[structopt(short = "n", long = "mnemonic_file")]
pub mnemonic_file: Option<String>,
/// File location from which to load config of trusted validators. It is used to verify
/// validator signatures in validator query response. The file should at least include public
/// key of all validators trusted by the client - which should typically be all validators on
/// the network. To connect to testnet, use 'libra/scripts/cli/trusted_peers.config.toml'.
/// Can be generated by libra-config for local testing:
/// `cargo run --bin libra-config`
/// But the preferred method is to simply use libra-swarm to run local networks
#[structopt(short = "s", long = "validator_set_file")]
pub validator_set_file: String,
}

fn main() -> std::io::Result<()> {
let _logger = set_default_global_logger(false /* async */, None);
crash_handler::setup_panic_handler();

let (commands, alias_to_cmd) = get_commands();

let args = Args::from_args();
let faucet_account_file = args.faucet_account_file.unwrap_or_else(|| "".to_string());

let mut client_proxy = ClientProxy::new(
&args.host,
&args.port,
&args.validator_set_file,
&faucet_account_file,
args.faucet_server,
args.mnemonic_file,
)
.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, &format!("{}", e)[..]))?;

// Test connection to validator
let test_ret = client_proxy.test_validator_connection();

if let Err(e) = test_ret {
println!(
"Not able to connect to validator at {}:{}, error {:?}",
args.host, args.port, e
);
return Ok(());
}
let cli_info = format!("Connected to validator at: {}:{}", args.host, args.port);
print_help(&cli_info, &commands);
println!("Please, input commands: \n");

let config = Config::builder()
.history_ignore_space(true)
.completion_type(CompletionType::List)
.auto_add_history(true)
.build();
let mut rl = Editor::<()>::with_config(config);
loop {
let readline = rl.readline("libra% ");
match readline {
Ok(line) => {
let params = parse_cmd(&line);
match alias_to_cmd.get(params[0]) {
Some(cmd) => cmd.execute(&mut client_proxy, &params),
None => match params[0] {
"quit" | "q!" => break,
"help" | "h" => print_help(&cli_info, &commands),
"" => continue,
x => println!("Unknown command: {:?}", x),
},
}
}
Err(ReadlineError::Interrupted) => {
println!("CTRL-C");
break;
}
Err(ReadlineError::Eof) => {
println!("CTRL-D");
break;
}
Err(err) => {
println!("Error: {:?}", err);
break;
}
}
}

Ok(())
}

/// Print the help message for the client and underlying command.
fn print_help(client_info: &str, commands: &[std::sync::Arc<dyn Command>]) {
println!("{}", client_info);
println!("usage: <command> <args>\n\nUse the following commands:\n");
for cmd in commands {
println!(
"{} {}\n\t{}",
cmd.get_aliases().join(" | "),
cmd.get_params_help(),
cmd.get_description()
);
}

println!("help | h \n\tPrints this help");
println!("quit | q! \n\tExit this client");
println!("\n");
}
@@ -0,0 +1,231 @@
// Copyright (c) The Libra Core Contributors
// SPDX-License-Identifier: Apache-2.0

use crate::{client_proxy::ClientProxy, commands::*};
use types::account_config::get_account_resource_or_default;
use vm_genesis::get_transaction_name;

/// Major command for query operations.
pub struct QueryCommand {}

impl Command for QueryCommand {
fn get_aliases(&self) -> Vec<&'static str> {
vec!["query", "q"]
}
fn get_description(&self) -> &'static str {
"Query operations"
}
fn execute(&self, client: &mut ClientProxy, params: &[&str]) {
let commands: Vec<Box<dyn Command>> = vec![
Box::new(QueryCommandGetBalance {}),
Box::new(QueryCommandGetSeqNum {}),
Box::new(QueryCommandGetLatestAccountState {}),
Box::new(QueryCommandGetTxnByAccountSeq {}),
Box::new(QueryCommandGetTxnByRange {}),
Box::new(QueryCommandGetEvent {}),
];

subcommand_execute(&params[0], commands, client, &params[1..]);
}
}

/// Sub commands to query balance for the account specified.
pub struct QueryCommandGetBalance {}

impl Command for QueryCommandGetBalance {
fn get_aliases(&self) -> Vec<&'static str> {
vec!["balance", "b"]
}
fn get_params_help(&self) -> &'static str {
"<account_ref_id>|<account_address>"
}
fn get_description(&self) -> &'static str {
"Get the current balance of an account"
}
fn execute(&self, client: &mut ClientProxy, params: &[&str]) {
if params.len() != 2 {
println!("Invalid number of arguments for balance query");
return;
}
match client.get_balance(&params) {
Ok(balance) => println!("Balance is: {}", balance),
Err(e) => report_error("Failed to get balance", e),
}
}
}

/// Sub command to get the latest sequence number from validator for the account specified.
pub struct QueryCommandGetSeqNum {}

impl Command for QueryCommandGetSeqNum {
fn get_aliases(&self) -> Vec<&'static str> {
vec!["sequence", "s"]
}
fn get_params_help(&self) -> &'static str {
"<account_ref_id>|<account_address> [reset_sequence_number=true|false]"
}
fn get_description(&self) -> &'static str {
"Get the current sequence number for an account, \
and reset current sequence number in CLI (optional, default is false)"
}
fn execute(&self, client: &mut ClientProxy, params: &[&str]) {
println!(">> Getting current sequence number");
match client.get_sequence_number(&params) {
Ok(sn) => println!("Sequence number is: {}", sn),
Err(e) => report_error("Error getting sequence number", e),
}
}
}

/// Command to query latest account state from validator.
pub struct QueryCommandGetLatestAccountState {}

impl Command for QueryCommandGetLatestAccountState {
fn get_aliases(&self) -> Vec<&'static str> {
vec!["account_state", "as"]
}
fn get_params_help(&self) -> &'static str {
"<account_ref_id>|<account_address>"
}
fn get_description(&self) -> &'static str {
"Get the latest state for an account"
}
fn execute(&self, client: &mut ClientProxy, params: &[&str]) {
println!(">> Getting latest account state");
match client.get_latest_account_state(&params) {
Ok((acc, version)) => match get_account_resource_or_default(&acc) {
Ok(_) => println!(
"Latest account state is: \n \
Account: {:#?}\n \
State: {:#?}\n \
Blockchain Version: {}\n",
client
.get_account_address_from_parameter(params[1])
.expect("Unable to parse account parameter"),
acc,
version,
),
Err(e) => report_error("Error converting account blob to account resource", e),
},
Err(e) => report_error("Error getting latest account state", e),
}
}
}

/// Sub command to get transaction by account and sequence number from validator.
pub struct QueryCommandGetTxnByAccountSeq {}

impl Command for QueryCommandGetTxnByAccountSeq {
fn get_aliases(&self) -> Vec<&'static str> {
vec!["txn_acc_seq", "ts"]
}
fn get_params_help(&self) -> &'static str {
"<account_ref_id>|<account_address> <sequence_number> <fetch_events=true|false>"
}
fn get_description(&self) -> &'static str {
"Get the committed transaction by account and sequence number. \
Optionally also fetch events emitted by this transaction."
}
fn execute(&self, client: &mut ClientProxy, params: &[&str]) {
println!(">> Getting committed transaction by account and sequence number");
match client.get_committed_txn_by_acc_seq(&params) {
Ok(txn_and_events) => {
match txn_and_events {
Some((comm_txn, events)) => {
println!(
"Committed transaction: {}",
comm_txn.format_for_client(get_transaction_name)
);
if let Some(events_inner) = &events {
println!("Events: ");
for event in events_inner {
println!("{}", event);
}
}
}
None => println!("Transaction not available"),
};
}
Err(e) => report_error(
"Error getting committed transaction by account and sequence number",
e,
),
}
}
}

/// Sub command to query transactions by range from validator.
pub struct QueryCommandGetTxnByRange {}

impl Command for QueryCommandGetTxnByRange {
fn get_aliases(&self) -> Vec<&'static str> {
vec!["txn_range", "tr"]
}
fn get_params_help(&self) -> &'static str {
"<start_version> <limit> <fetch_events=true|false>"
}
fn get_description(&self) -> &'static str {
"Get the committed transactions by version range. \
Optionally also fetch events emitted by these transactions."
}
fn execute(&self, client: &mut ClientProxy, params: &[&str]) {
println!(">> Getting committed transaction by range");
match client.get_committed_txn_by_range(&params) {
Ok(comm_txns_and_events) => {
// Note that this should never panic because we shouldn't return items
// if the version wasn't able to be parsed in the first place
let mut cur_version = params[1].parse::<u64>().expect("Unable to parse version");
for (txn, opt_events) in comm_txns_and_events {
println!(
"Transaction at version {}: {}",
cur_version,
txn.format_for_client(get_transaction_name)
);
if opt_events.is_some() {
let events = opt_events.unwrap();
if events.is_empty() {
println!("No events returned");
} else {
for event in events {
println!("{}", event);
}
}
}
cur_version += 1;
}
}
Err(e) => report_error("Error getting committed transactions by range", e),
}
}
}

/// Sub command to query events from validator.
pub struct QueryCommandGetEvent {}

impl Command for QueryCommandGetEvent {
fn get_aliases(&self) -> Vec<&'static str> {
vec!["event", "ev"]
}
fn get_params_help(&self) -> &'static str {
"<account_ref_id>|<account_address> <sent|received> <start_sequence_number> <ascending=true|false> <limit>"
}
fn get_description(&self) -> &'static str {
"Get events by account and event type (sent|received)."
}
fn execute(&self, client: &mut ClientProxy, params: &[&str]) {
println!(">> Getting events by account and event type.");
match client.get_events_by_account_and_type(&params) {
Ok((events, last_event_state)) => {
if events.is_empty() {
println!("No events returned");
} else {
for event in events {
println!("{}", event);
}
}
println!("Last event state: {:#?}", last_event_state);
}
Err(e) => report_error("Error getting events by access path", e),
}
}
}
@@ -0,0 +1,51 @@
// Copyright (c) The Libra Core Contributors
// SPDX-License-Identifier: Apache-2.0

use crate::{client_proxy::ClientProxy, commands::*};

/// Command to transfer coins between two accounts.
pub struct TransferCommand {}

impl Command for TransferCommand {
fn get_aliases(&self) -> Vec<&'static str> {
vec!["transfer", "transferb", "t", "tb"]
}
fn get_params_help(&self) -> &'static str {
"\n\t<sender_account_address>|<sender_account_ref_id> \
<receiver_account_address>|<receiver_account_ref_id> <number_of_coins> \
[gas_unit_price (default=0)] [max_gas_amount (default 10000)] \
Suffix 'b' is for blocking. "
}
fn get_description(&self) -> &'static str {
"Transfer coins from account to another."
}
fn execute(&self, client: &mut ClientProxy, params: &[&str]) {
if params.len() < 4 || params.len() > 6 {
println!("Invalid number of arguments for transfer");
println!(
"{} {}",
self.get_aliases().join(" | "),
self.get_params_help()
);
return;
}

println!(">> Transferring");
let is_blocking = blocking_cmd(&params[0]);
match client.transfer_coins(&params, is_blocking) {
Ok(index_and_seq) => {
if is_blocking {
println!("Finished transaction!");
} else {
println!("Transaction submitted to validator");
}
println!(
"To query for transaction status, run: query txn_acc_seq {} {} \
<fetch_events=true|false>",
index_and_seq.account_index, index_and_seq.sequence_number
);
}
Err(e) => report_error("Failed to perform transaction", e),
}
}
}
@@ -0,0 +1,6 @@
# cyclomatic complexity is not always useful
cognitive-complexity-threshold = 100
# types are used for safety encoding
type-complexity-threshold = 10000
# manipulating complex states machines in consensus
too-many-arguments-threshold = 15
@@ -0,0 +1,13 @@
[package]
name = "build_helpers"
version = "0.1.0"
authors = ["Libra Association <opensource@libra.org>"]
license = "Apache-2.0"
publish = false
edition = "2018"

[dependencies]
protoc-grpcio = "0.3.1"
walkdir = "2.2.0"

grpcio-client = {path = "../grpcio-client"}
@@ -0,0 +1,81 @@
// Copyright (c) The Libra Core Contributors
// SPDX-License-Identifier: Apache-2.0

/// Contains helpers for build.rs files. Includes helpers for proto compilation
use std::path::{Path, PathBuf};

use walkdir::WalkDir;

// Compiles all proto files under proto root and dependent roots.
// For example, if there is a file `src/a/b/c.proto`, it will generate `src/a/b/c.rs` and
// `src/a/b/c_grpc.rs`.
pub fn compile_proto(proto_root: &str, dependent_roots: Vec<&str>, generate_client_code: bool) {
let mut additional_includes = vec![];
for dependent_root in dependent_roots {
// First compile dependent directories
compile_dir(
&dependent_root,
vec![], /* additional_includes */
false, /* generate_client_code */
);
additional_includes.push(Path::new(dependent_root).to_path_buf());
}
// Now compile this directory
compile_dir(&proto_root, additional_includes, generate_client_code);
}

// Compile all of the proto files in proto_root directory and use the additional
// includes when compiling.
pub fn compile_dir(
proto_root: &str,
additional_includes: Vec<PathBuf>,
generate_client_code: bool,
) {
for entry in WalkDir::new(proto_root) {
let p = entry.unwrap();
if p.file_type().is_dir() {
continue;
}

let path = p.path();
if let Some(ext) = path.extension() {
if ext != "proto" {
continue;
}
println!("cargo:rerun-if-changed={}", path.display());
compile(&path, &additional_includes, generate_client_code);
}
}
}

fn compile(path: &Path, additional_includes: &[PathBuf], generate_client_code: bool) {
let parent = path.parent().unwrap();
let mut src_path = parent.to_owned().to_path_buf();
src_path.push("src");

let mut includes = additional_includes.to_owned();
includes.push(parent.to_path_buf());

::protoc_grpcio::compile_grpc_protos(&[path], includes.as_slice(), parent)
.unwrap_or_else(|_| panic!("Failed to compile protobuf input: {:?}", path));

if generate_client_code {
let file_string = path
.file_name()
.expect("unable to get filename")
.to_str()
.unwrap();
let includes_strings = includes
.iter()
.map(|x| x.to_str().unwrap())
.collect::<Vec<&str>>();

// generate client code
grpcio_client::client_stub_gen(
&[file_string],
includes_strings.as_slice(),
&parent.to_str().unwrap(),
)
.expect("Unable to generate client stub");
}
}
@@ -0,0 +1,4 @@
// Copyright (c) The Libra Core Contributors
// SPDX-License-Identifier: Apache-2.0

pub mod build_helpers;
@@ -0,0 +1,15 @@
[package]
name = "canonical_serialization"
version = "0.1.0"
authors = ["Libra Association <opensource@libra.org>"]
license = "Apache-2.0"
publish = false
edition = "2018"

[dependencies]
byteorder = "1.3.1"

failure = { path = "../failure_ext", package = "failure_ext" }

[dev-dependencies]
hex = "0.3"
@@ -0,0 +1,321 @@
// Copyright (c) The Libra Core Contributors
// SPDX-License-Identifier: Apache-2.0

//https://rust-lang.github.io/rust-clippy/master/index.html#blacklisted_name
//disable it in test so that we can use variable names such as 'foo' and 'bar'
#![allow(clippy::blacklisted_name)]
#![allow(clippy::many_single_char_names)]

use super::*;
use byteorder::WriteBytesExt;
use failure::Result;
use std::u32;

// Do not change the test vectors. Please read the comment below.
const TEST_VECTOR_1: &str = "ffffffffffffffff060000006463584d4237640000000000000009000000000102\
03040506070805050505050505050505050505050505050505050505050505050505\
05050505630000000103000000010000000103000000161543030000000038150300\
0000160a05040000001415596903000000c9175a";

// Why do we need test vectors?
//
// 1. Sometimes it help to catch common bugs between serialization and
// deserialization functions that would have been missed by a simple round trip test.
// For example, if there's a bug in a shared procedure that serialize and
// deserialize both calls then roundtrip might miss it.
//
// 2. It helps to catch code changes that inadvertently introduce breaking changes
// in the serialization format that is incompatible with what generated in the
// past which would be missed by roundtrip tests, or changes that are not backward
// compatible in the sense that it may fail to deserialize bytes generated in the past.

#[derive(Clone, Debug, Eq, PartialEq)]
pub struct Addr(pub [u8; 32]);

impl Addr {
fn new(bytes: [u8; 32]) -> Self {
Addr(bytes)
}
}

#[derive(Clone, Debug, Eq, PartialEq)]
struct Foo {
a: u64,
b: Vec<u8>,
c: Bar,
d: bool,
e: BTreeMap<Vec<u8>, Vec<u8>>,
}

#[derive(Clone, Debug, Eq, PartialEq)]
struct Bar {
a: u64,
b: Vec<u8>,
c: Addr,
d: u32,
}

impl CanonicalSerialize for Foo {
fn serialize(&self, serializer: &mut impl CanonicalSerializer) -> Result<()> {
serializer
.encode_u64(self.a)?
.encode_variable_length_bytes(&self.b)?
.encode_struct(&self.c)?
.encode_bool(self.d)?
.encode_btreemap(&self.e)?;
Ok(())
}
}

impl CanonicalSerialize for Bar {
fn serialize(&self, serializer: &mut impl CanonicalSerializer) -> Result<()> {
serializer
.encode_u64(self.a)?
.encode_variable_length_bytes(&self.b)?
.encode_raw_bytes(&self.c.0)?
.encode_u32(self.d)?;
Ok(())
}
}

impl CanonicalDeserialize for Foo {
fn deserialize(deserializer: &mut impl CanonicalDeserializer) -> Result<Self> {
let a = deserializer.decode_u64()?;
let b = deserializer.decode_variable_length_bytes()?;
let c: Bar = deserializer.decode_struct::<Bar>()?;
let d: bool = deserializer.decode_bool()?;
let e: BTreeMap<Vec<u8>, Vec<u8>> = deserializer.decode_btreemap()?;
Ok(Foo { a, b, c, d, e })
}
}

impl CanonicalDeserialize for Bar {
fn deserialize(deserializer: &mut impl CanonicalDeserializer) -> Result<Self> {
let a = deserializer.decode_u64()?;
let b = deserializer.decode_variable_length_bytes()?;
let c = deserializer.decode_bytes_with_len(32)?;
let mut cc: [u8; 32] = [0; 32];
cc.copy_from_slice(c.as_slice());

let d = deserializer.decode_u32()?;
Ok(Bar {
a,
b,
c: Addr::new(cc),
d,
})
}
}

#[test]
fn test_btreemap_encode() {
let mut map = BTreeMap::new();
let value = vec![54, 20, 21, 200];
let key1 = vec![0]; // after serialization: [1, 0]
let key2 = vec![0, 6]; // after serialization: [2, 0, 6]
let key3 = vec![1]; // after serialization: [1, 1]
let key4 = vec![2]; // after serialization: [1, 2]
map.insert(key1.clone(), value.clone());
map.insert(key2.clone(), value.clone());
map.insert(key3.clone(), value.clone());
map.insert(key4.clone(), value.clone());

let serialized_bytes = SimpleSerializer::<Vec<u8>>::serialize(&map).unwrap();

let mut deserializer = SimpleDeserializer::new(&serialized_bytes);

// ensure the order was encoded in lexicographic order
assert_eq!(deserializer.raw_bytes.read_u32::<Endianness>().unwrap(), 4);
assert_eq!(deserializer.decode_variable_length_bytes().unwrap(), key1);
assert_eq!(deserializer.decode_variable_length_bytes().unwrap(), value);
assert_eq!(deserializer.decode_variable_length_bytes().unwrap(), key3);
assert_eq!(deserializer.decode_variable_length_bytes().unwrap(), value);
assert_eq!(deserializer.decode_variable_length_bytes().unwrap(), key4);
assert_eq!(deserializer.decode_variable_length_bytes().unwrap(), value);
assert_eq!(deserializer.decode_variable_length_bytes().unwrap(), key2);
assert_eq!(deserializer.decode_variable_length_bytes().unwrap(), value);
}

#[test]
fn test_serialization_roundtrip() {
let bar = Bar {
a: 50,
b: vec![10u8; 100],
c: Addr::new([3u8; 32]),
d: 12,
};

let mut map = BTreeMap::new();
map.insert(vec![0, 56, 21], vec![22, 10, 5]);
map.insert(vec![1], vec![22, 21, 67]);
map.insert(vec![20, 21, 89, 105], vec![201, 23, 90]);

let foo = Foo {
a: 1,
b: vec![32, 41, 190, 200, 2, 5, 90, 100, 123, 234, 159, 159, 101],
c: bar,
d: false,
e: map,
};

let mut serializer = SimpleSerializer::<Vec<u8>>::new();
foo.serialize(&mut serializer).unwrap();
let serialized_bytes = serializer.get_output();

let mut deserializer = SimpleDeserializer::new(&serialized_bytes);
let deserialized_foo = Foo::deserialize(&mut deserializer).unwrap();
assert_eq!(foo, deserialized_foo);
assert_eq!(
deserializer.raw_bytes.position(),
deserializer.raw_bytes.get_ref().len() as u64
);
}

#[test]
fn test_encode_vec() {
let bar1 = Bar {
a: 55,
b: vec![10u8; 100],
c: Addr::new([3u8; 32]),
d: 77,
};
let bar2 = Bar {
a: 123,
b: vec![1, 5, 20],
c: Addr::new([8u8; 32]),
d: 127,
};

let mut vec = Vec::new();
vec.push(bar1.clone());
vec.push(bar2.clone());
let mut serializer = SimpleSerializer::<Vec<u8>>::new();
serializer.encode_vec(&vec).unwrap();
let serialized_bytes = serializer.get_output();

let de_vec: Vec<Bar> = SimpleDeserializer::deserialize(&serialized_bytes).unwrap();

assert_eq!(2, de_vec.len());
assert_eq!(bar1, de_vec[0]);
assert_eq!(bar2, de_vec[1]);

// test Vec<T> implementation
let mut serializer = SimpleSerializer::<Vec<u8>>::new();
serializer.encode_struct(&vec).unwrap();
let serialized_bytes = serializer.get_output();
let de_vec: Vec<Bar> = SimpleDeserializer::deserialize(&serialized_bytes).unwrap();

assert_eq!(2, de_vec.len());
assert_eq!(bar1, de_vec[0]);
assert_eq!(bar2, de_vec[1]);
}

#[test]
fn test_vec_impl() {
let mut vec: Vec<i32> = Vec::new();
vec.push(std::i32::MIN);
vec.push(std::i32::MAX);
vec.push(100);

let mut serializer = SimpleSerializer::<Vec<u8>>::new();
serializer.encode_struct(&vec).unwrap();
let serialized_bytes = serializer.get_output();
let de_vec: Vec<i32> = SimpleDeserializer::deserialize(&serialized_bytes).unwrap();
assert_eq!(vec, de_vec);
}

#[test]
fn test_vectors_1() {
let bar = Bar {
a: 100,
b: vec![0, 1, 2, 3, 4, 5, 6, 7, 8],
c: Addr::new([5u8; 32]),
d: 99,
};

let mut map = BTreeMap::new();
map.insert(vec![0, 56, 21], vec![22, 10, 5]);
map.insert(vec![1], vec![22, 21, 67]);
map.insert(vec![20, 21, 89, 105], vec![201, 23, 90]);

let foo = Foo {
a: u64::max_value(),
b: vec![100, 99, 88, 77, 66, 55],
c: bar,
d: true,
e: map,
};

let mut serializer = SimpleSerializer::<Vec<u8>>::new();
foo.serialize(&mut serializer).unwrap();
let serialized_bytes = serializer.get_output();

// make sure we serialize into exact same bytes as before
assert_eq!(TEST_VECTOR_1, hex::encode(serialized_bytes));

// make sure we can deserialize the test vector into expected struct
let test_vector_bytes = hex::decode(TEST_VECTOR_1).unwrap();
let deserialized_foo: Foo = SimpleDeserializer::deserialize(&test_vector_bytes).unwrap();
assert_eq!(foo, deserialized_foo);
}

#[test]
fn test_serialization_failure_cases() {
// a vec longer than representable range should result in failure
let bar = Bar {
a: 100,
b: vec![0; i32::max_value() as usize + 1],
c: Addr::new([0u8; 32]),
d: 222,
};

let mut serializer = SimpleSerializer::<Vec<u8>>::new();
assert!(bar.serialize(&mut serializer).is_err());
}

#[test]
fn test_deserialization_failure_cases() {
// invalid length prefix should fail on all decoding methods
let bytes_len_2 = vec![0; 2];
let mut deserializer = SimpleDeserializer::new(&bytes_len_2);
assert!(deserializer.clone().decode_u64().is_err());
assert!(deserializer.clone().decode_bytes_with_len(32).is_err());
assert!(deserializer.clone().decode_variable_length_bytes().is_err());
assert!(deserializer.clone().decode_struct::<Foo>().is_err());
assert!(Foo::deserialize(&mut deserializer.clone()).is_err());

// a length prefix longer than maximum allowed should fail
let mut long_bytes = Vec::new();
long_bytes
.write_u32::<Endianness>(ARRAY_MAX_LENGTH as u32 + 1)
.unwrap();
deserializer = SimpleDeserializer::new(&long_bytes);
assert!(deserializer.clone().decode_variable_length_bytes().is_err());

// vec not long enough should fail
let mut bytes_len_10 = Vec::new();
bytes_len_10.write_u32::<Endianness>(32).unwrap();
deserializer = SimpleDeserializer::new(&bytes_len_10);
assert!(deserializer.clone().decode_variable_length_bytes().is_err());
assert!(deserializer.clone().decode_bytes_with_len(32).is_err());

// malformed struct should fail
let mut some_bytes = Vec::new();
some_bytes.write_u64::<Endianness>(10).unwrap();
some_bytes.write_u32::<Endianness>(50).unwrap();
deserializer = SimpleDeserializer::new(&some_bytes);
assert!(deserializer.clone().decode_struct::<Foo>().is_err());

// malformed encoded bytes with length prefix larger than real
let mut evil_bytes = Vec::new();
evil_bytes.write_u32::<Endianness>(500).unwrap();
evil_bytes.resize_with(4 + 499, Default::default);
deserializer = SimpleDeserializer::new(&evil_bytes);
assert!(deserializer.clone().decode_variable_length_bytes().is_err());

// malformed encoded bool with value not 0 or 1
let mut bool_bytes = Vec::new();
bool_bytes.write_u8(2).unwrap();
deserializer = SimpleDeserializer::new(&bool_bytes);
assert!(deserializer.clone().decode_bool().is_err());
}

Large diffs are not rendered by default.

@@ -0,0 +1,16 @@
// Copyright (c) The Libra Core Contributors
// SPDX-License-Identifier: Apache-2.0

use crate::{CanonicalDeserialize, CanonicalSerialize, SimpleDeserializer, SimpleSerializer};
use std::fmt::Debug;

pub fn assert_canonical_encode_decode<T>(object: &T)
where
T: CanonicalSerialize + CanonicalDeserialize + Debug + Eq,
{
let serialized: Vec<u8> =
SimpleSerializer::serialize(object).expect("Serialization should work");
let deserialized: T =
SimpleDeserializer::deserialize(&serialized).expect("Deserialization should work");
assert_eq!(*object, deserialized);
}
@@ -0,0 +1,15 @@
[package]
name = "channel"
version = "0.1.0"
authors = ["Libra Association <opensource@libra.org>"]
license = "Apache-2.0"
publish = false
edition = "2018"

[dependencies]
futures = { version = "=0.3.0-alpha.16", package = "futures-preview" }
lazy_static = "1.3.0"
metrics = { path = "../metrics" }

[dev-dependencies]
rusty-fork = "0.2.1"
@@ -0,0 +1,157 @@
// Copyright (c) The Libra Core Contributors
// SPDX-License-Identifier: Apache-2.0

//! Provides an mpsc (multi-producer single-consumer) channel wrapped in an
//! [`IntGauge`](metrics::IntGauge)
//!
//! The original futures mpsc channels has the behavior that each cloned sender gets a guaranteed
//! slot. There are cases in our codebase that senders need to be cloned to work with combinators
//! like `buffer_unordered`. The bounded mpsc channels turn to be unbounded in this way. There are
//! great discussions in this [PR](https://github.com/rust-lang-nursery/futures-rs/pull/984). The
//! argument of the current behavior is to have only local limit on each sender, and relies on
//! global coordination for the number of cloned senders. However, this isn't really feasible in
//! some cases. One solution that came up from the discussion is to have poll_flush call poll_ready
//! (instead of a noop) to make sure the current sender task isn't parked. For the case that a new
//! cloned sender tries to send a message to a full channel, send executes poll_ready, start_send
//! and poll_flush. The first poll_ready would return Ready because maybe_parked initiated as
//! false. start_send then pushes the message to the internal message queue and parks the sender
//! task. poll_flush calls poll_ready again, and this time, it would return Pending because the
//! sender task is parked. So the send will block until the receiver removes more messages from the
//! queue and that sender's task is unparked.
//! [This PR](https://github.com/rust-lang-nursery/futures-rs/pull/1671) is supposed to fix this in
//! futures 0.3. It'll be consistent once it's merged.
//!
//! This change does have some implications though.
//! 1. When the channel size is 0, it becomes synchronous. `send` won't finish until the item is
//! taken from the receiver.
//! 2. `send` may fail if the receiver drops after receiving the item.
//!
//! let (tx, rx) = channel::new_test(1);
//! let f_tx = async move {
//! block_on(tx.send(1)).unwrap();
//! };
//! let f_rx = async move {
//! let item = block_on(rx.next()).unwrap();
//! assert_eq!(item, 1);
//! };
//! block_on(join(f_tx, f_rx)).unwrap();
//!
//! For the example above, `tx.send` could fail. Because send has three steps - poll_ready,
//! start_send and poll_flush. After start_send, the rx can receive the item, but if rx gets
//! dropped before poll_flush, it'll trigger disconnected send error. That's why the disconnected
//! error is converted to an Ok in poll_flush.

use futures::{
channel::mpsc,
sink::Sink,
stream::{FusedStream, Stream},
task::{Context, Poll},
};
use metrics::IntGauge;
use std::pin::Pin;

#[cfg(test)]
mod test;

/// Wrapper around a value with an `IntGauge`
/// It is used to gauge the number of elements in a `mpsc::channel`
#[derive(Clone)]
pub struct WithGauge<T> {
gauge: IntGauge,
value: T,
}

/// Similar to `mpsc::Sender`, but with an `IntGauge`
pub type Sender<T> = WithGauge<mpsc::Sender<T>>;
/// Similar to `mpsc::Receiver`, but with an `IntGauge`
pub type Receiver<T> = WithGauge<mpsc::Receiver<T>>;

/// `Sender` implements `Sink` in the same way as `mpsc::Sender`, but it increments the
/// associated `IntGauge` when it sends a message successfully.
impl<T> Sink<T> for Sender<T> {
type SinkError = mpsc::SendError;

fn poll_ready(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Result<(), Self::SinkError>> {
(*self).value.poll_ready(cx)
}

fn start_send(mut self: Pin<&mut Self>, msg: T) -> Result<(), Self::SinkError> {
self.gauge.inc();
(*self).value.start_send(msg).map_err(|e| {
self.gauge.dec();
e
})?;
Ok(())
}

// `poll_flush` would block if `poll_ready` returns pending, which means the channel is at
// capacity and the sender task is parked.
fn poll_flush(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Result<(), Self::SinkError>> {
match (*self).value.poll_ready(cx) {
Poll::Ready(Err(ref e)) if e.is_disconnected() => {
// If the receiver disconnected, we consider the sink to be flushed.
Poll::Ready(Ok(()))
}
x => x,
}
}

fn poll_close(
mut self: Pin<&mut Self>,
_: &mut Context<'_>,
) -> Poll<Result<(), Self::SinkError>> {
self.value.disconnect();
Poll::Ready(Ok(()))
}
}

impl<T> FusedStream for Receiver<T> {
fn is_terminated(&self) -> bool {
self.value.is_terminated()
}
}

/// `Receiver` implements `Stream` in the same way as `mpsc::Stream`, but it decrements the
/// associated `IntGauge` when it gets polled sucessfully.
impl<T> Stream for Receiver<T> {
type Item = T;

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T>> {
let poll = Pin::new(&mut self.value).poll_next(cx);
if let Poll::Ready(Some(_)) = poll {
self.gauge.dec();
}
poll
}
}

/// Similar to `mpsc::channel`, `new` creates a pair of `Sender` and `Receiver`
pub fn new<T>(size: usize, gauge: &IntGauge) -> (Sender<T>, Receiver<T>) {
gauge.set(0);
let (sender, receiver) = mpsc::channel(size);
(
WithGauge {
gauge: gauge.clone(),
value: sender,
},
WithGauge {
gauge: gauge.clone(),
value: receiver,
},
)
}

lazy_static::lazy_static! {
pub static ref TEST_COUNTER: IntGauge =
IntGauge::new("TEST_COUNTER", "Counter of network tests").unwrap();
}

pub fn new_test<T>(size: usize) -> (Sender<T>, Receiver<T>) {
new(size, &TEST_COUNTER)
}
@@ -0,0 +1,67 @@
// Copyright (c) The Libra Core Contributors
// SPDX-License-Identifier: Apache-2.0

use crate::{new_test, TEST_COUNTER};
use futures::{
executor::block_on,
task::{noop_waker, Context, Poll},
FutureExt, SinkExt, StreamExt,
};
use rusty_fork::{rusty_fork_id, rusty_fork_test, rusty_fork_test_name};

#[test]
fn test_send() {
let (mut tx, mut rx) = new_test(8);
assert_eq!(TEST_COUNTER.get(), 0);
let item = 42;
block_on(tx.send(item)).unwrap();
assert_eq!(TEST_COUNTER.get(), 1);
let received_item = block_on(rx.next()).unwrap();
assert_eq!(received_item, item);
assert_eq!(TEST_COUNTER.get(), 0);
}

// Fork the unit tests into separate processes to avoid the conflict that these tests executed in
// multiple threads may manipulate TEST_COUNTER at the same time.
rusty_fork_test! {
#[test]
fn test_send_backpressure() {
let waker = noop_waker();
let mut cx = Context::from_waker(&waker);

let (mut tx, mut rx) = new_test(1);
assert_eq!(TEST_COUNTER.get(), 0);
block_on(tx.send(1)).unwrap();
assert_eq!(TEST_COUNTER.get(), 1);

let mut task = tx.send(2);
assert_eq!(task.poll_unpin(&mut cx), Poll::Pending);
let item = block_on(rx.next()).unwrap();
assert_eq!(item, 1);
assert_eq!(TEST_COUNTER.get(), 1);
assert_eq!(task.poll_unpin(&mut cx), Poll::Ready(Ok(())));
}
}

// Fork the unit tests into separate processes to avoid the conflict that these tests executed in
// multiple threads may manipulate TEST_COUNTER at the same time.
rusty_fork_test! {
#[test]
fn test_send_backpressure_multi_senders() {
let waker = noop_waker();
let mut cx = Context::from_waker(&waker);

let (mut tx1, mut rx) = new_test(1);
assert_eq!(TEST_COUNTER.get(), 0);
block_on(tx1.send(1)).unwrap();
assert_eq!(TEST_COUNTER.get(), 1);

let mut tx2 = tx1.clone();
let mut task = tx2.send(2);
assert_eq!(task.poll_unpin(&mut cx), Poll::Pending);
let item = block_on(rx.next()).unwrap();
assert_eq!(item, 1);
assert_eq!(TEST_COUNTER.get(), 1);
assert_eq!(task.poll_unpin(&mut cx), Poll::Ready(Ok(())));
}
}
@@ -0,0 +1,14 @@
[package]
name = "crash_handler"
version = "0.1.0"
authors = ["Libra Association <opensource@libra.org>"]
license = "Apache-2.0"
publish = false
edition = "2018"

[dependencies]
backtrace = "0.3.9"
toml = "0.4.7"

logger = { path = "../logger" }
serde = { version = "1.0.89", features = ["derive"] }
@@ -0,0 +1,75 @@
// Copyright (c) The Libra Core Contributors
// SPDX-License-Identifier: Apache-2.0

#![feature(panic_info_message)]

use backtrace::Backtrace;
use logger::prelude::*;
use serde::Serialize;
use std::{
panic::{self, PanicInfo},
process, thread, time,
};

#[derive(Debug, Serialize)]
pub struct CrashInfo {
reason: String,
details: String,
backtrace: String,
}

pub fn setup_panic_handler() {
// If RUST_BACKTRACE variable isn't present or RUST_BACKTRACE=0, we setup panic handler
let is_backtrace_set = std::env::var_os("RUST_BACKTRACE")
.map(|x| &x != "0")
.unwrap_or(false);

if is_backtrace_set {
info!("Skip panic handler setup because RUST_BACKTRACE is set");
} else {
panic::set_hook(Box::new(move |pi: &PanicInfo<'_>| {
handle_panic(pi);
}));
}
}

// formats and logs panic information
fn handle_panic(panic_info: &PanicInfo<'_>) {
let reason = match panic_info.message() {
Some(m) => format!("{}", m),
None => "Unknown Reason".into(),
};

let mut details = String::new();

let payload = match panic_info.payload().downcast_ref::<&str>() {
Some(pld) => format!("Details: {}. ", pld),
None => "[no extra details]. ".into(),
};
details.push_str(&payload);

let location = match panic_info.location() {
Some(loc) => format!(
"Thread panicked at file '{}' at line {}",
loc.file(),
loc.line()
),
None => "[no location details].".into(),
};
details.push_str(&location);

let backtrace = format!("{:#?}", Backtrace::new());

let info = CrashInfo {
reason,
details,
backtrace,
};
crit!("{}", toml::to_string_pretty(&info).unwrap());

// allow to save on disk
thread::sleep(time::Duration::from_millis(100));

// kill the process
process::exit(12);
}
@@ -0,0 +1,20 @@
[package]
name = "debug_interface"
version = "0.1.0"
authors = ["Libra Association <opensource@libra.org>"]
license = "Apache-2.0"
publish = false
edition = "2018"

[dependencies]
grpcio = "0.4.3"
futures = "0.1.23"
protobuf = "2.6"

failure = { package = "failure_ext", path = "../failure_ext" }
jemalloc = { path = "../jemalloc" }
logger = { path = "../logger" }
metrics = { path = "../metrics" }

[build-dependencies]
build_helpers = { path = "../build_helpers" }
@@ -0,0 +1,17 @@
// Copyright (c) The Libra Core Contributors
// SPDX-License-Identifier: Apache-2.0

//! This compiles all the `.proto` files under `src/` directory.
//!
//! For example, if there is a file `src/a/b/c.proto`, it will generate `src/a/b/c.rs` and
//! `src/a/b/c_grpc.rs`.

fn main() {
let proto_root = "src/proto";

build_helpers::build_helpers::compile_proto(
proto_root,
vec![], /* dependent roots */
false, /* generate_client_stub */
);
}
@@ -0,0 +1,79 @@
// Copyright (c) The Libra Core Contributors
// SPDX-License-Identifier: Apache-2.0

use crate::proto::{
node_debug_interface::{DumpJemallocHeapProfileRequest, GetNodeDetailsRequest},
node_debug_interface_grpc::NodeDebugInterfaceClient,
};
use failure::prelude::*;
use grpcio::{ChannelBuilder, EnvBuilder};
use std::{collections::HashMap, sync::Arc};

// Generated
pub mod proto;

pub mod node_debug_helpers;
pub mod node_debug_service;

/// Implement default utility client for NodeDebugInterface
pub struct NodeDebugClient {
client: NodeDebugInterfaceClient,
address: String,
port: u16,
}

impl NodeDebugClient {
pub fn new<A: AsRef<str>>(address: A, port: u16) -> Self {
let env = Arc::new(EnvBuilder::new().name_prefix("grpc-debug-").build());
let ch = ChannelBuilder::new(env).connect(&format!("{}:{}", address.as_ref(), port));
let client = NodeDebugInterfaceClient::new(ch);

Self {
client,
address: address.as_ref().to_owned(),
port,
}
}

pub fn get_address(&self) -> &str {
&self.address
}

pub fn get_port(&self) -> u16 {
self.port
}

pub fn get_node_metric<S: AsRef<str>>(&self, metric: S) -> Result<Option<i64>> {
let metrics = self.get_node_metrics()?;
Ok(metrics.get(metric.as_ref()).cloned())
}

pub fn get_node_metrics(&self) -> Result<HashMap<String, i64>> {
let response = self
.client
.get_node_details(&GetNodeDetailsRequest::new())
.context("Unable to query Node metrics")?;

response
.stats
.into_iter()
.map(|(k, v)| match v.parse::<i64>() {
Ok(v) => Ok((k, v)),
Err(_) => Err(format_err!(
"Failed to parse stat value to i64 {}: {}",
&k,
&v
)),
})
.collect()
}

pub fn dump_heap_profile(&self) -> Result<i32> {
let response = self
.client
.dump_jemalloc_heap_profile(&DumpJemallocHeapProfileRequest::new())
.context("Unable to request heap dump")?;

Ok(response.status_code)
}
}
@@ -0,0 +1,38 @@
// Copyright (c) The Libra Core Contributors
// SPDX-License-Identifier: Apache-2.0

//! Helper functions for debug interface.

use crate::proto::node_debug_interface_grpc::NodeDebugInterfaceClient;
use grpcio::{ChannelBuilder, EnvBuilder};
use logger::prelude::*;
use std::{sync::Arc, thread, time};

pub fn create_debug_client(debug_port: u16) -> NodeDebugInterfaceClient {
let node_connection_str = format!("localhost:{}", debug_port);
let env = Arc::new(EnvBuilder::new().name_prefix("grpc-debug-").build());
let ch = ChannelBuilder::new(env).connect(&node_connection_str);
NodeDebugInterfaceClient::new(ch)
}

pub fn check_node_up(client: &NodeDebugInterfaceClient) {
let mut attempt = 200;
let get_details_req = crate::proto::node_debug_interface::GetNodeDetailsRequest::new();

loop {
match client.get_node_details(&get_details_req) {
Ok(_) => {
info!("Node is up");
break;
}
Err(e) => {
if attempt > 0 {
attempt -= 1;
thread::sleep(time::Duration::from_millis(100));
} else {
panic!("Node is not up after many attempts: {}", e);
}
}
}
}
}
@@ -0,0 +1,60 @@
// Copyright (c) The Libra Core Contributors
// SPDX-License-Identifier: Apache-2.0

//! Debug interface to access information in a specific node.

use crate::proto::{
node_debug_interface::{
DumpJemallocHeapProfileRequest, DumpJemallocHeapProfileResponse, GetNodeDetailsRequest,
GetNodeDetailsResponse,
},
node_debug_interface_grpc::NodeDebugInterface,
};
use futures::Future;
use logger::prelude::*;
use metrics::counters::COUNTER_ADMISSION_CONTROL_CANNOT_SEND_REPLY;

#[derive(Clone, Default)]
pub struct NodeDebugService {}

impl NodeDebugService {
pub fn new() -> Self {
Default::default()
}
}

impl NodeDebugInterface for NodeDebugService {
fn get_node_details(
&mut self,
ctx: ::grpcio::RpcContext<'_>,
_req: GetNodeDetailsRequest,
sink: ::grpcio::UnarySink<GetNodeDetailsResponse>,
) {
info!("[GRPC] get_node_details");
let mut response = GetNodeDetailsResponse::new();
response.stats = metrics::get_all_metrics();
ctx.spawn(sink.success(response).map_err(default_reply_error_logger))
}

fn dump_jemalloc_heap_profile(
&mut self,
ctx: ::grpcio::RpcContext<'_>,
_request: DumpJemallocHeapProfileRequest,
sink: ::grpcio::UnarySink<DumpJemallocHeapProfileResponse>,
) {
trace!("[GRPC] dump_jemalloc_heap_profile");
let status_code = match jemalloc::dump_jemalloc_memory_profile() {
Ok(_) => 0,
Err(err_code) => err_code,
};
let mut resp = DumpJemallocHeapProfileResponse::new();
resp.status_code = status_code;
let f = sink.success(resp).map_err(default_reply_error_logger);
ctx.spawn(f)
}
}

fn default_reply_error_logger<T: ::std::fmt::Debug>(e: T) {
COUNTER_ADMISSION_CONTROL_CANNOT_SEND_REPLY.inc();
error!("Failed to reply error due to {:?}", e)
}
@@ -0,0 +1,3 @@
# Ignore all the generated files.
node_debug_interface.rs
node_debug_interface_grpc.rs
@@ -0,0 +1,5 @@
// Copyright (c) The Libra Core Contributors
// SPDX-License-Identifier: Apache-2.0

pub mod node_debug_interface;
pub mod node_debug_interface_grpc;
@@ -0,0 +1,27 @@
// Copyright (c) The Libra Core Contributors
// SPDX-License-Identifier: Apache-2.0

// A Debugging interface to be used to query debug information from a Node
syntax = "proto3";

package debug;

message GetNodeDetailsRequest {}

message GetNodeDetailsResponse { map<string, string> stats = 1; }

message DumpJemallocHeapProfileRequest {}

message DumpJemallocHeapProfileResponse {
// Status code from jemalloc mallctl call. 0 indicates success.
int32 status_code = 1;
}

service NodeDebugInterface {
// Returns debug information about node
rpc GetNodeDetails(GetNodeDetailsRequest) returns (GetNodeDetailsResponse) {}

// Triggers a dump of heap profile.
rpc DumpJemallocHeapProfile(DumpJemallocHeapProfileRequest)
returns (DumpJemallocHeapProfileResponse) {}
}
@@ -0,0 +1,16 @@
[package]
name = "executable_helpers"
version = "0.1.0"
authors = ["Libra Association <opensource@libra.org>"]
license = "Apache-2.0"
publish = false
edition = "2018"

[dependencies]
clap = "2.32.0"
slog-scope = "4.0"

config = { path = "../../config" }
crash_handler = { path = "../crash_handler" }
logger = { path = "../logger" }
metrics = { path = "../metrics" }
@@ -0,0 +1,158 @@
// Copyright (c) The Libra Core Contributors
// SPDX-License-Identifier: Apache-2.0

use clap::{value_t, App, Arg, ArgMatches};
use config::config::{NodeConfig, NodeConfigHelpers};
use logger::prelude::*;
use slog_scope::GlobalLoggerGuard;

// General args
pub const ARG_PEER_ID: &str = "--peer_id";
pub const ARG_DISABLE_LOGGING: &str = "--no_logging";
pub const ARG_CONFIG_PATH: &str = "--config_path";

// Used for consensus
pub const ARG_NUM_PAYLOAD: &str = "--num_payload";
pub const ARG_PAYLOAD_SIZE: &str = "--payload_size";

pub fn load_configs_from_args(args: &ArgMatches<'_>) -> NodeConfig {
let node_config;

if args.is_present(ARG_CONFIG_PATH) {
// Allow peer id over-ride via command line
let peer_id = value_t!(args, ARG_PEER_ID, String).ok();

let config_path =
value_t!(args, ARG_CONFIG_PATH, String).expect("Path to config file must be specified");
info!("Loading node config from: {}", &config_path);
node_config = NodeConfig::load_config(peer_id, &config_path).expect("NodeConfig");

info!("Starting Full {}", node_config.base.peer_id);
} else {
// Note we will silently ignore --peer_id arg here
info!("Loading test configs");
node_config = NodeConfigHelpers::get_single_node_test_config(false /* random ports */);

info!("Starting Single-Mode {}", node_config.base.peer_id);
}

// Node configuration contains important ephemeral port information and should
// not be subject to being disabled as with other logs
println!("Using node config {:?}", &node_config);

node_config
}

pub fn setup_metrics(peer_id: &str, node_config: &NodeConfig) {
if !node_config.metrics.dir.as_os_str().is_empty() {
metrics::dump_all_metrics_to_file_periodically(
&node_config.metrics.dir,
&format!("{}.metrics", peer_id),
node_config.metrics.collection_interval_ms,
);
}

// TODO: should we do this differently for different binaries?
if !node_config.metrics.push_server_addr.is_empty() {
metrics::push_all_metrics_to_pushgateway_periodically(
"libra_node",
&node_config.metrics.push_server_addr,
peer_id,
node_config.metrics.collection_interval_ms,
);
}
}

/// Performs common setup for the executable. Takes in args that
/// you wish to use for this executable
pub fn setup_executable(
app_name: String,
arg_names: Vec<&str>,
) -> (NodeConfig, Option<GlobalLoggerGuard>, ArgMatches<'_>) {
crash_handler::setup_panic_handler();

let args = get_arg_matches(app_name, arg_names);
let is_logging_disabled = args.is_present(ARG_DISABLE_LOGGING);
let mut _logger = set_default_global_logger(is_logging_disabled, None);

let config = load_configs_from_args(&args);

// Reset the global logger using config (for chan_size currently).
// We need to drop the global logger guard first before resetting it.
_logger = None;
let logger = set_default_global_logger(
is_logging_disabled,
Some(config.base.node_async_log_chan_size),
);

setup_metrics(&config.base.peer_id, &config);

(config, logger, args)
}

fn set_default_global_logger(
is_logging_disabled: bool,
chan_size: Option<usize>,
) -> Option<GlobalLoggerGuard> {
if is_logging_disabled {
return None;
}

Some(logger::set_default_global_logger(
true, /* async */
chan_size, /* chan_size */
))
}

fn get_arg_matches(app_name: String, arg_names: Vec<&str>) -> ArgMatches<'_> {
let mut service_name = app_name.clone();
service_name.push_str(" Service");

let mut app = App::new(app_name)
.version("0.1.0")
.author("Libra Association <opensource@libra.org>")
.about(service_name.as_str());

for arg in arg_names {
let short;
let takes_value;
let help;
match arg {
ARG_PEER_ID => {
short = "-p";
takes_value = true;
help = "Specify peer id for this node";
}
ARG_CONFIG_PATH => {
short = "-f";
takes_value = true;
help = "Specify the path to the config file";
}
ARG_DISABLE_LOGGING => {
short = "-d";
takes_value = false;
help = "Controls logging";
}
ARG_NUM_PAYLOAD => {
short = "-n";
takes_value = true;
help = "Specify the number of payload each node send";
}
ARG_PAYLOAD_SIZE => {
short = "-s";
takes_value = true;
help = "Specify the byte size of each payload";
}
x => panic!("Invalid argument: {}", x),
}
app = app.arg(
Arg::with_name(arg)
.short(short)
.long(arg)
.takes_value(takes_value)
.help(help),
);
}

app.get_matches()
}
@@ -0,0 +1,4 @@
// Copyright (c) The Libra Core Contributors
// SPDX-License-Identifier: Apache-2.0

pub mod helpers;
@@ -0,0 +1,12 @@
[package]
name = "failure_ext"
version = "0.1.0"
authors = ["Libra Association <opensource@libra.org>"]
license = "Apache-2.0"
publish = false
edition = "2018"

[dependencies]
failure = "0.1.3"

failure_macros = { path = "failure_macros" }
@@ -0,0 +1,9 @@
[package]
name = "failure_macros"
version = "0.1.0"
authors = ["Libra Association <opensource@libra.org>"]
license = "Apache-2.0"
publish = false
edition = "2018"

[dependencies]
@@ -0,0 +1,15 @@
// Copyright (c) The Libra Core Contributors
// SPDX-License-Identifier: Apache-2.0

//! Collection of convenience macros for error handling

/// Exits a function early with an `Error`.
///
/// Equivalent to the `bail!` macro, except a error type is provided instead of
/// a message.
#[macro_export]
macro_rules! bail_err {
($e:expr) => {
return Err(From::from($e));
};
}
@@ -0,0 +1,31 @@
// Copyright (c) The Libra Core Contributors
// SPDX-License-Identifier: Apache-2.0

//! A common error handling library for the Libra project.
//!
//! ## Usage
//!
//! // This crate must be imported as 'failure' in order to ensure the
//! // procedural derive macro for the `Fail` trait can function properly.
//! failure = { path = "../common/failure_ext", package = "failure_ext" }
//! // Most of the types and macros you'll need can be found in the prelude.
//! use failure::prelude::*;

pub use failure::{
_core, bail, ensure, err_msg, format_err, AsFail, Backtrace, Causes, Compat, Context, Error,
Fail, ResultExt, SyncFailure,
};

// Custom error handling macros are placed in the failure_macros crate. Due to
// the way intra-crate macro exports currently work, macros can't be exported
// from anywhere but the top level when they are defined in the same crate.
pub use failure_macros::bail_err;

pub type Result<T> = ::std::result::Result<T, Error>;

/// Prelude module containing most commonly used types/macros this crate exports.
pub mod prelude {
pub use crate::Result;
pub use failure::{bail, ensure, err_msg, format_err, Error, Fail, ResultExt};
pub use failure_macros::bail_err;
}
@@ -0,0 +1,21 @@
[package]
name = "grpc_helpers"
version = "0.1.0"
authors = ["Libra Association <opensource@libra.org>"]
license = "Apache-2.0"
publish = false
edition = "2018"

[dependencies]
grpcio = "0.4.3"
futures = { version = "0.3.0-alpha.13", package = "futures-preview", features = ["compat"] }
futures_01 = { version = "0.1.25", package = "futures" }

failure = { package = "failure_ext", path = "../failure_ext" }
logger = { path = "../logger" }
metrics = { path = "../metrics" }

[dependencies.prometheus]
version = "0.4.2"
default-features = false
features = ["nightly", "push"]
@@ -0,0 +1,144 @@
// Copyright (c) The Libra Core Contributors
// SPDX-License-Identifier: Apache-2.0

use failure::{prelude::*, Result};
use futures::{compat::Future01CompatExt, future::Future, prelude::*};
use futures_01::future::Future as Future01;
use grpcio::{EnvBuilder, ServerBuilder};
use logger::prelude::*;
use metrics::counters::SVC_COUNTERS;
use std::{
str::from_utf8,
sync::{
mpsc::{self, Sender},
Arc,
},
thread, time,
};

pub fn default_reply_error_logger<T: std::fmt::Debug>(e: T) {
error!("Failed to reply error due to {:?}", e)
}

pub fn create_grpc_invalid_arg_status(method: &str, err: ::failure::Error) -> ::grpcio::RpcStatus {
let msg = format!("Request failed {}", err);
error!("{} failed with {}", method, &msg);
::grpcio::RpcStatus::new(::grpcio::RpcStatusCode::InvalidArgument, Some(msg))
}

/// This is a helper method to return a response to the GRPC context
/// and signal that the operation is done.
/// It's also logging any errors and incrementing relevant counters.
/// The return value is `bool` to flag externally whether the result
/// is successful (true) or not (false).
pub fn provide_grpc_response<ResponseType: std::fmt::Debug>(
resp: Result<ResponseType>,
ctx: ::grpcio::RpcContext<'_>,
sink: ::grpcio::UnarySink<ResponseType>,
) {
let mut success = true;
match resp {
Ok(resp) => ctx.spawn(sink.success(resp).map_err(default_reply_error_logger)),
Err(e) => {
success = false;
let f = sink
.fail(create_grpc_invalid_arg_status(
from_utf8(ctx.method()).expect("Unable to convert function name to string"),
e,
))
.map_err(default_reply_error_logger);
ctx.spawn(f)
}
}
SVC_COUNTERS.resp(&ctx, success);
}

pub fn spawn_service_thread(
service: ::grpcio::Service,
service_host_address: String,
service_public_port: u16,
service_name: impl Into<String>,
) -> ServerHandle {
spawn_service_thread_with_drop_closure(
service,
service_host_address,
service_public_port,
service_name,
|| { /* no code, to make compiler happy */ },
)
}

pub fn spawn_service_thread_with_drop_closure<F>(
service: ::grpcio::Service,
service_host_address: String,
service_public_port: u16,
service_name: impl Into<String>,
service_drop_closure: F,
) -> ServerHandle
where
F: FnOnce() + 'static,
{
let env = Arc::new(EnvBuilder::new().name_prefix(service_name).build());
let server = ServerBuilder::new(env)
.register_service(service)
.bind(service_host_address, service_public_port)
.build()
.expect("Unable to create grpc server");
ServerHandle::setup_with_drop_closure(server, Some(Box::new(service_drop_closure)))
}

pub struct ServerHandle {
stop_sender: Sender<()>,
drop_closure: Option<Box<dyn FnOnce()>>,
}

impl ServerHandle {
pub fn setup_with_drop_closure(
mut server: ::grpcio::Server,
drop_closure: Option<Box<dyn FnOnce()>>,
) -> Self {
let (start_sender, start_receiver) = mpsc::channel();
let (stop_sender, stop_receiver) = mpsc::channel();
let handle = Self {
stop_sender,
drop_closure,
};
thread::spawn(move || {
server.start();
start_sender.send(()).unwrap();
loop {
if stop_receiver.try_recv().is_ok() {
return;
}
thread::sleep(time::Duration::from_millis(100));
}
});

start_receiver.recv().unwrap();
handle
}
pub fn setup(server: ::grpcio::Server) -> Self {
Self::setup_with_drop_closure(server, None)
}
}

impl Drop for ServerHandle {
fn drop(&mut self) {
self.stop_sender.send(()).unwrap();
if let Some(f) = self.drop_closure.take() {
f()
}
}
}

pub fn convert_grpc_response<T>(
response: grpcio::Result<impl Future01<Item = T, Error = grpcio::Error>>,
) -> impl Future<Output = Result<T>> {
future::ready(response.map_err(convert_grpc_err))
.map_ok(Future01CompatExt::compat)
.and_then(|x| x.map_err(convert_grpc_err))
}

fn convert_grpc_err(e: ::grpcio::Error) -> Error {
format_err!("grpc error: {}", e)
}
@@ -0,0 +1,15 @@
[package]
name = "grpcio-client"
version = "0.1.0"
authors = ["Libra Association <opensource@libra.org>"]
license = "Apache-2.0"
publish = false
edition = "2018"

[dependencies]
mktemp = "0.3"
protobuf = "2.6"
protobuf-codegen = "2.*"
protoc = "2.*"
protoc-grpcio = "1.0.1"
regex = "1.1.6"
@@ -0,0 +1,308 @@
// Copyright (c) The Libra Core Contributors
// SPDX-License-Identifier: Apache-2.0

use std::collections::HashMap;

use protobuf::{
compiler_plugin,
descriptor::{FileDescriptorProto, MethodDescriptorProto, ServiceDescriptorProto},
descriptorx::{RootScope, WithScope},
};

use super::util;
use protobuf_codegen::code_writer::CodeWriter;

/*
This is mostly copied from grpcio-compiler.
It's copied and not reimplemented or re-used in some way because:
Most methods there are private, and I have to use the same names/structs
for the generated output.
*/

struct MethodGen<'a> {
proto: &'a MethodDescriptorProto,
root_scope: &'a RootScope<'a>,
}

impl<'a> MethodGen<'a> {
fn new(proto: &'a MethodDescriptorProto, root_scope: &'a RootScope<'a>) -> MethodGen<'a> {
MethodGen { proto, root_scope }
}

fn input(&self) -> String {
format!(
"super::{}",
self.root_scope
.find_message(self.proto.get_input_type())
.rust_fq_name()
)
}

fn output(&self) -> String {
format!(
"super::{}",
self.root_scope
.find_message(self.proto.get_output_type())
.rust_fq_name()
)
}

fn method_type(&self) -> (util::MethodType, String) {
match (
self.proto.get_client_streaming(),
self.proto.get_server_streaming(),
) {
(false, false) => (
util::MethodType::Unary,
util::fq_grpc("util::MethodType::Unary"),
),
(true, false) => (
util::MethodType::ClientStreaming,
util::fq_grpc("util::MethodType::ClientStreaming"),
),
(false, true) => (
util::MethodType::ServerStreaming,
util::fq_grpc("util::MethodType::ServerStreaming"),
),
(true, true) => (
util::MethodType::Duplex,
util::fq_grpc("util::MethodType::Duplex"),
),
}
}

fn name(&self) -> String {
util::to_snake_case(self.proto.get_name())
}

// Method signatures
fn unary(&self, method_name: &str) -> String {
format!(
"{}(&self, req: &{}) -> {}<{}>",
method_name,
self.input(),
util::fq_grpc("Result"),
self.output()
)
}

fn unary_async(&self, method_name: &str) -> String {
format!(
"{}_async(&self, req: &{}) -> {}<Box<Future<Item={}, Error={}> + Send>>",
method_name,
self.input(),
util::fq_grpc("Result"),
self.output(),
util::fq_grpc("Error")
)
}

fn client_streaming(&self, method_name: &str) -> String {
format!(
"{}(&self) -> {}<({}<{}>, {}<{}>)>",
method_name,
util::fq_grpc("Result"),
util::fq_grpc("ClientCStreamSender"),
self.input(),
util::fq_grpc("ClientCStreamReceiver"),
self.output()
)
}

fn server_streaming(&self, method_name: &str) -> String {
format!(
"{}(&self, req: &{}) -> {}<{}<{}>>",
method_name,
self.input(),
util::fq_grpc("Result"),
util::fq_grpc("ClientSStreamReceiver"),
self.output()
)
}

fn duplex_streaming(&self, method_name: &str) -> String {
format!(
"{}(&self) -> {}<({}<{}>, {}<{}>)>",
method_name,
util::fq_grpc("Result"),
util::fq_grpc("ClientDuplexSender"),
self.input(),
util::fq_grpc("ClientDuplexReceiver"),
self.output()
)
}

fn write_method(&self, has_impl: bool, w: &mut CodeWriter<'_>) {
let method_name = self.name();
let method_name = method_name.as_str();

// some parts are not implemented yet, account for both
let (sig, implemented) = match self.method_type().0 {
util::MethodType::Unary => (self.unary(method_name), true),
util::MethodType::ClientStreaming => (self.client_streaming(method_name), false),
util::MethodType::ServerStreaming => (self.server_streaming(method_name), false),
util::MethodType::Duplex => (self.duplex_streaming(method_name), false),
};

if has_impl {
w.def_fn(sig.as_str(), |w| {
if implemented {
w.write_line(format!("self.{}(req)", method_name));
} else {
w.unimplemented();
}
});
} else {
w.def_fn(sig.as_str(), |w| {
w.unimplemented();
});
}

// async variant: only implemented for unary methods for now
if let util::MethodType::Unary = self.method_type().0 {
w.def_fn(self.unary_async(method_name).as_str(), |w| {
if has_impl {
w.match_expr(format!("self.{}_async(req)", method_name), |w| {
w.case_expr("Ok(f)", "Ok(Box::new(f))");
w.case_expr("Err(e)", "Err(e)");
});
} else {
w.unimplemented();
}
});
}
}

fn write_trait(&self, w: &mut CodeWriter<'_>) {
self.write_method(false, w);
}

fn write_impl(&self, w: &mut CodeWriter<'_>) {
self.write_method(true, w);
}
}

struct ClientTraitGen<'a> {
proto: &'a ServiceDescriptorProto,
methods: Vec<MethodGen<'a>>,
base_name: String,
}

impl<'a> ClientTraitGen<'a> {
fn new(
proto: &'a ServiceDescriptorProto,
file: &FileDescriptorProto,
root_scope: &'a RootScope<'_>,
) -> ClientTraitGen<'a> {
let methods = proto
.get_method()
.iter()
.map(|m| MethodGen::new(m, root_scope))
.collect();

let base = protobuf::descriptorx::proto_path_to_rust_mod(file.get_name());

ClientTraitGen {
proto,
methods,
base_name: base,
}
}

fn service_name(&self) -> String {
util::to_camel_case(self.proto.get_name())
}

fn trait_name(&self) -> String {
format!("{}Trait", self.client_name())
}

fn client_name(&self) -> String {
format!("{}Client", self.service_name())
}

fn write_trait(&self, w: &mut CodeWriter<'_>) {
w.pub_trait_extend(self.trait_name().as_str(), "Clone + Send + Sync", |w| {
// methods that go inside the trait

for method in &self.methods {
w.write_line("");
method.write_trait(w);
}
});
}

fn write_impl(&self, w: &mut CodeWriter<'_>) {
let type_name = format!("super::{}_grpc::{}", self.base_name, self.client_name());
w.impl_for_block(self.trait_name(), type_name, |w| {
for method in &self.methods {
w.write_line("");
method.write_impl(w);
}
});
}

fn write(&self, w: &mut CodeWriter<'_>) {
// Client trait definition
self.write_trait(w);
w.write_line("");

// Impl block for client trait
self.write_impl(w);
}
}

fn gen_file(
file: &FileDescriptorProto,
root_scope: &RootScope<'_>,
) -> Option<compiler_plugin::GenResult> {
if file.get_service().is_empty() {
return None;
}

let base = protobuf::descriptorx::proto_path_to_rust_mod(file.get_name());

let mut v = Vec::new();
{
let mut w = CodeWriter::new(&mut v);
w.write_generated();
w.write_line("#![allow(unused_variables)]");
w.write_line("use futures::Future;");
w.write_line("");
for service in file.get_service() {
w.write_line("");
ClientTraitGen::new(service, file, root_scope).write(&mut w);
}
}

Some(compiler_plugin::GenResult {
name: base + "_client.rs",
content: v,
})
}

pub fn gen(
file_descriptors: &[FileDescriptorProto],
files_to_generate: &[&str],
) -> Vec<compiler_plugin::GenResult> {
let files_map: HashMap<&str, &FileDescriptorProto> =
file_descriptors.iter().map(|f| (f.get_name(), f)).collect();

let root_scope = RootScope { file_descriptors };

let mut results = Vec::new();

for file_name in files_to_generate {
// not all files need client stubs, some are simple protobufs, no service
let temp1 = file_name.to_string();
let file = files_map[&temp1[..]];

if file.get_service().is_empty() {
continue;
}

results.extend(gen_file(file, &root_scope).into_iter());
}

results
}
@@ -0,0 +1,71 @@
// Copyright (c) The Libra Core Contributors
// SPDX-License-Identifier: Apache-2.0

use protoc_grpcio::CompileResult;
use std::path::Path;

/// This crate provides a library for generating a Client trait for GRPC clients
/// generated by grpc-rs (protoc-grpcio)
///
/// This crate compliments the functionality provided by `protoc-grpcio` by defining a Trait for
/// the GRPC client service that can be used instead of the client directly for polymorphism and
/// testing.
///
///
/// ## Usage Example
///
/// To generate client trait as part of `build.rs` script, add:
///
/// ```ignore,no_run
/// grpcio_client::client_stub_gen(
/// &["calculator.proto"], /* service files to generate traits for */
/// &["src/proto", "../deps/src/proto"], /* proto paths & includes */
/// "src/proto", /* target dir */
/// );
/// ```
///
/// This will create the file `calculator_client.rs` under `src/proto` folder.
///
/// The generated file will include 2 structures:
/// ```rust
/// // assuming the service name is `Calculator`
/// pub trait CalculatorClientTrait {
/// // methods
/// }
/// ```
/// and
///
/// ```rust
/// # struct CalculatorClient;
/// # pub trait CalculatorClientTrait {
/// # // methods
/// # }
///
/// impl CalculatorClientTrait for CalculatorClient {
/// // method impl -- calling method from client
/// }
/// ```
mod codegen;
mod util;

/// Generate client trait for the GRPC Client
/// * `from` - the files with the services to generate client traits for
/// * `includes` - a vector of the parent folder of the files from `from` and all their includes.
/// * `to` - a path to a folder to store the generated files.
///
/// Generates client trait for the GRPC service defined in the first argument.
/// `from` argument includes
///
/// ## Example use:
/// client_stub_gen(&["src/proto/myservice.proto"], vec![], "src/proto");
pub fn client_stub_gen<P: AsRef<Path>>(
from: &[&str],
includes: &[&str],
to: P,
) -> CompileResult<()> {
let descriptor_set = util::protoc_descriptor_set(from, includes)?;
util::write_out_generated_files(codegen::gen(descriptor_set.get_file(), from), &to)
.expect("failed to write generated grpc definitions");

Ok(())
}
@@ -0,0 +1,97 @@
// Copyright (c) The Libra Core Contributors
// SPDX-License-Identifier: Apache-2.0

use mktemp::Temp;
use protobuf::{compiler_plugin, descriptor::FileDescriptorSet, error::ProtobufError};
use protoc::{DescriptorSetOutArgs, Protoc};
use protoc_grpcio::CompileResult;
use regex::Regex;
use std::{
fs::File,
io::{Read, Write},
path::Path,
};

/// copied from protoc-grpcio
/// it's not public there
pub(crate) fn write_out_generated_files<P>(
generation_results: Vec<compiler_plugin::GenResult>,
output_dir: P,
) -> CompileResult<()>
where
P: AsRef<Path>,
{
for result in generation_results {
let file = output_dir.as_ref().join(result.name);
File::create(&file)
.expect("failed to create file")
.write_all(&result.content)
.expect("failed to write file");
}

Ok(())
}

/// Generate snake case names. This is useful
/// helloWorldFoo => hello_world_foo
/// ID => :( not working for this case
///
/// This needs to be the same as in grpcio-compiler, but I
/// didn't copy it.
pub fn to_snake_case(name: &str) -> String {
let re = Regex::new("((:?^|(:?[A-Z]))[a-z0-9_]+)").unwrap();
let mut words = vec![];
for cap in re.captures_iter(name) {
words.push(cap.get(1).unwrap().as_str().to_lowercase());
}
words.join("_") // my best line of code
}

// TODO: frumious: make camel case
pub fn to_camel_case(name: &str) -> String {
// do nothing for now
name.to_string()
}

pub fn fq_grpc(item: &str) -> String {
format!("::grpcio::{}", item)
}

pub enum MethodType {
Unary,
ClientStreaming,
ServerStreaming,
Duplex,
}

pub fn protoc_descriptor_set(
from: &[&str],
includes: &[&str],
) -> Result<FileDescriptorSet, ProtobufError> {
let protoc = Protoc::from_env_path();
protoc
.check()
.expect("failed to find `protoc`, `protoc` must be availabe in `PATH`");

let descriptor_set = Temp::new_file()?;

protoc
.write_descriptor_set(DescriptorSetOutArgs {
out: match descriptor_set.as_ref().to_str() {
Some(s) => s,
None => unreachable!("failed to convert path to string"),
},
input: from,
includes,
include_imports: true,
})
.expect("failed to write descriptor set");

let mut serialized_descriptor_set = Vec::new();
File::open(&descriptor_set)
.expect("failed to open descriptor set")
.read_to_end(&mut serialized_descriptor_set)
.expect("failed to read descriptor set");

protobuf::parse_from_bytes::<FileDescriptorSet>(&serialized_descriptor_set)
}
@@ -0,0 +1,11 @@
[package]
name = "grpcio-extras"
version = "0.1.0"
authors = ["Libra Association <opensource@libra.org>"]
license = "Apache-2.0"
publish = false
edition = "2018"

[dependencies]
futures = "0.1.25"
grpcio = "0.4.3"
@@ -0,0 +1,14 @@
// Copyright (c) The Libra Core Contributors
// SPDX-License-Identifier: Apache-2.0

use futures::Future;

pub trait Cancelable {
fn cancel(&mut self);
}

impl<T: Future> Cancelable for T {
fn cancel(&mut self) {
unimplemented!();
}
}
@@ -0,0 +1,11 @@
[package]
name = "jemalloc"
version = "0.1.0"
authors = ["Libra Association <opensource@libra.org>"]
license = "Apache-2.0"
publish = false
edition = "2018"

[dependencies]
jemalloc-sys= { version = "0.1.8", features = ["profiling", "unprefixed_malloc_on_supported_platforms"] }
jemallocator = { version = "0.1.8", features = ["alloc_trait", "profiling"] }
@@ -0,0 +1,37 @@
// Copyright (c) The Libra Core Contributors
// SPDX-License-Identifier: Apache-2.0

use std::{ffi::CString, ptr};

#[global_allocator]
static ALLOC: jemallocator::Jemalloc = jemallocator::Jemalloc;

/// Tell jemalloc to dump the heap memory profile. This only works if
/// the binary is started with environment variable
///
/// MALLOC_CONF="prof:true,prof_prefix:jeprof.out"
///
/// Calling this function will cause jemalloc to write the memory
/// profile to the current working directory. Then one can process the
/// heap profile to various format with the 'jeprof' utility. ie
///
/// jeprof --pdf target/debug/libra_node jeprof.out.2141437.2.m2.heap > out.pdf
///
/// Returns the error code coming out of jemalloc if heap dump fails.
pub fn dump_jemalloc_memory_profile() -> Result<(), i32> {
let opt_name = CString::new("prof.dump").expect("CString::new failed.");
unsafe {
let err_code = jemalloc_sys::mallctl(
opt_name.as_ptr(),
ptr::null_mut(),
ptr::null_mut(),
ptr::null_mut(),
0,
);
if err_code == 0 {
Ok(())
} else {
Err(err_code)
}
}
}