diff --git a/Cargo.lock b/Cargo.lock index eea957f88..093b8e9b0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4850,12 +4850,12 @@ dependencies = [ "crossbeam-skiplist", "rand 0.9.1", "rstest", + "thiserror 2.0.12", "tokio", "torrust-tracker-clock", "torrust-tracker-configuration", "torrust-tracker-primitives", "torrust-tracker-test-helpers", - "tracing", ] [[package]] diff --git a/packages/torrent-repository/Cargo.toml b/packages/torrent-repository/Cargo.toml index e584fadf4..2cc02a720 100644 --- a/packages/torrent-repository/Cargo.toml +++ b/packages/torrent-repository/Cargo.toml @@ -19,11 +19,11 @@ version.workspace = true aquatic_udp_protocol = "0" bittorrent-primitives = "0.1.0" crossbeam-skiplist = "0" +thiserror = "2.0.12" tokio = { version = "1", features = ["macros", "net", "rt-multi-thread", "signal", "sync"] } torrust-tracker-clock = { version = "3.0.0-develop", path = "../clock" } torrust-tracker-configuration = { version = "3.0.0-develop", path = "../configuration" } torrust-tracker-primitives = { version = "3.0.0-develop", path = "../primitives" } -tracing = "0" [dev-dependencies] async-std = { version = "1", features = ["attributes", "tokio1"] } diff --git a/packages/torrent-repository/src/lib.rs b/packages/torrent-repository/src/lib.rs index c985f7a2b..a4e7d9c5d 100644 --- a/packages/torrent-repository/src/lib.rs +++ b/packages/torrent-repository/src/lib.rs @@ -23,7 +23,7 @@ pub trait LockTrackedTorrent { fn lock_or_panic(&self) -> MutexGuard<'_, Swarm>; } -impl LockTrackedTorrent for Arc> { +impl LockTrackedTorrent for SwarmHandle { fn lock_or_panic(&self) -> MutexGuard<'_, Swarm> { self.lock().expect("can't acquire lock for tracked torrent handle") } diff --git a/packages/torrent-repository/src/swarms.rs b/packages/torrent-repository/src/swarms.rs index 936f49d22..828e8c030 100644 --- a/packages/torrent-repository/src/swarms.rs +++ b/packages/torrent-repository/src/swarms.rs @@ -1,4 +1,4 @@ -use std::sync::Arc; +use std::sync::{Arc, Mutex}; use bittorrent_primitives::info_hash::InfoHash; use crossbeam_skiplist::SkipMap; @@ -8,12 +8,11 @@ use torrust_tracker_primitives::swarm_metadata::{AggregateSwarmMetadata, SwarmMe use torrust_tracker_primitives::{peer, DurationSinceUnixEpoch, PersistentTorrent, PersistentTorrents}; use crate::swarm::Swarm; -use crate::{LockTrackedTorrent, SwarmHandle}; +use crate::SwarmHandle; #[derive(Default, Debug)] pub struct Swarms { - // todo: this needs to be public only to insert a peerless torrent (empty swarm). - pub swarms: SkipMap, + swarms: SkipMap, } impl Swarms { @@ -34,34 +33,37 @@ impl Swarms { /// Returns `true` if the number of downloads was increased because the peer /// completed the download. /// - /// # Panics + /// # Errors /// - /// This function panics if the lock for the entry cannot be obtained. - pub fn upsert_peer( + /// This function panics if the lock for the swarm handle cannot be acquired. + pub fn handle_announcement( &self, info_hash: &InfoHash, peer: &peer::Peer, opt_persistent_torrent: Option, - ) -> bool { - if let Some(existing_entry) = self.swarms.get(info_hash) { - tracing::debug!("Torrent already exists: {:?}", info_hash); - - existing_entry.value().lock_or_panic().handle_announcement(peer) + ) -> Result { + let swarm_handle = if let Some(number_of_downloads) = opt_persistent_torrent { + SwarmHandle::new(Swarm::new(number_of_downloads).into()) } else { - tracing::debug!("Inserting new torrent: {:?}", info_hash); + SwarmHandle::default() + }; - let new_entry = if let Some(number_of_downloads) = opt_persistent_torrent { - SwarmHandle::new(Swarm::new(number_of_downloads).into()) - } else { - SwarmHandle::default() - }; + let swarm_handle = self.swarms.get_or_insert(*info_hash, swarm_handle); - let inserted_entry = self.swarms.get_or_insert(*info_hash, new_entry); + let mut swarm = swarm_handle.value().lock()?; - let mut torrent_guard = inserted_entry.value().lock_or_panic(); + Ok(swarm.handle_announcement(peer)) + } - torrent_guard.handle_announcement(peer) - } + /// Inserts a new swarm. + pub fn insert(&self, info_hash: &InfoHash, swarm: Swarm) { + // code-review: swarms builder? + // It's only used for testing purposes. It allows to pre-define the + // initial state of the swarm without having to go through the upsert + // process. + + let swarm_handle = Arc::new(Mutex::new(swarm)); + self.swarms.insert(*info_hash, swarm_handle); } /// Removes a torrent entry from the repository. @@ -79,13 +81,17 @@ impl Swarms { /// A peer is considered inactive if its last update timestamp is older than /// the provided cutoff time. /// - /// # Panics + /// # Errors /// - /// This function panics if the lock for the entry cannot be obtained. - pub fn remove_inactive_peers(&self, current_cutoff: DurationSinceUnixEpoch) { - for entry in &self.swarms { - entry.value().lock_or_panic().remove_inactive(current_cutoff); + /// This function returns an error if it fails to acquire the lock for any + /// swarm handle. + pub fn remove_inactive_peers(&self, current_cutoff: DurationSinceUnixEpoch) -> Result<(), Error> { + for swarm_handle in &self.swarms { + let mut swarm = swarm_handle.value().lock()?; + swarm.remove_inactive(current_cutoff); } + + Ok(()) } /// Retrieves a tracked torrent handle by its infohash. @@ -132,14 +138,17 @@ impl Swarms { /// /// A `SwarmMetadata` struct containing the aggregated torrent data if found. /// - /// # Panics + /// # Errors /// - /// This function panics if the lock for the entry cannot be obtained. - #[must_use] - pub fn get_swarm_metadata(&self, info_hash: &InfoHash) -> Option { - self.swarms - .get(info_hash) - .map(|entry| entry.value().lock_or_panic().metadata()) + /// This function panics if the lock for the swarm handle cannot be acquired. + pub fn get_swarm_metadata(&self, info_hash: &InfoHash) -> Result, Error> { + match self.swarms.get(info_hash) { + None => Ok(None), + Some(swarm_handle) => { + let swarm = swarm_handle.value().lock()?; + Ok(Some(swarm.metadata())) + } + } } /// Retrieves swarm metadata for a given torrent. @@ -148,11 +157,16 @@ impl Swarms { /// /// A `SwarmMetadata` struct containing the aggregated torrent data if it's /// found or a zeroed metadata struct if not. - #[must_use] - pub fn get_swarm_metadata_or_default(&self, info_hash: &InfoHash) -> SwarmMetadata { + /// + /// # Errors + /// + /// This function returns an error if it fails to acquire the lock for the + /// swarm handle. + pub fn get_swarm_metadata_or_default(&self, info_hash: &InfoHash) -> Result { match self.get_swarm_metadata(info_hash) { - Some(swarm_metadata) => swarm_metadata, - None => SwarmMetadata::zeroed(), + Ok(Some(swarm_metadata)) => Ok(swarm_metadata), + Ok(None) => Ok(SwarmMetadata::zeroed()), + Err(err) => Err(err), } } @@ -168,14 +182,22 @@ impl Swarms { /// A vector of peers (wrapped in `Arc`) representing the active peers for /// the torrent, excluding the requesting client. /// - /// # Panics + /// # Errors /// - /// This function panics if the lock for the torrent entry cannot be obtained. - #[must_use] - pub fn get_peers_for(&self, info_hash: &InfoHash, peer: &peer::Peer, limit: usize) -> Vec> { + /// This function returns an error if it fails to acquire the lock for the + /// swarm handle. + pub fn get_peers_peers_excluding( + &self, + info_hash: &InfoHash, + peer: &peer::Peer, + limit: usize, + ) -> Result>, Error> { match self.get(info_hash) { - None => vec![], - Some(entry) => entry.lock_or_panic().peers_excluding(&peer.peer_addr, Some(limit)), + None => Ok(vec![]), + Some(swarm_handle) => { + let swarm = swarm_handle.lock()?; + Ok(swarm.peers_excluding(&peer.peer_addr, Some(limit))) + } } } @@ -189,14 +211,17 @@ impl Swarms { /// A vector of peers (wrapped in `Arc`) representing the active peers for /// the torrent. /// - /// # Panics + /// # Errors /// - /// This function panics if the lock for the torrent entry cannot be obtained. - #[must_use] - pub fn get_torrent_peers(&self, info_hash: &InfoHash, limit: usize) -> Vec> { + /// This function returns an error if it fails to acquire the lock for the + /// swarm handle. + pub fn get_swarm_peers(&self, info_hash: &InfoHash, limit: usize) -> Result>, Error> { match self.get(info_hash) { - None => vec![], - Some(entry) => entry.lock_or_panic().peers(Some(limit)), + None => Ok(vec![]), + Some(swarm_handle) => { + let swarm = swarm_handle.lock()?; + Ok(swarm.peers(Some(limit))) + } } } @@ -205,17 +230,22 @@ impl Swarms { /// Depending on the tracker policy, torrents without any peers may be /// removed to conserve memory. /// - /// # Panics + /// # Errors /// - /// This function panics if the lock for the entry cannot be obtained. - pub fn remove_peerless_torrents(&self, policy: &TrackerPolicy) { - for entry in &self.swarms { - if entry.value().lock_or_panic().meets_retaining_policy(policy) { + /// This function returns an error if it fails to acquire the lock for any + /// swarm handle. + pub fn remove_peerless_torrents(&self, policy: &TrackerPolicy) -> Result<(), Error> { + for swarm_handle in &self.swarms { + let swarm = swarm_handle.value().lock()?; + + if swarm.meets_retaining_policy(policy) { continue; } - entry.remove(); + swarm_handle.remove(); } + + Ok(()) } /// Imports persistent torrent data into the in-memory repository. @@ -247,22 +277,45 @@ impl Swarms { /// /// A [`AggregateSwarmMetadata`] struct with the aggregated metrics. /// - /// # Panics + /// # Errors /// - /// This function panics if the lock for the entry cannot be obtained. - #[must_use] - pub fn get_aggregate_swarm_metadata(&self) -> AggregateSwarmMetadata { + /// This function returns an error if it fails to acquire the lock for any + /// swarm handle. + pub fn get_aggregate_swarm_metadata(&self) -> Result { let mut metrics = AggregateSwarmMetadata::default(); for entry in &self.swarms { - let stats = entry.value().lock_or_panic().metadata(); + let swarm = entry.value().lock()?; + let stats = swarm.metadata(); metrics.total_complete += u64::from(stats.complete); metrics.total_downloaded += u64::from(stats.downloaded); metrics.total_incomplete += u64::from(stats.incomplete); metrics.total_torrents += 1; } - metrics + Ok(metrics) + } + + #[must_use] + pub fn len(&self) -> usize { + self.swarms.len() + } + + #[must_use] + pub fn is_empty(&self) -> bool { + self.swarms.is_empty() + } +} + +#[derive(thiserror::Error, Debug, Clone)] +pub enum Error { + #[error("Can't acquire swarm lock")] + CannotAcquireSwarmLock, +} + +impl From>> for Error { + fn from(_error: std::sync::PoisonError>) -> Self { + Error::CannotAcquireSwarmLock } } @@ -310,25 +363,25 @@ mod tests { #[tokio::test] async fn it_should_add_the_first_peer_to_the_torrent_peer_list() { - let torrent_repository = Arc::new(Swarms::default()); + let swarms = Arc::new(Swarms::default()); let info_hash = sample_info_hash(); - let _number_of_downloads_increased = torrent_repository.upsert_peer(&info_hash, &sample_peer(), None); + let _number_of_downloads_increased = swarms.handle_announcement(&info_hash, &sample_peer(), None); - assert!(torrent_repository.get(&info_hash).is_some()); + assert!(swarms.get(&info_hash).is_some()); } #[tokio::test] async fn it_should_allow_adding_the_same_peer_twice_to_the_torrent_peer_list() { - let torrent_repository = Arc::new(Swarms::default()); + let swarms = Arc::new(Swarms::default()); let info_hash = sample_info_hash(); - let _number_of_downloads_increased = torrent_repository.upsert_peer(&info_hash, &sample_peer(), None); - let _number_of_downloads_increased = torrent_repository.upsert_peer(&info_hash, &sample_peer(), None); + let _number_of_downloads_increased = swarms.handle_announcement(&info_hash, &sample_peer(), None); + let _number_of_downloads_increased = swarms.handle_announcement(&info_hash, &sample_peer(), None); - assert!(torrent_repository.get(&info_hash).is_some()); + assert!(swarms.get(&info_hash).is_some()); } } @@ -347,30 +400,30 @@ mod tests { #[tokio::test] async fn it_should_return_the_peers_for_a_given_torrent() { - let torrent_repository = Arc::new(Swarms::default()); + let swarms = Arc::new(Swarms::default()); let info_hash = sample_info_hash(); let peer = sample_peer(); - let _number_of_downloads_increased = torrent_repository.upsert_peer(&info_hash, &peer, None); + let _number_of_downloads_increased = swarms.handle_announcement(&info_hash, &peer, None); - let peers = torrent_repository.get_torrent_peers(&info_hash, 74); + let peers = swarms.get_swarm_peers(&info_hash, 74).unwrap(); assert_eq!(peers, vec![Arc::new(peer)]); } #[tokio::test] async fn it_should_return_an_empty_list_or_peers_for_a_non_existing_torrent() { - let torrent_repository = Arc::new(Swarms::default()); + let swarms = Arc::new(Swarms::default()); - let peers = torrent_repository.get_torrent_peers(&sample_info_hash(), 74); + let peers = swarms.get_swarm_peers(&sample_info_hash(), 74).unwrap(); assert!(peers.is_empty()); } #[tokio::test] async fn it_should_return_74_peers_at_the_most_for_a_given_torrent() { - let torrent_repository = Arc::new(Swarms::default()); + let swarms = Arc::new(Swarms::default()); let info_hash = sample_info_hash(); @@ -385,10 +438,10 @@ mod tests { event: AnnounceEvent::Completed, }; - let _number_of_downloads_increased = torrent_repository.upsert_peer(&info_hash, &peer, None); + let _number_of_downloads_increased = swarms.handle_announcement(&info_hash, &peer, None); } - let peers = torrent_repository.get_torrent_peers(&info_hash, 74); + let peers = swarms.get_swarm_peers(&info_hash, 74).unwrap(); assert_eq!(peers.len(), 74); } @@ -409,36 +462,40 @@ mod tests { #[tokio::test] async fn it_should_return_an_empty_peer_list_for_a_non_existing_torrent() { - let torrent_repository = Arc::new(Swarms::default()); + let swarms = Arc::new(Swarms::default()); - let peers = torrent_repository.get_peers_for(&sample_info_hash(), &sample_peer(), TORRENT_PEERS_LIMIT); + let peers = swarms + .get_peers_peers_excluding(&sample_info_hash(), &sample_peer(), TORRENT_PEERS_LIMIT) + .unwrap(); assert_eq!(peers, vec![]); } #[tokio::test] async fn it_should_return_the_peers_for_a_given_torrent_excluding_a_given_peer() { - let torrent_repository = Arc::new(Swarms::default()); + let swarms = Arc::new(Swarms::default()); let info_hash = sample_info_hash(); let peer = sample_peer(); - let _number_of_downloads_increased = torrent_repository.upsert_peer(&info_hash, &peer, None); + let _number_of_downloads_increased = swarms.handle_announcement(&info_hash, &peer, None); - let peers = torrent_repository.get_peers_for(&info_hash, &peer, TORRENT_PEERS_LIMIT); + let peers = swarms + .get_peers_peers_excluding(&info_hash, &peer, TORRENT_PEERS_LIMIT) + .unwrap(); assert_eq!(peers, vec![]); } #[tokio::test] async fn it_should_return_74_peers_at_the_most_for_a_given_torrent_when_it_filters_out_a_given_peer() { - let torrent_repository = Arc::new(Swarms::default()); + let swarms = Arc::new(Swarms::default()); let info_hash = sample_info_hash(); let excluded_peer = sample_peer(); - let _number_of_downloads_increased = torrent_repository.upsert_peer(&info_hash, &excluded_peer, None); + let _number_of_downloads_increased = swarms.handle_announcement(&info_hash, &excluded_peer, None); // Add 74 peers for idx in 2..=75 { @@ -452,10 +509,12 @@ mod tests { event: AnnounceEvent::Completed, }; - let _number_of_downloads_increased = torrent_repository.upsert_peer(&info_hash, &peer, None); + let _number_of_downloads_increased = swarms.handle_announcement(&info_hash, &peer, None); } - let peers = torrent_repository.get_peers_for(&info_hash, &excluded_peer, TORRENT_PEERS_LIMIT); + let peers = swarms + .get_peers_peers_excluding(&info_hash, &excluded_peer, TORRENT_PEERS_LIMIT) + .unwrap(); assert_eq!(peers.len(), 74); } @@ -477,60 +536,64 @@ mod tests { #[tokio::test] async fn it_should_remove_a_torrent_entry() { - let torrent_repository = Arc::new(Swarms::default()); + let swarms = Arc::new(Swarms::default()); let info_hash = sample_info_hash(); - let _number_of_downloads_increased = torrent_repository.upsert_peer(&info_hash, &sample_peer(), None); + let _number_of_downloads_increased = swarms.handle_announcement(&info_hash, &sample_peer(), None); - let _unused = torrent_repository.remove(&info_hash); + let _unused = swarms.remove(&info_hash); - assert!(torrent_repository.get(&info_hash).is_none()); + assert!(swarms.get(&info_hash).is_none()); } #[tokio::test] async fn it_should_remove_peers_that_have_not_been_updated_after_a_cutoff_time() { - let torrent_repository = Arc::new(Swarms::default()); + let swarms = Arc::new(Swarms::default()); let info_hash = sample_info_hash(); let mut peer = sample_peer(); peer.updated = DurationSinceUnixEpoch::new(0, 0); - let _number_of_downloads_increased = torrent_repository.upsert_peer(&info_hash, &peer, None); + let _number_of_downloads_increased = swarms.handle_announcement(&info_hash, &peer, None); // Cut off time is 1 second after the peer was updated - torrent_repository.remove_inactive_peers(peer.updated.add(Duration::from_secs(1))); + swarms + .remove_inactive_peers(peer.updated.add(Duration::from_secs(1))) + .unwrap(); - assert!(!torrent_repository.get_torrent_peers(&info_hash, 74).contains(&Arc::new(peer))); + assert!(!swarms.get_swarm_peers(&info_hash, 74).unwrap().contains(&Arc::new(peer))); } fn initialize_repository_with_one_torrent_without_peers(info_hash: &InfoHash) -> Arc { - let torrent_repository = Arc::new(Swarms::default()); + let swarms = Arc::new(Swarms::default()); // Insert a sample peer for the torrent to force adding the torrent entry let mut peer = sample_peer(); peer.updated = DurationSinceUnixEpoch::new(0, 0); - let _number_of_downloads_increased = torrent_repository.upsert_peer(info_hash, &peer, None); + let _number_of_downloads_increased = swarms.handle_announcement(info_hash, &peer, None); // Remove the peer - torrent_repository.remove_inactive_peers(peer.updated.add(Duration::from_secs(1))); + swarms + .remove_inactive_peers(peer.updated.add(Duration::from_secs(1))) + .unwrap(); - torrent_repository + swarms } #[tokio::test] async fn it_should_remove_torrents_without_peers() { let info_hash = sample_info_hash(); - let torrent_repository = initialize_repository_with_one_torrent_without_peers(&info_hash); + let swarms = initialize_repository_with_one_torrent_without_peers(&info_hash); let tracker_policy = TrackerPolicy { remove_peerless_torrents: true, ..Default::default() }; - torrent_repository.remove_peerless_torrents(&tracker_policy); + swarms.remove_peerless_torrents(&tracker_policy).unwrap(); - assert!(torrent_repository.get(&info_hash).is_none()); + assert!(swarms.get(&info_hash).is_none()); } } mod returning_torrent_entries { @@ -573,14 +636,14 @@ mod tests { #[tokio::test] async fn it_should_return_one_torrent_entry_by_infohash() { - let torrent_repository = Arc::new(Swarms::default()); + let swarms = Arc::new(Swarms::default()); let info_hash = sample_info_hash(); let peer = sample_peer(); - let _number_of_downloads_increased = torrent_repository.upsert_peer(&info_hash, &peer, None); + let _number_of_downloads_increased = swarms.handle_announcement(&info_hash, &peer, None); - let torrent_entry = torrent_repository.get(&info_hash).unwrap(); + let torrent_entry = swarms.get(&info_hash).unwrap(); assert_eq!( TorrentEntryInfo { @@ -607,13 +670,13 @@ mod tests { #[tokio::test] async fn without_pagination() { - let torrent_repository = Arc::new(Swarms::default()); + let swarms = Arc::new(Swarms::default()); let info_hash = sample_info_hash(); let peer = sample_peer(); - let _number_of_downloads_increased = torrent_repository.upsert_peer(&info_hash, &peer, None); + let _number_of_downloads_increased = swarms.handle_announcement(&info_hash, &peer, None); - let torrent_entries = torrent_repository.get_paginated(None); + let torrent_entries = swarms.get_paginated(None); assert_eq!(torrent_entries.len(), 1); @@ -648,20 +711,20 @@ mod tests { #[tokio::test] async fn it_should_return_the_first_page() { - let torrent_repository = Arc::new(Swarms::default()); + let swarms = Arc::new(Swarms::default()); // Insert one torrent entry let info_hash_one = sample_info_hash_one(); let peer_one = sample_peer_one(); - let _number_of_downloads_increased = torrent_repository.upsert_peer(&info_hash_one, &peer_one, None); + let _number_of_downloads_increased = swarms.handle_announcement(&info_hash_one, &peer_one, None); // Insert another torrent entry let info_hash_one = sample_info_hash_alphabetically_ordered_after_sample_info_hash_one(); let peer_two = sample_peer_two(); - let _number_of_downloads_increased = torrent_repository.upsert_peer(&info_hash_one, &peer_two, None); + let _number_of_downloads_increased = swarms.handle_announcement(&info_hash_one, &peer_two, None); // Get only the first page where page size is 1 - let torrent_entries = torrent_repository.get_paginated(Some(&Pagination { offset: 0, limit: 1 })); + let torrent_entries = swarms.get_paginated(Some(&Pagination { offset: 0, limit: 1 })); assert_eq!(torrent_entries.len(), 1); @@ -683,20 +746,20 @@ mod tests { #[tokio::test] async fn it_should_return_the_second_page() { - let torrent_repository = Arc::new(Swarms::default()); + let swarms = Arc::new(Swarms::default()); // Insert one torrent entry let info_hash_one = sample_info_hash_one(); let peer_one = sample_peer_one(); - let _number_of_downloads_increased = torrent_repository.upsert_peer(&info_hash_one, &peer_one, None); + let _number_of_downloads_increased = swarms.handle_announcement(&info_hash_one, &peer_one, None); // Insert another torrent entry let info_hash_one = sample_info_hash_alphabetically_ordered_after_sample_info_hash_one(); let peer_two = sample_peer_two(); - let _number_of_downloads_increased = torrent_repository.upsert_peer(&info_hash_one, &peer_two, None); + let _number_of_downloads_increased = swarms.handle_announcement(&info_hash_one, &peer_two, None); // Get only the first page where page size is 1 - let torrent_entries = torrent_repository.get_paginated(Some(&Pagination { offset: 1, limit: 1 })); + let torrent_entries = swarms.get_paginated(Some(&Pagination { offset: 1, limit: 1 })); assert_eq!(torrent_entries.len(), 1); @@ -718,20 +781,20 @@ mod tests { #[tokio::test] async fn it_should_allow_changing_the_page_size() { - let torrent_repository = Arc::new(Swarms::default()); + let swarms = Arc::new(Swarms::default()); // Insert one torrent entry let info_hash_one = sample_info_hash_one(); let peer_one = sample_peer_one(); - let _number_of_downloads_increased = torrent_repository.upsert_peer(&info_hash_one, &peer_one, None); + let _number_of_downloads_increased = swarms.handle_announcement(&info_hash_one, &peer_one, None); // Insert another torrent entry let info_hash_one = sample_info_hash_alphabetically_ordered_after_sample_info_hash_one(); let peer_two = sample_peer_two(); - let _number_of_downloads_increased = torrent_repository.upsert_peer(&info_hash_one, &peer_two, None); + let _number_of_downloads_increased = swarms.handle_announcement(&info_hash_one, &peer_two, None); // Get only the first page where page size is 1 - let torrent_entries = torrent_repository.get_paginated(Some(&Pagination { offset: 1, limit: 1 })); + let torrent_entries = swarms.get_paginated(Some(&Pagination { offset: 1, limit: 1 })); assert_eq!(torrent_entries.len(), 1); } @@ -753,9 +816,9 @@ mod tests { #[tokio::test] async fn it_should_get_empty_aggregate_swarm_metadata_when_there_are_no_torrents() { - let torrent_repository = Arc::new(Swarms::default()); + let swarms = Arc::new(Swarms::default()); - let aggregate_swarm_metadata = torrent_repository.get_aggregate_swarm_metadata(); + let aggregate_swarm_metadata = swarms.get_aggregate_swarm_metadata().unwrap(); assert_eq!( aggregate_swarm_metadata, @@ -770,11 +833,11 @@ mod tests { #[tokio::test] async fn it_should_return_the_aggregate_swarm_metadata_when_there_is_a_leecher() { - let torrent_repository = Arc::new(Swarms::default()); + let swarms = Arc::new(Swarms::default()); - let _number_of_downloads_increased = torrent_repository.upsert_peer(&sample_info_hash(), &leecher(), None); + let _number_of_downloads_increased = swarms.handle_announcement(&sample_info_hash(), &leecher(), None); - let aggregate_swarm_metadata = torrent_repository.get_aggregate_swarm_metadata(); + let aggregate_swarm_metadata = swarms.get_aggregate_swarm_metadata().unwrap(); assert_eq!( aggregate_swarm_metadata, @@ -789,11 +852,11 @@ mod tests { #[tokio::test] async fn it_should_return_the_aggregate_swarm_metadata_when_there_is_a_seeder() { - let torrent_repository = Arc::new(Swarms::default()); + let swarms = Arc::new(Swarms::default()); - let _number_of_downloads_increased = torrent_repository.upsert_peer(&sample_info_hash(), &seeder(), None); + let _number_of_downloads_increased = swarms.handle_announcement(&sample_info_hash(), &seeder(), None); - let aggregate_swarm_metadata = torrent_repository.get_aggregate_swarm_metadata(); + let aggregate_swarm_metadata = swarms.get_aggregate_swarm_metadata().unwrap(); assert_eq!( aggregate_swarm_metadata, @@ -808,11 +871,11 @@ mod tests { #[tokio::test] async fn it_should_return_the_aggregate_swarm_metadata_when_there_is_a_completed_peer() { - let torrent_repository = Arc::new(Swarms::default()); + let swarms = Arc::new(Swarms::default()); - let _number_of_downloads_increased = torrent_repository.upsert_peer(&sample_info_hash(), &complete_peer(), None); + let _number_of_downloads_increased = swarms.handle_announcement(&sample_info_hash(), &complete_peer(), None); - let aggregate_swarm_metadata = torrent_repository.get_aggregate_swarm_metadata(); + let aggregate_swarm_metadata = swarms.get_aggregate_swarm_metadata().unwrap(); assert_eq!( aggregate_swarm_metadata, @@ -827,17 +890,16 @@ mod tests { #[tokio::test] async fn it_should_return_the_aggregate_swarm_metadata_when_there_are_multiple_torrents() { - let torrent_repository = Arc::new(Swarms::default()); + let swarms = Arc::new(Swarms::default()); let start_time = std::time::Instant::now(); for i in 0..1_000_000 { - let _number_of_downloads_increased = - torrent_repository.upsert_peer(&gen_seeded_infohash(&i), &leecher(), None); + let _number_of_downloads_increased = swarms.handle_announcement(&gen_seeded_infohash(&i), &leecher(), None); } let result_a = start_time.elapsed(); let start_time = std::time::Instant::now(); - let aggregate_swarm_metadata = torrent_repository.get_aggregate_swarm_metadata(); + let aggregate_swarm_metadata = swarms.get_aggregate_swarm_metadata().unwrap(); let result_b = start_time.elapsed(); assert_eq!( @@ -864,13 +926,13 @@ mod tests { #[tokio::test] async fn it_should_get_swarm_metadata_for_an_existing_torrent() { - let torrent_repository = Arc::new(Swarms::default()); + let swarms = Arc::new(Swarms::default()); let infohash = sample_info_hash(); - let _number_of_downloads_increased = torrent_repository.upsert_peer(&infohash, &leecher(), None); + let _number_of_downloads_increased = swarms.handle_announcement(&infohash, &leecher(), None); - let swarm_metadata = torrent_repository.get_swarm_metadata_or_default(&infohash); + let swarm_metadata = swarms.get_swarm_metadata_or_default(&infohash).unwrap(); assert_eq!( swarm_metadata, @@ -884,9 +946,9 @@ mod tests { #[tokio::test] async fn it_should_return_zeroed_swarm_metadata_for_a_non_existing_torrent() { - let torrent_repository = Arc::new(Swarms::default()); + let swarms = Arc::new(Swarms::default()); - let swarm_metadata = torrent_repository.get_swarm_metadata_or_default(&sample_info_hash()); + let swarm_metadata = swarms.get_swarm_metadata_or_default(&sample_info_hash()).unwrap(); assert_eq!(swarm_metadata, SwarmMetadata::zeroed()); } @@ -903,7 +965,7 @@ mod tests { #[tokio::test] async fn it_should_allow_importing_persisted_torrent_entries() { - let torrent_repository = Arc::new(Swarms::default()); + let swarms = Arc::new(Swarms::default()); let infohash = sample_info_hash(); @@ -911,9 +973,9 @@ mod tests { persistent_torrents.insert(infohash, 1); - torrent_repository.import_persistent(&persistent_torrents); + swarms.import_persistent(&persistent_torrents); - let swarm_metadata = torrent_repository.get_swarm_metadata_or_default(&infohash); + let swarm_metadata = swarms.get_swarm_metadata_or_default(&infohash).unwrap(); // Only the number of downloads is persisted. assert_eq!(swarm_metadata.downloaded, 1); diff --git a/packages/torrent-repository/tests/common/mod.rs b/packages/torrent-repository/tests/common/mod.rs index e083a05cc..c77ca2769 100644 --- a/packages/torrent-repository/tests/common/mod.rs +++ b/packages/torrent-repository/tests/common/mod.rs @@ -1,2 +1 @@ -pub mod torrent; pub mod torrent_peer_builder; diff --git a/packages/torrent-repository/tests/common/torrent.rs b/packages/torrent-repository/tests/common/torrent.rs deleted file mode 100644 index a1899621f..000000000 --- a/packages/torrent-repository/tests/common/torrent.rs +++ /dev/null @@ -1,71 +0,0 @@ -use std::net::SocketAddr; -use std::sync::Arc; - -use torrust_tracker_configuration::TrackerPolicy; -use torrust_tracker_primitives::swarm_metadata::SwarmMetadata; -use torrust_tracker_primitives::{peer, DurationSinceUnixEpoch}; -use torrust_tracker_torrent_repository::{swarm, LockTrackedTorrent, SwarmHandle}; - -#[derive(Debug, Clone)] -pub(crate) enum Torrent { - Single(swarm::Swarm), - MutexStd(SwarmHandle), -} - -impl Torrent { - pub(crate) fn get_stats(&self) -> SwarmMetadata { - match self { - Torrent::Single(entry) => entry.metadata(), - Torrent::MutexStd(entry) => entry.lock_or_panic().metadata(), - } - } - - pub(crate) fn meets_retaining_policy(&self, policy: &TrackerPolicy) -> bool { - match self { - Torrent::Single(entry) => entry.meets_retaining_policy(policy), - Torrent::MutexStd(entry) => entry.lock_or_panic().meets_retaining_policy(policy), - } - } - - pub(crate) fn peers_is_empty(&self) -> bool { - match self { - Torrent::Single(entry) => entry.is_empty(), - Torrent::MutexStd(entry) => entry.lock_or_panic().is_empty(), - } - } - - pub(crate) fn get_peers_len(&self) -> usize { - match self { - Torrent::Single(entry) => entry.len(), - Torrent::MutexStd(entry) => entry.lock_or_panic().len(), - } - } - - pub(crate) fn get_peers(&self, limit: Option) -> Vec> { - match self { - Torrent::Single(entry) => entry.peers(limit), - Torrent::MutexStd(entry) => entry.lock_or_panic().peers(limit), - } - } - - pub(crate) fn get_peers_for_client(&self, client: &SocketAddr, limit: Option) -> Vec> { - match self { - Torrent::Single(entry) => entry.peers_excluding(client, limit), - Torrent::MutexStd(entry) => entry.lock_or_panic().peers_excluding(client, limit), - } - } - - pub(crate) fn upsert_peer(&mut self, peer: &peer::Peer) -> bool { - match self { - Torrent::Single(entry) => entry.handle_announcement(peer), - Torrent::MutexStd(entry) => entry.lock_or_panic().handle_announcement(peer), - } - } - - pub(crate) fn remove_inactive_peers(&mut self, current_cutoff: DurationSinceUnixEpoch) { - match self { - Torrent::Single(entry) => entry.remove_inactive(current_cutoff), - Torrent::MutexStd(entry) => entry.lock_or_panic().remove_inactive(current_cutoff), - } - } -} diff --git a/packages/torrent-repository/tests/integration.rs b/packages/torrent-repository/tests/integration.rs index 5aab67b03..b3e057075 100644 --- a/packages/torrent-repository/tests/integration.rs +++ b/packages/torrent-repository/tests/integration.rs @@ -7,8 +7,8 @@ use torrust_tracker_clock::clock; pub mod common; -mod entry; -mod repository; +mod swarm; +mod swarms; /// This code needs to be copied into each crate. /// Working version, for production. diff --git a/packages/torrent-repository/tests/entry/mod.rs b/packages/torrent-repository/tests/swarm/mod.rs similarity index 67% rename from packages/torrent-repository/tests/entry/mod.rs rename to packages/torrent-repository/tests/swarm/mod.rs index 4607fd9c7..d529b0243 100644 --- a/packages/torrent-repository/tests/entry/mod.rs +++ b/packages/torrent-repository/tests/swarm/mod.rs @@ -9,19 +9,14 @@ use torrust_tracker_clock::clock::{self, Time as _}; use torrust_tracker_configuration::{TrackerPolicy, TORRENT_PEERS_LIMIT}; use torrust_tracker_primitives::peer; use torrust_tracker_primitives::peer::Peer; -use torrust_tracker_torrent_repository::{swarm, SwarmHandle}; +use torrust_tracker_torrent_repository::Swarm; -use crate::common::torrent::Torrent; use crate::common::torrent_peer_builder::{a_completed_peer, a_started_peer}; use crate::CurrentClock; #[fixture] -fn single() -> Torrent { - Torrent::Single(swarm::Swarm::default()) -} -#[fixture] -fn mutex_std() -> Torrent { - Torrent::MutexStd(SwarmHandle::default()) +fn swarm() -> Swarm { + Swarm::default() } #[fixture] @@ -52,39 +47,39 @@ pub enum Makes { Three, } -fn make(torrent: &mut Torrent, makes: &Makes) -> Vec { +fn make(swarm: &mut Swarm, makes: &Makes) -> Vec { match makes { Makes::Empty => vec![], Makes::Started => { let peer = a_started_peer(1); - torrent.upsert_peer(&peer); + swarm.handle_announcement(&peer); vec![peer] } Makes::Completed => { let peer = a_completed_peer(2); - torrent.upsert_peer(&peer); + swarm.handle_announcement(&peer); vec![peer] } Makes::Downloaded => { let mut peer = a_started_peer(3); - torrent.upsert_peer(&peer); + swarm.handle_announcement(&peer); peer.event = AnnounceEvent::Completed; peer.left = NumberOfBytes::new(0); - torrent.upsert_peer(&peer); + swarm.handle_announcement(&peer); vec![peer] } Makes::Three => { let peer_1 = a_started_peer(1); - torrent.upsert_peer(&peer_1); + swarm.handle_announcement(&peer_1); let peer_2 = a_completed_peer(2); - torrent.upsert_peer(&peer_2); + swarm.handle_announcement(&peer_2); let mut peer_3 = a_started_peer(3); - torrent.upsert_peer(&peer_3); + swarm.handle_announcement(&peer_3); peer_3.event = AnnounceEvent::Completed; peer_3.left = NumberOfBytes::new(0); - torrent.upsert_peer(&peer_3); + swarm.handle_announcement(&peer_3); vec![peer_1, peer_2, peer_3] } } @@ -93,10 +88,10 @@ fn make(torrent: &mut Torrent, makes: &Makes) -> Vec { #[rstest] #[case::empty(&Makes::Empty)] #[tokio::test] -async fn it_should_be_empty_by_default(#[values(single(), mutex_std())] mut torrent: Torrent, #[case] makes: &Makes) { - make(&mut torrent, makes); +async fn it_should_be_empty_by_default(#[values(swarm())] mut swarm: Swarm, #[case] makes: &Makes) { + make(&mut swarm, makes); - assert_eq!(torrent.get_peers_len(), 0); + assert_eq!(swarm.len(), 0); } #[rstest] @@ -107,33 +102,33 @@ async fn it_should_be_empty_by_default(#[values(single(), mutex_std())] mut torr #[case::three(&Makes::Three)] #[tokio::test] async fn it_should_check_if_entry_should_be_retained_based_on_the_tracker_policy( - #[values(single(), mutex_std())] mut torrent: Torrent, + #[values(swarm())] mut swarm: Swarm, #[case] makes: &Makes, #[values(policy_none(), policy_persist(), policy_remove(), policy_remove_persist())] policy: TrackerPolicy, ) { - make(&mut torrent, makes); + make(&mut swarm, makes); - let has_peers = !torrent.peers_is_empty(); - let has_downloads = torrent.get_stats().downloaded != 0; + let has_peers = !swarm.is_empty(); + let has_downloads = swarm.metadata().downloaded != 0; match (policy.remove_peerless_torrents, policy.persistent_torrent_completed_stat) { // remove torrents without peers, and keep completed download stats (true, true) => match (has_peers, has_downloads) { // no peers, but has downloads // peers, with or without downloads - (false, true) | (true, true | false) => assert!(torrent.meets_retaining_policy(&policy)), + (false, true) | (true, true | false) => assert!(swarm.meets_retaining_policy(&policy)), // no peers and no downloads - (false, false) => assert!(!torrent.meets_retaining_policy(&policy)), + (false, false) => assert!(!swarm.meets_retaining_policy(&policy)), }, // remove torrents without peers and drop completed download stats (true, false) => match (has_peers, has_downloads) { // peers, with or without downloads - (true, true | false) => assert!(torrent.meets_retaining_policy(&policy)), + (true, true | false) => assert!(swarm.meets_retaining_policy(&policy)), // no peers and with or without downloads - (false, true | false) => assert!(!torrent.meets_retaining_policy(&policy)), + (false, true | false) => assert!(!swarm.meets_retaining_policy(&policy)), }, // keep torrents without peers, but keep or drop completed download stats - (false, true | false) => assert!(torrent.meets_retaining_policy(&policy)), + (false, true | false) => assert!(swarm.meets_retaining_policy(&policy)), } } @@ -144,10 +139,10 @@ async fn it_should_check_if_entry_should_be_retained_based_on_the_tracker_policy #[case::downloaded(&Makes::Downloaded)] #[case::three(&Makes::Three)] #[tokio::test] -async fn it_should_get_peers_for_torrent_entry(#[values(single(), mutex_std())] mut torrent: Torrent, #[case] makes: &Makes) { - let peers = make(&mut torrent, makes); +async fn it_should_get_peers_for_torrent_entry(#[values(swarm())] mut swarm: Swarm, #[case] makes: &Makes) { + let peers = make(&mut swarm, makes); - let torrent_peers = torrent.get_peers(None); + let torrent_peers = swarm.peers(None); assert_eq!(torrent_peers.len(), peers.len()); @@ -163,15 +158,15 @@ async fn it_should_get_peers_for_torrent_entry(#[values(single(), mutex_std())] #[case::downloaded(&Makes::Downloaded)] #[case::three(&Makes::Three)] #[tokio::test] -async fn it_should_update_a_peer(#[values(single(), mutex_std())] mut torrent: Torrent, #[case] makes: &Makes) { - make(&mut torrent, makes); +async fn it_should_update_a_peer(#[values(swarm())] mut swarm: Swarm, #[case] makes: &Makes) { + make(&mut swarm, makes); // Make and insert a new peer. let mut peer = a_started_peer(-1); - torrent.upsert_peer(&peer); + swarm.handle_announcement(&peer); // Get the Inserted Peer by Id. - let peers = torrent.get_peers(None); + let peers = swarm.peers(None); let original = peers .iter() .find(|p| peer::ReadInfo::get_id(*p) == peer::ReadInfo::get_id(&peer)) @@ -181,10 +176,10 @@ async fn it_should_update_a_peer(#[values(single(), mutex_std())] mut torrent: T // Announce "Completed" torrent download event. peer.event = AnnounceEvent::Completed; - torrent.upsert_peer(&peer); + swarm.handle_announcement(&peer); // Get the Updated Peer by Id. - let peers = torrent.get_peers(None); + let peers = swarm.peers(None); let updated = peers .iter() .find(|p| peer::ReadInfo::get_id(*p) == peer::ReadInfo::get_id(&peer)) @@ -200,20 +195,17 @@ async fn it_should_update_a_peer(#[values(single(), mutex_std())] mut torrent: T #[case::downloaded(&Makes::Downloaded)] #[case::three(&Makes::Three)] #[tokio::test] -async fn it_should_remove_a_peer_upon_stopped_announcement( - #[values(single(), mutex_std())] mut torrent: Torrent, - #[case] makes: &Makes, -) { +async fn it_should_remove_a_peer_upon_stopped_announcement(#[values(swarm())] mut swarm: Swarm, #[case] makes: &Makes) { use torrust_tracker_primitives::peer::ReadInfo as _; - make(&mut torrent, makes); + make(&mut swarm, makes); let mut peer = a_started_peer(-1); - torrent.upsert_peer(&peer); + swarm.handle_announcement(&peer); // The started peer should be inserted. - let peers = torrent.get_peers(None); + let peers = swarm.peers(None); let original = peers .iter() .find(|p| p.get_id() == peer.get_id()) @@ -223,10 +215,10 @@ async fn it_should_remove_a_peer_upon_stopped_announcement( // Change peer to "Stopped" and insert. peer.event = AnnounceEvent::Stopped; - torrent.upsert_peer(&peer); + swarm.handle_announcement(&peer); // It should be removed now. - let peers = torrent.get_peers(None); + let peers = swarm.peers(None); assert_eq!( peers.iter().find(|p| p.get_id() == peer.get_id()), @@ -242,13 +234,13 @@ async fn it_should_remove_a_peer_upon_stopped_announcement( #[case::three(&Makes::Three)] #[tokio::test] async fn it_should_handle_a_peer_completed_announcement_and_update_the_downloaded_statistic( - #[values(single(), mutex_std())] mut torrent: Torrent, + #[values(swarm())] mut torrent: Swarm, #[case] makes: &Makes, ) { make(&mut torrent, makes); - let downloaded = torrent.get_stats().downloaded; + let downloaded = torrent.metadata().downloaded; - let peers = torrent.get_peers(None); + let peers = torrent.peers(None); let mut peer = **peers.first().expect("there should be a peer"); let is_already_completed = peer.event == AnnounceEvent::Completed; @@ -256,8 +248,8 @@ async fn it_should_handle_a_peer_completed_announcement_and_update_the_downloade // Announce "Completed" torrent download event. peer.event = AnnounceEvent::Completed; - torrent.upsert_peer(&peer); - let stats = torrent.get_stats(); + torrent.handle_announcement(&peer); + let stats = torrent.metadata(); if is_already_completed { assert_eq!(stats.downloaded, downloaded); @@ -272,19 +264,19 @@ async fn it_should_handle_a_peer_completed_announcement_and_update_the_downloade #[case::downloaded(&Makes::Downloaded)] #[case::three(&Makes::Three)] #[tokio::test] -async fn it_should_update_a_peer_as_a_seeder(#[values(single(), mutex_std())] mut torrent: Torrent, #[case] makes: &Makes) { - let peers = make(&mut torrent, makes); +async fn it_should_update_a_peer_as_a_seeder(#[values(swarm())] mut swarm: Swarm, #[case] makes: &Makes) { + let peers = make(&mut swarm, makes); let completed = u32::try_from(peers.iter().filter(|p| p.is_seeder()).count()).expect("it_should_not_be_so_many"); - let peers = torrent.get_peers(None); + let peers = swarm.peers(None); let mut peer = **peers.first().expect("there should be a peer"); let is_already_non_left = peer.left == NumberOfBytes::new(0); // Set Bytes Left to Zero peer.left = NumberOfBytes::new(0); - torrent.upsert_peer(&peer); - let stats = torrent.get_stats(); + swarm.handle_announcement(&peer); + let stats = swarm.metadata(); if is_already_non_left { // it was already complete @@ -301,19 +293,19 @@ async fn it_should_update_a_peer_as_a_seeder(#[values(single(), mutex_std())] mu #[case::downloaded(&Makes::Downloaded)] #[case::three(&Makes::Three)] #[tokio::test] -async fn it_should_update_a_peer_as_incomplete(#[values(single(), mutex_std())] mut torrent: Torrent, #[case] makes: &Makes) { - let peers = make(&mut torrent, makes); +async fn it_should_update_a_peer_as_incomplete(#[values(swarm())] mut swarm: Swarm, #[case] makes: &Makes) { + let peers = make(&mut swarm, makes); let incomplete = u32::try_from(peers.iter().filter(|p| !p.is_seeder()).count()).expect("it should not be so many"); - let peers = torrent.get_peers(None); + let peers = swarm.peers(None); let mut peer = **peers.first().expect("there should be a peer"); let completed_already = peer.left == NumberOfBytes::new(0); // Set Bytes Left to no Zero peer.left = NumberOfBytes::new(1); - torrent.upsert_peer(&peer); - let stats = torrent.get_stats(); + swarm.handle_announcement(&peer); + let stats = swarm.metadata(); if completed_already { // now it is incomplete @@ -330,13 +322,10 @@ async fn it_should_update_a_peer_as_incomplete(#[values(single(), mutex_std())] #[case::downloaded(&Makes::Downloaded)] #[case::three(&Makes::Three)] #[tokio::test] -async fn it_should_get_peers_excluding_the_client_socket( - #[values(single(), mutex_std())] mut torrent: Torrent, - #[case] makes: &Makes, -) { - make(&mut torrent, makes); +async fn it_should_get_peers_excluding_the_client_socket(#[values(swarm())] mut swarm: Swarm, #[case] makes: &Makes) { + make(&mut swarm, makes); - let peers = torrent.get_peers(None); + let peers = swarm.peers(None); let mut peer = **peers.first().expect("there should be a peer"); let socket = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8081); @@ -345,14 +334,14 @@ async fn it_should_get_peers_excluding_the_client_socket( assert_ne!(peer.peer_addr, socket); // it should get the peer as it dose not share the socket. - assert!(torrent.get_peers_for_client(&socket, None).contains(&peer.into())); + assert!(swarm.peers_excluding(&socket, None).contains(&peer.into())); // set the address to the socket. peer.peer_addr = socket; - torrent.upsert_peer(&peer); // Add peer + swarm.handle_announcement(&peer); // Add peer // It should not include the peer that has the same socket. - assert!(!torrent.get_peers_for_client(&socket, None).contains(&peer.into())); + assert!(!swarm.peers_excluding(&socket, None).contains(&peer.into())); } #[rstest] @@ -362,19 +351,16 @@ async fn it_should_get_peers_excluding_the_client_socket( #[case::downloaded(&Makes::Downloaded)] #[case::three(&Makes::Three)] #[tokio::test] -async fn it_should_limit_the_number_of_peers_returned( - #[values(single(), mutex_std())] mut torrent: Torrent, - #[case] makes: &Makes, -) { - make(&mut torrent, makes); +async fn it_should_limit_the_number_of_peers_returned(#[values(swarm())] mut swarm: Swarm, #[case] makes: &Makes) { + make(&mut swarm, makes); // We add one more peer than the scrape limit for peer_number in 1..=74 + 1 { let peer = a_started_peer(peer_number); - torrent.upsert_peer(&peer); + swarm.handle_announcement(&peer); } - let peers = torrent.get_peers(Some(TORRENT_PEERS_LIMIT)); + let peers = swarm.peers(Some(TORRENT_PEERS_LIMIT)); assert_eq!(peers.len(), 74); } @@ -386,14 +372,11 @@ async fn it_should_limit_the_number_of_peers_returned( #[case::downloaded(&Makes::Downloaded)] #[case::three(&Makes::Three)] #[tokio::test] -async fn it_should_remove_inactive_peers_beyond_cutoff( - #[values(single(), mutex_std())] mut torrent: Torrent, - #[case] makes: &Makes, -) { +async fn it_should_remove_inactive_peers_beyond_cutoff(#[values(swarm())] mut swarm: Swarm, #[case] makes: &Makes) { const TIMEOUT: Duration = Duration::from_secs(120); const EXPIRE: Duration = Duration::from_secs(121); - let peers = make(&mut torrent, makes); + let peers = make(&mut swarm, makes); let mut peer = a_completed_peer(-1); @@ -402,12 +385,12 @@ async fn it_should_remove_inactive_peers_beyond_cutoff( peer.updated = now.sub(EXPIRE); - torrent.upsert_peer(&peer); + swarm.handle_announcement(&peer); - assert_eq!(torrent.get_peers_len(), peers.len() + 1); + assert_eq!(swarm.len(), peers.len() + 1); let current_cutoff = CurrentClock::now_sub(&TIMEOUT).unwrap_or_default(); - torrent.remove_inactive_peers(current_cutoff); + swarm.remove_inactive(current_cutoff); - assert_eq!(torrent.get_peers_len(), peers.len()); + assert_eq!(swarm.len(), peers.len()); } diff --git a/packages/torrent-repository/tests/repository/mod.rs b/packages/torrent-repository/tests/swarms/mod.rs similarity index 79% rename from packages/torrent-repository/tests/repository/mod.rs rename to packages/torrent-repository/tests/swarms/mod.rs index 071a187fa..8e58b9e76 100644 --- a/packages/torrent-repository/tests/repository/mod.rs +++ b/packages/torrent-repository/tests/swarms/mod.rs @@ -1,6 +1,5 @@ use std::collections::{BTreeMap, HashSet}; use std::hash::{DefaultHasher, Hash, Hasher}; -use std::sync::{Arc, Mutex}; use aquatic_udp_protocol::{AnnounceEvent, NumberOfBytes}; use bittorrent_primitives::info_hash::InfoHash; @@ -15,7 +14,7 @@ use torrust_tracker_torrent_repository::{LockTrackedTorrent, Swarms}; use crate::common::torrent_peer_builder::{a_completed_peer, a_started_peer}; #[fixture] -fn skip_list_mutex_std() -> Swarms { +fn swarms() -> Swarms { Swarms::default() } @@ -33,27 +32,27 @@ fn default() -> Entries { #[fixture] fn started() -> Entries { - let mut torrent = Swarm::default(); - torrent.handle_announcement(&a_started_peer(1)); - vec![(InfoHash::default(), torrent)] + let mut swarm = Swarm::default(); + swarm.handle_announcement(&a_started_peer(1)); + vec![(InfoHash::default(), swarm)] } #[fixture] fn completed() -> Entries { - let mut torrent = Swarm::default(); - torrent.handle_announcement(&a_completed_peer(2)); - vec![(InfoHash::default(), torrent)] + let mut swarm = Swarm::default(); + swarm.handle_announcement(&a_completed_peer(2)); + vec![(InfoHash::default(), swarm)] } #[fixture] fn downloaded() -> Entries { - let mut torrent = Swarm::default(); + let mut swarm = Swarm::default(); let mut peer = a_started_peer(3); - torrent.handle_announcement(&peer); + swarm.handle_announcement(&peer); peer.event = AnnounceEvent::Completed; peer.left = NumberOfBytes::new(0); - torrent.handle_announcement(&peer); - vec![(InfoHash::default(), torrent)] + swarm.handle_announcement(&peer); + vec![(InfoHash::default(), swarm)] } #[fixture] @@ -148,11 +147,9 @@ fn persistent_three() -> PersistentTorrents { t.iter().copied().collect() } -fn make(repo: &Swarms, entries: &Entries) { - for (info_hash, entry) in entries { - let new = Arc::new(Mutex::new(entry.clone())); - // todo: use a public method to insert an empty swarm. - repo.swarms.insert(*info_hash, new); +fn make(swarms: &Swarms, entries: &Entries) { + for (info_hash, swarm) in entries { + swarms.insert(info_hash, swarm.clone()); } } @@ -201,13 +198,13 @@ fn policy_remove_persist() -> TrackerPolicy { #[case::out_of_order(many_out_of_order())] #[case::in_order(many_hashed_in_order())] #[tokio::test] -async fn it_should_get_a_torrent_entry(#[values(skip_list_mutex_std())] repo: Swarms, #[case] entries: Entries) { +async fn it_should_get_a_torrent_entry(#[values(swarms())] repo: Swarms, #[case] entries: Entries) { make(&repo, &entries); - if let Some((info_hash, torrent)) = entries.first() { + if let Some((info_hash, swarm)) = entries.first() { assert_eq!( Some(repo.get(info_hash).unwrap().lock_or_panic().clone()), - Some(torrent.clone()) + Some(swarm.clone()) ); } else { assert!(repo.get(&InfoHash::default()).is_none()); @@ -225,7 +222,7 @@ async fn it_should_get_a_torrent_entry(#[values(skip_list_mutex_std())] repo: Sw #[case::in_order(many_hashed_in_order())] #[tokio::test] async fn it_should_get_paginated_entries_in_a_stable_or_sorted_order( - #[values(skip_list_mutex_std())] repo: Swarms, + #[values(swarms())] repo: Swarms, #[case] entries: Entries, many_out_of_order: Entries, ) { @@ -258,7 +255,7 @@ async fn it_should_get_paginated_entries_in_a_stable_or_sorted_order( #[case::in_order(many_hashed_in_order())] #[tokio::test] async fn it_should_get_paginated( - #[values(skip_list_mutex_std())] repo: Swarms, + #[values(swarms())] repo: Swarms, #[case] entries: Entries, #[values(paginated_limit_zero(), paginated_limit_one(), paginated_limit_one_offset_one())] paginated: Pagination, ) { @@ -270,13 +267,13 @@ async fn it_should_get_paginated( match paginated { // it should return empty if limit is zero. Pagination { limit: 0, .. } => { - let torrents: Vec<(InfoHash, Swarm)> = repo + let swarms: Vec<(InfoHash, Swarm)> = repo .get_paginated(Some(&paginated)) .iter() - .map(|(i, lock_tracked_torrent)| (*i, lock_tracked_torrent.lock_or_panic().clone())) + .map(|(i, swarm_handle)| (*i, swarm_handle.lock_or_panic().clone())) .collect(); - assert_eq!(torrents, vec![]); + assert_eq!(swarms, vec![]); } // it should return a single entry if the limit is one. @@ -313,10 +310,10 @@ async fn it_should_get_paginated( #[case::out_of_order(many_out_of_order())] #[case::in_order(many_hashed_in_order())] #[tokio::test] -async fn it_should_get_metrics(#[values(skip_list_mutex_std())] repo: Swarms, #[case] entries: Entries) { +async fn it_should_get_metrics(#[values(swarms())] swarms: Swarms, #[case] entries: Entries) { use torrust_tracker_primitives::swarm_metadata::AggregateSwarmMetadata; - make(&repo, &entries); + make(&swarms, &entries); let mut metrics = AggregateSwarmMetadata::default(); @@ -329,7 +326,7 @@ async fn it_should_get_metrics(#[values(skip_list_mutex_std())] repo: Swarms, #[ metrics.total_downloaded += u64::from(stats.downloaded); } - assert_eq!(repo.get_aggregate_swarm_metadata(), metrics); + assert_eq!(swarms.get_aggregate_swarm_metadata().unwrap(), metrics); } #[rstest] @@ -343,21 +340,21 @@ async fn it_should_get_metrics(#[values(skip_list_mutex_std())] repo: Swarms, #[ #[case::in_order(many_hashed_in_order())] #[tokio::test] async fn it_should_import_persistent_torrents( - #[values(skip_list_mutex_std())] repo: Swarms, + #[values(swarms())] swarms: Swarms, #[case] entries: Entries, #[values(persistent_empty(), persistent_single(), persistent_three())] persistent_torrents: PersistentTorrents, ) { - make(&repo, &entries); + make(&swarms, &entries); - let mut downloaded = repo.get_aggregate_swarm_metadata().total_downloaded; + let mut downloaded = swarms.get_aggregate_swarm_metadata().unwrap().total_downloaded; persistent_torrents.iter().for_each(|(_, d)| downloaded += u64::from(*d)); - repo.import_persistent(&persistent_torrents); + swarms.import_persistent(&persistent_torrents); - assert_eq!(repo.get_aggregate_swarm_metadata().total_downloaded, downloaded); + assert_eq!(swarms.get_aggregate_swarm_metadata().unwrap().total_downloaded, downloaded); for (entry, _) in persistent_torrents { - assert!(repo.get(&entry).is_some()); + assert!(swarms.get(&entry).is_some()); } } @@ -371,21 +368,24 @@ async fn it_should_import_persistent_torrents( #[case::out_of_order(many_out_of_order())] #[case::in_order(many_hashed_in_order())] #[tokio::test] -async fn it_should_remove_an_entry(#[values(skip_list_mutex_std())] repo: Swarms, #[case] entries: Entries) { - make(&repo, &entries); +async fn it_should_remove_an_entry(#[values(swarms())] swarms: Swarms, #[case] entries: Entries) { + make(&swarms, &entries); for (info_hash, torrent) in entries { assert_eq!( - Some(repo.get(&info_hash).unwrap().lock_or_panic().clone()), + Some(swarms.get(&info_hash).unwrap().lock_or_panic().clone()), Some(torrent.clone()) ); - assert_eq!(Some(repo.remove(&info_hash).unwrap().lock_or_panic().clone()), Some(torrent)); + assert_eq!( + Some(swarms.remove(&info_hash).unwrap().lock_or_panic().clone()), + Some(torrent) + ); - assert!(repo.get(&info_hash).is_none()); - assert!(repo.remove(&info_hash).is_none()); + assert!(swarms.get(&info_hash).is_none()); + assert!(swarms.remove(&info_hash).is_none()); } - assert_eq!(repo.get_aggregate_swarm_metadata().total_torrents, 0); + assert_eq!(swarms.get_aggregate_swarm_metadata().unwrap().total_torrents, 0); } #[rstest] @@ -398,7 +398,7 @@ async fn it_should_remove_an_entry(#[values(skip_list_mutex_std())] repo: Swarms #[case::out_of_order(many_out_of_order())] #[case::in_order(many_hashed_in_order())] #[tokio::test] -async fn it_should_remove_inactive_peers(#[values(skip_list_mutex_std())] repo: Swarms, #[case] entries: Entries) { +async fn it_should_remove_inactive_peers(#[values(swarms())] swarms: Swarms, #[case] entries: Entries) { use std::ops::Sub as _; use std::time::Duration; @@ -411,7 +411,7 @@ async fn it_should_remove_inactive_peers(#[values(skip_list_mutex_std())] repo: const TIMEOUT: Duration = Duration::from_secs(120); const EXPIRE: Duration = Duration::from_secs(121); - make(&repo, &entries); + make(&swarms, &entries); let info_hash: InfoHash; let mut peer: peer::Peer; @@ -435,15 +435,18 @@ async fn it_should_remove_inactive_peers(#[values(skip_list_mutex_std())] repo: // Insert the infohash and peer into the repository // and verify there is an extra torrent entry. { - repo.upsert_peer(&info_hash, &peer, None); - assert_eq!(repo.get_aggregate_swarm_metadata().total_torrents, entries.len() as u64 + 1); + swarms.handle_announcement(&info_hash, &peer, None).unwrap(); + assert_eq!( + swarms.get_aggregate_swarm_metadata().unwrap().total_torrents, + entries.len() as u64 + 1 + ); } // Insert the infohash and peer into the repository // and verify the swarm metadata was updated. { - repo.upsert_peer(&info_hash, &peer, None); - let stats = repo.get_swarm_metadata(&info_hash); + swarms.handle_announcement(&info_hash, &peer, None).unwrap(); + let stats = swarms.get_swarm_metadata(&info_hash).unwrap(); assert_eq!( stats, Some(SwarmMetadata { @@ -456,19 +459,21 @@ async fn it_should_remove_inactive_peers(#[values(skip_list_mutex_std())] repo: // Verify that this new peer was inserted into the repository. { - let lock_tracked_torrent = repo.get(&info_hash).expect("it_should_get_some"); + let lock_tracked_torrent = swarms.get(&info_hash).expect("it_should_get_some"); let entry = lock_tracked_torrent.lock_or_panic(); assert!(entry.peers(None).contains(&peer.into())); } // Remove peers that have not been updated since the timeout (120 seconds ago). { - repo.remove_inactive_peers(CurrentClock::now_sub(&TIMEOUT).expect("it should get a time passed")); + swarms + .remove_inactive_peers(CurrentClock::now_sub(&TIMEOUT).expect("it should get a time passed")) + .unwrap(); } // Verify that the this peer was removed from the repository. { - let lock_tracked_torrent = repo.get(&info_hash).expect("it_should_get_some"); + let lock_tracked_torrent = swarms.get(&info_hash).expect("it_should_get_some"); let entry = lock_tracked_torrent.lock_or_panic(); assert!(!entry.peers(None).contains(&peer.into())); } @@ -485,15 +490,15 @@ async fn it_should_remove_inactive_peers(#[values(skip_list_mutex_std())] repo: #[case::in_order(many_hashed_in_order())] #[tokio::test] async fn it_should_remove_peerless_torrents( - #[values(skip_list_mutex_std())] repo: Swarms, + #[values(swarms())] swarms: Swarms, #[case] entries: Entries, #[values(policy_none(), policy_persist(), policy_remove(), policy_remove_persist())] policy: TrackerPolicy, ) { - make(&repo, &entries); + make(&swarms, &entries); - repo.remove_peerless_torrents(&policy); + swarms.remove_peerless_torrents(&policy).unwrap(); - let torrents: Vec<(InfoHash, Swarm)> = repo + let torrents: Vec<(InfoHash, Swarm)> = swarms .get_paginated(None) .iter() .map(|(i, lock_tracked_torrent)| (*i, lock_tracked_torrent.lock_or_panic().clone())) diff --git a/packages/tracker-core/src/torrent/repository/in_memory.rs b/packages/tracker-core/src/torrent/repository/in_memory.rs index 67e532e86..38593bf3c 100644 --- a/packages/tracker-core/src/torrent/repository/in_memory.rs +++ b/packages/tracker-core/src/torrent/repository/in_memory.rs @@ -20,8 +20,8 @@ use torrust_tracker_torrent_repository::{SwarmHandle, Swarms}; /// used in production. Other implementations are kept for reference. #[derive(Debug, Default)] pub struct InMemoryTorrentRepository { - /// The underlying in-memory data structure that stores torrent entries. - torrents: Arc, + /// The underlying in-memory data structure that stores swarms data. + swarms: Arc, } impl InMemoryTorrentRepository { @@ -39,6 +39,10 @@ impl InMemoryTorrentRepository { /// # Returns /// /// `true` if the peer stats were updated. + /// + /// # Panics + /// + /// This function panics if the underling swarms return an error. #[must_use] pub fn upsert_peer( &self, @@ -46,7 +50,9 @@ impl InMemoryTorrentRepository { peer: &peer::Peer, opt_persistent_torrent: Option, ) -> bool { - self.torrents.upsert_peer(info_hash, peer, opt_persistent_torrent) + self.swarms + .handle_announcement(info_hash, peer, opt_persistent_torrent) + .expect("Failed to upsert the peer in swarms") } /// Removes a torrent entry from the repository. @@ -65,7 +71,7 @@ impl InMemoryTorrentRepository { #[cfg(test)] #[must_use] pub(crate) fn remove(&self, key: &InfoHash) -> Option { - self.torrents.remove(key) + self.swarms.remove(key) } /// Removes inactive peers from all torrent entries. @@ -77,8 +83,14 @@ impl InMemoryTorrentRepository { /// /// * `current_cutoff` - The cutoff timestamp; peers not updated since this /// time will be removed. + /// + /// # Panics + /// + /// This function panics if the underling swarms return an error. pub(crate) fn remove_inactive_peers(&self, current_cutoff: DurationSinceUnixEpoch) { - self.torrents.remove_inactive_peers(current_cutoff); + self.swarms + .remove_inactive_peers(current_cutoff) + .expect("Failed to remove inactive peers from swarms"); } /// Removes torrent entries that have no active peers. @@ -90,8 +102,14 @@ impl InMemoryTorrentRepository { /// /// * `policy` - The tracker policy containing the configuration for /// removing peerless torrents. + /// + /// # Panics + /// + /// This function panics if the underling swarms return an error. pub(crate) fn remove_peerless_torrents(&self, policy: &TrackerPolicy) { - self.torrents.remove_peerless_torrents(policy); + self.swarms + .remove_peerless_torrents(policy) + .expect("Failed to remove peerless torrents from swarms"); } /// Retrieves a torrent entry by its infohash. @@ -105,7 +123,7 @@ impl InMemoryTorrentRepository { /// An `Option` containing the torrent entry if found. #[must_use] pub(crate) fn get(&self, key: &InfoHash) -> Option { - self.torrents.get(key) + self.swarms.get(key) } /// Retrieves a paginated list of torrent entries. @@ -123,7 +141,7 @@ impl InMemoryTorrentRepository { /// A vector of `(InfoHash, TorrentEntry)` tuples. #[must_use] pub(crate) fn get_paginated(&self, pagination: Option<&Pagination>) -> Vec<(InfoHash, SwarmHandle)> { - self.torrents.get_paginated(pagination) + self.swarms.get_paginated(pagination) } /// Retrieves swarm metadata for a given torrent. @@ -139,9 +157,15 @@ impl InMemoryTorrentRepository { /// # Returns /// /// A `SwarmMetadata` struct containing the aggregated torrent data. + /// + /// # Panics + /// + /// This function panics if the underling swarms return an error.s #[must_use] pub(crate) fn get_swarm_metadata_or_default(&self, info_hash: &InfoHash) -> SwarmMetadata { - self.torrents.get_swarm_metadata_or_default(info_hash) + self.swarms + .get_swarm_metadata_or_default(info_hash) + .expect("Failed to get swarm metadata") } /// Retrieves torrent peers for a given torrent and client, excluding the @@ -161,9 +185,15 @@ impl InMemoryTorrentRepository { /// /// A vector of peers (wrapped in `Arc`) representing the active peers for /// the torrent, excluding the requesting client. + /// + /// # Panics + /// + /// This function panics if the underling swarms return an error. #[must_use] pub(crate) fn get_peers_for(&self, info_hash: &InfoHash, peer: &peer::Peer, limit: usize) -> Vec> { - self.torrents.get_peers_for(info_hash, peer, max(limit, TORRENT_PEERS_LIMIT)) + self.swarms + .get_peers_peers_excluding(info_hash, peer, max(limit, TORRENT_PEERS_LIMIT)) + .expect("Failed to get other peers in swarm") } /// Retrieves the list of peers for a given torrent. @@ -182,11 +212,13 @@ impl InMemoryTorrentRepository { /// /// # Panics /// - /// This function panics if the lock for the torrent entry cannot be obtained. + /// This function panics if the underling swarms return an error. #[must_use] pub fn get_torrent_peers(&self, info_hash: &InfoHash) -> Vec> { // todo: pass the limit as an argument like `get_peers_for` - self.torrents.get_torrent_peers(info_hash, TORRENT_PEERS_LIMIT) + self.swarms + .get_swarm_peers(info_hash, TORRENT_PEERS_LIMIT) + .expect("Failed to get other peers in swarm") } /// Calculates and returns overall torrent metrics. @@ -198,9 +230,15 @@ impl InMemoryTorrentRepository { /// # Returns /// /// A [`AggregateSwarmMetadata`] struct with the aggregated metrics. + /// + /// # Panics + /// + /// This function panics if the underling swarms return an error. #[must_use] pub fn get_aggregate_swarm_metadata(&self) -> AggregateSwarmMetadata { - self.torrents.get_aggregate_swarm_metadata() + self.swarms + .get_aggregate_swarm_metadata() + .expect("Failed to get aggregate swarm metadata") } /// Imports persistent torrent data into the in-memory repository. @@ -212,6 +250,6 @@ impl InMemoryTorrentRepository { /// /// * `persistent_torrents` - A reference to the persisted torrent data. pub fn import_persistent(&self, persistent_torrents: &PersistentTorrents) { - self.torrents.import_persistent(persistent_torrents); + self.swarms.import_persistent(persistent_torrents); } }