Skip to content

Commit

Permalink
network: stop p2p server before peer updates executor
Browse files Browse the repository at this point in the history
  • Loading branch information
xiphon committed Feb 18, 2020
1 parent 2808446 commit b712012
Show file tree
Hide file tree
Showing 5 changed files with 15 additions and 29 deletions.
18 changes: 2 additions & 16 deletions main.go
Expand Up @@ -255,9 +255,9 @@ func run(cliContext *cli.Context) error {
nonce := utils.Serialize(key.Public)

peers := network.NewPeersList()
peerUpdates := make(chan pasl.PeerInfo)
peerUpdates := make(chan network.PeerInfo)
return pasl.WithManager(nonce, blockchain, p2pPort, peers, peerUpdates, blockchain.BlocksUpdates, blockchain.TxPoolUpdates, defaults.TimeoutRequest, func(manager *pasl.Manager) error {
return network.WithNode(config, peers, manager.OnNewConnection, func(node network.Node) error {
return network.WithNode(config, peers, peerUpdates, manager.OnNewConnection, func(node network.Node) error {
cancel := make(chan os.Signal, 2)
coreRPC := api.NewApi(blockchain)
RPCBindAddress := fmt.Sprintf("%s:%d", cliContext.GlobalString(rpcIPFlag.GetName()), defaults.RPCPort)
Expand Down Expand Up @@ -311,20 +311,6 @@ func run(cliContext *cli.Context) error {
})
})
}()

updatesListener := concurrent.NewUnboundedExecutor()
updatesListener.Go(func(ctx context.Context) {
for {
select {
case peer := <-peerUpdates:
//utils.Ftracef(cliContext.App.Writer, " %s:%d last seen %s ago", peer.Host, peer.Port, time.Since(time.Unix(int64(peer.LastConnect), 0)))
node.AddPeer(fmt.Sprintf("tcp://%s:%d", peer.Host, peer.Port))
case <-ctx.Done():
return
}
}
})
defer updatesListener.StopAndWaitForever()
}

RPCHandlers := coreRPC.GetHandlers()
Expand Down
8 changes: 7 additions & 1 deletion network/network.go
Expand Up @@ -64,7 +64,13 @@ type Connection struct {
OnStateUpdated func()
}

func WithNode(config Config, peers *PeersList, onNewConnection func(context.Context, *Connection) error, fn func(node Node) error) error {
type PeerInfo struct {
Host string
Port uint16
LastConnect uint32
}

func WithNode(config Config, peers *PeersList, peerUpdates <-chan PeerInfo, onNewConnection func(context.Context, *Connection) error, fn func(node Node) error) error {
node := Node{
config: config,
peers: peers,
Expand Down
2 changes: 1 addition & 1 deletion network/pasl/connection.go
Expand Up @@ -47,7 +47,7 @@ type PascalConnection struct {
peers *network.PeersList
nonce []byte
remoteNonce []byte
peerUpdates chan<- PeerInfo
peerUpdates chan<- network.PeerInfo
onStateUpdate chan<- eventConnectionState
onNewBlock chan *eventNewBlock
onNewOperation chan<- *eventNewOperation
Expand Down
4 changes: 2 additions & 2 deletions network/pasl/manager.go
Expand Up @@ -77,7 +77,7 @@ type Manager struct {
onSyncState chan syncState
p2pPort uint16
peers *network.PeersList
peerUpdates chan<- PeerInfo
peerUpdates chan<- network.PeerInfo
prevSyncState syncState
timeoutRequest time.Duration
txPoolUpdates <-chan tx.CommonOperation
Expand All @@ -89,7 +89,7 @@ func WithManager(
blockchain *blockchain.Blockchain,
p2pPort uint16,
peers *network.PeersList,
peerUpdates chan<- PeerInfo,
peerUpdates chan<- network.PeerInfo,
blocksUpdates <-chan safebox.SerializedBlock,
txPoolUpdates <-chan tx.CommonOperation,
timeoutRequest time.Duration,
Expand Down
12 changes: 3 additions & 9 deletions network/pasl/packet_hello.go
Expand Up @@ -29,23 +29,17 @@ import (
"github.com/pasl-project/pasl/utils"
)

type PeerInfo struct {
Host string
Port uint16
LastConnect uint32
}

type packetHello struct {
NodePort uint16
Nonce []byte
Time uint32
Block safebox.SerializedBlockHeader
Peers []PeerInfo
Peers []network.PeerInfo
UserAgent string
}

func generateHello(nodePort uint16, nonce []byte, pendingBlock safebox.SerializedBlockHeader, peers map[string]network.Peer, userAgent string) []byte {
whitePeers := make([]PeerInfo, 0, len(peers))
whitePeers := make([]network.PeerInfo, 0, len(peers))
for address := range peers {
parsed, err := url.Parse(address)
if err != nil {
Expand All @@ -55,7 +49,7 @@ func generateHello(nodePort uint16, nonce []byte, pendingBlock safebox.Serialize
if err != nil {
continue
}
whitePeers = append(whitePeers, PeerInfo{
whitePeers = append(whitePeers, network.PeerInfo{
Host: parsed.Hostname(),
Port: uint16(port),
LastConnect: peers[address].LastConnectTimestamp,
Expand Down

0 comments on commit b712012

Please sign in to comment.