Skip to content
This repository has been archived by the owner on Feb 21, 2024. It is now read-only.

Commit

Permalink
Merge pull request paritytech#185 from subspace/farmer-farming-test-fix
Browse files Browse the repository at this point in the history
  • Loading branch information
ozgunozerk committed Dec 9, 2021
2 parents 09bbf5d + 8959cf9 commit 19051a0
Show file tree
Hide file tree
Showing 3 changed files with 69 additions and 11 deletions.
43 changes: 43 additions & 0 deletions crates/subspace-farmer/src/commitments.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ use crate::plot::Plot;
use async_lock::Mutex;
use async_std::io;
use async_std::path::PathBuf;
#[cfg(test)]
use log::info;
use log::{error, trace};
use lru::LruCache;
use rayon::prelude::*;
Expand All @@ -14,6 +16,8 @@ use std::collections::HashMap;
use std::sync::Arc;
use subspace_core_primitives::{FlatPieces, Salt, Tag, PIECE_SIZE};
use thiserror::Error;
#[cfg(test)]
use tokio::{sync::mpsc, time::sleep, time::Duration};

const BATCH_SIZE: u64 = (16 * 1024 * 1024 / PIECE_SIZE) as u64;
const COMMITMENTS_CACHE_SIZE: usize = 2;
Expand Down Expand Up @@ -411,4 +415,43 @@ impl Commitments {

solutions_fut.await.unwrap().into_iter().next()
}

#[cfg(test)]
pub async fn on_recommitment(&self, salt: Salt) -> mpsc::Receiver<()> {
let (sender, receiver) = mpsc::channel(1);

let commitment_clone = self.clone();
tokio::spawn(async move {
loop {
let guard = commitment_clone.inner.commitment_databases.lock().await;
let status = guard.metadata_cache.get(&salt);
if status.is_none() {
info!(
"Could not retrieve the DB with salt: {:?}, will try again VERY soon...",
salt
);
drop(guard);
sleep(Duration::from_millis(100)).await;
continue;
}
info!("Successfully retrieved the DB with salt: {:?}", salt);
match status.unwrap() {
CommitmentStatus::InProgress => {
// drop the guard, so commitment can make progress
drop(guard);
sleep(Duration::from_millis(100)).await;
}
CommitmentStatus::Created => {
sender
.send(())
.await
.expect("Cannot send the notification to the test environment!");
break;
}
}
}
});

receiver
}
}
6 changes: 3 additions & 3 deletions crates/subspace-farmer/src/farming.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ pub enum FarmingError {
/// Farming Instance that stores a channel to stop/pause the background farming task
/// and a handle to make it possible to wait on this background task
pub struct Farming {
stop_sender: Option<async_oneshot::Sender<()>>,
stop_sender: async_oneshot::Sender<()>,
handle: Option<JoinHandle<Result<(), FarmingError>>>,
}

Expand Down Expand Up @@ -70,7 +70,7 @@ impl Farming {
});

Farming {
stop_sender: Some(stop_sender),
stop_sender,
handle: Some(farming_handle),
}
}
Expand All @@ -87,7 +87,7 @@ impl Farming {

impl Drop for Farming {
fn drop(&mut self) {
let _ = self.stop_sender.take().unwrap().send(());
let _ = self.stop_sender.send(());
}
}

Expand Down
31 changes: 23 additions & 8 deletions crates/subspace-farmer/src/farming/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,25 +44,40 @@ async fn farming_simulator(slots: Vec<SlotInfo>, tags: Vec<Tag>) {
identity.clone(),
);

let mut counter = 0;
let mut latest_salt = slots.first().unwrap().salt;
for (slot, tag) in slots.into_iter().zip(tags) {
let client_copy = client.clone();
async move {
// commitment in the background cannot keep up with the speed, so putting a little delay in here
// commitment usually takes around 0.002-0.003 second on my machine (M1 iMac), putting 100 microseconds here to be safe
sleep(Duration::from_millis(100)).await;
counter += 1;
async {
client_copy.send_slot(slot.clone()).await;

// if salt will change, wait for background recommitment to finish first
if slot.next_salt.unwrap() != latest_salt {
latest_salt = slot.next_salt.unwrap();
let mut current_commitment_notifier = commitments.clone().on_recommitment(slot.salt).await;
let mut upcoming_commitment_notifier = commitments.clone().on_recommitment(latest_salt).await;
tokio::select! {
_ = current_commitment_notifier.recv() => {
// also wait for the recommitment for the upcoming salt
// it locks the commitment database, and causing racy behavior
upcoming_commitment_notifier.recv().await;
},
_ = sleep(Duration::from_secs(3)) => { panic!("Cannot finish recommitments......"); }
}
}

tokio::select! {
Some(solution) = client_copy.receive_solution() => {
if let Some(solution) = solution.maybe_solution {
if solution.tag != tag {
panic!("Wrong Tag! The expected value was: {:?}", tag);
}
} else {
panic!("Solution was None!")
panic!("Solution was None! For challenge #: {}", counter);
}
},
_ = sleep(Duration::from_secs(1)) => {},
_ = sleep(Duration::from_secs(1)) => { panic!("Something is taking too much time!"); },
}
}
.await;
Expand All @@ -83,7 +98,7 @@ async fn farming_happy_path() {
slot_number: 3,
global_challenge: [1; TAG_SIZE],
salt: [1, 1, 1, 1, 1, 1, 1, 1],
next_salt: None,
next_salt: Some([1, 1, 1, 1, 1, 1, 1, 2]),
solution_range: u64::MAX,
};
let slots = vec![slot_info];
Expand Down Expand Up @@ -114,7 +129,7 @@ async fn farming_salt_change() {
slot_number: 3,
global_challenge: [1; TAG_SIZE],
salt: [1, 1, 1, 1, 1, 1, 1, 2],
next_salt: None,
next_salt: Some([1, 1, 1, 1, 1, 1, 1, 2]),
solution_range: u64::MAX,
};
let slots = vec![first_slot, second_slot, third_slot];
Expand Down

0 comments on commit 19051a0

Please sign in to comment.