From 6f5cb279083ee3b8b47f849e111019dfdea9c3b3 Mon Sep 17 00:00:00 2001 From: Jose Celano Date: Wed, 7 May 2025 10:03:18 +0100 Subject: [PATCH 1/7] refactor: [#1495] remove test for SwarmHandle Integration tests will be removed becuase unit tests have been added. Besides, there is no point in testing only the wrapper. SwarmHandle in only a wrapper over Swarm. --- .../torrent-repository/tests/common/mod.rs | 1 - .../tests/common/torrent.rs | 71 ---------- .../torrent-repository/tests/entry/mod.rs | 127 ++++++++---------- 3 files changed, 55 insertions(+), 144 deletions(-) delete mode 100644 packages/torrent-repository/tests/common/torrent.rs diff --git a/packages/torrent-repository/tests/common/mod.rs b/packages/torrent-repository/tests/common/mod.rs index e083a05cc..c77ca2769 100644 --- a/packages/torrent-repository/tests/common/mod.rs +++ b/packages/torrent-repository/tests/common/mod.rs @@ -1,2 +1 @@ -pub mod torrent; pub mod torrent_peer_builder; diff --git a/packages/torrent-repository/tests/common/torrent.rs b/packages/torrent-repository/tests/common/torrent.rs deleted file mode 100644 index a1899621f..000000000 --- a/packages/torrent-repository/tests/common/torrent.rs +++ /dev/null @@ -1,71 +0,0 @@ -use std::net::SocketAddr; -use std::sync::Arc; - -use torrust_tracker_configuration::TrackerPolicy; -use torrust_tracker_primitives::swarm_metadata::SwarmMetadata; -use torrust_tracker_primitives::{peer, DurationSinceUnixEpoch}; -use torrust_tracker_torrent_repository::{swarm, LockTrackedTorrent, SwarmHandle}; - -#[derive(Debug, Clone)] -pub(crate) enum Torrent { - Single(swarm::Swarm), - MutexStd(SwarmHandle), -} - -impl Torrent { - pub(crate) fn get_stats(&self) -> SwarmMetadata { - match self { - Torrent::Single(entry) => entry.metadata(), - Torrent::MutexStd(entry) => entry.lock_or_panic().metadata(), - } - } - - pub(crate) fn meets_retaining_policy(&self, policy: &TrackerPolicy) -> bool { - match self { - Torrent::Single(entry) => entry.meets_retaining_policy(policy), - Torrent::MutexStd(entry) => entry.lock_or_panic().meets_retaining_policy(policy), - } - } - - pub(crate) fn peers_is_empty(&self) -> bool { - match self { - Torrent::Single(entry) => entry.is_empty(), - Torrent::MutexStd(entry) => entry.lock_or_panic().is_empty(), - } - } - - pub(crate) fn get_peers_len(&self) -> usize { - match self { - Torrent::Single(entry) => entry.len(), - Torrent::MutexStd(entry) => entry.lock_or_panic().len(), - } - } - - pub(crate) fn get_peers(&self, limit: Option) -> Vec> { - match self { - Torrent::Single(entry) => entry.peers(limit), - Torrent::MutexStd(entry) => entry.lock_or_panic().peers(limit), - } - } - - pub(crate) fn get_peers_for_client(&self, client: &SocketAddr, limit: Option) -> Vec> { - match self { - Torrent::Single(entry) => entry.peers_excluding(client, limit), - Torrent::MutexStd(entry) => entry.lock_or_panic().peers_excluding(client, limit), - } - } - - pub(crate) fn upsert_peer(&mut self, peer: &peer::Peer) -> bool { - match self { - Torrent::Single(entry) => entry.handle_announcement(peer), - Torrent::MutexStd(entry) => entry.lock_or_panic().handle_announcement(peer), - } - } - - pub(crate) fn remove_inactive_peers(&mut self, current_cutoff: DurationSinceUnixEpoch) { - match self { - Torrent::Single(entry) => entry.remove_inactive(current_cutoff), - Torrent::MutexStd(entry) => entry.lock_or_panic().remove_inactive(current_cutoff), - } - } -} diff --git a/packages/torrent-repository/tests/entry/mod.rs b/packages/torrent-repository/tests/entry/mod.rs index 4607fd9c7..491b77a90 100644 --- a/packages/torrent-repository/tests/entry/mod.rs +++ b/packages/torrent-repository/tests/entry/mod.rs @@ -9,19 +9,14 @@ use torrust_tracker_clock::clock::{self, Time as _}; use torrust_tracker_configuration::{TrackerPolicy, TORRENT_PEERS_LIMIT}; use torrust_tracker_primitives::peer; use torrust_tracker_primitives::peer::Peer; -use torrust_tracker_torrent_repository::{swarm, SwarmHandle}; +use torrust_tracker_torrent_repository::Swarm; -use crate::common::torrent::Torrent; use crate::common::torrent_peer_builder::{a_completed_peer, a_started_peer}; use crate::CurrentClock; #[fixture] -fn single() -> Torrent { - Torrent::Single(swarm::Swarm::default()) -} -#[fixture] -fn mutex_std() -> Torrent { - Torrent::MutexStd(SwarmHandle::default()) +fn single() -> Swarm { + Swarm::default() } #[fixture] @@ -52,39 +47,39 @@ pub enum Makes { Three, } -fn make(torrent: &mut Torrent, makes: &Makes) -> Vec { +fn make(torrent: &mut Swarm, makes: &Makes) -> Vec { match makes { Makes::Empty => vec![], Makes::Started => { let peer = a_started_peer(1); - torrent.upsert_peer(&peer); + torrent.handle_announcement(&peer); vec![peer] } Makes::Completed => { let peer = a_completed_peer(2); - torrent.upsert_peer(&peer); + torrent.handle_announcement(&peer); vec![peer] } Makes::Downloaded => { let mut peer = a_started_peer(3); - torrent.upsert_peer(&peer); + torrent.handle_announcement(&peer); peer.event = AnnounceEvent::Completed; peer.left = NumberOfBytes::new(0); - torrent.upsert_peer(&peer); + torrent.handle_announcement(&peer); vec![peer] } Makes::Three => { let peer_1 = a_started_peer(1); - torrent.upsert_peer(&peer_1); + torrent.handle_announcement(&peer_1); let peer_2 = a_completed_peer(2); - torrent.upsert_peer(&peer_2); + torrent.handle_announcement(&peer_2); let mut peer_3 = a_started_peer(3); - torrent.upsert_peer(&peer_3); + torrent.handle_announcement(&peer_3); peer_3.event = AnnounceEvent::Completed; peer_3.left = NumberOfBytes::new(0); - torrent.upsert_peer(&peer_3); + torrent.handle_announcement(&peer_3); vec![peer_1, peer_2, peer_3] } } @@ -93,10 +88,10 @@ fn make(torrent: &mut Torrent, makes: &Makes) -> Vec { #[rstest] #[case::empty(&Makes::Empty)] #[tokio::test] -async fn it_should_be_empty_by_default(#[values(single(), mutex_std())] mut torrent: Torrent, #[case] makes: &Makes) { +async fn it_should_be_empty_by_default(#[values(single())] mut torrent: Swarm, #[case] makes: &Makes) { make(&mut torrent, makes); - assert_eq!(torrent.get_peers_len(), 0); + assert_eq!(torrent.len(), 0); } #[rstest] @@ -107,14 +102,14 @@ async fn it_should_be_empty_by_default(#[values(single(), mutex_std())] mut torr #[case::three(&Makes::Three)] #[tokio::test] async fn it_should_check_if_entry_should_be_retained_based_on_the_tracker_policy( - #[values(single(), mutex_std())] mut torrent: Torrent, + #[values(single())] mut torrent: Swarm, #[case] makes: &Makes, #[values(policy_none(), policy_persist(), policy_remove(), policy_remove_persist())] policy: TrackerPolicy, ) { make(&mut torrent, makes); - let has_peers = !torrent.peers_is_empty(); - let has_downloads = torrent.get_stats().downloaded != 0; + let has_peers = !torrent.is_empty(); + let has_downloads = torrent.metadata().downloaded != 0; match (policy.remove_peerless_torrents, policy.persistent_torrent_completed_stat) { // remove torrents without peers, and keep completed download stats @@ -144,10 +139,10 @@ async fn it_should_check_if_entry_should_be_retained_based_on_the_tracker_policy #[case::downloaded(&Makes::Downloaded)] #[case::three(&Makes::Three)] #[tokio::test] -async fn it_should_get_peers_for_torrent_entry(#[values(single(), mutex_std())] mut torrent: Torrent, #[case] makes: &Makes) { +async fn it_should_get_peers_for_torrent_entry(#[values(single())] mut torrent: Swarm, #[case] makes: &Makes) { let peers = make(&mut torrent, makes); - let torrent_peers = torrent.get_peers(None); + let torrent_peers = torrent.peers(None); assert_eq!(torrent_peers.len(), peers.len()); @@ -163,15 +158,15 @@ async fn it_should_get_peers_for_torrent_entry(#[values(single(), mutex_std())] #[case::downloaded(&Makes::Downloaded)] #[case::three(&Makes::Three)] #[tokio::test] -async fn it_should_update_a_peer(#[values(single(), mutex_std())] mut torrent: Torrent, #[case] makes: &Makes) { +async fn it_should_update_a_peer(#[values(single())] mut torrent: Swarm, #[case] makes: &Makes) { make(&mut torrent, makes); // Make and insert a new peer. let mut peer = a_started_peer(-1); - torrent.upsert_peer(&peer); + torrent.handle_announcement(&peer); // Get the Inserted Peer by Id. - let peers = torrent.get_peers(None); + let peers = torrent.peers(None); let original = peers .iter() .find(|p| peer::ReadInfo::get_id(*p) == peer::ReadInfo::get_id(&peer)) @@ -181,10 +176,10 @@ async fn it_should_update_a_peer(#[values(single(), mutex_std())] mut torrent: T // Announce "Completed" torrent download event. peer.event = AnnounceEvent::Completed; - torrent.upsert_peer(&peer); + torrent.handle_announcement(&peer); // Get the Updated Peer by Id. - let peers = torrent.get_peers(None); + let peers = torrent.peers(None); let updated = peers .iter() .find(|p| peer::ReadInfo::get_id(*p) == peer::ReadInfo::get_id(&peer)) @@ -200,20 +195,17 @@ async fn it_should_update_a_peer(#[values(single(), mutex_std())] mut torrent: T #[case::downloaded(&Makes::Downloaded)] #[case::three(&Makes::Three)] #[tokio::test] -async fn it_should_remove_a_peer_upon_stopped_announcement( - #[values(single(), mutex_std())] mut torrent: Torrent, - #[case] makes: &Makes, -) { +async fn it_should_remove_a_peer_upon_stopped_announcement(#[values(single())] mut torrent: Swarm, #[case] makes: &Makes) { use torrust_tracker_primitives::peer::ReadInfo as _; make(&mut torrent, makes); let mut peer = a_started_peer(-1); - torrent.upsert_peer(&peer); + torrent.handle_announcement(&peer); // The started peer should be inserted. - let peers = torrent.get_peers(None); + let peers = torrent.peers(None); let original = peers .iter() .find(|p| p.get_id() == peer.get_id()) @@ -223,10 +215,10 @@ async fn it_should_remove_a_peer_upon_stopped_announcement( // Change peer to "Stopped" and insert. peer.event = AnnounceEvent::Stopped; - torrent.upsert_peer(&peer); + torrent.handle_announcement(&peer); // It should be removed now. - let peers = torrent.get_peers(None); + let peers = torrent.peers(None); assert_eq!( peers.iter().find(|p| p.get_id() == peer.get_id()), @@ -242,13 +234,13 @@ async fn it_should_remove_a_peer_upon_stopped_announcement( #[case::three(&Makes::Three)] #[tokio::test] async fn it_should_handle_a_peer_completed_announcement_and_update_the_downloaded_statistic( - #[values(single(), mutex_std())] mut torrent: Torrent, + #[values(single())] mut torrent: Swarm, #[case] makes: &Makes, ) { make(&mut torrent, makes); - let downloaded = torrent.get_stats().downloaded; + let downloaded = torrent.metadata().downloaded; - let peers = torrent.get_peers(None); + let peers = torrent.peers(None); let mut peer = **peers.first().expect("there should be a peer"); let is_already_completed = peer.event == AnnounceEvent::Completed; @@ -256,8 +248,8 @@ async fn it_should_handle_a_peer_completed_announcement_and_update_the_downloade // Announce "Completed" torrent download event. peer.event = AnnounceEvent::Completed; - torrent.upsert_peer(&peer); - let stats = torrent.get_stats(); + torrent.handle_announcement(&peer); + let stats = torrent.metadata(); if is_already_completed { assert_eq!(stats.downloaded, downloaded); @@ -272,19 +264,19 @@ async fn it_should_handle_a_peer_completed_announcement_and_update_the_downloade #[case::downloaded(&Makes::Downloaded)] #[case::three(&Makes::Three)] #[tokio::test] -async fn it_should_update_a_peer_as_a_seeder(#[values(single(), mutex_std())] mut torrent: Torrent, #[case] makes: &Makes) { +async fn it_should_update_a_peer_as_a_seeder(#[values(single())] mut torrent: Swarm, #[case] makes: &Makes) { let peers = make(&mut torrent, makes); let completed = u32::try_from(peers.iter().filter(|p| p.is_seeder()).count()).expect("it_should_not_be_so_many"); - let peers = torrent.get_peers(None); + let peers = torrent.peers(None); let mut peer = **peers.first().expect("there should be a peer"); let is_already_non_left = peer.left == NumberOfBytes::new(0); // Set Bytes Left to Zero peer.left = NumberOfBytes::new(0); - torrent.upsert_peer(&peer); - let stats = torrent.get_stats(); + torrent.handle_announcement(&peer); + let stats = torrent.metadata(); if is_already_non_left { // it was already complete @@ -301,19 +293,19 @@ async fn it_should_update_a_peer_as_a_seeder(#[values(single(), mutex_std())] mu #[case::downloaded(&Makes::Downloaded)] #[case::three(&Makes::Three)] #[tokio::test] -async fn it_should_update_a_peer_as_incomplete(#[values(single(), mutex_std())] mut torrent: Torrent, #[case] makes: &Makes) { +async fn it_should_update_a_peer_as_incomplete(#[values(single())] mut torrent: Swarm, #[case] makes: &Makes) { let peers = make(&mut torrent, makes); let incomplete = u32::try_from(peers.iter().filter(|p| !p.is_seeder()).count()).expect("it should not be so many"); - let peers = torrent.get_peers(None); + let peers = torrent.peers(None); let mut peer = **peers.first().expect("there should be a peer"); let completed_already = peer.left == NumberOfBytes::new(0); // Set Bytes Left to no Zero peer.left = NumberOfBytes::new(1); - torrent.upsert_peer(&peer); - let stats = torrent.get_stats(); + torrent.handle_announcement(&peer); + let stats = torrent.metadata(); if completed_already { // now it is incomplete @@ -330,13 +322,10 @@ async fn it_should_update_a_peer_as_incomplete(#[values(single(), mutex_std())] #[case::downloaded(&Makes::Downloaded)] #[case::three(&Makes::Three)] #[tokio::test] -async fn it_should_get_peers_excluding_the_client_socket( - #[values(single(), mutex_std())] mut torrent: Torrent, - #[case] makes: &Makes, -) { +async fn it_should_get_peers_excluding_the_client_socket(#[values(single())] mut torrent: Swarm, #[case] makes: &Makes) { make(&mut torrent, makes); - let peers = torrent.get_peers(None); + let peers = torrent.peers(None); let mut peer = **peers.first().expect("there should be a peer"); let socket = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8081); @@ -345,14 +334,14 @@ async fn it_should_get_peers_excluding_the_client_socket( assert_ne!(peer.peer_addr, socket); // it should get the peer as it dose not share the socket. - assert!(torrent.get_peers_for_client(&socket, None).contains(&peer.into())); + assert!(torrent.peers_excluding(&socket, None).contains(&peer.into())); // set the address to the socket. peer.peer_addr = socket; - torrent.upsert_peer(&peer); // Add peer + torrent.handle_announcement(&peer); // Add peer // It should not include the peer that has the same socket. - assert!(!torrent.get_peers_for_client(&socket, None).contains(&peer.into())); + assert!(!torrent.peers_excluding(&socket, None).contains(&peer.into())); } #[rstest] @@ -362,19 +351,16 @@ async fn it_should_get_peers_excluding_the_client_socket( #[case::downloaded(&Makes::Downloaded)] #[case::three(&Makes::Three)] #[tokio::test] -async fn it_should_limit_the_number_of_peers_returned( - #[values(single(), mutex_std())] mut torrent: Torrent, - #[case] makes: &Makes, -) { +async fn it_should_limit_the_number_of_peers_returned(#[values(single())] mut torrent: Swarm, #[case] makes: &Makes) { make(&mut torrent, makes); // We add one more peer than the scrape limit for peer_number in 1..=74 + 1 { let peer = a_started_peer(peer_number); - torrent.upsert_peer(&peer); + torrent.handle_announcement(&peer); } - let peers = torrent.get_peers(Some(TORRENT_PEERS_LIMIT)); + let peers = torrent.peers(Some(TORRENT_PEERS_LIMIT)); assert_eq!(peers.len(), 74); } @@ -386,10 +372,7 @@ async fn it_should_limit_the_number_of_peers_returned( #[case::downloaded(&Makes::Downloaded)] #[case::three(&Makes::Three)] #[tokio::test] -async fn it_should_remove_inactive_peers_beyond_cutoff( - #[values(single(), mutex_std())] mut torrent: Torrent, - #[case] makes: &Makes, -) { +async fn it_should_remove_inactive_peers_beyond_cutoff(#[values(single())] mut torrent: Swarm, #[case] makes: &Makes) { const TIMEOUT: Duration = Duration::from_secs(120); const EXPIRE: Duration = Duration::from_secs(121); @@ -402,12 +385,12 @@ async fn it_should_remove_inactive_peers_beyond_cutoff( peer.updated = now.sub(EXPIRE); - torrent.upsert_peer(&peer); + torrent.handle_announcement(&peer); - assert_eq!(torrent.get_peers_len(), peers.len() + 1); + assert_eq!(torrent.len(), peers.len() + 1); let current_cutoff = CurrentClock::now_sub(&TIMEOUT).unwrap_or_default(); - torrent.remove_inactive_peers(current_cutoff); + torrent.remove_inactive(current_cutoff); - assert_eq!(torrent.get_peers_len(), peers.len()); + assert_eq!(torrent.len(), peers.len()); } From 5413e597b7054a4ea7f32a4f36ce9b801c78e832 Mon Sep 17 00:00:00 2001 From: Jose Celano Date: Wed, 7 May 2025 10:15:40 +0100 Subject: [PATCH 2/7] refactor: [#1495] renamings to follow latest changes in torrent-repository pkg --- .../torrent-repository/tests/integration.rs | 4 +- .../tests/{entry => swarm}/mod.rs | 128 +++++++++--------- .../tests/{repository => swarms}/mod.rs | 99 +++++++------- .../src/torrent/repository/in_memory.rs | 26 ++-- 4 files changed, 130 insertions(+), 127 deletions(-) rename packages/torrent-repository/tests/{entry => swarm}/mod.rs (73%) rename packages/torrent-repository/tests/{repository => swarms}/mod.rs (81%) diff --git a/packages/torrent-repository/tests/integration.rs b/packages/torrent-repository/tests/integration.rs index 5aab67b03..b3e057075 100644 --- a/packages/torrent-repository/tests/integration.rs +++ b/packages/torrent-repository/tests/integration.rs @@ -7,8 +7,8 @@ use torrust_tracker_clock::clock; pub mod common; -mod entry; -mod repository; +mod swarm; +mod swarms; /// This code needs to be copied into each crate. /// Working version, for production. diff --git a/packages/torrent-repository/tests/entry/mod.rs b/packages/torrent-repository/tests/swarm/mod.rs similarity index 73% rename from packages/torrent-repository/tests/entry/mod.rs rename to packages/torrent-repository/tests/swarm/mod.rs index 491b77a90..d529b0243 100644 --- a/packages/torrent-repository/tests/entry/mod.rs +++ b/packages/torrent-repository/tests/swarm/mod.rs @@ -15,7 +15,7 @@ use crate::common::torrent_peer_builder::{a_completed_peer, a_started_peer}; use crate::CurrentClock; #[fixture] -fn single() -> Swarm { +fn swarm() -> Swarm { Swarm::default() } @@ -47,39 +47,39 @@ pub enum Makes { Three, } -fn make(torrent: &mut Swarm, makes: &Makes) -> Vec { +fn make(swarm: &mut Swarm, makes: &Makes) -> Vec { match makes { Makes::Empty => vec![], Makes::Started => { let peer = a_started_peer(1); - torrent.handle_announcement(&peer); + swarm.handle_announcement(&peer); vec![peer] } Makes::Completed => { let peer = a_completed_peer(2); - torrent.handle_announcement(&peer); + swarm.handle_announcement(&peer); vec![peer] } Makes::Downloaded => { let mut peer = a_started_peer(3); - torrent.handle_announcement(&peer); + swarm.handle_announcement(&peer); peer.event = AnnounceEvent::Completed; peer.left = NumberOfBytes::new(0); - torrent.handle_announcement(&peer); + swarm.handle_announcement(&peer); vec![peer] } Makes::Three => { let peer_1 = a_started_peer(1); - torrent.handle_announcement(&peer_1); + swarm.handle_announcement(&peer_1); let peer_2 = a_completed_peer(2); - torrent.handle_announcement(&peer_2); + swarm.handle_announcement(&peer_2); let mut peer_3 = a_started_peer(3); - torrent.handle_announcement(&peer_3); + swarm.handle_announcement(&peer_3); peer_3.event = AnnounceEvent::Completed; peer_3.left = NumberOfBytes::new(0); - torrent.handle_announcement(&peer_3); + swarm.handle_announcement(&peer_3); vec![peer_1, peer_2, peer_3] } } @@ -88,10 +88,10 @@ fn make(torrent: &mut Swarm, makes: &Makes) -> Vec { #[rstest] #[case::empty(&Makes::Empty)] #[tokio::test] -async fn it_should_be_empty_by_default(#[values(single())] mut torrent: Swarm, #[case] makes: &Makes) { - make(&mut torrent, makes); +async fn it_should_be_empty_by_default(#[values(swarm())] mut swarm: Swarm, #[case] makes: &Makes) { + make(&mut swarm, makes); - assert_eq!(torrent.len(), 0); + assert_eq!(swarm.len(), 0); } #[rstest] @@ -102,33 +102,33 @@ async fn it_should_be_empty_by_default(#[values(single())] mut torrent: Swarm, # #[case::three(&Makes::Three)] #[tokio::test] async fn it_should_check_if_entry_should_be_retained_based_on_the_tracker_policy( - #[values(single())] mut torrent: Swarm, + #[values(swarm())] mut swarm: Swarm, #[case] makes: &Makes, #[values(policy_none(), policy_persist(), policy_remove(), policy_remove_persist())] policy: TrackerPolicy, ) { - make(&mut torrent, makes); + make(&mut swarm, makes); - let has_peers = !torrent.is_empty(); - let has_downloads = torrent.metadata().downloaded != 0; + let has_peers = !swarm.is_empty(); + let has_downloads = swarm.metadata().downloaded != 0; match (policy.remove_peerless_torrents, policy.persistent_torrent_completed_stat) { // remove torrents without peers, and keep completed download stats (true, true) => match (has_peers, has_downloads) { // no peers, but has downloads // peers, with or without downloads - (false, true) | (true, true | false) => assert!(torrent.meets_retaining_policy(&policy)), + (false, true) | (true, true | false) => assert!(swarm.meets_retaining_policy(&policy)), // no peers and no downloads - (false, false) => assert!(!torrent.meets_retaining_policy(&policy)), + (false, false) => assert!(!swarm.meets_retaining_policy(&policy)), }, // remove torrents without peers and drop completed download stats (true, false) => match (has_peers, has_downloads) { // peers, with or without downloads - (true, true | false) => assert!(torrent.meets_retaining_policy(&policy)), + (true, true | false) => assert!(swarm.meets_retaining_policy(&policy)), // no peers and with or without downloads - (false, true | false) => assert!(!torrent.meets_retaining_policy(&policy)), + (false, true | false) => assert!(!swarm.meets_retaining_policy(&policy)), }, // keep torrents without peers, but keep or drop completed download stats - (false, true | false) => assert!(torrent.meets_retaining_policy(&policy)), + (false, true | false) => assert!(swarm.meets_retaining_policy(&policy)), } } @@ -139,10 +139,10 @@ async fn it_should_check_if_entry_should_be_retained_based_on_the_tracker_policy #[case::downloaded(&Makes::Downloaded)] #[case::three(&Makes::Three)] #[tokio::test] -async fn it_should_get_peers_for_torrent_entry(#[values(single())] mut torrent: Swarm, #[case] makes: &Makes) { - let peers = make(&mut torrent, makes); +async fn it_should_get_peers_for_torrent_entry(#[values(swarm())] mut swarm: Swarm, #[case] makes: &Makes) { + let peers = make(&mut swarm, makes); - let torrent_peers = torrent.peers(None); + let torrent_peers = swarm.peers(None); assert_eq!(torrent_peers.len(), peers.len()); @@ -158,15 +158,15 @@ async fn it_should_get_peers_for_torrent_entry(#[values(single())] mut torrent: #[case::downloaded(&Makes::Downloaded)] #[case::three(&Makes::Three)] #[tokio::test] -async fn it_should_update_a_peer(#[values(single())] mut torrent: Swarm, #[case] makes: &Makes) { - make(&mut torrent, makes); +async fn it_should_update_a_peer(#[values(swarm())] mut swarm: Swarm, #[case] makes: &Makes) { + make(&mut swarm, makes); // Make and insert a new peer. let mut peer = a_started_peer(-1); - torrent.handle_announcement(&peer); + swarm.handle_announcement(&peer); // Get the Inserted Peer by Id. - let peers = torrent.peers(None); + let peers = swarm.peers(None); let original = peers .iter() .find(|p| peer::ReadInfo::get_id(*p) == peer::ReadInfo::get_id(&peer)) @@ -176,10 +176,10 @@ async fn it_should_update_a_peer(#[values(single())] mut torrent: Swarm, #[case] // Announce "Completed" torrent download event. peer.event = AnnounceEvent::Completed; - torrent.handle_announcement(&peer); + swarm.handle_announcement(&peer); // Get the Updated Peer by Id. - let peers = torrent.peers(None); + let peers = swarm.peers(None); let updated = peers .iter() .find(|p| peer::ReadInfo::get_id(*p) == peer::ReadInfo::get_id(&peer)) @@ -195,17 +195,17 @@ async fn it_should_update_a_peer(#[values(single())] mut torrent: Swarm, #[case] #[case::downloaded(&Makes::Downloaded)] #[case::three(&Makes::Three)] #[tokio::test] -async fn it_should_remove_a_peer_upon_stopped_announcement(#[values(single())] mut torrent: Swarm, #[case] makes: &Makes) { +async fn it_should_remove_a_peer_upon_stopped_announcement(#[values(swarm())] mut swarm: Swarm, #[case] makes: &Makes) { use torrust_tracker_primitives::peer::ReadInfo as _; - make(&mut torrent, makes); + make(&mut swarm, makes); let mut peer = a_started_peer(-1); - torrent.handle_announcement(&peer); + swarm.handle_announcement(&peer); // The started peer should be inserted. - let peers = torrent.peers(None); + let peers = swarm.peers(None); let original = peers .iter() .find(|p| p.get_id() == peer.get_id()) @@ -215,10 +215,10 @@ async fn it_should_remove_a_peer_upon_stopped_announcement(#[values(single())] m // Change peer to "Stopped" and insert. peer.event = AnnounceEvent::Stopped; - torrent.handle_announcement(&peer); + swarm.handle_announcement(&peer); // It should be removed now. - let peers = torrent.peers(None); + let peers = swarm.peers(None); assert_eq!( peers.iter().find(|p| p.get_id() == peer.get_id()), @@ -234,7 +234,7 @@ async fn it_should_remove_a_peer_upon_stopped_announcement(#[values(single())] m #[case::three(&Makes::Three)] #[tokio::test] async fn it_should_handle_a_peer_completed_announcement_and_update_the_downloaded_statistic( - #[values(single())] mut torrent: Swarm, + #[values(swarm())] mut torrent: Swarm, #[case] makes: &Makes, ) { make(&mut torrent, makes); @@ -264,19 +264,19 @@ async fn it_should_handle_a_peer_completed_announcement_and_update_the_downloade #[case::downloaded(&Makes::Downloaded)] #[case::three(&Makes::Three)] #[tokio::test] -async fn it_should_update_a_peer_as_a_seeder(#[values(single())] mut torrent: Swarm, #[case] makes: &Makes) { - let peers = make(&mut torrent, makes); +async fn it_should_update_a_peer_as_a_seeder(#[values(swarm())] mut swarm: Swarm, #[case] makes: &Makes) { + let peers = make(&mut swarm, makes); let completed = u32::try_from(peers.iter().filter(|p| p.is_seeder()).count()).expect("it_should_not_be_so_many"); - let peers = torrent.peers(None); + let peers = swarm.peers(None); let mut peer = **peers.first().expect("there should be a peer"); let is_already_non_left = peer.left == NumberOfBytes::new(0); // Set Bytes Left to Zero peer.left = NumberOfBytes::new(0); - torrent.handle_announcement(&peer); - let stats = torrent.metadata(); + swarm.handle_announcement(&peer); + let stats = swarm.metadata(); if is_already_non_left { // it was already complete @@ -293,19 +293,19 @@ async fn it_should_update_a_peer_as_a_seeder(#[values(single())] mut torrent: Sw #[case::downloaded(&Makes::Downloaded)] #[case::three(&Makes::Three)] #[tokio::test] -async fn it_should_update_a_peer_as_incomplete(#[values(single())] mut torrent: Swarm, #[case] makes: &Makes) { - let peers = make(&mut torrent, makes); +async fn it_should_update_a_peer_as_incomplete(#[values(swarm())] mut swarm: Swarm, #[case] makes: &Makes) { + let peers = make(&mut swarm, makes); let incomplete = u32::try_from(peers.iter().filter(|p| !p.is_seeder()).count()).expect("it should not be so many"); - let peers = torrent.peers(None); + let peers = swarm.peers(None); let mut peer = **peers.first().expect("there should be a peer"); let completed_already = peer.left == NumberOfBytes::new(0); // Set Bytes Left to no Zero peer.left = NumberOfBytes::new(1); - torrent.handle_announcement(&peer); - let stats = torrent.metadata(); + swarm.handle_announcement(&peer); + let stats = swarm.metadata(); if completed_already { // now it is incomplete @@ -322,10 +322,10 @@ async fn it_should_update_a_peer_as_incomplete(#[values(single())] mut torrent: #[case::downloaded(&Makes::Downloaded)] #[case::three(&Makes::Three)] #[tokio::test] -async fn it_should_get_peers_excluding_the_client_socket(#[values(single())] mut torrent: Swarm, #[case] makes: &Makes) { - make(&mut torrent, makes); +async fn it_should_get_peers_excluding_the_client_socket(#[values(swarm())] mut swarm: Swarm, #[case] makes: &Makes) { + make(&mut swarm, makes); - let peers = torrent.peers(None); + let peers = swarm.peers(None); let mut peer = **peers.first().expect("there should be a peer"); let socket = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8081); @@ -334,14 +334,14 @@ async fn it_should_get_peers_excluding_the_client_socket(#[values(single())] mut assert_ne!(peer.peer_addr, socket); // it should get the peer as it dose not share the socket. - assert!(torrent.peers_excluding(&socket, None).contains(&peer.into())); + assert!(swarm.peers_excluding(&socket, None).contains(&peer.into())); // set the address to the socket. peer.peer_addr = socket; - torrent.handle_announcement(&peer); // Add peer + swarm.handle_announcement(&peer); // Add peer // It should not include the peer that has the same socket. - assert!(!torrent.peers_excluding(&socket, None).contains(&peer.into())); + assert!(!swarm.peers_excluding(&socket, None).contains(&peer.into())); } #[rstest] @@ -351,16 +351,16 @@ async fn it_should_get_peers_excluding_the_client_socket(#[values(single())] mut #[case::downloaded(&Makes::Downloaded)] #[case::three(&Makes::Three)] #[tokio::test] -async fn it_should_limit_the_number_of_peers_returned(#[values(single())] mut torrent: Swarm, #[case] makes: &Makes) { - make(&mut torrent, makes); +async fn it_should_limit_the_number_of_peers_returned(#[values(swarm())] mut swarm: Swarm, #[case] makes: &Makes) { + make(&mut swarm, makes); // We add one more peer than the scrape limit for peer_number in 1..=74 + 1 { let peer = a_started_peer(peer_number); - torrent.handle_announcement(&peer); + swarm.handle_announcement(&peer); } - let peers = torrent.peers(Some(TORRENT_PEERS_LIMIT)); + let peers = swarm.peers(Some(TORRENT_PEERS_LIMIT)); assert_eq!(peers.len(), 74); } @@ -372,11 +372,11 @@ async fn it_should_limit_the_number_of_peers_returned(#[values(single())] mut to #[case::downloaded(&Makes::Downloaded)] #[case::three(&Makes::Three)] #[tokio::test] -async fn it_should_remove_inactive_peers_beyond_cutoff(#[values(single())] mut torrent: Swarm, #[case] makes: &Makes) { +async fn it_should_remove_inactive_peers_beyond_cutoff(#[values(swarm())] mut swarm: Swarm, #[case] makes: &Makes) { const TIMEOUT: Duration = Duration::from_secs(120); const EXPIRE: Duration = Duration::from_secs(121); - let peers = make(&mut torrent, makes); + let peers = make(&mut swarm, makes); let mut peer = a_completed_peer(-1); @@ -385,12 +385,12 @@ async fn it_should_remove_inactive_peers_beyond_cutoff(#[values(single())] mut t peer.updated = now.sub(EXPIRE); - torrent.handle_announcement(&peer); + swarm.handle_announcement(&peer); - assert_eq!(torrent.len(), peers.len() + 1); + assert_eq!(swarm.len(), peers.len() + 1); let current_cutoff = CurrentClock::now_sub(&TIMEOUT).unwrap_or_default(); - torrent.remove_inactive(current_cutoff); + swarm.remove_inactive(current_cutoff); - assert_eq!(torrent.len(), peers.len()); + assert_eq!(swarm.len(), peers.len()); } diff --git a/packages/torrent-repository/tests/repository/mod.rs b/packages/torrent-repository/tests/swarms/mod.rs similarity index 81% rename from packages/torrent-repository/tests/repository/mod.rs rename to packages/torrent-repository/tests/swarms/mod.rs index 071a187fa..20c6255fa 100644 --- a/packages/torrent-repository/tests/repository/mod.rs +++ b/packages/torrent-repository/tests/swarms/mod.rs @@ -15,7 +15,7 @@ use torrust_tracker_torrent_repository::{LockTrackedTorrent, Swarms}; use crate::common::torrent_peer_builder::{a_completed_peer, a_started_peer}; #[fixture] -fn skip_list_mutex_std() -> Swarms { +fn swarms() -> Swarms { Swarms::default() } @@ -33,27 +33,27 @@ fn default() -> Entries { #[fixture] fn started() -> Entries { - let mut torrent = Swarm::default(); - torrent.handle_announcement(&a_started_peer(1)); - vec![(InfoHash::default(), torrent)] + let mut swarm = Swarm::default(); + swarm.handle_announcement(&a_started_peer(1)); + vec![(InfoHash::default(), swarm)] } #[fixture] fn completed() -> Entries { - let mut torrent = Swarm::default(); - torrent.handle_announcement(&a_completed_peer(2)); - vec![(InfoHash::default(), torrent)] + let mut swarm = Swarm::default(); + swarm.handle_announcement(&a_completed_peer(2)); + vec![(InfoHash::default(), swarm)] } #[fixture] fn downloaded() -> Entries { - let mut torrent = Swarm::default(); + let mut swarm = Swarm::default(); let mut peer = a_started_peer(3); - torrent.handle_announcement(&peer); + swarm.handle_announcement(&peer); peer.event = AnnounceEvent::Completed; peer.left = NumberOfBytes::new(0); - torrent.handle_announcement(&peer); - vec![(InfoHash::default(), torrent)] + swarm.handle_announcement(&peer); + vec![(InfoHash::default(), swarm)] } #[fixture] @@ -201,13 +201,13 @@ fn policy_remove_persist() -> TrackerPolicy { #[case::out_of_order(many_out_of_order())] #[case::in_order(many_hashed_in_order())] #[tokio::test] -async fn it_should_get_a_torrent_entry(#[values(skip_list_mutex_std())] repo: Swarms, #[case] entries: Entries) { +async fn it_should_get_a_torrent_entry(#[values(swarms())] repo: Swarms, #[case] entries: Entries) { make(&repo, &entries); - if let Some((info_hash, torrent)) = entries.first() { + if let Some((info_hash, swarm)) = entries.first() { assert_eq!( Some(repo.get(info_hash).unwrap().lock_or_panic().clone()), - Some(torrent.clone()) + Some(swarm.clone()) ); } else { assert!(repo.get(&InfoHash::default()).is_none()); @@ -225,7 +225,7 @@ async fn it_should_get_a_torrent_entry(#[values(skip_list_mutex_std())] repo: Sw #[case::in_order(many_hashed_in_order())] #[tokio::test] async fn it_should_get_paginated_entries_in_a_stable_or_sorted_order( - #[values(skip_list_mutex_std())] repo: Swarms, + #[values(swarms())] repo: Swarms, #[case] entries: Entries, many_out_of_order: Entries, ) { @@ -258,7 +258,7 @@ async fn it_should_get_paginated_entries_in_a_stable_or_sorted_order( #[case::in_order(many_hashed_in_order())] #[tokio::test] async fn it_should_get_paginated( - #[values(skip_list_mutex_std())] repo: Swarms, + #[values(swarms())] repo: Swarms, #[case] entries: Entries, #[values(paginated_limit_zero(), paginated_limit_one(), paginated_limit_one_offset_one())] paginated: Pagination, ) { @@ -270,13 +270,13 @@ async fn it_should_get_paginated( match paginated { // it should return empty if limit is zero. Pagination { limit: 0, .. } => { - let torrents: Vec<(InfoHash, Swarm)> = repo + let swarms: Vec<(InfoHash, Swarm)> = repo .get_paginated(Some(&paginated)) .iter() - .map(|(i, lock_tracked_torrent)| (*i, lock_tracked_torrent.lock_or_panic().clone())) + .map(|(i, swarm_handle)| (*i, swarm_handle.lock_or_panic().clone())) .collect(); - assert_eq!(torrents, vec![]); + assert_eq!(swarms, vec![]); } // it should return a single entry if the limit is one. @@ -313,10 +313,10 @@ async fn it_should_get_paginated( #[case::out_of_order(many_out_of_order())] #[case::in_order(many_hashed_in_order())] #[tokio::test] -async fn it_should_get_metrics(#[values(skip_list_mutex_std())] repo: Swarms, #[case] entries: Entries) { +async fn it_should_get_metrics(#[values(swarms())] swarms: Swarms, #[case] entries: Entries) { use torrust_tracker_primitives::swarm_metadata::AggregateSwarmMetadata; - make(&repo, &entries); + make(&swarms, &entries); let mut metrics = AggregateSwarmMetadata::default(); @@ -329,7 +329,7 @@ async fn it_should_get_metrics(#[values(skip_list_mutex_std())] repo: Swarms, #[ metrics.total_downloaded += u64::from(stats.downloaded); } - assert_eq!(repo.get_aggregate_swarm_metadata(), metrics); + assert_eq!(swarms.get_aggregate_swarm_metadata(), metrics); } #[rstest] @@ -343,21 +343,21 @@ async fn it_should_get_metrics(#[values(skip_list_mutex_std())] repo: Swarms, #[ #[case::in_order(many_hashed_in_order())] #[tokio::test] async fn it_should_import_persistent_torrents( - #[values(skip_list_mutex_std())] repo: Swarms, + #[values(swarms())] swarms: Swarms, #[case] entries: Entries, #[values(persistent_empty(), persistent_single(), persistent_three())] persistent_torrents: PersistentTorrents, ) { - make(&repo, &entries); + make(&swarms, &entries); - let mut downloaded = repo.get_aggregate_swarm_metadata().total_downloaded; + let mut downloaded = swarms.get_aggregate_swarm_metadata().total_downloaded; persistent_torrents.iter().for_each(|(_, d)| downloaded += u64::from(*d)); - repo.import_persistent(&persistent_torrents); + swarms.import_persistent(&persistent_torrents); - assert_eq!(repo.get_aggregate_swarm_metadata().total_downloaded, downloaded); + assert_eq!(swarms.get_aggregate_swarm_metadata().total_downloaded, downloaded); for (entry, _) in persistent_torrents { - assert!(repo.get(&entry).is_some()); + assert!(swarms.get(&entry).is_some()); } } @@ -371,21 +371,24 @@ async fn it_should_import_persistent_torrents( #[case::out_of_order(many_out_of_order())] #[case::in_order(many_hashed_in_order())] #[tokio::test] -async fn it_should_remove_an_entry(#[values(skip_list_mutex_std())] repo: Swarms, #[case] entries: Entries) { - make(&repo, &entries); +async fn it_should_remove_an_entry(#[values(swarms())] swarms: Swarms, #[case] entries: Entries) { + make(&swarms, &entries); for (info_hash, torrent) in entries { assert_eq!( - Some(repo.get(&info_hash).unwrap().lock_or_panic().clone()), + Some(swarms.get(&info_hash).unwrap().lock_or_panic().clone()), Some(torrent.clone()) ); - assert_eq!(Some(repo.remove(&info_hash).unwrap().lock_or_panic().clone()), Some(torrent)); + assert_eq!( + Some(swarms.remove(&info_hash).unwrap().lock_or_panic().clone()), + Some(torrent) + ); - assert!(repo.get(&info_hash).is_none()); - assert!(repo.remove(&info_hash).is_none()); + assert!(swarms.get(&info_hash).is_none()); + assert!(swarms.remove(&info_hash).is_none()); } - assert_eq!(repo.get_aggregate_swarm_metadata().total_torrents, 0); + assert_eq!(swarms.get_aggregate_swarm_metadata().total_torrents, 0); } #[rstest] @@ -398,7 +401,7 @@ async fn it_should_remove_an_entry(#[values(skip_list_mutex_std())] repo: Swarms #[case::out_of_order(many_out_of_order())] #[case::in_order(many_hashed_in_order())] #[tokio::test] -async fn it_should_remove_inactive_peers(#[values(skip_list_mutex_std())] repo: Swarms, #[case] entries: Entries) { +async fn it_should_remove_inactive_peers(#[values(swarms())] swarms: Swarms, #[case] entries: Entries) { use std::ops::Sub as _; use std::time::Duration; @@ -411,7 +414,7 @@ async fn it_should_remove_inactive_peers(#[values(skip_list_mutex_std())] repo: const TIMEOUT: Duration = Duration::from_secs(120); const EXPIRE: Duration = Duration::from_secs(121); - make(&repo, &entries); + make(&swarms, &entries); let info_hash: InfoHash; let mut peer: peer::Peer; @@ -435,15 +438,15 @@ async fn it_should_remove_inactive_peers(#[values(skip_list_mutex_std())] repo: // Insert the infohash and peer into the repository // and verify there is an extra torrent entry. { - repo.upsert_peer(&info_hash, &peer, None); - assert_eq!(repo.get_aggregate_swarm_metadata().total_torrents, entries.len() as u64 + 1); + swarms.upsert_peer(&info_hash, &peer, None); + assert_eq!(swarms.get_aggregate_swarm_metadata().total_torrents, entries.len() as u64 + 1); } // Insert the infohash and peer into the repository // and verify the swarm metadata was updated. { - repo.upsert_peer(&info_hash, &peer, None); - let stats = repo.get_swarm_metadata(&info_hash); + swarms.upsert_peer(&info_hash, &peer, None); + let stats = swarms.get_swarm_metadata(&info_hash); assert_eq!( stats, Some(SwarmMetadata { @@ -456,19 +459,19 @@ async fn it_should_remove_inactive_peers(#[values(skip_list_mutex_std())] repo: // Verify that this new peer was inserted into the repository. { - let lock_tracked_torrent = repo.get(&info_hash).expect("it_should_get_some"); + let lock_tracked_torrent = swarms.get(&info_hash).expect("it_should_get_some"); let entry = lock_tracked_torrent.lock_or_panic(); assert!(entry.peers(None).contains(&peer.into())); } // Remove peers that have not been updated since the timeout (120 seconds ago). { - repo.remove_inactive_peers(CurrentClock::now_sub(&TIMEOUT).expect("it should get a time passed")); + swarms.remove_inactive_peers(CurrentClock::now_sub(&TIMEOUT).expect("it should get a time passed")); } // Verify that the this peer was removed from the repository. { - let lock_tracked_torrent = repo.get(&info_hash).expect("it_should_get_some"); + let lock_tracked_torrent = swarms.get(&info_hash).expect("it_should_get_some"); let entry = lock_tracked_torrent.lock_or_panic(); assert!(!entry.peers(None).contains(&peer.into())); } @@ -485,15 +488,15 @@ async fn it_should_remove_inactive_peers(#[values(skip_list_mutex_std())] repo: #[case::in_order(many_hashed_in_order())] #[tokio::test] async fn it_should_remove_peerless_torrents( - #[values(skip_list_mutex_std())] repo: Swarms, + #[values(swarms())] swarms: Swarms, #[case] entries: Entries, #[values(policy_none(), policy_persist(), policy_remove(), policy_remove_persist())] policy: TrackerPolicy, ) { - make(&repo, &entries); + make(&swarms, &entries); - repo.remove_peerless_torrents(&policy); + swarms.remove_peerless_torrents(&policy); - let torrents: Vec<(InfoHash, Swarm)> = repo + let torrents: Vec<(InfoHash, Swarm)> = swarms .get_paginated(None) .iter() .map(|(i, lock_tracked_torrent)| (*i, lock_tracked_torrent.lock_or_panic().clone())) diff --git a/packages/tracker-core/src/torrent/repository/in_memory.rs b/packages/tracker-core/src/torrent/repository/in_memory.rs index 67e532e86..5902f6735 100644 --- a/packages/tracker-core/src/torrent/repository/in_memory.rs +++ b/packages/tracker-core/src/torrent/repository/in_memory.rs @@ -20,8 +20,8 @@ use torrust_tracker_torrent_repository::{SwarmHandle, Swarms}; /// used in production. Other implementations are kept for reference. #[derive(Debug, Default)] pub struct InMemoryTorrentRepository { - /// The underlying in-memory data structure that stores torrent entries. - torrents: Arc, + /// The underlying in-memory data structure that stores swarms data. + swarms: Arc, } impl InMemoryTorrentRepository { @@ -46,7 +46,7 @@ impl InMemoryTorrentRepository { peer: &peer::Peer, opt_persistent_torrent: Option, ) -> bool { - self.torrents.upsert_peer(info_hash, peer, opt_persistent_torrent) + self.swarms.upsert_peer(info_hash, peer, opt_persistent_torrent) } /// Removes a torrent entry from the repository. @@ -65,7 +65,7 @@ impl InMemoryTorrentRepository { #[cfg(test)] #[must_use] pub(crate) fn remove(&self, key: &InfoHash) -> Option { - self.torrents.remove(key) + self.swarms.remove(key) } /// Removes inactive peers from all torrent entries. @@ -78,7 +78,7 @@ impl InMemoryTorrentRepository { /// * `current_cutoff` - The cutoff timestamp; peers not updated since this /// time will be removed. pub(crate) fn remove_inactive_peers(&self, current_cutoff: DurationSinceUnixEpoch) { - self.torrents.remove_inactive_peers(current_cutoff); + self.swarms.remove_inactive_peers(current_cutoff); } /// Removes torrent entries that have no active peers. @@ -91,7 +91,7 @@ impl InMemoryTorrentRepository { /// * `policy` - The tracker policy containing the configuration for /// removing peerless torrents. pub(crate) fn remove_peerless_torrents(&self, policy: &TrackerPolicy) { - self.torrents.remove_peerless_torrents(policy); + self.swarms.remove_peerless_torrents(policy); } /// Retrieves a torrent entry by its infohash. @@ -105,7 +105,7 @@ impl InMemoryTorrentRepository { /// An `Option` containing the torrent entry if found. #[must_use] pub(crate) fn get(&self, key: &InfoHash) -> Option { - self.torrents.get(key) + self.swarms.get(key) } /// Retrieves a paginated list of torrent entries. @@ -123,7 +123,7 @@ impl InMemoryTorrentRepository { /// A vector of `(InfoHash, TorrentEntry)` tuples. #[must_use] pub(crate) fn get_paginated(&self, pagination: Option<&Pagination>) -> Vec<(InfoHash, SwarmHandle)> { - self.torrents.get_paginated(pagination) + self.swarms.get_paginated(pagination) } /// Retrieves swarm metadata for a given torrent. @@ -141,7 +141,7 @@ impl InMemoryTorrentRepository { /// A `SwarmMetadata` struct containing the aggregated torrent data. #[must_use] pub(crate) fn get_swarm_metadata_or_default(&self, info_hash: &InfoHash) -> SwarmMetadata { - self.torrents.get_swarm_metadata_or_default(info_hash) + self.swarms.get_swarm_metadata_or_default(info_hash) } /// Retrieves torrent peers for a given torrent and client, excluding the @@ -163,7 +163,7 @@ impl InMemoryTorrentRepository { /// the torrent, excluding the requesting client. #[must_use] pub(crate) fn get_peers_for(&self, info_hash: &InfoHash, peer: &peer::Peer, limit: usize) -> Vec> { - self.torrents.get_peers_for(info_hash, peer, max(limit, TORRENT_PEERS_LIMIT)) + self.swarms.get_peers_for(info_hash, peer, max(limit, TORRENT_PEERS_LIMIT)) } /// Retrieves the list of peers for a given torrent. @@ -186,7 +186,7 @@ impl InMemoryTorrentRepository { #[must_use] pub fn get_torrent_peers(&self, info_hash: &InfoHash) -> Vec> { // todo: pass the limit as an argument like `get_peers_for` - self.torrents.get_torrent_peers(info_hash, TORRENT_PEERS_LIMIT) + self.swarms.get_torrent_peers(info_hash, TORRENT_PEERS_LIMIT) } /// Calculates and returns overall torrent metrics. @@ -200,7 +200,7 @@ impl InMemoryTorrentRepository { /// A [`AggregateSwarmMetadata`] struct with the aggregated metrics. #[must_use] pub fn get_aggregate_swarm_metadata(&self) -> AggregateSwarmMetadata { - self.torrents.get_aggregate_swarm_metadata() + self.swarms.get_aggregate_swarm_metadata() } /// Imports persistent torrent data into the in-memory repository. @@ -212,6 +212,6 @@ impl InMemoryTorrentRepository { /// /// * `persistent_torrents` - A reference to the persisted torrent data. pub fn import_persistent(&self, persistent_torrents: &PersistentTorrents) { - self.torrents.import_persistent(persistent_torrents); + self.swarms.import_persistent(persistent_torrents); } } From 6d50fa083cd334bfc1f23a96d3754e98ed6ae51b Mon Sep 17 00:00:00 2001 From: Jose Celano Date: Wed, 7 May 2025 11:45:50 +0100 Subject: [PATCH 3/7] refactor: [#1495] remove panics from Swarms type They have been moved one level up to the InMemoryTorrentRepository type. We should buble them up to the final user, returing an error in the UDP or HTTP tracker when the swarm handle lock cannot be adquired. A new issues will be opened to address that. --- Cargo.lock | 2 +- packages/torrent-repository/Cargo.toml | 2 +- packages/torrent-repository/src/lib.rs | 2 +- packages/torrent-repository/src/swarms.rs | 189 +++++++++++------- .../torrent-repository/tests/swarms/mod.rs | 25 ++- .../src/torrent/repository/in_memory.rs | 54 ++++- 6 files changed, 182 insertions(+), 92 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index eea957f88..093b8e9b0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4850,12 +4850,12 @@ dependencies = [ "crossbeam-skiplist", "rand 0.9.1", "rstest", + "thiserror 2.0.12", "tokio", "torrust-tracker-clock", "torrust-tracker-configuration", "torrust-tracker-primitives", "torrust-tracker-test-helpers", - "tracing", ] [[package]] diff --git a/packages/torrent-repository/Cargo.toml b/packages/torrent-repository/Cargo.toml index e584fadf4..2cc02a720 100644 --- a/packages/torrent-repository/Cargo.toml +++ b/packages/torrent-repository/Cargo.toml @@ -19,11 +19,11 @@ version.workspace = true aquatic_udp_protocol = "0" bittorrent-primitives = "0.1.0" crossbeam-skiplist = "0" +thiserror = "2.0.12" tokio = { version = "1", features = ["macros", "net", "rt-multi-thread", "signal", "sync"] } torrust-tracker-clock = { version = "3.0.0-develop", path = "../clock" } torrust-tracker-configuration = { version = "3.0.0-develop", path = "../configuration" } torrust-tracker-primitives = { version = "3.0.0-develop", path = "../primitives" } -tracing = "0" [dev-dependencies] async-std = { version = "1", features = ["attributes", "tokio1"] } diff --git a/packages/torrent-repository/src/lib.rs b/packages/torrent-repository/src/lib.rs index c985f7a2b..a4e7d9c5d 100644 --- a/packages/torrent-repository/src/lib.rs +++ b/packages/torrent-repository/src/lib.rs @@ -23,7 +23,7 @@ pub trait LockTrackedTorrent { fn lock_or_panic(&self) -> MutexGuard<'_, Swarm>; } -impl LockTrackedTorrent for Arc> { +impl LockTrackedTorrent for SwarmHandle { fn lock_or_panic(&self) -> MutexGuard<'_, Swarm> { self.lock().expect("can't acquire lock for tracked torrent handle") } diff --git a/packages/torrent-repository/src/swarms.rs b/packages/torrent-repository/src/swarms.rs index 936f49d22..222bea60a 100644 --- a/packages/torrent-repository/src/swarms.rs +++ b/packages/torrent-repository/src/swarms.rs @@ -8,7 +8,7 @@ use torrust_tracker_primitives::swarm_metadata::{AggregateSwarmMetadata, SwarmMe use torrust_tracker_primitives::{peer, DurationSinceUnixEpoch, PersistentTorrent, PersistentTorrents}; use crate::swarm::Swarm; -use crate::{LockTrackedTorrent, SwarmHandle}; +use crate::SwarmHandle; #[derive(Default, Debug)] pub struct Swarms { @@ -34,33 +34,31 @@ impl Swarms { /// Returns `true` if the number of downloads was increased because the peer /// completed the download. /// - /// # Panics + /// # Errors /// - /// This function panics if the lock for the entry cannot be obtained. + /// This function panics if the lock for the swarm handle cannot be acquired. pub fn upsert_peer( &self, info_hash: &InfoHash, peer: &peer::Peer, opt_persistent_torrent: Option, - ) -> bool { - if let Some(existing_entry) = self.swarms.get(info_hash) { - tracing::debug!("Torrent already exists: {:?}", info_hash); + ) -> Result { + if let Some(existing_swarm_handle) = self.swarms.get(info_hash) { + let mut swarm = existing_swarm_handle.value().lock()?; - existing_entry.value().lock_or_panic().handle_announcement(peer) + Ok(swarm.handle_announcement(peer)) } else { - tracing::debug!("Inserting new torrent: {:?}", info_hash); - - let new_entry = if let Some(number_of_downloads) = opt_persistent_torrent { + let new_swarm_handle = if let Some(number_of_downloads) = opt_persistent_torrent { SwarmHandle::new(Swarm::new(number_of_downloads).into()) } else { SwarmHandle::default() }; - let inserted_entry = self.swarms.get_or_insert(*info_hash, new_entry); + let inserted_swarm_handle = self.swarms.get_or_insert(*info_hash, new_swarm_handle); - let mut torrent_guard = inserted_entry.value().lock_or_panic(); + let mut swarm = inserted_swarm_handle.value().lock()?; - torrent_guard.handle_announcement(peer) + Ok(swarm.handle_announcement(peer)) } } @@ -79,13 +77,17 @@ impl Swarms { /// A peer is considered inactive if its last update timestamp is older than /// the provided cutoff time. /// - /// # Panics + /// # Errors /// - /// This function panics if the lock for the entry cannot be obtained. - pub fn remove_inactive_peers(&self, current_cutoff: DurationSinceUnixEpoch) { - for entry in &self.swarms { - entry.value().lock_or_panic().remove_inactive(current_cutoff); + /// This function returns an error if it fails to acquire the lock for any + /// swarm handle. + pub fn remove_inactive_peers(&self, current_cutoff: DurationSinceUnixEpoch) -> Result<(), Error> { + for swarm_handle in &self.swarms { + let mut swarm = swarm_handle.value().lock()?; + swarm.remove_inactive(current_cutoff); } + + Ok(()) } /// Retrieves a tracked torrent handle by its infohash. @@ -132,14 +134,17 @@ impl Swarms { /// /// A `SwarmMetadata` struct containing the aggregated torrent data if found. /// - /// # Panics + /// # Errors /// - /// This function panics if the lock for the entry cannot be obtained. - #[must_use] - pub fn get_swarm_metadata(&self, info_hash: &InfoHash) -> Option { - self.swarms - .get(info_hash) - .map(|entry| entry.value().lock_or_panic().metadata()) + /// This function panics if the lock for the swarm handle cannot be acquired. + pub fn get_swarm_metadata(&self, info_hash: &InfoHash) -> Result, Error> { + match self.swarms.get(info_hash) { + None => Ok(None), + Some(swarm_handle) => { + let swarm = swarm_handle.value().lock()?; + Ok(Some(swarm.metadata())) + } + } } /// Retrieves swarm metadata for a given torrent. @@ -148,11 +153,16 @@ impl Swarms { /// /// A `SwarmMetadata` struct containing the aggregated torrent data if it's /// found or a zeroed metadata struct if not. - #[must_use] - pub fn get_swarm_metadata_or_default(&self, info_hash: &InfoHash) -> SwarmMetadata { + /// + /// # Errors + /// + /// This function returns an error if it fails to acquire the lock for the + /// swarm handle. + pub fn get_swarm_metadata_or_default(&self, info_hash: &InfoHash) -> Result { match self.get_swarm_metadata(info_hash) { - Some(swarm_metadata) => swarm_metadata, - None => SwarmMetadata::zeroed(), + Ok(Some(swarm_metadata)) => Ok(swarm_metadata), + Ok(None) => Ok(SwarmMetadata::zeroed()), + Err(err) => Err(err), } } @@ -168,14 +178,17 @@ impl Swarms { /// A vector of peers (wrapped in `Arc`) representing the active peers for /// the torrent, excluding the requesting client. /// - /// # Panics + /// # Errors /// - /// This function panics if the lock for the torrent entry cannot be obtained. - #[must_use] - pub fn get_peers_for(&self, info_hash: &InfoHash, peer: &peer::Peer, limit: usize) -> Vec> { + /// This function returns an error if it fails to acquire the lock for the + /// swarm handle. + pub fn get_peers_for(&self, info_hash: &InfoHash, peer: &peer::Peer, limit: usize) -> Result>, Error> { match self.get(info_hash) { - None => vec![], - Some(entry) => entry.lock_or_panic().peers_excluding(&peer.peer_addr, Some(limit)), + None => Ok(vec![]), + Some(swarm_handle) => { + let swarm = swarm_handle.lock()?; + Ok(swarm.peers_excluding(&peer.peer_addr, Some(limit))) + } } } @@ -189,14 +202,17 @@ impl Swarms { /// A vector of peers (wrapped in `Arc`) representing the active peers for /// the torrent. /// - /// # Panics + /// # Errors /// - /// This function panics if the lock for the torrent entry cannot be obtained. - #[must_use] - pub fn get_torrent_peers(&self, info_hash: &InfoHash, limit: usize) -> Vec> { + /// This function returns an error if it fails to acquire the lock for the + /// swarm handle. + pub fn get_torrent_peers(&self, info_hash: &InfoHash, limit: usize) -> Result>, Error> { match self.get(info_hash) { - None => vec![], - Some(entry) => entry.lock_or_panic().peers(Some(limit)), + None => Ok(vec![]), + Some(swarm_handle) => { + let swarm = swarm_handle.lock()?; + Ok(swarm.peers(Some(limit))) + } } } @@ -205,17 +221,22 @@ impl Swarms { /// Depending on the tracker policy, torrents without any peers may be /// removed to conserve memory. /// - /// # Panics + /// # Errors /// - /// This function panics if the lock for the entry cannot be obtained. - pub fn remove_peerless_torrents(&self, policy: &TrackerPolicy) { - for entry in &self.swarms { - if entry.value().lock_or_panic().meets_retaining_policy(policy) { + /// This function returns an error if it fails to acquire the lock for any + /// swarm handle. + pub fn remove_peerless_torrents(&self, policy: &TrackerPolicy) -> Result<(), Error> { + for swarm_handle in &self.swarms { + let swarm = swarm_handle.value().lock()?; + + if swarm.meets_retaining_policy(policy) { continue; } - entry.remove(); + swarm_handle.remove(); } + + Ok(()) } /// Imports persistent torrent data into the in-memory repository. @@ -247,22 +268,35 @@ impl Swarms { /// /// A [`AggregateSwarmMetadata`] struct with the aggregated metrics. /// - /// # Panics + /// # Errors /// - /// This function panics if the lock for the entry cannot be obtained. - #[must_use] - pub fn get_aggregate_swarm_metadata(&self) -> AggregateSwarmMetadata { + /// This function returns an error if it fails to acquire the lock for any + /// swarm handle. + pub fn get_aggregate_swarm_metadata(&self) -> Result { let mut metrics = AggregateSwarmMetadata::default(); for entry in &self.swarms { - let stats = entry.value().lock_or_panic().metadata(); + let swarm = entry.value().lock()?; + let stats = swarm.metadata(); metrics.total_complete += u64::from(stats.complete); metrics.total_downloaded += u64::from(stats.downloaded); metrics.total_incomplete += u64::from(stats.incomplete); metrics.total_torrents += 1; } - metrics + Ok(metrics) + } +} + +#[derive(thiserror::Error, Debug, Clone)] +pub enum Error { + #[error("Can't acquire swarm lock")] + CannotAcquireSwarmLock, +} + +impl From>> for Error { + fn from(_error: std::sync::PoisonError>) -> Self { + Error::CannotAcquireSwarmLock } } @@ -354,7 +388,7 @@ mod tests { let _number_of_downloads_increased = torrent_repository.upsert_peer(&info_hash, &peer, None); - let peers = torrent_repository.get_torrent_peers(&info_hash, 74); + let peers = torrent_repository.get_torrent_peers(&info_hash, 74).unwrap(); assert_eq!(peers, vec![Arc::new(peer)]); } @@ -363,7 +397,7 @@ mod tests { async fn it_should_return_an_empty_list_or_peers_for_a_non_existing_torrent() { let torrent_repository = Arc::new(Swarms::default()); - let peers = torrent_repository.get_torrent_peers(&sample_info_hash(), 74); + let peers = torrent_repository.get_torrent_peers(&sample_info_hash(), 74).unwrap(); assert!(peers.is_empty()); } @@ -388,7 +422,7 @@ mod tests { let _number_of_downloads_increased = torrent_repository.upsert_peer(&info_hash, &peer, None); } - let peers = torrent_repository.get_torrent_peers(&info_hash, 74); + let peers = torrent_repository.get_torrent_peers(&info_hash, 74).unwrap(); assert_eq!(peers.len(), 74); } @@ -411,7 +445,9 @@ mod tests { async fn it_should_return_an_empty_peer_list_for_a_non_existing_torrent() { let torrent_repository = Arc::new(Swarms::default()); - let peers = torrent_repository.get_peers_for(&sample_info_hash(), &sample_peer(), TORRENT_PEERS_LIMIT); + let peers = torrent_repository + .get_peers_for(&sample_info_hash(), &sample_peer(), TORRENT_PEERS_LIMIT) + .unwrap(); assert_eq!(peers, vec![]); } @@ -425,7 +461,9 @@ mod tests { let _number_of_downloads_increased = torrent_repository.upsert_peer(&info_hash, &peer, None); - let peers = torrent_repository.get_peers_for(&info_hash, &peer, TORRENT_PEERS_LIMIT); + let peers = torrent_repository + .get_peers_for(&info_hash, &peer, TORRENT_PEERS_LIMIT) + .unwrap(); assert_eq!(peers, vec![]); } @@ -455,7 +493,9 @@ mod tests { let _number_of_downloads_increased = torrent_repository.upsert_peer(&info_hash, &peer, None); } - let peers = torrent_repository.get_peers_for(&info_hash, &excluded_peer, TORRENT_PEERS_LIMIT); + let peers = torrent_repository + .get_peers_for(&info_hash, &excluded_peer, TORRENT_PEERS_LIMIT) + .unwrap(); assert_eq!(peers.len(), 74); } @@ -498,9 +538,14 @@ mod tests { let _number_of_downloads_increased = torrent_repository.upsert_peer(&info_hash, &peer, None); // Cut off time is 1 second after the peer was updated - torrent_repository.remove_inactive_peers(peer.updated.add(Duration::from_secs(1))); + torrent_repository + .remove_inactive_peers(peer.updated.add(Duration::from_secs(1))) + .unwrap(); - assert!(!torrent_repository.get_torrent_peers(&info_hash, 74).contains(&Arc::new(peer))); + assert!(!torrent_repository + .get_torrent_peers(&info_hash, 74) + .unwrap() + .contains(&Arc::new(peer))); } fn initialize_repository_with_one_torrent_without_peers(info_hash: &InfoHash) -> Arc { @@ -512,7 +557,9 @@ mod tests { let _number_of_downloads_increased = torrent_repository.upsert_peer(info_hash, &peer, None); // Remove the peer - torrent_repository.remove_inactive_peers(peer.updated.add(Duration::from_secs(1))); + torrent_repository + .remove_inactive_peers(peer.updated.add(Duration::from_secs(1))) + .unwrap(); torrent_repository } @@ -528,7 +575,7 @@ mod tests { ..Default::default() }; - torrent_repository.remove_peerless_torrents(&tracker_policy); + torrent_repository.remove_peerless_torrents(&tracker_policy).unwrap(); assert!(torrent_repository.get(&info_hash).is_none()); } @@ -755,7 +802,7 @@ mod tests { async fn it_should_get_empty_aggregate_swarm_metadata_when_there_are_no_torrents() { let torrent_repository = Arc::new(Swarms::default()); - let aggregate_swarm_metadata = torrent_repository.get_aggregate_swarm_metadata(); + let aggregate_swarm_metadata = torrent_repository.get_aggregate_swarm_metadata().unwrap(); assert_eq!( aggregate_swarm_metadata, @@ -774,7 +821,7 @@ mod tests { let _number_of_downloads_increased = torrent_repository.upsert_peer(&sample_info_hash(), &leecher(), None); - let aggregate_swarm_metadata = torrent_repository.get_aggregate_swarm_metadata(); + let aggregate_swarm_metadata = torrent_repository.get_aggregate_swarm_metadata().unwrap(); assert_eq!( aggregate_swarm_metadata, @@ -793,7 +840,7 @@ mod tests { let _number_of_downloads_increased = torrent_repository.upsert_peer(&sample_info_hash(), &seeder(), None); - let aggregate_swarm_metadata = torrent_repository.get_aggregate_swarm_metadata(); + let aggregate_swarm_metadata = torrent_repository.get_aggregate_swarm_metadata().unwrap(); assert_eq!( aggregate_swarm_metadata, @@ -812,7 +859,7 @@ mod tests { let _number_of_downloads_increased = torrent_repository.upsert_peer(&sample_info_hash(), &complete_peer(), None); - let aggregate_swarm_metadata = torrent_repository.get_aggregate_swarm_metadata(); + let aggregate_swarm_metadata = torrent_repository.get_aggregate_swarm_metadata().unwrap(); assert_eq!( aggregate_swarm_metadata, @@ -837,7 +884,7 @@ mod tests { let result_a = start_time.elapsed(); let start_time = std::time::Instant::now(); - let aggregate_swarm_metadata = torrent_repository.get_aggregate_swarm_metadata(); + let aggregate_swarm_metadata = torrent_repository.get_aggregate_swarm_metadata().unwrap(); let result_b = start_time.elapsed(); assert_eq!( @@ -870,7 +917,7 @@ mod tests { let _number_of_downloads_increased = torrent_repository.upsert_peer(&infohash, &leecher(), None); - let swarm_metadata = torrent_repository.get_swarm_metadata_or_default(&infohash); + let swarm_metadata = torrent_repository.get_swarm_metadata_or_default(&infohash).unwrap(); assert_eq!( swarm_metadata, @@ -886,7 +933,7 @@ mod tests { async fn it_should_return_zeroed_swarm_metadata_for_a_non_existing_torrent() { let torrent_repository = Arc::new(Swarms::default()); - let swarm_metadata = torrent_repository.get_swarm_metadata_or_default(&sample_info_hash()); + let swarm_metadata = torrent_repository.get_swarm_metadata_or_default(&sample_info_hash()).unwrap(); assert_eq!(swarm_metadata, SwarmMetadata::zeroed()); } @@ -913,7 +960,7 @@ mod tests { torrent_repository.import_persistent(&persistent_torrents); - let swarm_metadata = torrent_repository.get_swarm_metadata_or_default(&infohash); + let swarm_metadata = torrent_repository.get_swarm_metadata_or_default(&infohash).unwrap(); // Only the number of downloads is persisted. assert_eq!(swarm_metadata.downloaded, 1); diff --git a/packages/torrent-repository/tests/swarms/mod.rs b/packages/torrent-repository/tests/swarms/mod.rs index 20c6255fa..82247bfcb 100644 --- a/packages/torrent-repository/tests/swarms/mod.rs +++ b/packages/torrent-repository/tests/swarms/mod.rs @@ -329,7 +329,7 @@ async fn it_should_get_metrics(#[values(swarms())] swarms: Swarms, #[case] entri metrics.total_downloaded += u64::from(stats.downloaded); } - assert_eq!(swarms.get_aggregate_swarm_metadata(), metrics); + assert_eq!(swarms.get_aggregate_swarm_metadata().unwrap(), metrics); } #[rstest] @@ -349,12 +349,12 @@ async fn it_should_import_persistent_torrents( ) { make(&swarms, &entries); - let mut downloaded = swarms.get_aggregate_swarm_metadata().total_downloaded; + let mut downloaded = swarms.get_aggregate_swarm_metadata().unwrap().total_downloaded; persistent_torrents.iter().for_each(|(_, d)| downloaded += u64::from(*d)); swarms.import_persistent(&persistent_torrents); - assert_eq!(swarms.get_aggregate_swarm_metadata().total_downloaded, downloaded); + assert_eq!(swarms.get_aggregate_swarm_metadata().unwrap().total_downloaded, downloaded); for (entry, _) in persistent_torrents { assert!(swarms.get(&entry).is_some()); @@ -388,7 +388,7 @@ async fn it_should_remove_an_entry(#[values(swarms())] swarms: Swarms, #[case] e assert!(swarms.remove(&info_hash).is_none()); } - assert_eq!(swarms.get_aggregate_swarm_metadata().total_torrents, 0); + assert_eq!(swarms.get_aggregate_swarm_metadata().unwrap().total_torrents, 0); } #[rstest] @@ -438,15 +438,18 @@ async fn it_should_remove_inactive_peers(#[values(swarms())] swarms: Swarms, #[c // Insert the infohash and peer into the repository // and verify there is an extra torrent entry. { - swarms.upsert_peer(&info_hash, &peer, None); - assert_eq!(swarms.get_aggregate_swarm_metadata().total_torrents, entries.len() as u64 + 1); + swarms.upsert_peer(&info_hash, &peer, None).unwrap(); + assert_eq!( + swarms.get_aggregate_swarm_metadata().unwrap().total_torrents, + entries.len() as u64 + 1 + ); } // Insert the infohash and peer into the repository // and verify the swarm metadata was updated. { - swarms.upsert_peer(&info_hash, &peer, None); - let stats = swarms.get_swarm_metadata(&info_hash); + swarms.upsert_peer(&info_hash, &peer, None).unwrap(); + let stats = swarms.get_swarm_metadata(&info_hash).unwrap(); assert_eq!( stats, Some(SwarmMetadata { @@ -466,7 +469,9 @@ async fn it_should_remove_inactive_peers(#[values(swarms())] swarms: Swarms, #[c // Remove peers that have not been updated since the timeout (120 seconds ago). { - swarms.remove_inactive_peers(CurrentClock::now_sub(&TIMEOUT).expect("it should get a time passed")); + swarms + .remove_inactive_peers(CurrentClock::now_sub(&TIMEOUT).expect("it should get a time passed")) + .unwrap(); } // Verify that the this peer was removed from the repository. @@ -494,7 +499,7 @@ async fn it_should_remove_peerless_torrents( ) { make(&swarms, &entries); - swarms.remove_peerless_torrents(&policy); + swarms.remove_peerless_torrents(&policy).unwrap(); let torrents: Vec<(InfoHash, Swarm)> = swarms .get_paginated(None) diff --git a/packages/tracker-core/src/torrent/repository/in_memory.rs b/packages/tracker-core/src/torrent/repository/in_memory.rs index 5902f6735..8c93f3605 100644 --- a/packages/tracker-core/src/torrent/repository/in_memory.rs +++ b/packages/tracker-core/src/torrent/repository/in_memory.rs @@ -39,6 +39,10 @@ impl InMemoryTorrentRepository { /// # Returns /// /// `true` if the peer stats were updated. + /// + /// # Panics + /// + /// This function panics if the underling swarms return an error. #[must_use] pub fn upsert_peer( &self, @@ -46,7 +50,9 @@ impl InMemoryTorrentRepository { peer: &peer::Peer, opt_persistent_torrent: Option, ) -> bool { - self.swarms.upsert_peer(info_hash, peer, opt_persistent_torrent) + self.swarms + .upsert_peer(info_hash, peer, opt_persistent_torrent) + .expect("Failed to upsert the peer in swarms") } /// Removes a torrent entry from the repository. @@ -77,8 +83,14 @@ impl InMemoryTorrentRepository { /// /// * `current_cutoff` - The cutoff timestamp; peers not updated since this /// time will be removed. + /// + /// # Panics + /// + /// This function panics if the underling swarms return an error. pub(crate) fn remove_inactive_peers(&self, current_cutoff: DurationSinceUnixEpoch) { - self.swarms.remove_inactive_peers(current_cutoff); + self.swarms + .remove_inactive_peers(current_cutoff) + .expect("Failed to remove inactive peers from swarms"); } /// Removes torrent entries that have no active peers. @@ -90,8 +102,14 @@ impl InMemoryTorrentRepository { /// /// * `policy` - The tracker policy containing the configuration for /// removing peerless torrents. + /// + /// # Panics + /// + /// This function panics if the underling swarms return an error. pub(crate) fn remove_peerless_torrents(&self, policy: &TrackerPolicy) { - self.swarms.remove_peerless_torrents(policy); + self.swarms + .remove_peerless_torrents(policy) + .expect("Failed to remove peerless torrents from swarms"); } /// Retrieves a torrent entry by its infohash. @@ -139,9 +157,15 @@ impl InMemoryTorrentRepository { /// # Returns /// /// A `SwarmMetadata` struct containing the aggregated torrent data. + /// + /// # Panics + /// + /// This function panics if the underling swarms return an error.s #[must_use] pub(crate) fn get_swarm_metadata_or_default(&self, info_hash: &InfoHash) -> SwarmMetadata { - self.swarms.get_swarm_metadata_or_default(info_hash) + self.swarms + .get_swarm_metadata_or_default(info_hash) + .expect("Failed to get swarm metadata") } /// Retrieves torrent peers for a given torrent and client, excluding the @@ -161,9 +185,15 @@ impl InMemoryTorrentRepository { /// /// A vector of peers (wrapped in `Arc`) representing the active peers for /// the torrent, excluding the requesting client. + /// + /// # Panics + /// + /// This function panics if the underling swarms return an error. #[must_use] pub(crate) fn get_peers_for(&self, info_hash: &InfoHash, peer: &peer::Peer, limit: usize) -> Vec> { - self.swarms.get_peers_for(info_hash, peer, max(limit, TORRENT_PEERS_LIMIT)) + self.swarms + .get_peers_for(info_hash, peer, max(limit, TORRENT_PEERS_LIMIT)) + .expect("Failed to get other peers in swarm") } /// Retrieves the list of peers for a given torrent. @@ -182,11 +212,13 @@ impl InMemoryTorrentRepository { /// /// # Panics /// - /// This function panics if the lock for the torrent entry cannot be obtained. + /// This function panics if the underling swarms return an error. #[must_use] pub fn get_torrent_peers(&self, info_hash: &InfoHash) -> Vec> { // todo: pass the limit as an argument like `get_peers_for` - self.swarms.get_torrent_peers(info_hash, TORRENT_PEERS_LIMIT) + self.swarms + .get_torrent_peers(info_hash, TORRENT_PEERS_LIMIT) + .expect("Failed to get other peers in swarm") } /// Calculates and returns overall torrent metrics. @@ -198,9 +230,15 @@ impl InMemoryTorrentRepository { /// # Returns /// /// A [`AggregateSwarmMetadata`] struct with the aggregated metrics. + /// + /// # Panics + /// + /// This function panics if the underling swarms return an error. #[must_use] pub fn get_aggregate_swarm_metadata(&self) -> AggregateSwarmMetadata { - self.swarms.get_aggregate_swarm_metadata() + self.swarms + .get_aggregate_swarm_metadata() + .expect("Failed to get aggregate swarm metadata") } /// Imports persistent torrent data into the in-memory repository. From 31f1fbf32216fbb1f1fc43c5c103af44e25bb462 Mon Sep 17 00:00:00 2001 From: Jose Celano Date: Wed, 7 May 2025 12:07:15 +0100 Subject: [PATCH 4/7] refactgor: [#1495] make field private It was public only to allow setting a pre-defined state in tests. A new public method have been adding temporarily to explain its usage. --- packages/torrent-repository/src/swarms.rs | 14 +++++++++++--- packages/torrent-repository/tests/swarms/mod.rs | 9 +++------ 2 files changed, 14 insertions(+), 9 deletions(-) diff --git a/packages/torrent-repository/src/swarms.rs b/packages/torrent-repository/src/swarms.rs index 222bea60a..34cd52d3b 100644 --- a/packages/torrent-repository/src/swarms.rs +++ b/packages/torrent-repository/src/swarms.rs @@ -1,4 +1,4 @@ -use std::sync::Arc; +use std::sync::{Arc, Mutex}; use bittorrent_primitives::info_hash::InfoHash; use crossbeam_skiplist::SkipMap; @@ -12,8 +12,7 @@ use crate::SwarmHandle; #[derive(Default, Debug)] pub struct Swarms { - // todo: this needs to be public only to insert a peerless torrent (empty swarm). - pub swarms: SkipMap, + swarms: SkipMap, } impl Swarms { @@ -62,6 +61,15 @@ impl Swarms { } } + /// Inserts a new swarm. It's only used for testing purposes. It allows to + /// pre-define the initial state of the swarm without having to go through + /// the upsert process. + pub fn insert_swarm(&self, info_hash: &InfoHash, swarm: Swarm) { + // code-review: swarms builder? + let swarm_handle = Arc::new(Mutex::new(swarm)); + self.swarms.insert(*info_hash, swarm_handle); + } + /// Removes a torrent entry from the repository. /// /// # Returns diff --git a/packages/torrent-repository/tests/swarms/mod.rs b/packages/torrent-repository/tests/swarms/mod.rs index 82247bfcb..43571eb83 100644 --- a/packages/torrent-repository/tests/swarms/mod.rs +++ b/packages/torrent-repository/tests/swarms/mod.rs @@ -1,6 +1,5 @@ use std::collections::{BTreeMap, HashSet}; use std::hash::{DefaultHasher, Hash, Hasher}; -use std::sync::{Arc, Mutex}; use aquatic_udp_protocol::{AnnounceEvent, NumberOfBytes}; use bittorrent_primitives::info_hash::InfoHash; @@ -148,11 +147,9 @@ fn persistent_three() -> PersistentTorrents { t.iter().copied().collect() } -fn make(repo: &Swarms, entries: &Entries) { - for (info_hash, entry) in entries { - let new = Arc::new(Mutex::new(entry.clone())); - // todo: use a public method to insert an empty swarm. - repo.swarms.insert(*info_hash, new); +fn make(swarms: &Swarms, entries: &Entries) { + for (info_hash, swarm) in entries { + swarms.insert_swarm(info_hash, swarm.clone()); } } From 5c2c1e0f77c767a945823fb3abf7091caeb17129 Mon Sep 17 00:00:00 2001 From: Jose Celano Date: Wed, 7 May 2025 12:13:42 +0100 Subject: [PATCH 5/7] feat: [#1495] add len and is_empty methods to Swarms type --- packages/torrent-repository/src/swarms.rs | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/packages/torrent-repository/src/swarms.rs b/packages/torrent-repository/src/swarms.rs index 34cd52d3b..a03b9d7e6 100644 --- a/packages/torrent-repository/src/swarms.rs +++ b/packages/torrent-repository/src/swarms.rs @@ -294,6 +294,16 @@ impl Swarms { Ok(metrics) } + + #[must_use] + pub fn len(&self) -> usize { + self.swarms.len() + } + + #[must_use] + pub fn is_empty(&self) -> bool { + self.swarms.is_empty() + } } #[derive(thiserror::Error, Debug, Clone)] From 5b3142f6bae735750aa0ebead74b3587bb441f01 Mon Sep 17 00:00:00 2001 From: Jose Celano Date: Wed, 7 May 2025 12:17:35 +0100 Subject: [PATCH 6/7] refactor: [#1495] refactor Swarms::upsert_peer --- packages/torrent-repository/src/swarms.rs | 20 +++++++------------- 1 file changed, 7 insertions(+), 13 deletions(-) diff --git a/packages/torrent-repository/src/swarms.rs b/packages/torrent-repository/src/swarms.rs index a03b9d7e6..fb6652ba5 100644 --- a/packages/torrent-repository/src/swarms.rs +++ b/packages/torrent-repository/src/swarms.rs @@ -42,23 +42,17 @@ impl Swarms { peer: &peer::Peer, opt_persistent_torrent: Option, ) -> Result { - if let Some(existing_swarm_handle) = self.swarms.get(info_hash) { - let mut swarm = existing_swarm_handle.value().lock()?; - - Ok(swarm.handle_announcement(peer)) + let swarm_handle = if let Some(number_of_downloads) = opt_persistent_torrent { + SwarmHandle::new(Swarm::new(number_of_downloads).into()) } else { - let new_swarm_handle = if let Some(number_of_downloads) = opt_persistent_torrent { - SwarmHandle::new(Swarm::new(number_of_downloads).into()) - } else { - SwarmHandle::default() - }; + SwarmHandle::default() + }; - let inserted_swarm_handle = self.swarms.get_or_insert(*info_hash, new_swarm_handle); + let swarm_handle = self.swarms.get_or_insert(*info_hash, swarm_handle); - let mut swarm = inserted_swarm_handle.value().lock()?; + let mut swarm = swarm_handle.value().lock()?; - Ok(swarm.handle_announcement(peer)) - } + Ok(swarm.handle_announcement(peer)) } /// Inserts a new swarm. It's only used for testing purposes. It allows to From 4d91738d05cc2220ebdea4cb512badbf1809074f Mon Sep 17 00:00:00 2001 From: Jose Celano Date: Wed, 7 May 2025 12:58:21 +0100 Subject: [PATCH 7/7] refactor: [#1495] renamings in torrent-repository pkg --- packages/torrent-repository/src/swarms.rs | 189 +++++++++--------- .../torrent-repository/tests/swarms/mod.rs | 6 +- .../src/torrent/repository/in_memory.rs | 6 +- 3 files changed, 102 insertions(+), 99 deletions(-) diff --git a/packages/torrent-repository/src/swarms.rs b/packages/torrent-repository/src/swarms.rs index fb6652ba5..828e8c030 100644 --- a/packages/torrent-repository/src/swarms.rs +++ b/packages/torrent-repository/src/swarms.rs @@ -36,7 +36,7 @@ impl Swarms { /// # Errors /// /// This function panics if the lock for the swarm handle cannot be acquired. - pub fn upsert_peer( + pub fn handle_announcement( &self, info_hash: &InfoHash, peer: &peer::Peer, @@ -55,11 +55,13 @@ impl Swarms { Ok(swarm.handle_announcement(peer)) } - /// Inserts a new swarm. It's only used for testing purposes. It allows to - /// pre-define the initial state of the swarm without having to go through - /// the upsert process. - pub fn insert_swarm(&self, info_hash: &InfoHash, swarm: Swarm) { + /// Inserts a new swarm. + pub fn insert(&self, info_hash: &InfoHash, swarm: Swarm) { // code-review: swarms builder? + // It's only used for testing purposes. It allows to pre-define the + // initial state of the swarm without having to go through the upsert + // process. + let swarm_handle = Arc::new(Mutex::new(swarm)); self.swarms.insert(*info_hash, swarm_handle); } @@ -184,7 +186,12 @@ impl Swarms { /// /// This function returns an error if it fails to acquire the lock for the /// swarm handle. - pub fn get_peers_for(&self, info_hash: &InfoHash, peer: &peer::Peer, limit: usize) -> Result>, Error> { + pub fn get_peers_peers_excluding( + &self, + info_hash: &InfoHash, + peer: &peer::Peer, + limit: usize, + ) -> Result>, Error> { match self.get(info_hash) { None => Ok(vec![]), Some(swarm_handle) => { @@ -208,7 +215,7 @@ impl Swarms { /// /// This function returns an error if it fails to acquire the lock for the /// swarm handle. - pub fn get_torrent_peers(&self, info_hash: &InfoHash, limit: usize) -> Result>, Error> { + pub fn get_swarm_peers(&self, info_hash: &InfoHash, limit: usize) -> Result>, Error> { match self.get(info_hash) { None => Ok(vec![]), Some(swarm_handle) => { @@ -356,25 +363,25 @@ mod tests { #[tokio::test] async fn it_should_add_the_first_peer_to_the_torrent_peer_list() { - let torrent_repository = Arc::new(Swarms::default()); + let swarms = Arc::new(Swarms::default()); let info_hash = sample_info_hash(); - let _number_of_downloads_increased = torrent_repository.upsert_peer(&info_hash, &sample_peer(), None); + let _number_of_downloads_increased = swarms.handle_announcement(&info_hash, &sample_peer(), None); - assert!(torrent_repository.get(&info_hash).is_some()); + assert!(swarms.get(&info_hash).is_some()); } #[tokio::test] async fn it_should_allow_adding_the_same_peer_twice_to_the_torrent_peer_list() { - let torrent_repository = Arc::new(Swarms::default()); + let swarms = Arc::new(Swarms::default()); let info_hash = sample_info_hash(); - let _number_of_downloads_increased = torrent_repository.upsert_peer(&info_hash, &sample_peer(), None); - let _number_of_downloads_increased = torrent_repository.upsert_peer(&info_hash, &sample_peer(), None); + let _number_of_downloads_increased = swarms.handle_announcement(&info_hash, &sample_peer(), None); + let _number_of_downloads_increased = swarms.handle_announcement(&info_hash, &sample_peer(), None); - assert!(torrent_repository.get(&info_hash).is_some()); + assert!(swarms.get(&info_hash).is_some()); } } @@ -393,30 +400,30 @@ mod tests { #[tokio::test] async fn it_should_return_the_peers_for_a_given_torrent() { - let torrent_repository = Arc::new(Swarms::default()); + let swarms = Arc::new(Swarms::default()); let info_hash = sample_info_hash(); let peer = sample_peer(); - let _number_of_downloads_increased = torrent_repository.upsert_peer(&info_hash, &peer, None); + let _number_of_downloads_increased = swarms.handle_announcement(&info_hash, &peer, None); - let peers = torrent_repository.get_torrent_peers(&info_hash, 74).unwrap(); + let peers = swarms.get_swarm_peers(&info_hash, 74).unwrap(); assert_eq!(peers, vec![Arc::new(peer)]); } #[tokio::test] async fn it_should_return_an_empty_list_or_peers_for_a_non_existing_torrent() { - let torrent_repository = Arc::new(Swarms::default()); + let swarms = Arc::new(Swarms::default()); - let peers = torrent_repository.get_torrent_peers(&sample_info_hash(), 74).unwrap(); + let peers = swarms.get_swarm_peers(&sample_info_hash(), 74).unwrap(); assert!(peers.is_empty()); } #[tokio::test] async fn it_should_return_74_peers_at_the_most_for_a_given_torrent() { - let torrent_repository = Arc::new(Swarms::default()); + let swarms = Arc::new(Swarms::default()); let info_hash = sample_info_hash(); @@ -431,10 +438,10 @@ mod tests { event: AnnounceEvent::Completed, }; - let _number_of_downloads_increased = torrent_repository.upsert_peer(&info_hash, &peer, None); + let _number_of_downloads_increased = swarms.handle_announcement(&info_hash, &peer, None); } - let peers = torrent_repository.get_torrent_peers(&info_hash, 74).unwrap(); + let peers = swarms.get_swarm_peers(&info_hash, 74).unwrap(); assert_eq!(peers.len(), 74); } @@ -455,10 +462,10 @@ mod tests { #[tokio::test] async fn it_should_return_an_empty_peer_list_for_a_non_existing_torrent() { - let torrent_repository = Arc::new(Swarms::default()); + let swarms = Arc::new(Swarms::default()); - let peers = torrent_repository - .get_peers_for(&sample_info_hash(), &sample_peer(), TORRENT_PEERS_LIMIT) + let peers = swarms + .get_peers_peers_excluding(&sample_info_hash(), &sample_peer(), TORRENT_PEERS_LIMIT) .unwrap(); assert_eq!(peers, vec![]); @@ -466,15 +473,15 @@ mod tests { #[tokio::test] async fn it_should_return_the_peers_for_a_given_torrent_excluding_a_given_peer() { - let torrent_repository = Arc::new(Swarms::default()); + let swarms = Arc::new(Swarms::default()); let info_hash = sample_info_hash(); let peer = sample_peer(); - let _number_of_downloads_increased = torrent_repository.upsert_peer(&info_hash, &peer, None); + let _number_of_downloads_increased = swarms.handle_announcement(&info_hash, &peer, None); - let peers = torrent_repository - .get_peers_for(&info_hash, &peer, TORRENT_PEERS_LIMIT) + let peers = swarms + .get_peers_peers_excluding(&info_hash, &peer, TORRENT_PEERS_LIMIT) .unwrap(); assert_eq!(peers, vec![]); @@ -482,13 +489,13 @@ mod tests { #[tokio::test] async fn it_should_return_74_peers_at_the_most_for_a_given_torrent_when_it_filters_out_a_given_peer() { - let torrent_repository = Arc::new(Swarms::default()); + let swarms = Arc::new(Swarms::default()); let info_hash = sample_info_hash(); let excluded_peer = sample_peer(); - let _number_of_downloads_increased = torrent_repository.upsert_peer(&info_hash, &excluded_peer, None); + let _number_of_downloads_increased = swarms.handle_announcement(&info_hash, &excluded_peer, None); // Add 74 peers for idx in 2..=75 { @@ -502,11 +509,11 @@ mod tests { event: AnnounceEvent::Completed, }; - let _number_of_downloads_increased = torrent_repository.upsert_peer(&info_hash, &peer, None); + let _number_of_downloads_increased = swarms.handle_announcement(&info_hash, &peer, None); } - let peers = torrent_repository - .get_peers_for(&info_hash, &excluded_peer, TORRENT_PEERS_LIMIT) + let peers = swarms + .get_peers_peers_excluding(&info_hash, &excluded_peer, TORRENT_PEERS_LIMIT) .unwrap(); assert_eq!(peers.len(), 74); @@ -529,67 +536,64 @@ mod tests { #[tokio::test] async fn it_should_remove_a_torrent_entry() { - let torrent_repository = Arc::new(Swarms::default()); + let swarms = Arc::new(Swarms::default()); let info_hash = sample_info_hash(); - let _number_of_downloads_increased = torrent_repository.upsert_peer(&info_hash, &sample_peer(), None); + let _number_of_downloads_increased = swarms.handle_announcement(&info_hash, &sample_peer(), None); - let _unused = torrent_repository.remove(&info_hash); + let _unused = swarms.remove(&info_hash); - assert!(torrent_repository.get(&info_hash).is_none()); + assert!(swarms.get(&info_hash).is_none()); } #[tokio::test] async fn it_should_remove_peers_that_have_not_been_updated_after_a_cutoff_time() { - let torrent_repository = Arc::new(Swarms::default()); + let swarms = Arc::new(Swarms::default()); let info_hash = sample_info_hash(); let mut peer = sample_peer(); peer.updated = DurationSinceUnixEpoch::new(0, 0); - let _number_of_downloads_increased = torrent_repository.upsert_peer(&info_hash, &peer, None); + let _number_of_downloads_increased = swarms.handle_announcement(&info_hash, &peer, None); // Cut off time is 1 second after the peer was updated - torrent_repository + swarms .remove_inactive_peers(peer.updated.add(Duration::from_secs(1))) .unwrap(); - assert!(!torrent_repository - .get_torrent_peers(&info_hash, 74) - .unwrap() - .contains(&Arc::new(peer))); + assert!(!swarms.get_swarm_peers(&info_hash, 74).unwrap().contains(&Arc::new(peer))); } fn initialize_repository_with_one_torrent_without_peers(info_hash: &InfoHash) -> Arc { - let torrent_repository = Arc::new(Swarms::default()); + let swarms = Arc::new(Swarms::default()); // Insert a sample peer for the torrent to force adding the torrent entry let mut peer = sample_peer(); peer.updated = DurationSinceUnixEpoch::new(0, 0); - let _number_of_downloads_increased = torrent_repository.upsert_peer(info_hash, &peer, None); + let _number_of_downloads_increased = swarms.handle_announcement(info_hash, &peer, None); // Remove the peer - torrent_repository + swarms .remove_inactive_peers(peer.updated.add(Duration::from_secs(1))) .unwrap(); - torrent_repository + swarms } #[tokio::test] async fn it_should_remove_torrents_without_peers() { let info_hash = sample_info_hash(); - let torrent_repository = initialize_repository_with_one_torrent_without_peers(&info_hash); + let swarms = initialize_repository_with_one_torrent_without_peers(&info_hash); let tracker_policy = TrackerPolicy { remove_peerless_torrents: true, ..Default::default() }; - torrent_repository.remove_peerless_torrents(&tracker_policy).unwrap(); + swarms.remove_peerless_torrents(&tracker_policy).unwrap(); - assert!(torrent_repository.get(&info_hash).is_none()); + assert!(swarms.get(&info_hash).is_none()); } } mod returning_torrent_entries { @@ -632,14 +636,14 @@ mod tests { #[tokio::test] async fn it_should_return_one_torrent_entry_by_infohash() { - let torrent_repository = Arc::new(Swarms::default()); + let swarms = Arc::new(Swarms::default()); let info_hash = sample_info_hash(); let peer = sample_peer(); - let _number_of_downloads_increased = torrent_repository.upsert_peer(&info_hash, &peer, None); + let _number_of_downloads_increased = swarms.handle_announcement(&info_hash, &peer, None); - let torrent_entry = torrent_repository.get(&info_hash).unwrap(); + let torrent_entry = swarms.get(&info_hash).unwrap(); assert_eq!( TorrentEntryInfo { @@ -666,13 +670,13 @@ mod tests { #[tokio::test] async fn without_pagination() { - let torrent_repository = Arc::new(Swarms::default()); + let swarms = Arc::new(Swarms::default()); let info_hash = sample_info_hash(); let peer = sample_peer(); - let _number_of_downloads_increased = torrent_repository.upsert_peer(&info_hash, &peer, None); + let _number_of_downloads_increased = swarms.handle_announcement(&info_hash, &peer, None); - let torrent_entries = torrent_repository.get_paginated(None); + let torrent_entries = swarms.get_paginated(None); assert_eq!(torrent_entries.len(), 1); @@ -707,20 +711,20 @@ mod tests { #[tokio::test] async fn it_should_return_the_first_page() { - let torrent_repository = Arc::new(Swarms::default()); + let swarms = Arc::new(Swarms::default()); // Insert one torrent entry let info_hash_one = sample_info_hash_one(); let peer_one = sample_peer_one(); - let _number_of_downloads_increased = torrent_repository.upsert_peer(&info_hash_one, &peer_one, None); + let _number_of_downloads_increased = swarms.handle_announcement(&info_hash_one, &peer_one, None); // Insert another torrent entry let info_hash_one = sample_info_hash_alphabetically_ordered_after_sample_info_hash_one(); let peer_two = sample_peer_two(); - let _number_of_downloads_increased = torrent_repository.upsert_peer(&info_hash_one, &peer_two, None); + let _number_of_downloads_increased = swarms.handle_announcement(&info_hash_one, &peer_two, None); // Get only the first page where page size is 1 - let torrent_entries = torrent_repository.get_paginated(Some(&Pagination { offset: 0, limit: 1 })); + let torrent_entries = swarms.get_paginated(Some(&Pagination { offset: 0, limit: 1 })); assert_eq!(torrent_entries.len(), 1); @@ -742,20 +746,20 @@ mod tests { #[tokio::test] async fn it_should_return_the_second_page() { - let torrent_repository = Arc::new(Swarms::default()); + let swarms = Arc::new(Swarms::default()); // Insert one torrent entry let info_hash_one = sample_info_hash_one(); let peer_one = sample_peer_one(); - let _number_of_downloads_increased = torrent_repository.upsert_peer(&info_hash_one, &peer_one, None); + let _number_of_downloads_increased = swarms.handle_announcement(&info_hash_one, &peer_one, None); // Insert another torrent entry let info_hash_one = sample_info_hash_alphabetically_ordered_after_sample_info_hash_one(); let peer_two = sample_peer_two(); - let _number_of_downloads_increased = torrent_repository.upsert_peer(&info_hash_one, &peer_two, None); + let _number_of_downloads_increased = swarms.handle_announcement(&info_hash_one, &peer_two, None); // Get only the first page where page size is 1 - let torrent_entries = torrent_repository.get_paginated(Some(&Pagination { offset: 1, limit: 1 })); + let torrent_entries = swarms.get_paginated(Some(&Pagination { offset: 1, limit: 1 })); assert_eq!(torrent_entries.len(), 1); @@ -777,20 +781,20 @@ mod tests { #[tokio::test] async fn it_should_allow_changing_the_page_size() { - let torrent_repository = Arc::new(Swarms::default()); + let swarms = Arc::new(Swarms::default()); // Insert one torrent entry let info_hash_one = sample_info_hash_one(); let peer_one = sample_peer_one(); - let _number_of_downloads_increased = torrent_repository.upsert_peer(&info_hash_one, &peer_one, None); + let _number_of_downloads_increased = swarms.handle_announcement(&info_hash_one, &peer_one, None); // Insert another torrent entry let info_hash_one = sample_info_hash_alphabetically_ordered_after_sample_info_hash_one(); let peer_two = sample_peer_two(); - let _number_of_downloads_increased = torrent_repository.upsert_peer(&info_hash_one, &peer_two, None); + let _number_of_downloads_increased = swarms.handle_announcement(&info_hash_one, &peer_two, None); // Get only the first page where page size is 1 - let torrent_entries = torrent_repository.get_paginated(Some(&Pagination { offset: 1, limit: 1 })); + let torrent_entries = swarms.get_paginated(Some(&Pagination { offset: 1, limit: 1 })); assert_eq!(torrent_entries.len(), 1); } @@ -812,9 +816,9 @@ mod tests { #[tokio::test] async fn it_should_get_empty_aggregate_swarm_metadata_when_there_are_no_torrents() { - let torrent_repository = Arc::new(Swarms::default()); + let swarms = Arc::new(Swarms::default()); - let aggregate_swarm_metadata = torrent_repository.get_aggregate_swarm_metadata().unwrap(); + let aggregate_swarm_metadata = swarms.get_aggregate_swarm_metadata().unwrap(); assert_eq!( aggregate_swarm_metadata, @@ -829,11 +833,11 @@ mod tests { #[tokio::test] async fn it_should_return_the_aggregate_swarm_metadata_when_there_is_a_leecher() { - let torrent_repository = Arc::new(Swarms::default()); + let swarms = Arc::new(Swarms::default()); - let _number_of_downloads_increased = torrent_repository.upsert_peer(&sample_info_hash(), &leecher(), None); + let _number_of_downloads_increased = swarms.handle_announcement(&sample_info_hash(), &leecher(), None); - let aggregate_swarm_metadata = torrent_repository.get_aggregate_swarm_metadata().unwrap(); + let aggregate_swarm_metadata = swarms.get_aggregate_swarm_metadata().unwrap(); assert_eq!( aggregate_swarm_metadata, @@ -848,11 +852,11 @@ mod tests { #[tokio::test] async fn it_should_return_the_aggregate_swarm_metadata_when_there_is_a_seeder() { - let torrent_repository = Arc::new(Swarms::default()); + let swarms = Arc::new(Swarms::default()); - let _number_of_downloads_increased = torrent_repository.upsert_peer(&sample_info_hash(), &seeder(), None); + let _number_of_downloads_increased = swarms.handle_announcement(&sample_info_hash(), &seeder(), None); - let aggregate_swarm_metadata = torrent_repository.get_aggregate_swarm_metadata().unwrap(); + let aggregate_swarm_metadata = swarms.get_aggregate_swarm_metadata().unwrap(); assert_eq!( aggregate_swarm_metadata, @@ -867,11 +871,11 @@ mod tests { #[tokio::test] async fn it_should_return_the_aggregate_swarm_metadata_when_there_is_a_completed_peer() { - let torrent_repository = Arc::new(Swarms::default()); + let swarms = Arc::new(Swarms::default()); - let _number_of_downloads_increased = torrent_repository.upsert_peer(&sample_info_hash(), &complete_peer(), None); + let _number_of_downloads_increased = swarms.handle_announcement(&sample_info_hash(), &complete_peer(), None); - let aggregate_swarm_metadata = torrent_repository.get_aggregate_swarm_metadata().unwrap(); + let aggregate_swarm_metadata = swarms.get_aggregate_swarm_metadata().unwrap(); assert_eq!( aggregate_swarm_metadata, @@ -886,17 +890,16 @@ mod tests { #[tokio::test] async fn it_should_return_the_aggregate_swarm_metadata_when_there_are_multiple_torrents() { - let torrent_repository = Arc::new(Swarms::default()); + let swarms = Arc::new(Swarms::default()); let start_time = std::time::Instant::now(); for i in 0..1_000_000 { - let _number_of_downloads_increased = - torrent_repository.upsert_peer(&gen_seeded_infohash(&i), &leecher(), None); + let _number_of_downloads_increased = swarms.handle_announcement(&gen_seeded_infohash(&i), &leecher(), None); } let result_a = start_time.elapsed(); let start_time = std::time::Instant::now(); - let aggregate_swarm_metadata = torrent_repository.get_aggregate_swarm_metadata().unwrap(); + let aggregate_swarm_metadata = swarms.get_aggregate_swarm_metadata().unwrap(); let result_b = start_time.elapsed(); assert_eq!( @@ -923,13 +926,13 @@ mod tests { #[tokio::test] async fn it_should_get_swarm_metadata_for_an_existing_torrent() { - let torrent_repository = Arc::new(Swarms::default()); + let swarms = Arc::new(Swarms::default()); let infohash = sample_info_hash(); - let _number_of_downloads_increased = torrent_repository.upsert_peer(&infohash, &leecher(), None); + let _number_of_downloads_increased = swarms.handle_announcement(&infohash, &leecher(), None); - let swarm_metadata = torrent_repository.get_swarm_metadata_or_default(&infohash).unwrap(); + let swarm_metadata = swarms.get_swarm_metadata_or_default(&infohash).unwrap(); assert_eq!( swarm_metadata, @@ -943,9 +946,9 @@ mod tests { #[tokio::test] async fn it_should_return_zeroed_swarm_metadata_for_a_non_existing_torrent() { - let torrent_repository = Arc::new(Swarms::default()); + let swarms = Arc::new(Swarms::default()); - let swarm_metadata = torrent_repository.get_swarm_metadata_or_default(&sample_info_hash()).unwrap(); + let swarm_metadata = swarms.get_swarm_metadata_or_default(&sample_info_hash()).unwrap(); assert_eq!(swarm_metadata, SwarmMetadata::zeroed()); } @@ -962,7 +965,7 @@ mod tests { #[tokio::test] async fn it_should_allow_importing_persisted_torrent_entries() { - let torrent_repository = Arc::new(Swarms::default()); + let swarms = Arc::new(Swarms::default()); let infohash = sample_info_hash(); @@ -970,9 +973,9 @@ mod tests { persistent_torrents.insert(infohash, 1); - torrent_repository.import_persistent(&persistent_torrents); + swarms.import_persistent(&persistent_torrents); - let swarm_metadata = torrent_repository.get_swarm_metadata_or_default(&infohash).unwrap(); + let swarm_metadata = swarms.get_swarm_metadata_or_default(&infohash).unwrap(); // Only the number of downloads is persisted. assert_eq!(swarm_metadata.downloaded, 1); diff --git a/packages/torrent-repository/tests/swarms/mod.rs b/packages/torrent-repository/tests/swarms/mod.rs index 43571eb83..8e58b9e76 100644 --- a/packages/torrent-repository/tests/swarms/mod.rs +++ b/packages/torrent-repository/tests/swarms/mod.rs @@ -149,7 +149,7 @@ fn persistent_three() -> PersistentTorrents { fn make(swarms: &Swarms, entries: &Entries) { for (info_hash, swarm) in entries { - swarms.insert_swarm(info_hash, swarm.clone()); + swarms.insert(info_hash, swarm.clone()); } } @@ -435,7 +435,7 @@ async fn it_should_remove_inactive_peers(#[values(swarms())] swarms: Swarms, #[c // Insert the infohash and peer into the repository // and verify there is an extra torrent entry. { - swarms.upsert_peer(&info_hash, &peer, None).unwrap(); + swarms.handle_announcement(&info_hash, &peer, None).unwrap(); assert_eq!( swarms.get_aggregate_swarm_metadata().unwrap().total_torrents, entries.len() as u64 + 1 @@ -445,7 +445,7 @@ async fn it_should_remove_inactive_peers(#[values(swarms())] swarms: Swarms, #[c // Insert the infohash and peer into the repository // and verify the swarm metadata was updated. { - swarms.upsert_peer(&info_hash, &peer, None).unwrap(); + swarms.handle_announcement(&info_hash, &peer, None).unwrap(); let stats = swarms.get_swarm_metadata(&info_hash).unwrap(); assert_eq!( stats, diff --git a/packages/tracker-core/src/torrent/repository/in_memory.rs b/packages/tracker-core/src/torrent/repository/in_memory.rs index 8c93f3605..38593bf3c 100644 --- a/packages/tracker-core/src/torrent/repository/in_memory.rs +++ b/packages/tracker-core/src/torrent/repository/in_memory.rs @@ -51,7 +51,7 @@ impl InMemoryTorrentRepository { opt_persistent_torrent: Option, ) -> bool { self.swarms - .upsert_peer(info_hash, peer, opt_persistent_torrent) + .handle_announcement(info_hash, peer, opt_persistent_torrent) .expect("Failed to upsert the peer in swarms") } @@ -192,7 +192,7 @@ impl InMemoryTorrentRepository { #[must_use] pub(crate) fn get_peers_for(&self, info_hash: &InfoHash, peer: &peer::Peer, limit: usize) -> Vec> { self.swarms - .get_peers_for(info_hash, peer, max(limit, TORRENT_PEERS_LIMIT)) + .get_peers_peers_excluding(info_hash, peer, max(limit, TORRENT_PEERS_LIMIT)) .expect("Failed to get other peers in swarm") } @@ -217,7 +217,7 @@ impl InMemoryTorrentRepository { pub fn get_torrent_peers(&self, info_hash: &InfoHash) -> Vec> { // todo: pass the limit as an argument like `get_peers_for` self.swarms - .get_torrent_peers(info_hash, TORRENT_PEERS_LIMIT) + .get_swarm_peers(info_hash, TORRENT_PEERS_LIMIT) .expect("Failed to get other peers in swarm") }