Skip to content

Commit

Permalink
move decrypt task to block proposer
Browse files Browse the repository at this point in the history
remove unused import
remove block proposer infinite loop
  • Loading branch information
raynear committed Sep 18, 2023
1 parent 936c7d9 commit c0bd2ee
Show file tree
Hide file tree
Showing 6 changed files with 151 additions and 124 deletions.
165 changes: 136 additions & 29 deletions crates/client/block-proposer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,11 @@ use futures::{future, select};
use hyper::header::{HeaderValue, AUTHORIZATION, CONTENT_TYPE};
use hyper::{Body, Client, Request};
use log::{debug, error, info, trace, warn};
use mc_transaction_pool::{EPool, EncryptedPool, EncryptedTransactionPool};
use mp_starknet::transaction::types::{Transaction as MPTransaction, TxType};
use mc_rpc::submit_extrinsic_with_order;
use mc_transaction_pool::decryptor::Decryptor;
use mc_transaction_pool::{EPool, EncryptedTransactionPool};
use mp_starknet::execution::types::Felt252Wrapper;
use mp_starknet::transaction::types::{EncryptedInvokeTransaction, Transaction as MPTransaction, TxType};
use pallet_starknet::runtime_api::{ConvertTransactionRuntimeApi, StarknetRuntimeApi};
use prometheus_endpoint::Registry as PrometheusRegistry;
use sc_block_builder::{BlockBuilderApi, BlockBuilderProvider};
Expand All @@ -38,7 +41,6 @@ use sp_runtime::generic::BlockId as SPBlockId;
use sp_runtime::traits::{Block as BlockT, Header as HeaderT};
use sp_runtime::{Digest, Percent, SaturatedConversion};
use tokio;
use tokio::sync::Mutex;

/// Default block size limit in bytes used by [`Proposer`].
///
Expand Down Expand Up @@ -261,7 +263,7 @@ const MAX_SKIPPED_TRANSACTIONS: usize = 8;

impl<A, B, Block, C, PR> Proposer<B, Block, C, A, PR>
where
A: EncryptedTransactionPool<Block = Block> + EPool,
A: EncryptedTransactionPool<Block = Block> + EPool + 'static,
B: backend::Backend<Block> + Send + Sync + 'static,
Block: BlockT,
C: BlockBuilderProvider<B, Block, C> + HeaderBackend<Block> + ProvideRuntimeApi<Block> + Send + Sync + 'static,
Expand Down Expand Up @@ -340,8 +342,6 @@ where
// Print the summary of the proposal.
self.print_summary(&block, end_reason, block_took, propose_with_timer.elapsed());

// println!("start block_height {}", self.parent_number.to_string().parse::<u64>().unwrap());

Ok(Proposal { block, proof, storage_changes })
}

Expand Down Expand Up @@ -406,23 +406,27 @@ where
block_size_limit: Option<usize>,
) -> Result<EndProposingReason, sp_blockchain::Error> {
let epool = self.transaction_pool.epool().clone();
let block_height = self.parent_number.to_string().parse::<u64>().unwrap() + 1;

let mut enabled = false;
{
let enabled = {
let lock = epool.lock().await;
enabled = lock.is_enabled();
}
lock.is_enabled()
};

if enabled {
// wait decryption
let block_height = self.parent_number.to_string().parse::<u64>().unwrap() + 1;
let closed = {
let lock = epool.lock().await;
let exist = lock.exist(block_height);
if exist { lock.is_closed(block_height).unwrap() } else { false }
};

if enabled && !closed {
// add temporary pool tx to pool
let mut temporary_pool: Vec<(u64, MPTransaction)> = vec![];
{
let mut lock = epool.lock().await;
if lock.exist(block_height) {
println!("close on {}", block_height);
lock.close(block_height);
let _ = lock.close(block_height);

let txs = lock.get_txs(block_height).unwrap();

Expand Down Expand Up @@ -450,24 +454,127 @@ where
.await;
}

let mut interval = tokio::time::interval(tokio::time::Duration::from_millis(100));
let cnt = {
let lock = epool.lock().await;

loop {
{
let lock = epool.lock().await;
if lock.exist(block_height) {
let tx_cnt = lock.get_tx_cnt(block_height);
let dec_cnt = lock.get_decrypted_cnt(block_height);
let ready_cnt = self.transaction_pool.status().ready as u64;
// println!("{} waiting {}:{}:{}", block_height, tx_cnt, dec_cnt, ready_cnt);
if tx_cnt == dec_cnt && dec_cnt == ready_cnt {
break;
}
} else {
break;
lock.len(block_height) as u64
};

for order in 0..cnt {
let block_height = self.parent_number.to_string().parse::<u64>().unwrap() + 1;
let best_block_hash = self.client.info().best_hash;
let client = self.client.clone();
let pool = self.transaction_pool.clone();
let chain_id = Felt252Wrapper(client.runtime_api().chain_id(best_block_hash).unwrap().into());
let epool = self.transaction_pool.epool().clone();
self.spawn_handle.spawn_blocking(
"Decryptor",
None,
Box::pin(
// tokio::task::spawn(
async move {
tokio::time::sleep(Duration::from_secs(1)).await;

println!("stompesi - start delay function block_height {} order {}", block_height, order);

let encrypted_invoke_transaction: EncryptedInvokeTransaction;
{
let lock = epool.lock().await;
// println!("check key_received on block_height {} order {}", block_height, order);
let did_received_key = lock.get_key_received(block_height, order);

if did_received_key == true {
println!("Received key");
return;
}
println!("Not received key");
encrypted_invoke_transaction = lock.get(block_height, order).unwrap().clone();
}

let decryptor = Decryptor::new();
let invoke_tx = decryptor
.decrypt_encrypted_invoke_transaction(encrypted_invoke_transaction, None)
.await;
// println!("decrypt done on block_height {} order {}", block_height, order);

{
let mut lock = epool.lock().await;
// println!("check key_received on block_height {} order {}", block_height, order);
let did_received_key = lock.get_key_received(block_height, order);

if did_received_key == true {
println!("Received key");
return;
}

lock.increase_decrypted_cnt(block_height);

let previous_closed = match lock.is_closed(block_height - 1) {
Ok(closed) => {
if closed {
println!("{} is closed", block_height - 1);
} else {
println!("{} is not closed", block_height - 1);
};
closed
}
Err(_e) => {
println!("no state for {}", block_height - 1);
false
}
};

if previous_closed {
let mut txs = match lock.get_txs(block_height) {
Ok(txs) => txs.clone(),
Err(_e) => panic!("no case of this"),
};

// let transaction: MPTransaction = invoke_tx.from_invoke(chain_id);
// let extrinsic = convert_transaction(client, best_block_hash,
// transaction.clone(), TxType::Invoke)
// .await .expect("Failed to submit
// extrinsic");

let transaction: MPTransaction = invoke_tx.from_invoke(chain_id);
println!(
"{} is closed.. push on temporary pool of {}",
block_height - 1,
block_height
);
txs.add_tx_to_temporary_pool(order, transaction);
return;
}
}

let transaction: MPTransaction = invoke_tx.from_invoke(chain_id);
let extrinsic = client
.runtime_api()
.convert_transaction(best_block_hash, transaction.clone(), TxType::Invoke)
.unwrap()
.expect("Failed to submit extrinsic");

submit_extrinsic_with_order(pool, best_block_hash, extrinsic, order)
.await
.expect("Failed to submit extrinsic");
},
),
)
}
}

if enabled {
{
let lock = epool.lock().await;
if lock.exist(block_height) {
let tx_cnt = lock.get_tx_cnt(block_height);
let dec_cnt = lock.get_decrypted_cnt(block_height);
let ready_cnt = self.transaction_pool.status().ready as u64;
println!("{} waiting {}:{}:{}", block_height, tx_cnt, dec_cnt, ready_cnt);
if !(tx_cnt == dec_cnt && dec_cnt == ready_cnt) {
return Err(sp_blockchain::Error::TransactionPoolNotReady);
}
}
interval.tick().await;
}
}

Expand Down
97 changes: 9 additions & 88 deletions crates/client/rpc/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,6 @@ mod types;
use core::str::FromStr;
use std::marker::PhantomData;
use std::sync::Arc;
use std::thread;
use std::time::Duration;

use encryptor::SequencerPoseidonEncryption;
use errors::StarknetRpcApiError;
Expand All @@ -28,7 +26,7 @@ use mc_rpc_core::Felt;
pub use mc_rpc_core::StarknetRpcApiServer;
use mc_storage::OverrideHandle;
use mc_transaction_pool::decryptor::Decryptor;
use mc_transaction_pool::{ChainApi, EPool, EncryptedPool, EncryptedTransactionPool, Pool};
use mc_transaction_pool::{ChainApi, EPool, EncryptedTransactionPool, Pool};
use mp_starknet::crypto::merkle_patricia_tree::merkle_tree::ProofNode;
use mp_starknet::execution::types::Felt252Wrapper;
use mp_starknet::traits::hash::HasherT;
Expand Down Expand Up @@ -57,8 +55,6 @@ use starknet_core::types::{
Transaction, TransactionStatus,
};
use starknet_crypto::{get_public_key, sign, verify};
use tokio;
use tokio::sync::Mutex;
use vdf::{ReturnData, VDF};

use crate::constants::{MAX_EVENTS_CHUNK_SIZE, MAX_EVENTS_KEYS, MAX_STORAGE_PROOF_KEYS_BY_QUERY};
Expand Down Expand Up @@ -488,10 +484,10 @@ where
let extrinsic =
convert_transaction(self.client.clone(), best_block_hash, transaction.clone(), TxType::Invoke).await?;

let mut block_number = self.current_block_number().unwrap() + 1;
let block_number = self.current_block_number().unwrap() + 1;
let epool = self.pool.epool().clone();

let mut order;
let order;
{
let mut lock = epool.lock().await;
lock.initialize_if_not_exist(block_number);
Expand Down Expand Up @@ -543,15 +539,15 @@ where
convert_transaction(self.client.clone(), best_block_hash, transaction.clone(), TxType::DeployAccount)
.await?;

let mut block_number = self.current_block_number().unwrap() + 1;
let block_number = self.current_block_number().unwrap() + 1;
let epool = self.pool.epool().clone();

let mut order;
let order;
{
let mut lock = epool.lock().await;
lock.initialize_if_not_exist(block_number);
order = lock.get_order(block_number);
lock.increase_not_encrypted_cnt(block_number);
let _ = lock.increase_not_encrypted_cnt(block_number);
}

submit_extrinsic_with_order(self.pool.clone(), best_block_hash, extrinsic, order).await?;
Expand Down Expand Up @@ -829,15 +825,15 @@ where
let extrinsic =
convert_transaction(self.client.clone(), best_block_hash, transaction.clone(), TxType::Declare).await?;

let mut block_number = self.current_block_number().unwrap() + 1;
let block_number = self.current_block_number().unwrap() + 1;
let epool = self.pool.epool().clone();

let mut order;
let order;
{
let mut lock = epool.lock().await;
lock.initialize_if_not_exist(block_number);
order = lock.get_order(block_number);
lock.increase_not_encrypted_cnt(block_number);
let _ = lock.increase_not_encrypted_cnt(block_number);
}

submit_extrinsic_with_order(self.pool.clone(), best_block_hash, extrinsic, order).await?;
Expand Down Expand Up @@ -1148,81 +1144,6 @@ where
let client = self.client.clone();
let pool = self.pool.clone();

tokio::task::spawn(async move {
tokio::time::sleep(Duration::from_secs(1)).await;

println!("stompesi - start delay function");

let encrypted_invoke_transaction: EncryptedInvokeTransaction;
{
let mut lock = epool.lock().await;
let did_received_key = lock.get_key_received(block_number, order);

if did_received_key == true {
println!("Received key");
return;
}
println!("Not received key");
encrypted_invoke_transaction = lock.get(block_number, order).unwrap().clone();
}

let decryptor = Decryptor::new();
let invoke_tx = decryptor.decrypt_encrypted_invoke_transaction(encrypted_invoke_transaction, None).await;

{
let mut lock = epool.lock().await;
let did_received_key = lock.get_key_received(block_number, order);

if did_received_key == true {
println!("Received key");
return;
}

lock.increase_decrypted_cnt(block_number);

let previous_closed = match lock.is_closed(block_number - 1) {
Ok(closed) => {
if closed {
println!("{} is closed", block_number - 1);
} else {
println!("{} is not closed", block_number - 1);
};
closed
}
Err(e) => {
println!("no state for {}", block_number - 1);
false
}
};

if previous_closed {
let mut txs = match lock.get_txs(block_number) {
Ok(txs) => txs.clone(),
Err(e) => panic!("no case of this"),
};

// let transaction: MPTransaction = invoke_tx.from_invoke(chain_id);
// let extrinsic = convert_transaction(client, best_block_hash, transaction.clone(), TxType::Invoke)
// .await
// .expect("Failed to submit extrinsic");

let transaction: MPTransaction = invoke_tx.from_invoke(chain_id);
println!("{} is closed.. push on temporary pool of {}", block_number - 1, block_number);
txs.add_tx_to_temporary_pool(order, transaction);
return;
}
}

let transaction: MPTransaction = invoke_tx.from_invoke(chain_id);
let extrinsic = convert_transaction(client, best_block_hash, transaction.clone(), TxType::Invoke)
.await
.expect("Failed to submit extrinsic");

submit_extrinsic_with_order(pool, best_block_hash, extrinsic, order)
.await
.expect("Failed to submit extrinsic");
});

// Generate commitment

// 1. Get sequencer private key
Expand Down
Loading

0 comments on commit c0bd2ee

Please sign in to comment.