Skip to content
This repository has been archived by the owner on Feb 2, 2021. It is now read-only.

Commit

Permalink
Use a separate PeerGroup for XT connections, so we use the set of all…
Browse files Browse the repository at this point in the history
… possible P2P nodes for regular chain syncing, etc, and don't need any weird balancing hacks like before.
  • Loading branch information
mikehearn committed Jan 12, 2015
1 parent 90f5789 commit 1f48914
Show file tree
Hide file tree
Showing 4 changed files with 89 additions and 79 deletions.
125 changes: 66 additions & 59 deletions client/src/main/java/lighthouse/Main.java
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ public class Main extends Application {
public String updatesURL = UPDATES_BASE_URL;

public static boolean offline = false;
private PeerGroup xtPeers;

public static void main(String[] args) throws IOException {
// Startup sequence: we use UpdateFX to allow for automatic online updates when the app is running. UpdateFX
Expand Down Expand Up @@ -334,6 +335,13 @@ public void initBitcoin(@Nullable DeterministicSeed restoreFromSeed) throws IOEx
// we give to the app kit is currently an exception and runs on a library thread. It'll get fixed in
// a future version.
Threading.USER_THREAD = Platform::runLater;
// Bring up a dedicated P2P connection group for Bitcoin XT nodes only. It'll be used for getutxo and nothing
// else. Syncing to the network, Bloom filtering, etc, will be done by the WAK peer group. It's just easier
// to do it this way than try to always maintain the correct balance of peers in a single PeerGroup, which
// doesn't have great support for saying e.g. I want 1/3rd of peers to match this criteria and the other 2/3rds
// can be anything.
xtPeers = connectXTPeers();

// Create the app kit. It won't do any heavyweight initialization until after we start it.
bitcoin = new WalletAppKit(params, AppDirectory.dir().toFile(), APP_NAME) {
{
Expand All @@ -343,7 +351,7 @@ public void initBitcoin(@Nullable DeterministicSeed restoreFromSeed) throws IOEx
@Override
protected void onSetupCompleted() {
wallet = (PledgingWallet) bitcoin.wallet();
backend = new LighthouseBackend(CLIENT, vPeerGroup, vChain, wallet);
backend = new LighthouseBackend(CLIENT, vPeerGroup, xtPeers, vChain, wallet);

reached("onSetupCompleted");
walletLoadedLatch.countDown();
Expand All @@ -353,64 +361,8 @@ protected void onSetupCompleted() {
vPeerGroup.addAddress(new PeerAddress(unchecked(InetAddress::getLocalHost), RegTestParams.get().getPort() + 1));
vPeerGroup.setMinBroadcastConnections(1);
vPeerGroup.setUseLocalhostPeerWhenPossible(false);
} else {
// Just a quick check to see if we can resolve DNS names.
if (new InetSocketAddress("google.com", 80).getAddress() == null) {
log.warn("User appears to be offline");
offline = true;
} else {
setupPeerGroup();
}
vPeerGroup.addEventListener(new AbstractPeerEventListener() {
@Override
public void onPeerConnected(Peer peer, int peerCount) {
if (peer.getAddress().getAddr().isLoopbackAddress() && !peer.getPeerVersionMessage().isGetUTXOsSupported()) {
// We connected to localhost but it doesn't have what we need.
log.warn("Localhost peer does not have support for NODE_GETUTXOS, ignoring");
informationalAlert("Local Bitcoin node not usable",
"You have a Bitcoin (Core) node running on your computer, but it doesn't have the protocol support %s needs. %s will still " +
"work but will use the peer to peer network instead, so you won't get upgraded security. " +
"Try installing Bitcoin XT, which is a modified version of Bitcoin Core that has the upgraded protocol support.",
APP_NAME);
vPeerGroup.setUseLocalhostPeerWhenPossible(false);
setupPeerGroup();
}
}
});
}
}

private void setupPeerGroup() {
// We need to find some peers that support the getutxo message, so pick two found via the
// vinumeris.com seed with a custom query, then pick another four from the other DNS seeds
// (i.e. any bitcoin peer). There's unfortunately no way to instruct the other seeds to
// search for a subset of the Bitcoin network so that's why we need to use a new more flexible
// HTTP based protocol here. The seed will find Bitcoin XT nodes as people start and stop them.
//
// Hopefully in future more people will run HTTP seeds, then we can use a MultiplexingDiscovery
// to randomly merge their answers and reduce the influence of any one seed. Additionally if
// more people run Bitcoin XT nodes we can bump up the number we search for here to again
// reduce the influence of any one node. But this needs people to help decentralise.


// TODO: This logic isn't right.

PeerDiscovery disco = new HttpDiscovery(params,
unchecked(() -> new URI("http://main.seed.vinumeris.com:8081/peers?srvmask=3&getutxo=true")),
// auth key used to sign responses.
ECKey.fromPublicOnly(BaseEncoding.base16().decode(
"027a79143a4de36341494d21b6593015af6b2500e720ad2eda1c0b78165f4f38c4".toUpperCase()
)));
vPeerGroup.addPeerDiscovery(disco);
vPeerGroup.setMaxConnections(2);
vPeerGroup.setConnectTimeoutMillis(10000);
vPeerGroup.waitForPeersWithServiceMask(2, GetUTXOsMessage.SERVICE_FLAGS_REQUIRED).addListener(() -> {
vPeerGroup.addPeerDiscovery(new DnsDiscovery(params));
vPeerGroup.setMaxConnections(6);
// Six peers is a tradeoff between reliability, trust and need to be gentle with network
// resources. It could be higher and nothing would break though.
}, Threading.SAME_THREAD);
}
};
if (bitcoin.isChainFileLocked()) {
informationalAlert("Already running",
Expand All @@ -429,8 +381,7 @@ private void setupPeerGroup() {
} else if (params == TestNet3Params.get()) {
bitcoin.setCheckpoints(getClass().getResourceAsStream("checkpoints.testnet"));
}
bitcoin.setPeerNodes() // Hack to prevent WAK adding DnsDiscovery
.setBlockingStartup(false)
bitcoin.setBlockingStartup(false)
.setDownloadListener(MainWindow.bitcoinUIModel.getDownloadListener())
.setUserAgent(APP_NAME, "" + VERSION)
.restoreWalletFromSeed(restoreFromSeed);
Expand All @@ -450,6 +401,61 @@ public void failed(Service.State from, Throwable failure) {
bitcoin.startAsync();
}

public PeerGroup connectXTPeers() {
PeerGroup xtPeers = new PeerGroup(params);
final int XT_PEERS = 4;
if (params == RegTestParams.get()) {
// Use two local regtest nodes for testing.
xtPeers.addAddress(new PeerAddress(unchecked(InetAddress::getLocalHost), RegTestParams.get().getPort()));
xtPeers.addAddress(new PeerAddress(unchecked(InetAddress::getLocalHost), RegTestParams.get().getPort() + 1));
xtPeers.setUseLocalhostPeerWhenPossible(false);
} else {
// Just a quick check to see if we can resolve DNS names.
if (new InetSocketAddress("google.com", 80).getAddress() == null) {
log.warn("User appears to be offline");
offline = true;
} else {
// PeerGroup will use a local Bitcoin node if at all possible, but it may not have what we need.
xtPeers.addEventListener(new AbstractPeerEventListener() {
@Override
public void onPeerConnected(Peer peer, int peerCount) {
if (peer.getAddress().getAddr().isLoopbackAddress() && !peer.getPeerVersionMessage().isGetUTXOsSupported()) {
// We connected to localhost but it doesn't have what we need.
log.warn("Localhost peer does not have support for NODE_GETUTXOS, ignoring");
informationalAlert("Local Bitcoin node not usable",
"You have a Bitcoin (Core) node running on your computer, but it doesn't have the protocol support %s needs. %s will still " +
"work but will use the peer to peer network instead, so you won't get upgraded security. " +
"Try installing Bitcoin XT, which is a modified version of Bitcoin Core that has the upgraded protocol support.",
APP_NAME);
xtPeers.setUseLocalhostPeerWhenPossible(false);
xtPeers.setMaxConnections(XT_PEERS);
peer.close();
}
}
});
// There's unfortunately no way to instruct the other seeds to search for a subset of the Bitcoin network
// so that's why we need to use a new more flexible HTTP based protocol here. The seed will find
// Bitcoin XT nodes as people start and stop them.
//
// Hopefully in future more people will run HTTP seeds, then we can use a MultiplexingDiscovery
// to randomly merge their answers and reduce the influence of any one seed. Additionally if
// more people run Bitcoin XT nodes we can bump up the number we search for here to again
// reduce the influence of any one node. But this needs people to help decentralise.
xtPeers.addPeerDiscovery(new HttpDiscovery(params,
unchecked(() -> new URI("http://main.seed.vinumeris.com/peers?srvmask=3&getutxo=true")),
// auth key used to sign responses.
ECKey.fromPublicOnly(BaseEncoding.base16().decode(
"027a79143a4de36341494d21b6593015af6b2500e720ad2eda1c0b78165f4f38c4".toUpperCase()
)))
);
xtPeers.setConnectTimeoutMillis(10000);
xtPeers.setMaxConnections(XT_PEERS);
xtPeers.startAsync();
}
}
return xtPeers;
}

private void refreshStylesheets(Scene scene) {
scene.getStylesheets().clear();
TextFieldValidator.configureScene(scene);
Expand Down Expand Up @@ -623,6 +629,7 @@ public <T> OverlayUI<T> overlayUI(String name, @Nullable String title) {
public void stop() throws Exception {
if (bitcoin != null && bitcoin.isRunning()) {
backend.shutdown();
xtPeers.stopAsync();
bitcoin.stopAsync();
bitcoin.awaitTerminated();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ public void initCoreState() {
gate = new AffinityExecutor.Gate();
executor = new AffinityExecutor.ServiceAffinityExecutor("test thread");
diskManager = new DiskManager(UnitTestParams.get(), executor);
backend = new LighthouseBackend(CLIENT, peerGroup, blockChain, pledgingWallet, diskManager, executor);
backend = new LighthouseBackend(CLIENT, peerGroup, peerGroup, blockChain, pledgingWallet, diskManager, executor);
backend.setMinPeersForUTXOQuery(1);
backend.setMaxJitterSeconds(0);

Expand Down Expand Up @@ -296,7 +296,7 @@ public void serverAndLocalAreDeduped() throws Exception {
executor = new AffinityExecutor.ServiceAffinityExecutor("test thread 2");
diskManager = new DiskManager(UnitTestParams.get(), executor);
writeProjectToDisk();
backend = new LighthouseBackend(CLIENT, peerGroup, blockChain, pledgingWallet, diskManager, executor);
backend = new LighthouseBackend(CLIENT, peerGroup, peerGroup, blockChain, pledgingWallet, diskManager, executor);

// Let's watch out for pledges from the server.
ObservableSet<LHProtos.Pledge> pledges = backend.mirrorOpenPledges(project, gate);
Expand Down
37 changes: 20 additions & 17 deletions common/src/main/java/lighthouse/LighthouseBackend.java
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,9 @@ public class LighthouseBackend extends AbstractBlockChainListener {

private final DiskManager diskManager;
public final AffinityExecutor.ServiceAffinityExecutor executor;
private final PeerGroup peerGroup;
// We have two separate sets of connections to the P2P network. One is unfussy about what kind of peers it has.
// We use this whenever possible, to get more diversity. The other is for peers that know how to answer getutxo.
private final PeerGroup regularP2P, xtP2P;
private final PledgingWallet wallet;
private final CompletableFuture<Boolean> initialized = new CompletableFuture<>();

Expand Down Expand Up @@ -111,20 +113,21 @@ public String toString() {
@GuardedBy("this")
private final Map<String, Project> projectsByUrlPath;

public LighthouseBackend(Mode mode, PeerGroup peerGroup, AbstractBlockChain chain, PledgingWallet wallet) {
this(mode, peerGroup, chain, wallet, new AffinityExecutor.ServiceAffinityExecutor("LighthouseBackend"));
public LighthouseBackend(Mode mode, PeerGroup regularP2P, PeerGroup xtP2P, AbstractBlockChain chain, PledgingWallet wallet) {
this(mode, regularP2P, xtP2P, chain, wallet, new AffinityExecutor.ServiceAffinityExecutor("LighthouseBackend"));
}

public LighthouseBackend(Mode mode, PeerGroup peerGroup, AbstractBlockChain chain, PledgingWallet wallet, AffinityExecutor.ServiceAffinityExecutor executor) {
public LighthouseBackend(Mode mode, PeerGroup regularP2P, PeerGroup xtP2P, AbstractBlockChain chain, PledgingWallet wallet, AffinityExecutor.ServiceAffinityExecutor executor) {
// The disk manager should only auto load projects in server mode where we install/change them by dropping them
// into the server directory. But in client mode we always want explicit import.
this(mode, peerGroup, chain, wallet, new DiskManager(wallet.getParams(), executor), executor);
this(mode, regularP2P, xtP2P, chain, wallet, new DiskManager(wallet.getParams(), executor), executor);
}

public LighthouseBackend(Mode mode, PeerGroup peerGroup, AbstractBlockChain chain, PledgingWallet wallet, DiskManager diskManager, AffinityExecutor.ServiceAffinityExecutor executor) {
public LighthouseBackend(Mode mode, PeerGroup regularP2P, PeerGroup xtP2P, AbstractBlockChain chain, PledgingWallet wallet, DiskManager diskManager, AffinityExecutor.ServiceAffinityExecutor executor) {
this.diskManager = diskManager;
this.executor = executor;
this.peerGroup = peerGroup;
this.regularP2P = regularP2P;
this.xtP2P = xtP2P;
this.openPledges = new HashMap<>();
this.claimedPledges = new HashMap<>();
this.wallet = wallet;
Expand Down Expand Up @@ -240,7 +243,7 @@ private boolean checkClaimConfidence(Transaction transaction, TransactionConfide
case PENDING:
int seenBy = conf.numBroadcastPeers();
log.info("Claim seen by {} peers", seenBy);
if (seenBy < peerGroup.getMinBroadcastConnections())
if (seenBy < regularP2P.getMinBroadcastConnections())
break;
// Fall through ...
case BUILDING:
Expand Down Expand Up @@ -414,10 +417,10 @@ private CompletableFuture<Set<LHProtos.Pledge>> checkPledgesAgainstP2PNetwork(Pr
executor.executeASAP(() -> {
log.info("Checking {} pledge(s) against P2P network for '{}'", pledges.size(), project);
markAsInProgress(project);
ListenableFuture<List<Peer>> peerFuture = peerGroup.waitForPeersOfVersion(minPeersForUTXOQuery, GetUTXOsMessage.MIN_PROTOCOL_VERSION);
ListenableFuture<List<Peer>> peerFuture = xtP2P.waitForPeersOfVersion(minPeersForUTXOQuery, GetUTXOsMessage.MIN_PROTOCOL_VERSION);
if (!peerFuture.isDone()) {
log.info("Waiting to find {} peers that support getutxo", minPeersForUTXOQuery);
for (Peer peer : peerGroup.getConnectedPeers()) {
for (Peer peer : xtP2P.getConnectedPeers()) {
log.info("Connected to: {}", peer);
}
}
Expand Down Expand Up @@ -796,7 +799,7 @@ public void notifyNewBestBlock(StoredBlock block) throws VerificationException {

if (mode == Mode.CLIENT) {
// Don't bother with pointless/noisy server requeries until we're caught up with the chain tip.
if (block.getHeight() > peerGroup.getMostCommonChainHeight() - 2) {
if (block.getHeight() > regularP2P.getMostCommonChainHeight() - 2) {
log.info("New block found, refreshing pledges");
diskManager.getProjects().stream().filter(project -> project.getPaymentURL() != null).forEach(this::jitteredServerRequery);
}
Expand Down Expand Up @@ -932,7 +935,7 @@ private CompletableFuture<LHProtos.Pledge> broadcastDependenciesOf(LHProtos.Pled
// case of remote nodes (maybe we should forbid this later), it may block for a few seconds whilst
// the transactions propagate.
log.info("Broadcasting dependency {} with thirty second timeout", tx.getHash());
peerGroup.broadcastTransaction(tx).get(30, TimeUnit.SECONDS);
regularP2P.broadcastTransaction(tx).get(30, TimeUnit.SECONDS);
}
result.complete(pledge);
}
Expand Down Expand Up @@ -1079,21 +1082,21 @@ private List<LHProtos.Pledge> whichPledgesAreRevokedBy(Transaction t) {
private BloomFilterManager manager = new BloomFilterManager();

private void installBloomFilterProvider() {
peerGroup.addPeerFilterProvider(manager);
peerGroup.addEventListener(manager, executor);
regularP2P.addPeerFilterProvider(manager);
regularP2P.addEventListener(manager, executor);
}

public void shutdown() {
ignoreAndLog(() -> Uninterruptibles.getUninterruptibly(executor.service.submit(() -> {
peerGroup.removePeerFilterProvider(manager);
peerGroup.removeEventListener(manager);
regularP2P.removePeerFilterProvider(manager);
regularP2P.removeEventListener(manager);
diskManager.shutdown();
executor.service.shutdown();
})));
}

public void refreshBloomFilter() {
peerGroup.recalculateFastCatchupAndFilter(PeerGroup.FilterRecalculateMode.SEND_IF_CHANGED);
regularP2P.recalculateFastCatchupAndFilter(PeerGroup.FilterRecalculateMode.SEND_IF_CHANGED);
}

private Map<TransactionOutPoint, LHProtos.Pledge> getAllOpenPledgedOutpoints() {
Expand Down
2 changes: 1 addition & 1 deletion server/src/main/java/lighthouse/server/PledgeServer.java
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ public static void main(String[] args) throws Exception {
// we'll have to split the backend thread from the http server thread.
AffinityExecutor.ServiceAffinityExecutor executor = new AffinityExecutor.ServiceAffinityExecutor("server");
server.setExecutor(executor);
LighthouseBackend backend = new LighthouseBackend(SERVER, kit.peerGroup(), kit.chain(), (PledgingWallet) kit.wallet(), executor);
LighthouseBackend backend = new LighthouseBackend(SERVER, kit.peerGroup(), kit.peerGroup(), kit.chain(), (PledgingWallet) kit.wallet(), executor);
backend.setMinPeersForUTXOQuery(minPeersSupportingGetUTXO);
server.createContext(LHUtils.HTTP_PATH_PREFIX, new ProjectHandler(backend));
server.createContext("/", exchange -> {
Expand Down

0 comments on commit 1f48914

Please sign in to comment.