Skip to content
This repository has been archived by the owner on Aug 23, 2020. It is now read-only.

Commit

Permalink
mark TCP neighbor as stopped when removed, so Replicator Sink & Sourc…
Browse files Browse the repository at this point in the history
…e thread stop.
  • Loading branch information
Alon Elmaliah authored and paulhandy committed Dec 24, 2017
1 parent 5f06cf0 commit 3049751
Show file tree
Hide file tree
Showing 3 changed files with 8 additions and 2 deletions.
6 changes: 6 additions & 0 deletions src/main/java/com/iota/iri/network/TCPNeighbor.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ public class TCPNeighbor extends Neighbor {
private int tcpPort;

private final ArrayBlockingQueue<ByteBuffer> sendQueue = new ArrayBlockingQueue<>(10);
private boolean stopped = false;

public TCPNeighbor(InetSocketAddress address, boolean isConfigured, final double limit) {
super(address, isConfigured, limit);
Expand All @@ -35,6 +36,11 @@ public Socket getSource() {
public void clear() {
setSource(null);
setSink(null);
this.stopped = true;
}

public boolean isStopped() {
return stopped;
}

public void setSource(Socket source) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ public void run() {
portAsByteArray, 0, ReplicatorSinkPool.PORT_BYTES);
out.write(portAsByteArray);

while (!replicatorSinkPool.shutdown) {
while (!replicatorSinkPool.shutdown && !neighbor.isStopped()) {
try {
ByteBuffer message = neighbor.getNextMessage();
if (neighbor.getSink() != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ public void run() {
connection.setSoTimeout(0); // infinite timeout - blocking read

offset = 0;
while (!shutdown) {
while (!shutdown && !neighbor.isStopped()) {

while ( ((count = stream.read(data, offset, (TRANSACTION_PACKET_SIZE - offset + ReplicatorSinkProcessor.CRC32_BYTES))) != -1)
&& (offset < (TRANSACTION_PACKET_SIZE + ReplicatorSinkProcessor.CRC32_BYTES))) {
Expand Down

0 comments on commit 3049751

Please sign in to comment.