Skip to content
This repository has been archived by the owner on Mar 12, 2020. It is now read-only.

Commit

Permalink
feat(cafe): cafe hosts -> bootstrap peers
Browse files Browse the repository at this point in the history
Tricky, have to pass the swarm multi address prefixes to the client via the session. Adding /
removing cafes updates the bootstrap config.

fixes #327
  • Loading branch information
sanderpick committed Nov 2, 2018
1 parent 1222ca9 commit 080d927
Show file tree
Hide file tree
Showing 18 changed files with 304 additions and 157 deletions.
33 changes: 30 additions & 3 deletions core/cafe.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,19 @@ func (t *Textile) RegisterCafe(peerId string) (*repo.CafeSession, error) {
return nil, err
}

return t.datastore.CafeSessions().Get(pid.Pretty()), nil
// add to bootstrap
session := t.datastore.CafeSessions().Get(pid.Pretty())
if session != nil {
var peers []string
for _, s := range session.SwarmAddrs {
peers = append(peers, s+"/ipfs/"+session.CafeId)
}
if err := updateBootstrapConfig(t.repoPath, peers, []string{}); err != nil {
return nil, err
}
}

return session, nil
}

// CafeSessions lists active cafe sessions
Expand All @@ -47,11 +59,11 @@ func (t *Textile) CafeSession(peerId string) (*repo.CafeSession, error) {
}

// RefreshCafeSession attempts to refresh a token with a cafe
func (t *Textile) RefreshCafeSession(cafeId string) (*repo.CafeSession, error) {
func (t *Textile) RefreshCafeSession(peerId string) (*repo.CafeSession, error) {
if !t.Online() {
return nil, ErrOffline
}
session := t.datastore.CafeSessions().Get(cafeId)
session := t.datastore.CafeSessions().Get(peerId)
if session == nil {
return nil, errors.New("session not found")
}
Expand All @@ -63,6 +75,21 @@ func (t *Textile) DeregisterCafe(peerId string) error {
if err := t.touchDatastore(); err != nil {
return err
}
session := t.datastore.CafeSessions().Get(peerId)
if session == nil {
return nil
}

// remove from bootstrap
var peers []string
for _, s := range session.SwarmAddrs {
peers = append(peers, s+"/ipfs/"+session.CafeId)
}
if err := updateBootstrapConfig(t.repoPath, []string{}, peers); err != nil {
return err
}

// delete from datastore
if err := t.datastore.CafeSessions().Delete(peerId); err != nil {
return err
}
Expand Down
81 changes: 53 additions & 28 deletions core/cafe_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/textileio/textile-go/keypair"
"github.com/textileio/textile-go/pb"
"github.com/textileio/textile-go/repo"
"github.com/textileio/textile-go/repo/config"
"github.com/textileio/textile-go/service"
"gx/ipfs/QmVzK524a2VWLqyvtBeiHKsUAWYgeAk4DBeZoY7vpNPNRx/go-block-format"
"gx/ipfs/QmYVNvtQkeZ6AKSwDrjQTs432QtL6umrrK41EBq3cu7iSP/go-cid"
Expand Down Expand Up @@ -42,11 +43,12 @@ const CafeServiceProtocol = protocol.ID("/textile/cafe/1.0.0")

// CafeService is a libp2p pinning and offline message service
type CafeService struct {
service *service.Service
datastore repo.Datastore
inbox *CafeInbox
httpAddr string
open bool
service *service.Service
datastore repo.Datastore
inbox *CafeInbox
httpAddr string
swarmAddrs []string
open bool
}

// NewCafeService returns a new threads service
Expand Down Expand Up @@ -147,11 +149,12 @@ func (h *CafeService) Register(cafe peer.ID) error {
return err
}
session := &repo.CafeSession{
CafeId: cafe.Pretty(),
Access: res.Access,
Refresh: res.Refresh,
Expiry: exp,
HttpAddr: res.HttpAddr,
CafeId: cafe.Pretty(),
Access: res.Access,
Refresh: res.Refresh,
Expiry: exp,
HttpAddr: res.HttpAddr,
SwarmAddrs: res.SwarmAddrs,
}
return h.datastore.CafeSessions().AddOrUpdate(session)
}
Expand Down Expand Up @@ -315,7 +318,8 @@ func (h *CafeService) DeleteMessages(cafe peer.ID) error {
}

// sendCafeRequest sends an authenticated request, retrying once after a session refresh
func (h *CafeService) sendCafeRequest(cafe peer.ID, envFactory func(*repo.CafeSession) (*pb.Envelope, error)) (*pb.Envelope, error) {
func (h *CafeService) sendCafeRequest(
cafe peer.ID, envFactory func(*repo.CafeSession) (*pb.Envelope, error)) (*pb.Envelope, error) {
// find access token for this cafe
session := h.datastore.CafeSessions().Get(cafe.Pretty())
if session == nil {
Expand Down Expand Up @@ -397,11 +401,12 @@ func (h *CafeService) refresh(session *repo.CafeSession) (*repo.CafeSession, err
return nil, err
}
refreshed := &repo.CafeSession{
CafeId: session.CafeId,
Access: res.Access,
Refresh: res.Refresh,
Expiry: exp,
HttpAddr: res.HttpAddr,
CafeId: session.CafeId,
Access: res.Access,
Refresh: res.Refresh,
Expiry: exp,
HttpAddr: res.HttpAddr,
SwarmAddrs: res.SwarmAddrs,
}
if err := h.datastore.CafeSessions().AddOrUpdate(refreshed); err != nil {
return nil, err
Expand Down Expand Up @@ -522,7 +527,14 @@ func (h *CafeService) handleRegistration(pid peer.ID, env *pb.Envelope) (*pb.Env
}

// get a session
session, err := jwt.NewSession(h.service.Node.PrivateKey, pid, h.Protocol(), defaultSessionDuration, h.httpAddr)
session, err := jwt.NewSession(
h.service.Node.PrivateKey,
pid,
h.Protocol(),
defaultSessionDuration,
h.httpAddr,
h.swarmAddrs,
)
if err != nil {
return h.service.NewError(500, err.Error(), env.Message.RequestId)
}
Expand Down Expand Up @@ -586,7 +598,14 @@ func (h *CafeService) handleRefreshSession(pid peer.ID, env *pb.Envelope) (*pb.E
if err != nil {
return h.service.NewError(500, err.Error(), env.Message.RequestId)
}
session, err := jwt.NewSession(h.service.Node.PrivateKey, spid, h.Protocol(), defaultSessionDuration, h.httpAddr)
session, err := jwt.NewSession(
h.service.Node.PrivateKey,
spid,
h.Protocol(),
defaultSessionDuration,
h.httpAddr,
h.swarmAddrs,
)
if err != nil {
return h.service.NewError(500, err.Error(), env.Message.RequestId)
}
Expand Down Expand Up @@ -838,23 +857,29 @@ func (h *CafeService) verifyKeyFunc(token *njwt.Token) (interface{}, error) {
return h.service.Node.PrivateKey.GetPublic(), nil
}

// setHttpApi sets the cafe http api address
func (h *CafeService) setHttpAddr(addr string) {
var host string
if cafeApiHost != nil {
var err error
host, err = ipfs.PublicIPv4Addr(h.service.Node)
if err != nil {
log.Infof(err.Error())
}
// setAddrs sets addresses used in sessions generated by this host
func (h *CafeService) setAddrs(bindedHttpAddr string, swarmPorts config.SwarmPorts) {
// get a public host if available
host, err := ipfs.PublicIPv4Addr(h.service.Node)
if err != nil {
log.Infof(err.Error())
}
if host == "" {
host = "127.0.0.1"
}
parts := strings.Split(addr, ":")

// set the http address where peers can reach this cafe
parts := strings.Split(bindedHttpAddr, ":")
if len(parts) < 2 {
return
}
h.httpAddr = fmt.Sprintf("%s:%s", host, parts[1])
log.Infof("cafe http api address: %s", h.httpAddr)

// set the swarm multiaddress(es) where other peers can reach this cafe
h.swarmAddrs = []string{
fmt.Sprintf("/ip4/%s/tcp/%s", host, swarmPorts.TCP),
fmt.Sprintf("/ip4/%s/tcp/%s/ws", host, swarmPorts.WS),
}
log.Infof("cafe multiaddresses: %s, %s", h.swarmAddrs[0], h.swarmAddrs[1])
}
104 changes: 91 additions & 13 deletions core/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,31 @@ import (
"fmt"
"github.com/textileio/textile-go/repo/config"
"gx/ipfs/QmebqVUQQqQFhg74FtQFszUJo22Vpr3e8qBAkvvV4ho9HH/go-ipfs/repo"
"gx/ipfs/QmebqVUQQqQFhg74FtQFszUJo22Vpr3e8qBAkvvV4ho9HH/go-ipfs/repo/fsrepo"
"math/rand"
"regexp"
"strconv"
"strings"
"time"
)

const minPort = 1024
const maxPort = 49151

var tcpPortRx = regexp.MustCompile("/tcp/([0-9]+)$")
var wsPortRx = regexp.MustCompile("/tcp/([0-9]+)/ws$")

// Config returns the textile configuration file
func (t *Textile) Config() *config.Config {
return t.config
}

// GetRandomPort returns a port within the acceptable range
func GetRandomPort() string {
rand.Seed(time.Now().UTC().UnixNano())
return strconv.Itoa(rand.Intn(maxPort-minPort) + minPort)
}

// applyTextileConfigOptions update textile config w/ init options
func applyTextileConfigOptions(init InitConfig) error {
conf, err := config.Read(init.RepoPath)
Expand Down Expand Up @@ -42,25 +59,86 @@ func applyTextileConfigOptions(init InitConfig) error {
return config.Write(init.RepoPath, conf)
}

//func ensureBootstrapConfig(rep repo.Repo) error {
// return config.UpdateIpfs(rep, "Bootstrap", config.BootstrapAddresses)
// return nil
//}
// updateBootstrapConfig adds additional peers to the bootstrap config
func updateBootstrapConfig(repoPath string, add []string, rm []string) error {
rep, err := fsrepo.Open(repoPath)
if err != nil {
return err
}
defer rep.Close()
conf, err := rep.Config()
if err != nil {
return err
}
var final []string

// get a list that does not include items in rm
outer:
for _, bp := range conf.Bootstrap {
for _, r := range rm {
if bp == r {
continue outer
}
}
final = append(final, bp)
}

// add new
for _, p := range add {
final = append(final, p)
}
return config.UpdateIpfs(rep, "Bootstrap", final)
}

// loadSwarmPorts returns the swarm ports in the ipfs config
func loadSwarmPorts(repoPath string) (*config.SwarmPorts, error) {
rep, err := fsrepo.Open(repoPath)
if err != nil {
return nil, err
}
defer rep.Close()
conf, err := rep.Config()
if err != nil {
return nil, err
}
ports := &config.SwarmPorts{}
for _, p := range conf.Addresses.Swarm {
tcp := tcpPortRx.FindStringSubmatch(p)
if len(tcp) == 2 {
ports.TCP = tcp[1]
}
ws := wsPortRx.FindStringSubmatch(p)
if len(ws) == 2 {
ports.WS = ws[1]
}
}
return ports, nil
}

// applySwarmPortConfigOption sets custom swarm ports (tcp and ws)
func applySwarmPortConfigOption(rep repo.Repo, ports string) error {
parts := strings.Split(ports, ",")
if len(parts) != 2 {
return nil
var parts []string
if ports != "" {
parts = strings.Split(ports, ",")
}
var tcp, ws string
switch len(parts) {
case 1:
tcp = parts[0]
ws = GetRandomPort()
case 2:
tcp = parts[0]
ws = parts[1]
default:
tcp = GetRandomPort()
ws = GetRandomPort()
}
return config.UpdateIpfs(rep, "Addresses.Swarm", []string{
fmt.Sprintf("/ip4/0.0.0.0/tcp/%s", parts[0]),
fmt.Sprintf("/ip6/::/tcp/%s", parts[0]),
fmt.Sprintf("/ip4/0.0.0.0/tcp/%s/ws", parts[1]),
fmt.Sprintf("/ip6/::/tcp/%s/ws", parts[1]),
fmt.Sprintf("/ip4/0.0.0.0/tcp/%s", tcp),
fmt.Sprintf("/ip6/::/tcp/%s", tcp),
fmt.Sprintf("/ip4/0.0.0.0/tcp/%s/ws", ws),
fmt.Sprintf("/ip6/::/tcp/%s/ws", ws),
})
log.Infof("applied custom swarm port: %s", ports)
return nil
}

// applyServerConfigOption adds the IPFS server profile to the repo config
Expand Down
11 changes: 10 additions & 1 deletion core/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -295,6 +295,15 @@ func (t *Textile) Start() error {
return err
}

// load swarm ports
sports, err := loadSwarmPorts(t.repoPath)
if err != nil {
return err
}
if sports == nil {
return errors.New("failed to load swarm ports")
}

// build update channels
t.online = make(chan struct{})
t.updates = make(chan Update, 10)
Expand Down Expand Up @@ -358,8 +367,8 @@ func (t *Textile) Start() error {

// setup cafe service
t.cafeService = NewCafeService(accnt, t.ipfs, t.datastore, t.cafeInbox)
t.cafeService.setAddrs(t.config.Addresses.CafeAPI, *sports)
if t.config.Cafe.Open {
t.cafeService.setHttpAddr(t.config.Addresses.CafeAPI)
t.cafeService.open = true
t.startCafeApi(t.config.Addresses.CafeAPI)
}
Expand Down
6 changes: 5 additions & 1 deletion core/thread.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,9 +223,13 @@ func (t *Thread) newBlockHeader() (*pb.ThreadBlockHeader, error) {
if err != nil {
return nil, err
}
var parents []string
if head != "" {
parents = strings.Split(head, ",")
}
return &pb.ThreadBlockHeader{
Date: pdate,
Parents: strings.Split(string(head), ","),
Parents: parents,
Author: t.node().Identity.Pretty(),
}, nil
}
Expand Down
3 changes: 1 addition & 2 deletions desktop/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import (
"github.com/textileio/textile-go/gateway"
"github.com/textileio/textile-go/keypair"
"github.com/textileio/textile-go/repo"
rconfig "github.com/textileio/textile-go/repo/config"
logger "gx/ipfs/QmQvJiADDe7JR4m968MwXobTCCzUqQkP87aRHe29MEBGHV/go-logging"
"gx/ipfs/QmebqVUQQqQFhg74FtQFszUJo22Vpr3e8qBAkvvV4ho9HH/go-ipfs/repo/fsrepo"
"os"
Expand Down Expand Up @@ -182,7 +181,7 @@ func start(a *astilectron.Astilectron, w []*astilectron.Window, _ *astilectron.M
}()

// start the gateway
gateway.Host.Start(fmt.Sprintf("127.0.0.1:%d", rconfig.GetRandomPort()))
gateway.Host.Start(fmt.Sprintf("127.0.0.1:%s", core.GetRandomPort()))

// save off the server address
gatewayAddr = fmt.Sprintf("http://%s", gateway.Host.Addr())
Expand Down
Loading

0 comments on commit 080d927

Please sign in to comment.