-
Notifications
You must be signed in to change notification settings - Fork 1
/
PeerPollingService.java
165 lines (145 loc) · 8.03 KB
/
PeerPollingService.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
package com.jounaidr.jrc.server.peers.peer.services;
import com.jounaidr.jrc.server.blockchain.Block;
import com.jounaidr.jrc.server.blockchain.Blockchain;
import com.jounaidr.jrc.server.peers.Peers;
import com.jounaidr.jrc.server.peers.peer.Peer;
import com.jounaidr.jrc.server.peers.peer.PeerClient;
import com.jounaidr.jrc.server.peers.peer.util.Status;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.configurationprocessor.json.JSONException;
import java.io.IOException;
import java.io.InvalidObjectException;
import java.net.ConnectException;
import java.net.SocketTimeoutException;
import java.util.ArrayList;
import java.util.concurrent.*;
@Slf4j
public class PeerPollingService implements Runnable{
@Autowired
private Blockchain blockchain; //The blockchain bean instance for this node, injected through spring
@Autowired
private Peers peers; //The peers bean instance for this node, injected through spring
private final Peer peer;
private final PeerClient peerClient;
private final ScheduledThreadPoolExecutor peersExecutor;
//TODO: SET TO 15 secs when deploying only 1sec for testing
private static final long POLLING_FREQUENCY = 5000; //The delay between each poll in ms
private String cashedPeerSocketsList;
/**
* Instantiates a new peer polling service for a given peer
*
* @param peer the peer that this polling service will poll
* @param peersExecutor the the peers thread pool executor
*/
public PeerPollingService(Peer peer, ScheduledThreadPoolExecutor peersExecutor, Blockchain blockchain, Peers peers) {
this.blockchain = blockchain;
this.peers = peers;
this.peer = peer;
this.peerClient = new PeerClient(peer.getPeerSocket()); //Instantiate a new peer client from the peers socket
this.peersExecutor = peersExecutor;
}
/**
* Returns a random initial delay to be used when scheduling this runnable
*
* @return the random initial delay
*/
private long getRandomInitialDelay(){
//Random delay is calculated using a random value with the range specified by the polling frequency
double delay = ThreadLocalRandom.current().nextDouble() * POLLING_FREQUENCY;
return Double.valueOf(Math.ceil(delay)).longValue();
}
/**
* Submit the polling task as a scheduled thread to the peers thread pool executor
*/
public void start() {
//TODO: add a static delay to the initial delay aswel so that beans are deffo initialised
//Schedule the task specified in the run() method to run consecutively
//With an initially random delay, and subsequent fixed delay defined in POLLING_FREQUENCY
peersExecutor.scheduleAtFixedRate(this, this.getRandomInitialDelay(), POLLING_FREQUENCY, TimeUnit.MILLISECONDS);
}
/**
* Gets the peers health status from its actuator endpoint
* and sets its status variable for this polling services respective peer
*
* @throws IOException connection exceptions
* @throws JSONException json conversion exceptions
*/
private void peerHealthCheck() throws IOException, JSONException {
String peerHealth = peerClient.getPeerHealth();
if(peer.getPeerStatus() != Status.UP){
if(peerHealth.equals("UP")){
peer.setPeerStatus(Status.UP);
log.info("Connection reestablished with peer: [{}] ! Setting peer status to UP", peer.getPeerSocket());
}
else{
peer.setPeerStatus(Status.UNKNOWN);
log.info("Peer health check returned invalid response: {}. Setting peer [{}] status to UNKNOWN", peerClient.getPeerHealth(), peer.getPeerSocket());
}
}
}
/**
* The PeerPollingService run task.
*
* First the peers health status is checked, and if healthy
* proceed to get the peers sockets list if it has changed. Then compare
* the peers blockchain size again this nodes local blockchain instance,
* and depending on the difference either get the peers last block or
* attempt to synchronise the whole blockchain if this node is behind the peer
*/
@Override
public void run() {
log.debug("Attempting to poll peer: [{}]",peer.getPeerSocket());
try {
this.peerHealthCheck(); //First check the peers health status
if(peer.getPeerStatus() == Status.UP){
// Get the peers healthy peer list
String peerSocketsList = peerClient.getHealthySocketsList();
if(!peerSocketsList.equals(cashedPeerSocketsList)){ // The peers peer list has changed, update this nodes peer list...
log.info("New peers have been detected from the following peer: [{}] !",peer.getPeerSocket());
peers.addSocketsList(peerSocketsList);
this.cashedPeerSocketsList = peerSocketsList;
}
// Compare the peers blockchain size against this nodes blockchain size...
int chainSizeDiff = peerClient.getPeerBlockchainSize() - this.blockchain.getChain().size();
if(chainSizeDiff == 0){ // The peers are in sync as there is no difference in chain size
log.debug("Peer [{}] is in sync with this node",peer.getPeerSocket());
}
if(chainSizeDiff == 1){ // The peers chain is ahead of this node and has the newest block, request and add it...
log.info("A new block was detected in the following peer: [{}] !",peer.getPeerSocket());
try {
this.blockchain.addBlock(peerClient.getPeerLastBlock());
} catch (InvalidObjectException e) { // The new block is invalid...
log.error("Could not add the new block from peer: [{}]. Reason: {}", peer.getPeerSocket(), e.getMessage());
}
}
if(chainSizeDiff > 1){ // This nodes blockchain is behind the peer by more than one block, attempt to synchronise the blockchain...
log.info("Attempting to synchronize with the following peer: [{}]", peer.getPeerSocket());
ArrayList<Block> chainResponse = peerClient.getPeerBlockchain();
try {
Blockchain incomingBlockchain = new Blockchain(chainResponse);
this.blockchain.replaceChain(incomingBlockchain);
log.info("Successfully synchronized blockchain with the following peer: [{}] !", peer.getPeerSocket());
} catch (InvalidObjectException e) {
// A block in the blockchain is invalid
log.error("Could not synchronize with the following peer: [{}]. Reason: {}", peer.getPeerSocket(), e.getMessage());
}
}
if(chainSizeDiff < 0){ // The peers blockchain is behind this nodes
log.debug("Peer [{}] is behind this node",peer.getPeerSocket());
}
}
} catch (SocketTimeoutException | ConnectException e) { // Currently tested exceptions caused by a lack of connection
if(peer.getPeerStatus() != Status.DOWN){
peer.setPeerStatus(Status.DOWN);
log.info("Could not poll the following peer: [{}]. Reason: {}. Setting peer status to DOWN", peer.getPeerSocket(), e.getMessage());
}
log.debug("Could not poll the following peer: [{}]. Reason: {}. Peer status is: {}", peer.getPeerSocket(), e.getMessage(), peer.getPeerStatus());
} catch (Exception e) {
//TODO: test around this, different exceptions, what happens if non block json is returned, what if different response code
peer.setPeerStatus(Status.UNKNOWN);
log.debug("Could not poll the following peer: [{}]. Reason: {}. Setting peer status to UNKNOWN", peer.getPeerSocket(), e.getMessage());
e.printStackTrace();
}
}
}