Skip to content
Permalink
Browse files

Must set length greater than length of message code so that it is han…

…dled
  • Loading branch information...
tubackkhoa committed Jan 9, 2019
1 parent 3e4de4a commit c7b267f725c36315d9acaaa7f3fd11924ec99746
Showing with 57 additions and 40 deletions.
  1. +5 −9 OrderBook/main.go
  2. +42 −16 OrderBook/protocol/protocol.go
  3. +0 −13 OrderBook/protocol/protocol_test.go
  4. +10 −2 OrderBook/protocol/service.go
@@ -19,7 +19,6 @@ import (
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/eth"
"github.com/ethereum/go-ethereum/eth/downloader"
"github.com/ethereum/go-ethereum/p2p"

"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/node"
@@ -326,6 +325,7 @@ func startup(p2pPort int, httpPort int, wsPort int, name string, privateKey stri
// register pss and orderbook service
rpcapi := []string{
// "eth",
// "ssh",
"orderbook",
}
dataDir := fmt.Sprintf("%s%d", demo.DatadirPrefix, p2pPort)
@@ -335,15 +335,11 @@ func startup(p2pPort int, httpPort int, wsPort int, name string, privateKey stri
}
orderbookEngine = orderbook.NewEngine(orderbookDir, allowedPairs)

proto := protocol.NewProtocol(msgC, quitC, orderbookEngine)
var protocolArr []p2p.Protocol
if proto != nil {
protocolArr = []p2p.Protocol{*proto}
}

thisNode, err = demo.NewServiceNodeWithPrivateKeyAndDataDirAndProtocols(privkey, dataDir, p2pPort, httpPort, wsPort, protocolArr, rpcapi...)
thisNode, err = demo.NewServiceNodeWithPrivateKeyAndDataDir(privkey, dataDir, p2pPort, httpPort, wsPort, rpcapi...)
// register normal service, protocol is for p2p, service is for rpc calls
err = thisNode.Register(protocol.NewService(orderbookEngine))
service := protocol.NewService(name, msgC, quitC, orderbookEngine)
err = thisNode.Register(service)

if err != nil {
demo.LogCrit("Register orderbook service in servicenode failed", "err", err)
}
@@ -1,8 +1,10 @@
package protocol

import (
"context"
"fmt"
"strconv"
"time"

"github.com/ethereum/go-ethereum/p2p"
"github.com/ethereum/go-ethereum/p2p/protocols"
@@ -74,6 +76,18 @@ type OrderbookHandler struct {
QuitC <-chan struct{}
}

// checkProtoHandshake verifies local and remote protoHandshakes match
func checkProtoHandshake(testVersion uint) func(interface{}) error {
return func(rhs interface{}) error {
remote := rhs.(*OrderbookHandshake)

if remote.V != testVersion {
return fmt.Errorf("%d (!= %d)", remote.V, testVersion)
}
return nil
}
}

// we will receive message in handle
func (orderbookHandler *OrderbookHandler) handle(msg interface{}) error {

@@ -127,34 +141,46 @@ func (orderbookHandler *OrderbookHandler) handle(msg interface{}) error {
return nil
}

return fmt.Errorf("Invalid pssorderbook protocol message")
return fmt.Errorf("Invalid orderbook protocol message")

}

// create the protocol with the protocols extension
func NewProtocol(inC <-chan interface{}, quitC <-chan struct{}, orderbookEngine *orderbook.Engine) *p2p.Protocol {
func NewProtocol(name string, inC <-chan interface{}, quitC <-chan struct{}, orderbookEngine *orderbook.Engine) *p2p.Protocol {
return &p2p.Protocol{
Name: "Orderbook",
Version: 42,
Length: 1,
// we may use more 1 custom message code
Length: uint64(len(OrderbookProtocol.Messages)) + 1,
Run: func(p *p2p.Peer, rw p2p.MsgReadWriter) error {
demo.LogWarn("running", "peer", p)
var err error
// create the enhanced peer, it will wrap p2p.Send with code from Message Spec
pp := protocols.NewPeer(p, rw, OrderbookProtocol)

// send the message, then handle it to make sure protocol success
go func() {
outmsg := &OrderbookHandshake{
V: 42,
// shortened hex string for terminal logging
Nick: p.Name(),
}
err := pp.Send(outmsg)
if err != nil {
demo.LogError("Send p2p message fail", "err", err)
}
demo.LogInfo("Sending handshake", "peer", p, "handshake", outmsg)
}()
// go func() {
outmsg := &OrderbookHandshake{
V: 42,
// shortened hex string for terminal logging
Nick: name,
}

// check handshake
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
hsCheck := checkProtoHandshake(outmsg.V)
_, err = pp.Handshake(ctx, outmsg, hsCheck)
if err != nil {
return err
}

err = pp.Send(outmsg)
if err != nil {
demo.LogError("Send p2p message fail", "err", err)
}
demo.LogInfo("Sending handshake", "peer", p, "handshake", outmsg)
// }()

// protocols abstraction provides a separate blocking run loop for the peer
// when this returns, the protocol will be terminated
@@ -165,7 +191,7 @@ func NewProtocol(inC <-chan interface{}, quitC <-chan struct{}, orderbookEngine
InC: inC,
QuitC: quitC,
}
err := pp.Run(run.handle)
err = pp.Run(run.handle)
return err
},
}

This file was deleted.

Oops, something went wrong.
@@ -12,6 +12,7 @@ import (
type OrderbookService struct {
V int
Engine *orderbook.Engine
protos []p2p.Protocol
}

// APIs : api service
@@ -30,7 +31,7 @@ func (service *OrderbookService) APIs() []rpc.API {
// these are needed to satisfy the node.Service interface
// in this example they do nothing
func (service *OrderbookService) Protocols() []p2p.Protocol {
return []p2p.Protocol{}
return service.protos
}

func (service *OrderbookService) Start(srv *p2p.Server) error {
@@ -42,11 +43,18 @@ func (service *OrderbookService) Stop() error {
}

// wrapper function for servicenode to start the service
func NewService(orderbookEngine *orderbook.Engine) func(ctx *node.ServiceContext) (node.Service, error) {
func NewService(name string, inC <-chan interface{}, quitC <-chan struct{}, orderbookEngine *orderbook.Engine) func(ctx *node.ServiceContext) (node.Service, error) {
proto := NewProtocol(name, inC, quitC, orderbookEngine)
var protocolArr []p2p.Protocol
if proto != nil {
protocolArr = []p2p.Protocol{*proto}
}

return func(ctx *node.ServiceContext) (node.Service, error) {
return &OrderbookService{
V: 42,
Engine: orderbookEngine,
protos: protocolArr,
}, nil
}
}

0 comments on commit c7b267f

Please sign in to comment.
You can’t perform that action at this time.