Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Aws5 #261

Merged
merged 2 commits into from
May 27, 2018
Merged

Aws5 #261

Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 30 additions & 24 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,46 +47,52 @@ used later in this demo.
$ cat mint-demo.json | cargo run --release --bin solana-genesis-demo > genesis.log
```

Now you can start the server:
Before you start the server, make sure you know the IP address of the machine ou want to be the leader for the demo, and make sure that udp ports 8000-10000 are open on all the machines you wan to test with. Now you can start the server:

```bash
$ cat genesis.log | cargo run --release --bin solana-fullnode > transactions0.log
$ cat ./multinode-demo/leader.sh
#!/bin/bash
export RUST_LOG=solana=info
sudo sysctl -w net.core.rmem_max=26214400
cat genesis.log leader.log | cargo run --release --features cuda --bin solana-testnode -- -s leader.json -l leader.json -b 8000 -d 2>&1 | tee leader-tee.log
$ ./multinode-demo/leader.sh
```

Wait a few seconds for the server to initialize. It will print "Ready." when it's safe
to start sending it transactions.

Then, in a separate shell, let's execute some transactions. Note we pass in
the JSON configuration file here, not the genesis ledger.
Now you can start some validators:

```bash
$ cat mint-demo.json | cargo run --release --bin solana-client-demo
```

Now kill the server with Ctrl-C, and take a look at the ledger. You should
see something similar to:

```json
{"num_hashes":27,"id":[0, "..."],"event":"Tick"}
{"num_hashes":3,"id":[67, "..."],"event":{"Transaction":{"tokens":42}}}
{"num_hashes":27,"id":[0, "..."],"event":"Tick"}
$ cat ./multinode-demo/validator.sh
#!/bin/bash
rsync -v -e ssh $1:~/solana/mint-demo.json .
rsync -v -e ssh $1:~/solana/leader.json .
rsync -v -e ssh $1:~/solana/genesis.log .
rsync -v -e ssh $1:~/solana/leader.log .
rsync -v -e ssh $1:~/solana/libcuda_verify_ed25519.a .
export RUST_LOG=solana=info
sudo sysctl -w net.core.rmem_max=26214400
cat genesis.log leader.log | cargo run --release --features cuda --bin solana-testnode -- -l validator.json -s validator.json -v leader.json -b 9000 -d 2>&1 | tee validator-tee.log
$ ./multinode-demo/validator.sh ubuntu@10.0.1.51 #The leader machine
```

Now restart the server from where we left off. Pass it both the genesis ledger, and
the transaction ledger.

```bash
$ cat genesis.log transactions0.log | cargo run --release --bin solana-fullnode > transactions1.log
```

Lastly, run the client demo again, and verify that all funds were spent in the
previous round, and so no additional transactions are added.
Then, in a separate shell, let's execute some transactions. Note we pass in
the JSON configuration file here, not the genesis ledger.
>>>>>>> logs

```bash
$ cat mint-demo.json | cargo run --release --bin solana-client-demo
$ cat ./multinode-demo/client.sh
#!/bin/bash
export RUST_LOG=solana=info
rsync -v -e ssh $1:~/solana/leader.json .
rsync -v -e ssh $1:~/solana/mint-demo.json .
cat mint-demo.json | cargo run --release --bin solana-full-node -- -l leader.json -c 8100 -n 1
$ ./multinode-demo/client.sh ubuntu@10.0.1.51 #The leader machine
```

Stop the server again, and verify there are only Tick entries, and no Transaction entries.
Try starting a more validators and reruning the client demo!

Developing
===
Expand Down
10 changes: 4 additions & 6 deletions multinode-demo/client.sh
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
#!/bin/bash
cd /home/ubuntu/solana
#git pull
export RUST_LOG=solana::crdt=trace
# scp ubuntu@18.206.1.146:~/solana/leader.json .
# scp ubuntu@18.206.1.146:~/solana/mint-demo.json .
cat mint-demo.json | cargo run --release --bin solana-multinode-demo -- -l leader.json -c 10.0.5.179:8100 -n 3
export RUST_LOG=solana=info
rsync -v -e ssh $1:~/solana/leader.json .
rsync -v -e ssh $1:~/solana/mint-demo.json .
cat mint-demo.json | cargo run --release --bin solana-client-demo -- -l leader.json -c 8100 -n 1
6 changes: 2 additions & 4 deletions multinode-demo/leader.sh
Original file line number Diff line number Diff line change
@@ -1,6 +1,4 @@
#!/bin/bash
cd /home/ubuntu/solana
git pull
export RUST_LOG=solana=info
cat genesis.log | cargo run --release --features cuda --bin solana-testnode -- -s leader.json -b 8000 -d | grep INFO
#cat genesis.log | cargo run --release --bin solana-testnode -- -s leader.json -b 8000 -d
sudo sysctl -w net.core.rmem_max=26214400
cat genesis.log leader.log | cargo run --release --features cuda --bin solana-testnode -- -s leader.json -l leader.json -b 8000 -d 2>&1 | tee leader-tee.log
15 changes: 7 additions & 8 deletions multinode-demo/validator.sh
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
#!/bin/bash
cd /home/ubuntu/solana
git pull
scp ubuntu@18.206.1.146:~/solana/mint-demo.json .
scp ubuntu@18.206.1.146:~/solana/leader.json .
scp ubuntu@18.206.1.146:~/solana/genesis.log .
scp ubuntu@18.206.1.146:~/solana/libcuda_verify_ed25519.a .
rsync -v -e ssh $1:~/solana/mint-demo.json .
rsync -v -e ssh $1:~/solana/leader.json .
rsync -v -e ssh $1:~/solana/genesis.log .
rsync -v -e ssh $1:~/solana/leader.log .
rsync -v -e ssh $1:~/solana/libcuda_verify_ed25519.a .
export RUST_LOG=solana=info
cat genesis.log | cargo run --release --features cuda --bin solana-testnode -- -s replicator.json -v leader.json -b 9000 -d 2>&1 | tee validator.log

sudo sysctl -w net.core.rmem_max=26214400
cat genesis.log leader.log | cargo run --release --features cuda --bin solana-fullnode -- -l validator.json -s validator.json -v leader.json -b 9000 -d 2>&1 | tee validator-tee.log
31 changes: 22 additions & 9 deletions src/bin/client-demo.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
extern crate futures;
extern crate getopts;
extern crate isatty;
extern crate pnet;
extern crate rayon;
extern crate serde_json;
extern crate solana;

use futures::Future;
use getopts::Options;
use isatty::stdin_isatty;
use pnet::datalink;
use rayon::prelude::*;
use solana::crdt::{Crdt, ReplicatedData};
use solana::mint::MintDemo;
Expand All @@ -18,7 +20,7 @@ use solana::transaction::Transaction;
use std::env;
use std::fs::File;
use std::io::{stdin, Read};
use std::net::{SocketAddr, UdpSocket};
use std::net::{IpAddr, SocketAddr, UdpSocket};
use std::process::exit;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, RwLock};
Expand All @@ -36,14 +38,25 @@ fn print_usage(program: &str, opts: Options) {
print!("{}", opts.usage(&brief));
}

fn get_ip_addr() -> Option<IpAddr> {
for iface in datalink::interfaces() {
for p in iface.ips {
if !p.ip().is_loopback() && !p.ip().is_multicast() {
return Some(p.ip());
}
}
}
None
}

fn main() {
let mut threads = 4usize;
let mut num_nodes = 10usize;
let mut leader = "leader.json".to_string();

let mut opts = Options::new();
opts.optopt("l", "", "leader", "leader.json");
opts.optopt("c", "", "client address", "host:port");
opts.optopt("c", "", "client port", "port");
opts.optopt("t", "", "number of threads", &format!("{}", threads));
opts.optopt(
"n",
Expand All @@ -69,12 +82,13 @@ fn main() {
if matches.opt_present("l") {
leader = matches.opt_str("l").unwrap();
}
let client_addr: Arc<RwLock<SocketAddr>> = if matches.opt_present("c") {
let addr = matches.opt_str("c").unwrap().parse().unwrap();
Arc::new(RwLock::new(addr))
} else {
Arc::new(RwLock::new("127.0.0.1:8010".parse().unwrap()))
};
let mut addr: SocketAddr = "127.0.0.1:8010".parse().unwrap();
if matches.opt_present("c") {
let port = matches.opt_str("c").unwrap().parse().unwrap();
addr.set_port(port);
}
addr.set_ip(get_ip_addr().unwrap());
let client_addr: Arc<RwLock<SocketAddr>> = Arc::new(RwLock::new(addr));
if matches.opt_present("t") {
threads = matches.opt_str("t").unwrap().parse().expect("integer");
}
Expand Down Expand Up @@ -230,7 +244,6 @@ fn converge(
let mut spy_crdt = Crdt::new(spy);
spy_crdt.insert(&leader);
spy_crdt.set_leader(leader.id);

let spy_ref = Arc::new(RwLock::new(spy_crdt));
let spy_window = default_window();
let t_spy_listen = Crdt::listen(spy_ref.clone(), spy_window, spy_gossip, exit.clone());
Expand Down
19 changes: 14 additions & 5 deletions src/bin/fullnode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,12 @@ use solana::signature::{KeyPair, KeyPairUtil};
use solana::transaction::Instruction;
use std::env;
use std::fs::File;
use std::io::{stdin, stdout, Read};
use std::io::{stdin, Read};
use std::net::{IpAddr, SocketAddr, UdpSocket};
use std::process::exit;
use std::sync::Arc;
use std::sync::atomic::AtomicBool;
use std::time::Duration;
//use std::time::Duration;

fn print_usage(program: &str, opts: Options) {
let mut brief = format!("Usage: cat <transaction.log> | {} [options]\n\n", program);
Expand All @@ -38,6 +38,7 @@ fn main() {
opts.optopt("b", "", "bind", "bind to port or address");
opts.optflag("d", "dyn", "detect network address dynamically");
opts.optopt("s", "", "save", "save my identity to path.json");
opts.optopt("l", "", "load", "load my identity to path.json");
opts.optflag("h", "help", "print help");
opts.optopt(
"v",
Expand Down Expand Up @@ -130,6 +131,12 @@ fn main() {
// we need all the receiving sockets to be bound within the expected
// port range that we open on aws
let mut repl_data = make_repl_data(&bind_addr);
if matches.opt_present("l") {
let path = matches.opt_str("l").unwrap();
if let Ok(file) = File::open(path) {
repl_data = serde_json::from_reader(file).expect("parse");
}
}
let threads = if matches.opt_present("v") {
eprintln!("starting validator... {}", repl_data.requests_addr);
let path = matches.opt_str("v").unwrap();
Expand All @@ -149,18 +156,20 @@ fn main() {
} else {
eprintln!("starting leader... {}", repl_data.requests_addr);
repl_data.current_leader_id = repl_data.id.clone();
let file = File::create("leader.log").expect("leader.log create");
let server = Server::new_leader(
bank,
last_id,
Some(Duration::from_millis(1000)),
//Some(Duration::from_millis(1000)),
None,
repl_data.clone(),
UdpSocket::bind(repl_data.requests_addr).unwrap(),
UdpSocket::bind(repl_data.transactions_addr).unwrap(),
UdpSocket::bind("0.0.0.0:0").unwrap(),
UdpSocket::bind("0.0.0.0:0").unwrap(),
UdpSocket::bind(repl_data.gossip_addr).unwrap(),
exit.clone(),
stdout(),
file,
);
server.thread_hdls
};
Expand All @@ -169,7 +178,7 @@ fn main() {
let file = File::create(path).expect("file");
serde_json::to_writer(file, &repl_data).expect("serialize");
}
eprintln!("Ready. Listening on {}", bind_addr);
eprintln!("Ready. Listening on {}", repl_data.transactions_addr);

for t in threads {
t.join().expect("join");
Expand Down
26 changes: 16 additions & 10 deletions src/crdt.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
use bincode::{deserialize, serialize};
use byteorder::{LittleEndian, ReadBytesExt};
use hash::Hash;
use packet::SharedBlob;
use packet::{SharedBlob, BLOB_SIZE};
use rayon::prelude::*;
use result::{Error, Result};
use ring::rand::{SecureRandom, SystemRandom};
Expand Down Expand Up @@ -226,6 +226,7 @@ impl Crdt {
.expect("set_index in pub fn broadcast");
//TODO profile this, may need multiple sockets for par_iter
trace!("broadcast {} to {}", blob.meta.size, v.replicate_addr);
assert!(blob.meta.size < BLOB_SIZE);
let e = s.send_to(&blob.data[..blob.meta.size], &v.replicate_addr);
trace!("done broadcast {} to {}", blob.meta.size, v.replicate_addr);
e
Expand Down Expand Up @@ -285,6 +286,7 @@ impl Crdt {
v.replicate_addr
);
//TODO profile this, may need multiple sockets for par_iter
assert!(rblob.meta.size < BLOB_SIZE);
s.send_to(&rblob.data[..rblob.meta.size], &v.replicate_addr)
})
.collect();
Expand Down Expand Up @@ -327,14 +329,16 @@ impl Crdt {
}

pub fn window_index_request(&self, ix: u64) -> Result<(SocketAddr, Vec<u8>)> {
if self.table.len() <= 1 {
let daddr = "0.0.0.0:0".parse().unwrap();
let valid: Vec<_> = self.table
.values()
.filter(|r| r.id != self.me && r.replicate_addr != daddr)
.collect();
if valid.is_empty() {
return Err(Error::CrdtTooSmall);
}
let mut n = (Self::random() as usize) % self.table.len();
while self.table.values().nth(n).unwrap().id == self.me {
n = (Self::random() as usize) % self.table.len();
}
let addr = self.table.values().nth(n).unwrap().gossip_addr.clone();
let n = (Self::random() as usize) % valid.len();
let addr = valid[n].gossip_addr.clone();
let req = Protocol::RequestWindowIndex(self.table[&self.me].clone(), ix);
let out = serialize(&req)?;
Ok((addr, out))
Expand Down Expand Up @@ -431,6 +435,7 @@ impl Crdt {
"responding RequestWindowIndex {} {}",
ix, from.replicate_addr
);
assert!(outblob.len() < BLOB_SIZE);
sock.send_to(&outblob, from.replicate_addr)?;
}
Ok(())
Expand All @@ -442,7 +447,7 @@ impl Crdt {
sock: &UdpSocket,
) -> Result<()> {
//TODO cache connections
let mut buf = vec![0u8; 1024 * 64];
let mut buf = vec![0u8; BLOB_SIZE];
trace!("recv_from on {}", sock.local_addr().unwrap());
let (amt, src) = sock.recv_from(&mut buf)?;
trace!("got request from {}", src);
Expand All @@ -451,7 +456,7 @@ impl Crdt {
match r {
// TODO sigverify these
Protocol::RequestUpdates(v, reqdata) => {
trace!("RequestUpdates {}", v);
trace!("RequestUpdates {} from {}", v, src);
let addr = reqdata.gossip_addr;
// only lock for this call, dont lock during IO `sock.send_to` or `sock.recv_from`
let (from, ups, data) = obj.read()
Expand All @@ -464,12 +469,13 @@ impl Crdt {
obj.write()
.expect("'obj' write lock in RequestUpdates")
.insert(&reqdata);
assert!(rsp.len() < BLOB_SIZE);
sock.send_to(&rsp, addr)
.expect("'sock.send_to' in RequestUpdates");
trace!("send_to done!");
}
Protocol::ReceiveUpdates(from, ups, data) => {
trace!("ReceivedUpdates");
trace!("ReceivedUpdates {} from {}", ups, src);
obj.write()
.expect("'obj' write lock in ReceiveUpdates")
.apply_updates(from, ups, &data);
Expand Down
2 changes: 1 addition & 1 deletion src/sigverify.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ fn batch_size(batches: &Vec<SharedPackets>) -> usize {
batches
.iter()
.map(|p| p.read().unwrap().packets.len())
.fold(0, |x, y| x + y)
.sum()
}

#[cfg(not(feature = "cuda"))]
Expand Down
6 changes: 4 additions & 2 deletions src/streamer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
use crdt::Crdt;
#[cfg(feature = "erasure")]
use erasure;
use packet::{Blob, BlobRecycler, PacketRecycler, SharedBlob, SharedPackets};
use packet::{Blob, BlobRecycler, PacketRecycler, SharedBlob, SharedPackets, BLOB_SIZE};
use result::{Error, Result};
use std::collections::VecDeque;
use std::net::{SocketAddr, UdpSocket};
Expand Down Expand Up @@ -177,10 +177,11 @@ fn repair_window(
trace!("repair_window counter {} {}", *times, *consumed);
return Ok(());
}
info!("repair_window request {} {}", *consumed, *received);
let sock = UdpSocket::bind("0.0.0.0:0")?;
for (to, req) in reqs {
//todo cache socket
info!("repair_window request {} {} {}", *consumed, *received, to);
assert!(req.len() < BLOB_SIZE);
sock.send_to(&req, to)?;
}
Ok(())
Expand Down Expand Up @@ -510,6 +511,7 @@ mod bench {
let mut num = 0;
for p in msgs_.read().unwrap().packets.iter() {
let a = p.meta.addr();
assert!(p.meta.size < packet::BLOB_SIZE);
send.send_to(&p.data[..p.meta.size], &a).unwrap();
num += 1;
}
Expand Down