Skip to content

Commit

Permalink
Merge 4a89e2c into 8fb32b0
Browse files Browse the repository at this point in the history
  • Loading branch information
bfbachmann committed May 7, 2017
2 parents 8fb32b0 + 4a89e2c commit ae3c14d
Show file tree
Hide file tree
Showing 7 changed files with 405 additions and 16 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
# Compiled Object files, Static and Dynamic libs (Shared Objects)
cumulus
*.o
*.a
*.so
Expand Down
1 change: 1 addition & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,4 @@ before_install:
- go get github.com/mattn/goveralls
script:
- $HOME/gopath/bin/goveralls -service=travis-ci
- go test ./...
5 changes: 0 additions & 5 deletions cumulus.go

This file was deleted.

11 changes: 0 additions & 11 deletions cumulus_test.go

This file was deleted.

64 changes: 64 additions & 0 deletions main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
package main

import (
"flag"
"io/ioutil"

log "github.com/sirupsen/logrus"
"github.com/ubclaunchpad/cumulus/peer"
)

func main() {
log.Info("Starting Cumulus Peer")

// Get and parse command line arguments
// targetPeer is a Multiaddr representing the target peer to connect to
// when joining the Cumulus Network.
// port is the port to communicate over (defaults to peer.DefaultPort).
// ip is the public IP address of the this host.
targetPeer := flag.String("t", "", "target peer to connect to")
port := flag.Int("p", peer.DefaultPort, "TCP port to use for this host")
ip := flag.String("i", peer.DefaultIP, "IP address to use for this host")
debug := flag.Bool("d", false, "Enable debug logging")
flag.Parse()

if *debug {
log.SetLevel(log.DebugLevel)
}

// Set up a new host on the Cumulus network
host, err := peer.NewPeer(*ip, *port)
if err != nil {
log.Fatal(err)
}

// Set the host StreamHandler for the Cumulus Protocol and use
// BasicStreamHandler as its StreamHandler.
host.SetStreamHandler(peer.CumulusProtocol, host.Receive)
if *targetPeer == "" {
// No target was specified, wait for incoming connections
log.Info("No target provided. Listening for incoming connections...")
select {} // Hang until someone connects to us
}

stream, err := host.Connect(*targetPeer)
if err != nil {
log.Error(err)
}

// Send a message to the peer
_, err = stream.Write([]byte("Hello, world!\n"))
if err != nil {
log.Error(err)
}

// Read the reply from the peer
reply, err := ioutil.ReadAll(stream)
if err != nil {
log.Error(err)
}

log.Debugf("Peer %s read reply: %s", host.ID(), string(reply))

stream.Close()
}
178 changes: 178 additions & 0 deletions peer/peer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,178 @@
package peer

import (
"bufio"
"context"
"fmt"

crypto "github.com/libp2p/go-libp2p-crypto"
host "github.com/libp2p/go-libp2p-host"
net "github.com/libp2p/go-libp2p-net"
lpeer "github.com/libp2p/go-libp2p-peer"
pstore "github.com/libp2p/go-libp2p-peerstore"
swarm "github.com/libp2p/go-libp2p-swarm"
bhost "github.com/libp2p/go-libp2p/p2p/host/basic"
ma "github.com/multiformats/go-multiaddr"
log "github.com/sirupsen/logrus"
)

const (
// DefaultPort is the TCP port hosts will communicate over if none is
// provided
DefaultPort = 8765

// CumulusProtocol is the name of the protocol peers communicate over
CumulusProtocol = "/cumulus/0.0.1"

// DefaultIP is the IP address new hosts will use if none if provided
DefaultIP = "127.0.0.1"
)

// Peer is a cumulus Peer composed of a host
type Peer struct {
host.Host
}

// NewPeer creates a Cumulus host with the given IP addr and TCP port.
// This may throw an error if we fail to create a key pair, a pid, or a new
// multiaddress.
func NewPeer(ip string, port int) (*Peer, error) {
// Make sure we received a valid port number

// Generate a key pair for this host. We will only use the pudlic key to
// obtain a valid host ID.
// Cannot throw error with given arguments
_, pub, _ := crypto.GenerateKeyPair(crypto.RSA, 2048)

// Obtain Peer ID from public key.
// Cannot throw error with given argument
pid, _ := lpeer.IDFromPublicKey(pub)

// Create a multiaddress (IP address and TCP port for this peer).
addr, err := ma.NewMultiaddr(fmt.Sprintf("/ip4/%s/tcp/%d", ip, port))
if err != nil {
return nil, err
}

ps := pstore.NewPeerstore()

// Create swarm (this is the interface to the libP2P Network) using the
// multiaddress, peerID, and peerStore we just created.
netwrk, err := swarm.NewNetwork(
context.Background(),
[]ma.Multiaddr{addr},
pid,
ps,
nil)
if err != nil {
return nil, err
}

// Actually create the host and peer with the network we just set up.
host := bhost.New(netwrk)
peer := &Peer{Host: host}

// Build host multiaddress
hostAddr, _ := ma.NewMultiaddr(fmt.Sprintf("/ipfs/%s",
host.ID().Pretty()))

// Now we can build a full multiaddress to reach this host
// by encapsulating both addresses:
fullAddr := addr.Encapsulate(hostAddr)
log.Info("I am ", fullAddr)

// Add this host's address to its peerstore (avoid's net/identi error)
ps.AddAddr(pid, fullAddr, pstore.PermanentAddrTTL)

return peer, nil
}

// Receive is the function that gets called when a remote peer
// opens a new stream with the host that SetStreamHandler() is called on.
// This should be passed as the second argument to SetStreamHandler().
// We may want to implement another type of StreamHandler in the future.
func (p *Peer) Receive(s net.Stream) {
log.Debug("Setting basic stream handler.")
defer s.Close()
p.doCumulus(s)
}

// Communicate with peers.
// TODO: Update this to do something useful. For now it just reads from the
// stream and writes back what it read.
func (p *Peer) doCumulus(s net.Stream) {
buf := bufio.NewReader(s)
str, err := buf.ReadString('\n')
if err != nil {
log.Error(err)
return
}

log.Debugf("Peer %s read: %s", p.ID(), str)

_, err = s.Write([]byte(str))
if err != nil {
log.Error(err)
return
}
}

// ExtractPeerInfo extracts the peer ID and multiaddress from the
// given multiaddress.
// Returns peer ID (esentially 46 character hash created by the peer)
// and the peer's multiaddress in the form /ip4/<peer IP>/tcp/<CumulusPort>.
func ExtractPeerInfo(peerma string) (lpeer.ID, ma.Multiaddr, error) {
log.Debug("Extracting peer info from ", peerma)

ipfsaddr, err := ma.NewMultiaddr(peerma)
if err != nil {
return "-", nil, err
}

// Cannot throw error when passed P_IPFS
pid, err := ipfsaddr.ValueForProtocol(ma.P_IPFS)
if err != nil {
return "-", nil, err
}

// Cannot return error if no error was returned in ValueForProtocol
peerid, _ := lpeer.IDB58Decode(pid)

// Decapsulate the /ipfs/<peerID> part from the target
// /ip4/<a.b.c.d>/ipfs/<peer> becomes /ip4/<a.b.c.d>
targetPeerAddr, err := ma.NewMultiaddr(
fmt.Sprintf("/ipfs/%s", lpeer.IDB58Encode(peerid)))
if err != nil {
return "-", nil, err
}

trgtAddr := ipfsaddr.Decapsulate(targetPeerAddr)

return peerid, trgtAddr, nil
}

// Connect adds the given multiaddress to p's Peerstore and opens a stream
// with the peer at that multiaddress if the multiaddress is valid, otherwise
// returns error.
func (p *Peer) Connect(peerma string) (net.Stream, error) {
peerid, targetAddr, err := ExtractPeerInfo(peerma)
if err != nil {
return nil, err
}

// Store the peer's address in this host's PeerStore
p.Peerstore().AddAddr(peerid, targetAddr, pstore.PermanentAddrTTL)

log.Debug("Connected to Cumulus Peer:")
log.Debug("Peer ID:", peerid.Pretty())
log.Debug("Peer Address:", targetAddr)

// Open a stream with the peer
stream, err := p.NewStream(context.Background(), peerid,
CumulusProtocol)
if err != nil {
return nil, err
}

return stream, nil
}
Loading

0 comments on commit ae3c14d

Please sign in to comment.