Skip to content

Commit

Permalink
load peers from db
Browse files Browse the repository at this point in the history
  • Loading branch information
TATAUFO committed Jan 28, 2020
1 parent 3ada56b commit d8c626b
Show file tree
Hide file tree
Showing 5 changed files with 59 additions and 16 deletions.
4 changes: 4 additions & 0 deletions cmd/pdu/create.go
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,10 @@ func initDB() (db.UDB, error) {
return nil, err
}

if err := udb.CreateBucket(db.BucketPeer); err != nil {
return nil, err
}

return udb, nil
}

Expand Down
10 changes: 10 additions & 0 deletions db/bolt/bolt.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
var (
errFindMissingLimit = errors.New("find operate missing limit")
errFindArgsNumberNotCorrect = errors.New("find operate number not correct")
errBucketNotExist = errors.New("bucket not exist")
)

// UBoltDB is the db struct by bolt
Expand Down Expand Up @@ -68,6 +69,9 @@ func (u *UBoltDB) DeleteBucket(bucketName string) error {
func (u *UBoltDB) Set(bucketName, key string, val []byte) error {
return u.db.Update(func(tx *bolt.Tx) error {
b := tx.Bucket([]byte(bucketName))
if b == nil {
return errBucketNotExist
}
return b.Put([]byte(key), val)
})
}
Expand All @@ -76,6 +80,9 @@ func (u *UBoltDB) Set(bucketName, key string, val []byte) error {
func (u *UBoltDB) Get(bucketName, key string) (val []byte, err error) {
err = u.db.View(func(tx *bolt.Tx) error {
b := tx.Bucket([]byte(bucketName))
if b == nil {
return errBucketNotExist
}
val = b.Get([]byte(key))
return nil
})
Expand All @@ -99,6 +106,9 @@ func (u *UBoltDB) Find(bucketName, prefix string, args ...int) (rows []*db.Row,

err = u.db.View(func(tx *bolt.Tx) error {
b := tx.Bucket([]byte(bucketName))
if b == nil {
return errBucketNotExist
}
c := b.Cursor()
prefixBytes := []byte(prefix)
count := 0
Expand Down
4 changes: 2 additions & 2 deletions db/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@ const (
// BucketConfig is used to save config info when universe be created
BucketConfig = "config"

// BucketSTPrefix is used to save the prefix of the space-time bucket
BucketSTPrefix = "st_"
// BucketPeer is used to save the peer information
BucketPeer = "peer"

// ConfigRoot0 root user which gender is 0
ConfigRoot0 = "root0"
Expand Down
45 changes: 37 additions & 8 deletions node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@ import (
)

const (
displayInterval = 1000
displayInterval = 1000
maxLoadPeersCount = 1000
)

// Node is struct of node
Expand All @@ -60,17 +61,18 @@ func New(udb db.UDB) (node *Node, err error) {
peers: make(map[common.Hash]*peer.Peer),
}

if err := node.setLocalNodeKey(); err != nil {
if err := node.initUniverse(); err != nil {
return nil, err
}

if err := node.initUniverse(); err != nil {
if err := node.loadMessage(); err != nil {
return nil, err
}

if err := node.loadMessage(); err != nil {
if err := node.initNetwork(); err != nil {
return nil, err
}

return node, nil
}

Expand All @@ -79,6 +81,36 @@ func (n *Node) SetLocalPort(port uint64) {
n.localPort = port
}

func (n *Node) initNetwork() error {
// set local node key
if err := n.setLocalNodeKey(); err != nil {
return err
}

// load peers from db
if err := n.loadPeers(); err != nil {
return err
}

return nil
}

func (n *Node) loadPeers() error {
rows, err := n.udb.Find(db.BucketPeer, "", maxLoadPeersCount)
if err != nil {
return err
}

for _, row := range rows {
var newPeer peer.Peer
if err := json.Unmarshal(row.V, &newPeer); err != nil {
return err
}
n.peers[common.Bytes2Hash([]byte(row.K))] = &newPeer
}
return nil
}

// setLocalNodeKey set the local node key
func (n *Node) setLocalNodeKey() error {
nodeKey, err := n.udb.Get(db.BucketConfig, db.ConfigLocalNodeKey)
Expand Down Expand Up @@ -157,15 +189,12 @@ func (n Node) wsHandler(ws *websocket.Conn) {
}

func (n Node) nodeHandler(w http.ResponseWriter, r *http.Request) {
// todo: w.Write return the basic information of local node
switch r.Method {
case "GET":
w.Write([]byte(r.Method))
case "POST":
w.Write([]byte(r.Method))
case "PUT":
w.Write([]byte(r.Method))
case "DELETE":
w.Write([]byte(r.Method))
default:
w.Write([]byte(""))
}
Expand Down
12 changes: 6 additions & 6 deletions peer/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,15 @@ import (

// Peer contain the info of websocket connection
type Peer struct {
ip string
port uint64
nodeKey string
IP string `json:"ip"`
Port uint64 `json:"port"`
NodeKey string `json:"nodeKey"`
conn *websocket.Conn
}

// New create new Peer
func New(ip string, port uint64, nodeKey string) (*Peer, error) {
return &Peer{ip: ip, port: port, nodeKey: nodeKey}, nil
return &Peer{IP: ip, Port: port, NodeKey: nodeKey}, nil
}

// Dial build ws connection
Expand All @@ -54,10 +54,10 @@ func (p *Peer) Close() error {

// Url show the Peer ws url address
func (p Peer) Url() string {
return fmt.Sprintf("ws://%s:%d/%s", p.ip, p.port, p.nodeKey)
return fmt.Sprintf("ws://%s:%d/%s", p.IP, p.Port, p.NodeKey)
}

// origin used when peer dial
func (p Peer) origin() string {
return fmt.Sprintf("http://%s:%d/", p.ip, p.port)
return fmt.Sprintf("http://%s:%d/", p.IP, p.Port)
}

0 comments on commit d8c626b

Please sign in to comment.