Skip to content

Commit

Permalink
Merge pull request #91 from nicoruti/replication
Browse files Browse the repository at this point in the history
Added possibility to filter replication
  • Loading branch information
tbocek committed Feb 27, 2015
2 parents 930707d + 188964e commit d1ab8c2
Show file tree
Hide file tree
Showing 4 changed files with 92 additions and 5 deletions.
Expand Up @@ -18,6 +18,7 @@

import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
Expand Down Expand Up @@ -68,6 +69,7 @@ public class IndirectReplication implements ResponsibilityListener, Runnable {
private boolean nRoot = false;
private boolean keepData = false;
private Replication replication;
private Collection<ReplicationFilter> replicationFilters = new HashSet<ReplicationFilter>();

private ScheduledFuture<?> scheduledFuture;

Expand Down Expand Up @@ -189,6 +191,23 @@ public int blockSize() {
return blockSize;
}

public IndirectReplication addReplicationFilter(ReplicationFilter filter) {
if (replicationFilters == null) {
replicationFilters = new HashSet<ReplicationFilter>(1);
}
replicationFilters.add(filter);
return this;
}

public Collection<ReplicationFilter> replicationFilters() {
return replicationFilters;
}

public IndirectReplication replicationFilters(Collection<ReplicationFilter> replicationFilters) {
this.replicationFilters = replicationFilters;
return this;
}

public IndirectReplication start() {

if (intervalMillis == -1) {
Expand All @@ -212,7 +231,11 @@ public int replicationFactor() {
};
}

this.replication = new Replication(peer, replicationFactor.replicationFactor(), nRoot, keepData);
if(replicationFilters == null) {
replicationFilters = new HashSet<ReplicationFilter>(0);
}

this.replication = new Replication(peer, replicationFactor.replicationFactor(), nRoot, keepData, replicationFilters);
this.replication.addResponsibilityListener(this);
if(responsibilityListeners!=null) {
for(ResponsibilityListener responsibilityListener:responsibilityListeners) {
Expand Down Expand Up @@ -344,6 +367,10 @@ protected FutureDone<?> send(final Number160 locationKey, final NavigableMap<Num
int count = 0;
List<FutureDone<?>> retVal = new ArrayList<FutureDone<?>>(replicationFactor);
for (PeerStatistic peerStatistic : sortedSet) {
if(replication.rejectReplication(peerStatistic.peerAddress())) {
continue;
}

count++;
closePeers.add(peerStatistic.peerAddress());
retVal.add(replicationSender.sendDirect(peerStatistic.peerAddress(), locationKey, dataMapConverted));
Expand Down
37 changes: 33 additions & 4 deletions replication/src/main/java/net/tomp2p/replication/Replication.java
Expand Up @@ -18,6 +18,7 @@

import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.SortedSet;
import java.util.concurrent.atomic.AtomicReferenceArray;
Expand Down Expand Up @@ -60,6 +61,8 @@ public class Replication implements PeerMapChangeListener, ReplicationListener {

final PeerDHT peer;

private final Collection<ReplicationFilter> filters;

/**
* Constructor.
*
Expand All @@ -76,8 +79,9 @@ public class Replication implements PeerMapChangeListener, ReplicationListener {
* @param keepData
* flag indicating if data will be kept in memory after loss of replication responsibility
*/
public Replication(final PeerDHT peer, final int replicationFactor, final boolean nRoot, final boolean keepData) {
public Replication(final PeerDHT peer, final int replicationFactor, final boolean nRoot, final boolean keepData, Collection<ReplicationFilter> filters) {
this.peer = peer;
this.filters = filters;
this.backend = peer.storageLayer();
this.selfAddress = peer.peerAddress();
this.peerMap = peer.peer().peerBean().peerMap();
Expand Down Expand Up @@ -176,6 +180,15 @@ public void addResponsibilityListener(final ResponsibilityListener responsibilit
public void removeResponsibilityListener(final ResponsibilityListener responsibilityListener) {
listeners.remove(responsibilityListener);
}

public boolean rejectReplication(PeerAddress address) {
for (ReplicationFilter filter : filters) {
if(filter.rejectReplication(address)) {
return true;
}
}
return false;
}

/**
* Notify if I'm responsible and something needs to change, i.e., to make sure that there are enough replicas.
Expand Down Expand Up @@ -544,9 +557,19 @@ public void peerUpdated(final PeerAddress peerAddress, final PeerStatistic peerS
* @return The peer that is responsible for the location key, including myself.
*/
private PeerAddress closest(final Number160 locationKey) {
SortedSet<PeerStatistic> tmp = peerMap.closePeers(locationKey, 1);
tmp.add(new PeerStatistic(selfAddress));
return tmp.iterator().next().peerAddress();
SortedSet<PeerStatistic> tmp = peerMap.closePeers(locationKey, 1);
tmp.add(new PeerStatistic(selfAddress));
Iterator<PeerStatistic> it = tmp.iterator();
while (it.hasNext()) {
PeerStatistic statistic = it.next();
if (!rejectReplication(statistic.peerAddress())) {
return statistic.peerAddress();
}
}

// TODO handle this better
LOG.warn("Not found any peer address that is allowed for replication. Take next best...");
return tmp.iterator().next().peerAddress();
}

/**
Expand All @@ -563,6 +586,12 @@ private PeerAddress closest(final Number160 locationKey) {
*/
private boolean isInReplicationRange(final Number160 locationKey, final PeerAddress peerAddress,
final int replicationFactor) {
// check filter first
if (rejectReplication(peerAddress)) {
LOG.trace("Rejected replication to peer {}", peerAddress);
return false;
}

SortedSet<PeerStatistic> tmp = peerMap.closePeers(locationKey, replicationFactor);
PeerStatistic peerAddressStatistic = new PeerStatistic(peerAddress);
PeerStatistic selfStatistic = new PeerStatistic(selfAddress);
Expand Down
@@ -0,0 +1,13 @@
package net.tomp2p.replication;

import net.tomp2p.peers.PeerAddress;

/**
* Allows to filter peers that should not be considered for the replication
* @author Nico Rutishauser
*
*/
public interface ReplicationFilter {

boolean rejectReplication(PeerAddress targetAddress);
}
@@ -0,0 +1,18 @@
package net.tomp2p.replication;

import net.tomp2p.peers.PeerAddress;

/**
* Relieves slow peers from the replication duty
*
* @author Nico Rutishauser
*
*/
public class SlowReplicationFilter implements ReplicationFilter {

@Override
public boolean rejectReplication(PeerAddress targetAddress) {
return targetAddress.isSlow();
}

}

0 comments on commit d1ab8c2

Please sign in to comment.