Skip to content
This repository has been archived by the owner on Jan 9, 2024. It is now read-only.

Commit

Permalink
Relay initiates the connection
Browse files Browse the repository at this point in the history
  • Loading branch information
lbarman committed Jan 30, 2017
1 parent 4dedc8d commit 98de9c6
Show file tree
Hide file tree
Showing 3 changed files with 92 additions and 19 deletions.
5 changes: 0 additions & 5 deletions sda/services/churn.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,6 @@ package services
// This file contains the logic to handle churn.

import (
"time"

"github.com/lbarman/prifi/sda/protocols"
"gopkg.in/dedis/onet.v1"
"gopkg.in/dedis/onet.v1/log"
Expand All @@ -31,9 +29,6 @@ import (
* count the number of participants, if > threshold, start prifi
*/

//Delay before each host re-tried to connect
const DELAY_BEFORE_KEEPALIVE = 5 * time.Second

type waitQueueEntry struct {
serverID *network.ServerIdentity
numericID int
Expand Down
75 changes: 73 additions & 2 deletions sda/services/prifi.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,20 @@ type StopProtocol struct{}
// by nodes that want to join the protocol.
type ConnectionRequest struct{}

// HelloMsg messages are sent by the relay to the trustee;
// if they are up, they answer with a ConnectionRequest
type HelloMsg struct{}

// DisconnectionRequest messages are sent to the relay
// by nodes that want to leave the protocol.
type DisconnectionRequest struct{}

//Delay before each host re-tried to connect to the relay
const DELAY_BEFORE_CONNECT_TO_RELAY = 5 * time.Second

//Delay before the relay re-tried to connect to the trustees
const DELAY_BEFORE_CONNECT_TO_TRUSTEES = 30 * time.Second

// returns true if the PriFi SDA protocol is running (in any state : init, communicate, etc)
func (s *ServiceState) IsPriFiProtocolRunning() bool {
if s.priFiSDAProtocol != nil {
Expand All @@ -34,6 +44,22 @@ func (s *ServiceState) HandleStop(msg *network.Envelope) {

}

// Packet send by relay when some node connected
func (s *ServiceState) HandleHelloMsg(msg *network.Envelope) {
if s.role != prifi_protocol.Trustee {
log.Error("Received a Hello message, but we're not a trustee ! ignoring.")
return
}

if !s.receivedHello {
//start sending some ConnectionRequests
s.relayIdentity = msg.ServerIdentity
go s.connectToRelay(s.relayIdentity, s.connectToRelayStopChan)
s.receivedHello = true
}

}

// Packet send by relay when some node connected
func (s *ServiceState) HandleConnection(msg *network.Envelope) {
if s.churnHandler == nil {
Expand Down Expand Up @@ -66,6 +92,7 @@ func (s *ServiceState) NetworkErrorHappened(si *network.ServerIdentity) {

if s.role != prifi_protocol.Relay {
log.Lvl3("A network error occurred with node", si, ", but we're not the relay, nothing to do.")
s.connectToRelayStopChan <- true //"nothing" except stop this goroutine
return
}
if s.churnHandler == nil {
Expand Down Expand Up @@ -143,14 +170,46 @@ func (s *ServiceState) stopPriFiCommunicateProtocol() {
// autoConnect sends a connection request to the relay
// every 10 seconds if the node is not participating to
// a PriFi protocol.
func (s *ServiceState) autoConnect(relayID *network.ServerIdentity) {
func (s *ServiceState) connectToTrustees(trusteesIDs []*network.ServerIdentity, stopChan chan bool) {
for _, v := range trusteesIDs {
s.sendHelloMessage(v)
}

tick := time.Tick(DELAY_BEFORE_CONNECT_TO_TRUSTEES)
for range tick {
if !s.IsPriFiProtocolRunning() {
for _, v := range trusteesIDs {
s.sendHelloMessage(v)
}
}

select {
case <-stopChan:
log.Lvl3("Stopping connectToTrustees subroutine.")
return
default:
}
}
}

// connectToRelay sends a connection request to the relay
// every 10 seconds if the node is not participating to
// a PriFi protocol.
func (s *ServiceState) connectToRelay(relayID *network.ServerIdentity, stopChan chan bool) {
s.sendConnectionRequest(relayID)

tick := time.Tick(DELAY_BEFORE_KEEPALIVE)
tick := time.Tick(DELAY_BEFORE_CONNECT_TO_RELAY)
for range tick {
if !s.IsPriFiProtocolRunning() {
s.sendConnectionRequest(relayID)
}

select {
case <-stopChan:
log.Lvl3("Stopping connectToRelay subroutine.")
return
default:
}
}
}

Expand All @@ -165,3 +224,15 @@ func (s *ServiceState) sendConnectionRequest(relayID *network.ServerIdentity) {
log.Error("Connection failed:", err)
}
}

// sendHelloMessage sends a hello message to the trustee.
// It is called by the relay services at startup to
// announce themselves to the trustees.
func (s *ServiceState) sendHelloMessage(trusteeID *network.ServerIdentity) {
log.Lvl2("Sending hello request")
err := s.SendRaw(trusteeID, &HelloMsg{})

if err != nil {
log.Lvl3("Hello failed, ", trusteeID, " isn't online.")
}
}
31 changes: 19 additions & 12 deletions sda/services/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,15 @@ type ServiceState struct {
// We need to embed the ServiceProcessor, so that incoming messages
// are correctly handled.
*onet.ServiceProcessor
prifiTomlConfig *prifi_protocol.PrifiTomlConfig
Storage *Storage
path string
role prifi_protocol.PriFiRole
relayIdentity *network.ServerIdentity
prifiTomlConfig *prifi_protocol.PrifiTomlConfig
Storage *Storage
path string
role prifi_protocol.PriFiRole
relayIdentity *network.ServerIdentity
trusteeIDs []*network.ServerIdentity
connectToRelayStopChan chan bool
connectToTrusteesStopChan chan bool
receivedHello bool

//this hold the churn handler; protocol is started there. Only relay has this != nil
churnHandler *churnHandler
Expand All @@ -62,10 +66,12 @@ func newService(c *onet.Context) onet.Service {
s := &ServiceState{
ServiceProcessor: onet.NewServiceProcessor(c),
}
helloMsg := network.RegisterMessage(HelloMsg{})
stopMsg := network.RegisterMessage(StopProtocol{})
connMsg := network.RegisterMessage(ConnectionRequest{})
disconnectMsg := network.RegisterMessage(DisconnectionRequest{})

c.RegisterProcessorFunc(helloMsg, s.HandleHelloMsg)
c.RegisterProcessorFunc(stopMsg, s.HandleStop)
c.RegisterProcessorFunc(connMsg, s.HandleConnection)
c.RegisterProcessorFunc(disconnectMsg, s.HandleDisconnection)
Expand Down Expand Up @@ -126,6 +132,9 @@ func (s *ServiceState) StartRelay(group *app.Group) error {
//the relay has a socks Client
go prifi_socks.StartSocksClient(socksServerConfig.Port, socksServerConfig.UpstreamChannel, socksServerConfig.DownstreamChannel)

s.connectToTrusteesStopChan = make(chan bool)
go s.connectToTrustees(trusteesIDs, s.connectToTrusteesStopChan)

return nil
}

Expand All @@ -135,7 +144,7 @@ func (s *ServiceState) StartClient(group *app.Group) error {
log.Info("Service", s, "running in client mode")
s.role = prifi_protocol.Client

relayID, _ := mapIdentities(group)
relayID, trusteeIDs := mapIdentities(group)
s.relayIdentity = relayID

socksClientConfig = &prifi_protocol.SOCKSConfig{
Expand All @@ -149,7 +158,9 @@ func (s *ServiceState) StartClient(group *app.Group) error {
log.Lvl1("Starting SOCKS server on port", socksClientConfig.Port)
go prifi_socks.StartSocksServer(socksClientConfig.Port, socksClientConfig.PayloadLength, socksClientConfig.UpstreamChannel, socksClientConfig.DownstreamChannel, s.prifiTomlConfig.DoLatencyTests)

go s.autoConnect(relayID)
s.connectToRelayStopChan = make(chan bool)
s.trusteeIDs = trusteeIDs
go s.connectToRelay(relayID, s.connectToRelayStopChan)

return nil
}
Expand Down Expand Up @@ -183,11 +194,7 @@ func (s *ServiceState) StartSocksTunnelOnly() error {
func (s *ServiceState) StartTrustee(group *app.Group) error {
log.Info("Service", s, "running in trustee mode")
s.role = prifi_protocol.Trustee

relayID, _ := mapIdentities(group)
s.relayIdentity = relayID

go s.autoConnect(relayID)
s.connectToRelayStopChan = make(chan bool)

return nil
}
Expand Down

0 comments on commit 98de9c6

Please sign in to comment.