diff --git a/replication/src/main/java/net/tomp2p/replication/IndirectReplication.java b/replication/src/main/java/net/tomp2p/replication/IndirectReplication.java index fe17ea839..b497db2a7 100644 --- a/replication/src/main/java/net/tomp2p/replication/IndirectReplication.java +++ b/replication/src/main/java/net/tomp2p/replication/IndirectReplication.java @@ -18,6 +18,7 @@ import java.util.ArrayList; import java.util.Collection; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.NavigableMap; @@ -68,6 +69,7 @@ public class IndirectReplication implements ResponsibilityListener, Runnable { private boolean nRoot = false; private boolean keepData = false; private Replication replication; + private Collection replicationFilters = new HashSet(); private ScheduledFuture scheduledFuture; @@ -189,6 +191,23 @@ public int blockSize() { return blockSize; } + public IndirectReplication addReplicationFilter(ReplicationFilter filter) { + if (replicationFilters == null) { + replicationFilters = new HashSet(1); + } + replicationFilters.add(filter); + return this; + } + + public Collection replicationFilters() { + return replicationFilters; + } + + public IndirectReplication replicationFilters(Collection replicationFilters) { + this.replicationFilters = replicationFilters; + return this; + } + public IndirectReplication start() { if (intervalMillis == -1) { @@ -212,7 +231,11 @@ public int replicationFactor() { }; } - this.replication = new Replication(peer, replicationFactor.replicationFactor(), nRoot, keepData); + if(replicationFilters == null) { + replicationFilters = new HashSet(0); + } + + this.replication = new Replication(peer, replicationFactor.replicationFactor(), nRoot, keepData, replicationFilters); this.replication.addResponsibilityListener(this); if(responsibilityListeners!=null) { for(ResponsibilityListener responsibilityListener:responsibilityListeners) { @@ -344,6 +367,10 @@ protected FutureDone send(final Number160 locationKey, final NavigableMap> retVal = new ArrayList>(replicationFactor); for (PeerStatistic peerStatistic : sortedSet) { + if(replication.rejectReplication(peerStatistic.peerAddress())) { + continue; + } + count++; closePeers.add(peerStatistic.peerAddress()); retVal.add(replicationSender.sendDirect(peerStatistic.peerAddress(), locationKey, dataMapConverted)); diff --git a/replication/src/main/java/net/tomp2p/replication/Replication.java b/replication/src/main/java/net/tomp2p/replication/Replication.java index c3ae9cdd0..d5d28c861 100644 --- a/replication/src/main/java/net/tomp2p/replication/Replication.java +++ b/replication/src/main/java/net/tomp2p/replication/Replication.java @@ -18,6 +18,7 @@ import java.util.ArrayList; import java.util.Collection; +import java.util.Iterator; import java.util.List; import java.util.SortedSet; import java.util.concurrent.atomic.AtomicReferenceArray; @@ -60,6 +61,8 @@ public class Replication implements PeerMapChangeListener, ReplicationListener { final PeerDHT peer; + private final Collection filters; + /** * Constructor. * @@ -76,8 +79,9 @@ public class Replication implements PeerMapChangeListener, ReplicationListener { * @param keepData * flag indicating if data will be kept in memory after loss of replication responsibility */ - public Replication(final PeerDHT peer, final int replicationFactor, final boolean nRoot, final boolean keepData) { + public Replication(final PeerDHT peer, final int replicationFactor, final boolean nRoot, final boolean keepData, Collection filters) { this.peer = peer; + this.filters = filters; this.backend = peer.storageLayer(); this.selfAddress = peer.peerAddress(); this.peerMap = peer.peer().peerBean().peerMap(); @@ -176,6 +180,15 @@ public void addResponsibilityListener(final ResponsibilityListener responsibilit public void removeResponsibilityListener(final ResponsibilityListener responsibilityListener) { listeners.remove(responsibilityListener); } + + public boolean rejectReplication(PeerAddress address) { + for (ReplicationFilter filter : filters) { + if(filter.rejectReplication(address)) { + return true; + } + } + return false; + } /** * Notify if I'm responsible and something needs to change, i.e., to make sure that there are enough replicas. @@ -544,9 +557,19 @@ public void peerUpdated(final PeerAddress peerAddress, final PeerStatistic peerS * @return The peer that is responsible for the location key, including myself. */ private PeerAddress closest(final Number160 locationKey) { - SortedSet tmp = peerMap.closePeers(locationKey, 1); - tmp.add(new PeerStatistic(selfAddress)); - return tmp.iterator().next().peerAddress(); + SortedSet tmp = peerMap.closePeers(locationKey, 1); + tmp.add(new PeerStatistic(selfAddress)); + Iterator it = tmp.iterator(); + while (it.hasNext()) { + PeerStatistic statistic = it.next(); + if (!rejectReplication(statistic.peerAddress())) { + return statistic.peerAddress(); + } + } + + // TODO handle this better + LOG.warn("Not found any peer address that is allowed for replication. Take next best..."); + return tmp.iterator().next().peerAddress(); } /** @@ -563,6 +586,12 @@ private PeerAddress closest(final Number160 locationKey) { */ private boolean isInReplicationRange(final Number160 locationKey, final PeerAddress peerAddress, final int replicationFactor) { + // check filter first + if (rejectReplication(peerAddress)) { + LOG.trace("Rejected replication to peer {}", peerAddress); + return false; + } + SortedSet tmp = peerMap.closePeers(locationKey, replicationFactor); PeerStatistic peerAddressStatistic = new PeerStatistic(peerAddress); PeerStatistic selfStatistic = new PeerStatistic(selfAddress); diff --git a/replication/src/main/java/net/tomp2p/replication/ReplicationFilter.java b/replication/src/main/java/net/tomp2p/replication/ReplicationFilter.java new file mode 100644 index 000000000..42a36837f --- /dev/null +++ b/replication/src/main/java/net/tomp2p/replication/ReplicationFilter.java @@ -0,0 +1,13 @@ +package net.tomp2p.replication; + +import net.tomp2p.peers.PeerAddress; + +/** + * Allows to filter peers that should not be considered for the replication + * @author Nico Rutishauser + * + */ +public interface ReplicationFilter { + + boolean rejectReplication(PeerAddress targetAddress); +} diff --git a/replication/src/main/java/net/tomp2p/replication/SlowReplicationFilter.java b/replication/src/main/java/net/tomp2p/replication/SlowReplicationFilter.java new file mode 100644 index 000000000..f720d4944 --- /dev/null +++ b/replication/src/main/java/net/tomp2p/replication/SlowReplicationFilter.java @@ -0,0 +1,18 @@ +package net.tomp2p.replication; + +import net.tomp2p.peers.PeerAddress; + +/** + * Relieves slow peers from the replication duty + * + * @author Nico Rutishauser + * + */ +public class SlowReplicationFilter implements ReplicationFilter { + + @Override + public boolean rejectReplication(PeerAddress targetAddress) { + return targetAddress.isSlow(); + } + +}