From 2f213514ea9f7a59f0022c0b7f899aef25eebb68 Mon Sep 17 00:00:00 2001 From: TATAUFO Date: Fri, 7 Feb 2020 02:03:24 +0800 Subject: [PATCH] sync universe --- cmd/pdu/create.go | 146 +---------------------- cmd/pdu/start.go | 37 ++++-- cmd/pdu/utils.go | 103 +++++++++++++++- common/hash.go | 7 ++ db/db.go | 16 +++ db/utils.go | 211 +++++++++++++++++++++++++++++++++ galaxy/wavequestion.go | 2 + galaxy/waveroots.go | 3 + node/node.go | 261 ++++++++++++++++++++++++++--------------- peer/peer.go | 101 ++++++++++++++-- 10 files changed, 626 insertions(+), 261 deletions(-) create mode 100644 db/utils.go diff --git a/cmd/pdu/create.go b/cmd/pdu/create.go index 66e2a4f..f284a08 100644 --- a/cmd/pdu/create.go +++ b/cmd/pdu/create.go @@ -17,23 +17,17 @@ package main import ( - "bufio" - "encoding/json" "errors" "fmt" - "github.com/mitchellh/go-homedir" + "os" + "strings" + "github.com/pdupub/go-pdu/common" "github.com/pdupub/go-pdu/core" "github.com/pdupub/go-pdu/crypto" "github.com/pdupub/go-pdu/db" - "github.com/pdupub/go-pdu/db/bolt" "github.com/pdupub/go-pdu/params" "github.com/spf13/cobra" - "github.com/spf13/viper" - "math/big" - "os" - "path" - "strings" ) // createCmd represents the create command @@ -41,16 +35,7 @@ var createCmd = &cobra.Command{ Use: "create", Short: "Create a new PDU Universe", RunE: func(_ *cobra.Command, args []string) error { - - if err := initDir(); err != nil { - return err - } - - if err := initConfig(); err != nil { - return err - } - - udb, err := initDB() + udb, err := initNodeDir() if err != nil { return err } @@ -77,7 +62,7 @@ func initUniverseAndSave(udb db.UDB) error { return err } - if err := saveRootUsers(users, udb); err != nil { + if err := db.SaveRootUsers(udb, users); err != nil { return err } @@ -101,7 +86,7 @@ func initUniverseAndSave(udb db.UDB) error { return err } - if err := saveMsg(msg, udb); err != nil { + if err := db.SaveMsg(udb, msg); err != nil { return err } return nil @@ -139,72 +124,6 @@ func createFirstMsg(users []*core.User, priKeys []*crypto.PrivateKey) (*core.Mes return msg, nil } -func scanLine(input *string) { - reader := bufio.NewReader(os.Stdin) - data, _, _ := reader.ReadLine() - *input = string(data) -} - -func saveMsg(msg *core.Message, udb db.UDB) error { - msgBytes, err := json.Marshal(msg) - if err != nil { - return err - } - countBytes, err := udb.Get(db.BucketConfig, db.ConfigMsgCount) - if err != nil { - return err - } - count := new(big.Int).SetBytes(countBytes) - err = udb.Set(db.BucketMsg, common.Hash2String(msg.ID()), msgBytes) - if err != nil { - return err - } - - err = udb.Set(db.BucketMID, count.String(), []byte(common.Hash2String(msg.ID()))) - if err != nil { - return err - } - count = count.Add(count, big.NewInt(1)) - err = udb.Set(db.BucketConfig, db.ConfigMsgCount, count.Bytes()) - if err != nil { - return err - } - - err = udb.Set(db.BucketLastMID, common.Hash2String(msg.SenderID), []byte(common.Hash2String(msg.ID()))) - if err != nil { - return err - } - return nil -} - -func saveRootUsers(users []*core.User, udb db.UDB) (err error) { - // save root users - var root0, root1 []byte - if root0, err = json.Marshal(users[0]); err != nil { - return err - } - if err = udb.Set(db.BucketConfig, db.ConfigRoot0, root0); err != nil { - return err - } - if err = udb.Set(db.BucketUser, common.Hash2String(users[0].ID()), root0); err != nil { - return err - } - - if root1, err = json.Marshal(users[1]); err != nil { - return err - } - - if err = udb.Set(db.BucketConfig, db.ConfigRoot1, root1); err != nil { - return err - } - - if err = udb.Set(db.BucketUser, common.Hash2String(users[1].ID()), root1); err != nil { - return err - } - - return nil -} - func createRootUsers() (users []*core.User, priKeys []*crypto.PrivateKey, err error) { for i := 0; i < 2; i++ { @@ -233,59 +152,6 @@ func createRootUsers() (users []*core.User, priKeys []*crypto.PrivateKey, err er return users, priKeys, err } -func initDB() (db.UDB, error) { - dbFilePath := path.Join(dataDir, "u.db") - udb, err := bolt.NewDB(dbFilePath) - if err != nil { - return nil, err - } - - if err := udb.CreateBucket(db.BucketConfig); err != nil { - return nil, err - } - if err := udb.Set(db.BucketConfig, db.ConfigMsgCount, big.NewInt(0).Bytes()); err != nil { - return nil, err - } - - if err := udb.CreateBucket(db.BucketUser); err != nil { - return nil, err - } - if err := udb.CreateBucket(db.BucketMsg); err != nil { - return nil, err - } - if err := udb.CreateBucket(db.BucketMID); err != nil { - return nil, err - } - if err := udb.CreateBucket(db.BucketLastMID); err != nil { - return nil, err - } - - if err := udb.CreateBucket(db.BucketPeer); err != nil { - return nil, err - } - - return udb, nil -} - -func initConfig() error { - viper.SetConfigType(params.DefaultConfigType) - viper.Set("CONFIG_NAME", "PDU") - return viper.WriteConfigAs(path.Join(dataDir, params.DefaultConfigFile)) -} - -func initDir() error { - if dataDir == "" { - home, _ := homedir.Dir() - dataDir = path.Join(home, params.DefaultPath) - } - err := os.Mkdir(dataDir, os.ModePerm) - if err != nil { - return err - } - - return nil -} - func init() { createCmd.PersistentFlags().StringVar(&dataDir, "datadir", "", fmt.Sprintf("(default $HOME/%s)", params.DefaultPath)) rootCmd.AddCommand(createCmd) diff --git a/cmd/pdu/start.go b/cmd/pdu/start.go index 799f838..4f3abae 100644 --- a/cmd/pdu/start.go +++ b/cmd/pdu/start.go @@ -20,6 +20,10 @@ import ( "encoding/json" "errors" "fmt" + "os" + "os/signal" + "path" + "github.com/mitchellh/go-homedir" "github.com/pdupub/go-pdu/common" "github.com/pdupub/go-pdu/common/log" @@ -31,9 +35,6 @@ import ( "github.com/pdupub/go-pdu/params" "github.com/spf13/cobra" "github.com/spf13/viper" - "os" - "os/signal" - "path" ) // startCmd represents the start command @@ -41,11 +42,25 @@ var startCmd = &cobra.Command{ Use: "start", Short: "Start to run PDU Universe", RunE: func(_ *cobra.Command, args []string) error { - err := initConfigLoad() - if err != nil { + + if err := updateDataDir(); err != nil { + return err + } + + if exist, err := pathExists(dataDir); err != nil { return err + } else if !exist { + if newdb, err := initNodeDir(); err != nil { + return err + } else if err := newdb.Close(); err != nil { + return err + } + log.Info("Database initialized successfully", dataDir) } + if err := initConfigLoad(); err != nil { + return err + } log.Info("Starting p2p node") log.Info("CONFIG_NAME", viper.GetString("CONFIG_NAME")) @@ -53,12 +68,10 @@ var startCmd = &cobra.Command{ if err != nil { return err } - pn, err := node.New(udb) if err != nil { return err } - // for all node mode need to unlock account var unlockedUser core.User var unlockedPrivateKey *crypto.PrivateKey @@ -129,8 +142,7 @@ var startCmd = &cobra.Command{ }, } -// initConfigLoad reads in config file and ENV variables if set. -func initConfigLoad() error { +func updateDataDir() error { if dataDir == "" { // Find home directory. home, err := homedir.Dir() @@ -138,9 +150,12 @@ func initConfigLoad() error { return err } dataDir = path.Join(home, params.DefaultPath) - } + return nil +} +// initConfigLoad reads in config file and ENV variables if set. +func initConfigLoad() error { viper.SetConfigFile(path.Join(dataDir, params.DefaultConfigFile)) viper.SetConfigType("yml") viper.AutomaticEnv() // read in environment variables that match @@ -164,7 +179,7 @@ func initDBLoad() (db.UDB, error) { func init() { startCmd.PersistentFlags().StringVar(&dataDir, "datadir", "", fmt.Sprintf("(default $HOME/%s)", params.DefaultPath)) startCmd.PersistentFlags().StringVar(&nodeAddressList, "nodes", "", "pdu nodes list, split by comma [userid@ip:port/nodeKey]") - startCmd.PersistentFlags().Uint64Var(&localPort, "localPort", node.DefaultLocalPort, "local port") + startCmd.PersistentFlags().Uint64Var(&localPort, "port", node.DefaultLocalPort, "local port") // time proof startCmd.PersistentFlags().BoolVar(&nodeTPEnable, "tp", false, "time proof enable") diff --git a/cmd/pdu/utils.go b/cmd/pdu/utils.go index fe29cfd..7f7818e 100644 --- a/cmd/pdu/utils.go +++ b/cmd/pdu/utils.go @@ -17,14 +17,96 @@ package main import ( + "bufio" "fmt" + "io/ioutil" + "math/big" + "os" + "path" + "strings" + "github.com/howeyc/gopass" + "github.com/mitchellh/go-homedir" "github.com/pdupub/go-pdu/core" "github.com/pdupub/go-pdu/crypto" - "io/ioutil" - "strings" + "github.com/pdupub/go-pdu/db" + "github.com/pdupub/go-pdu/db/bolt" + "github.com/pdupub/go-pdu/params" + "github.com/spf13/viper" ) +// initNodeDir initialize node dir and config file and db, and open db +func initNodeDir() (db.UDB, error) { + if err := initDir(); err != nil { + return nil, err + } + + if err := initConfig(); err != nil { + return nil, err + } + + udb, err := initDB() + if err != nil { + return nil, err + } + return udb, nil +} + +func initConfig() error { + viper.SetConfigType(params.DefaultConfigType) + viper.Set("CONFIG_NAME", "PDU") + return viper.WriteConfigAs(path.Join(dataDir, params.DefaultConfigFile)) +} + +func initDir() error { + if dataDir == "" { + home, _ := homedir.Dir() + dataDir = path.Join(home, params.DefaultPath) + } + err := os.Mkdir(dataDir, os.ModePerm) + if err != nil { + return err + } + + return nil +} + +func initDB() (db.UDB, error) { + dbFilePath := path.Join(dataDir, "u.db") + udb, err := bolt.NewDB(dbFilePath) + if err != nil { + return nil, err + } + if err := udb.CreateBucket(db.BucketConfig); err != nil { + return nil, err + } + if err := udb.Set(db.BucketConfig, db.ConfigMsgCount, big.NewInt(0).Bytes()); err != nil { + return nil, err + } + if err := udb.CreateBucket(db.BucketUser); err != nil { + return nil, err + } + if err := udb.CreateBucket(db.BucketMsg); err != nil { + return nil, err + } + if err := udb.CreateBucket(db.BucketMID); err != nil { + return nil, err + } + if err := udb.CreateBucket(db.BucketMOD); err != nil { + return nil, err + } + if err := udb.CreateBucket(db.BucketLastMID); err != nil { + return nil, err + } + if err := udb.CreateBucket(db.BucketPeer); err != nil { + return nil, err + } + if err := udb.Set(db.BucketConfig, db.ConfigCurrentStep, big.NewInt(db.StepInitDB).Bytes()); err != nil { + return nil, err + } + return udb, nil +} + func unlockKeyByCmd() (*crypto.PrivateKey, *crypto.PublicKey, error) { var keyFile string fmt.Print("KeyFile path: ") @@ -55,3 +137,20 @@ func unlockKeyByFile(keyFile, passFile string) (*crypto.PrivateKey, *crypto.Publ return core.DecryptKey(keyJson, strings.TrimSpace(string(passwd))) } + +func scanLine(input *string) { + reader := bufio.NewReader(os.Stdin) + data, _, _ := reader.ReadLine() + *input = string(data) +} + +func pathExists(path string) (bool, error) { + _, err := os.Stat(path) + if err == nil { + return true, nil + } + if os.IsNotExist(err) { + return false, nil + } + return false, err +} diff --git a/common/hash.go b/common/hash.go index c90b51f..0d174ba 100644 --- a/common/hash.go +++ b/common/hash.go @@ -51,3 +51,10 @@ func Bytes2Hash(b []byte) Hash { copy(hash[HashLength-len(b):], b) return hash } + +// Hash2Bytes is transform Hash to []byte +func Hash2Bytes(h Hash) (b []byte) { + b = make([]byte, HashLength) + copy(b, h[:]) + return b +} diff --git a/db/db.go b/db/db.go index cae2d76..49cf958 100644 --- a/db/db.go +++ b/db/db.go @@ -26,6 +26,9 @@ const ( // BucketMID is used to save msg.ID by order (order/msg.ID) BucketMID = "mid" + // BucketMOD is used to save msg received sequence (msg.ID/order) + BucketMOD = "mod" + // BucketLastMID is used to save last msg.ID by user.ID BucketLastMID = "lmid" @@ -44,10 +47,23 @@ const ( // ConfigMsgCount is the current message count in the universe ConfigMsgCount = "msg_count" + // ConfigCurrentStep is the current step of initialize the universe + // step 0 - create bucket + // step 1 - roots saved + ConfigCurrentStep = "current_step" + // ConfigLocalNodeKey is the local node key ConfigLocalNodeKey = "local_node_key" ) +const ( + // StepInitDB is the step which all bucket in db have been created + StepInitDB = iota + + // StepRootsSaved is the step which two roots have been saved into db + StepRootsSaved +) + // Row is the key/value pair from db type Row struct { K string diff --git a/db/utils.go b/db/utils.go new file mode 100644 index 0000000..ee81fef --- /dev/null +++ b/db/utils.go @@ -0,0 +1,211 @@ +// Copyright 2019 The PDU Authors +// This file is part of the PDU library. +// +// The PDU library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The PDU library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the PDU library. If not, see . + +package db + +import ( + "encoding/json" + "errors" + "math/big" + + "github.com/pdupub/go-pdu/common" + "github.com/pdupub/go-pdu/core" +) + +var ( + // ErrMessageNotFound returns when the message not be found + ErrMessageNotFound = errors.New("message can not be found") +) + +func SaveRootUsers(udb UDB, users []*core.User) (err error) { + // save root users + var root0, root1 []byte + if root0, err = json.Marshal(users[0]); err != nil { + return err + } + if err = udb.Set(BucketConfig, ConfigRoot0, root0); err != nil { + return err + } + if err = udb.Set(BucketUser, common.Hash2String(users[0].ID()), root0); err != nil { + return err + } + + if root1, err = json.Marshal(users[1]); err != nil { + return err + } + + if err = udb.Set(BucketConfig, ConfigRoot1, root1); err != nil { + return err + } + + if err = udb.Set(BucketUser, common.Hash2String(users[1].ID()), root1); err != nil { + return err + } + + if err := udb.Set(BucketConfig, ConfigCurrentStep, big.NewInt(StepRootsSaved).Bytes()); err != nil { + return err + } + return nil +} + +func GetRootUsers(udb UDB) (*core.User, *core.User, error) { + var user0, user1 core.User + var err error + root0, err := udb.Get(BucketConfig, ConfigRoot0) + if err != nil { + return nil, nil, err + } + root1, err := udb.Get(BucketConfig, ConfigRoot1) + if err != nil { + return nil, nil, err + } + if err := json.Unmarshal(root0, &user0); err != nil { + return nil, nil, err + } + if err := json.Unmarshal(root1, &user1); err != nil { + return nil, nil, err + } + return &user0, &user1, nil +} + +func SaveMsg(udb UDB, msg *core.Message) error { + msgBytes, err := json.Marshal(msg) + if err != nil { + return err + } + countBytes, err := udb.Get(BucketConfig, ConfigMsgCount) + if err != nil { + return err + } + count := new(big.Int).SetBytes(countBytes) + err = udb.Set(BucketMsg, common.Hash2String(msg.ID()), msgBytes) + if err != nil { + return err + } + err = udb.Set(BucketMID, count.String(), common.Hash2Bytes(msg.ID())) + if err != nil { + return err + } + err = udb.Set(BucketMOD, common.Hash2String(msg.ID()), count.Bytes()) + if err != nil { + return err + } + count = count.Add(count, big.NewInt(1)) + err = udb.Set(BucketConfig, ConfigMsgCount, count.Bytes()) + if err != nil { + return err + } + + err = udb.Set(BucketLastMID, common.Hash2String(msg.SenderID), common.Hash2Bytes(msg.ID())) + if err != nil { + return err + } + return nil +} + +func GetLastMsg(udb UDB) (*core.Message, error) { + var msg core.Message + countBytes, err := udb.Get(BucketConfig, ConfigMsgCount) + if err != nil { + return nil, err + } + count := new(big.Int).SetBytes(countBytes) + mid, err := udb.Get(BucketMID, count.Sub(count, big.NewInt(1)).String()) + if err != nil { + return nil, err + } else if mid == nil { + return nil, ErrMessageNotFound + } + msgBytes, err := udb.Get(BucketMsg, common.Bytes2String(mid)) + if err != nil { + return nil, err + } else if msgBytes == nil { + return nil, ErrMessageNotFound + } + err = json.Unmarshal(msgBytes, &msg) + if err != nil { + return nil, err + } + return &msg, nil +} + +func GetMsgByOrder(udb UDB, start *big.Int, size int) (msgs []*core.Message) { + for ; size > 0; size-- { + mid, err := udb.Get(BucketMID, start.String()) + if err != nil || mid == nil { + continue + } + msgBytes, err := udb.Get(BucketMsg, common.Bytes2String(mid)) + if err != nil || msgBytes == nil { + continue + } + var msg core.Message + err = json.Unmarshal(msgBytes, &msg) + if err != nil { + continue + } + msgs = append(msgs, &msg) + start = start.Add(start, big.NewInt(1)) + } + return msgs +} + +func GetOrderCntByMsg(udb UDB, mid common.Hash) (order *big.Int, count *big.Int, err error) { + orderBytes, err := udb.Get(BucketMOD, common.Hash2String(mid)) + if err != nil { + return nil, nil, err + } else if orderBytes == nil { + return nil, nil, ErrMessageNotFound + } + order = new(big.Int).SetBytes(orderBytes) + + count, err = GetMsgCount(udb) + if err != nil { + return nil, nil, err + } + return order, count, nil +} + +func GetMsgCount(udb UDB) (count *big.Int, err error) { + countBytes, err := udb.Get(BucketConfig, ConfigMsgCount) + if err != nil { + return nil, err + } + count = new(big.Int).SetBytes(countBytes) + return count, nil +} + +func GetLastMsgByUser(udb UDB, userID common.Hash) (*core.Message, error) { + var msg core.Message + lastMsgBytes, err := udb.Get(BucketLastMID, common.Hash2String(userID)) + if err != nil { + return nil, err + } else if lastMsgBytes == nil { + return nil, ErrMessageNotFound + } + msgBytes, err := udb.Get(BucketMsg, common.Bytes2String(lastMsgBytes)) + if err != nil { + return nil, err + } else if msgBytes == nil { + return nil, ErrMessageNotFound + } + err = json.Unmarshal(msgBytes, &msg) + if err != nil { + return nil, err + } + + return &msg, nil +} diff --git a/galaxy/wavequestion.go b/galaxy/wavequestion.go index 2d81489..273f6c7 100644 --- a/galaxy/wavequestion.go +++ b/galaxy/wavequestion.go @@ -18,6 +18,8 @@ package galaxy // WaveQuestion implements the Wave interface and represents request info message. type WaveQuestion struct { + Cmd string `json:"cmd"` + Args [][]byte `json:"args"` } // Command returns the protocol command string for the wave. diff --git a/galaxy/waveroots.go b/galaxy/waveroots.go index 21730cf..47981f0 100644 --- a/galaxy/waveroots.go +++ b/galaxy/waveroots.go @@ -16,8 +16,11 @@ package galaxy +import "github.com/pdupub/go-pdu/core" + // WaveRoots implements the Wave interface and represents a getRoots message. type WaveRoots struct { + Users [2]*core.User `json:"users"` } // Command returns the protocol command string for the wave. diff --git a/node/node.go b/node/node.go index 7b81f15..30d3d2d 100644 --- a/node/node.go +++ b/node/node.go @@ -42,7 +42,7 @@ import ( const ( displayInterval = 1000 maxLoadPeersCount = 1000 - checkPeerInterval = 20 + checkPeerInterval = 10 ) var ( @@ -60,6 +60,7 @@ type Node struct { localPort uint64 localNodeKey string peers map[common.Hash]*peer.Peer + initStep uint64 } // New is used to create new node @@ -71,11 +72,7 @@ func New(udb db.UDB) (node *Node, err error) { peers: make(map[common.Hash]*peer.Peer), } - if err := node.initUniverse(); err != nil { - return nil, err - } - - if err := node.loadMessage(); err != nil { + if err := node.loadUniverse(); err != nil { return nil, err } @@ -248,6 +245,49 @@ func (n Node) wsHandler(ws *websocket.Conn) { } else if err = n.broadcastMsg(&msg); err != nil { log.Error("Broadcast", common.Hash2String(msg.ID())) } + } else if w.Command() == galaxy.CmdQuestion { + wm := w.(*galaxy.WaveQuestion) + log.Info("Received question", wm.Cmd) + switch wm.Cmd { + case galaxy.CmdRoots: + user0, user1, err := db.GetRootUsers(n.udb) + if err != nil { + log.Error(err) + } + p := peer.Peer{Conn: ws} + if err = p.SendRoots(user0, user1); err != nil { + log.Error(err) + } + case galaxy.CmdMessages: + var order, count *big.Int + var err error + var msgs []*core.Message + msgID := common.Bytes2Hash(wm.Args[0]) + + log.Debug(common.Hash2String(msgID)) + if msgID != common.Bytes2Hash([]byte{}) { + order, count, err = db.GetOrderCntByMsg(n.udb, msgID) + if err != nil { + log.Error(err) + } + order = order.Add(order, big.NewInt(1)) + } else { + order = big.NewInt(0) + count, err = db.GetMsgCount(n.udb) + if err != nil { + log.Error(err) + } + } + + if order != nil && count != nil && count.Uint64()-order.Uint64() > peer.MaxMsgCountPerWave { + log.Info("Send msg from order", order, "size", peer.MaxMsgCountPerWave) + msgs = db.GetMsgByOrder(n.udb, order, peer.MaxMsgCountPerWave) + } + p := peer.Peer{Conn: ws} + if err = p.SendMsgs(msgs); err != nil { + log.Error(err) + } + } } } } @@ -272,12 +312,111 @@ func (n *Node) runLocalServe() { } } +func (n *Node) syncCreateUniverse() { + log.Info("Start sync universe start", "create universe") + for _, peer := range n.peers { + if !peer.Connected() { + continue + } + if err := peer.SendQuestion(galaxy.CmdRoots); err != nil { + log.Error(err) + continue + } + + w, err := galaxy.ReceiveWave(peer.Conn) + if err != nil { + log.Error(err) + continue + } + if w.Command() == galaxy.CmdRoots { + mw := w.(*galaxy.WaveRoots) + user0 := mw.Users[0] + user1 := mw.Users[1] + log.Info("user0", common.Hash2String(user0.ID())) + log.Info("user1", common.Hash2String(user1.ID())) + // update init step + n.initStep = db.StepRootsSaved + n.universe, err = core.NewUniverse(user0, user1) + if err != nil { + log.Error(err) + continue + } + if err := db.SaveRootUsers(n.udb, mw.Users[:]); err != nil { + log.Error(err) + continue + } + break + } + } +} + +func (n *Node) syncMsgFromPeers() { + log.Info("Start Sync message from peers") + lastMsg, err := db.GetLastMsg(n.udb) + var lastMsgID common.Hash + if err != nil && err != db.ErrMessageNotFound { + log.Error(err) + return + } + if lastMsg != nil { + lastMsgID = lastMsg.ID() + } + for _, peer := range n.peers { + if !peer.Connected() { + continue + } + if err := peer.SendQuestion(galaxy.CmdMessages, lastMsgID); err != nil { + log.Error(err) + continue + } + + w, err := galaxy.ReceiveWave(peer.Conn) + if err != nil { + log.Error(err) + continue + } + if w.Command() == galaxy.CmdMessages { + mw := w.(*galaxy.WaveMessages) + for _, mb := range mw.Msgs { + var msg core.Message + err := json.Unmarshal(mb, &msg) + if err != nil { + log.Error(err) + continue + } + n.saveMsg(&msg) + } + } + } +} + func (n *Node) runNode(sig <-chan struct{}, wait chan<- struct{}) { + // create universe if need + if n.initStep < db.StepRootsSaved { + for { + if n.initStep >= db.StepRootsSaved { + break + } + select { + case <-time.After(time.Second): + n.updatePeersStatus() + n.syncCreateUniverse() + case <-sig: + log.Info("Stop server") + close(wait) + return + + } + } + } + + // run node for { select { case <-time.After(time.Second * time.Duration(checkPeerInterval)): log.Info("Update peers status") n.updatePeersStatus() + n.syncMsgFromPeers() case <-sig: log.Info("Stop server") close(wait) @@ -306,14 +445,14 @@ func (n *Node) runTimeProof(sig <-chan struct{}, wait chan<- struct{}) { case <-time.After(time.Second * time.Duration(n.tpInterval)): var refs []*core.MsgReference // load last msg in universe - lastMsg, err := n.getLastMsg() + lastMsg, err := db.GetLastMsg(n.udb) if err != nil { log.Error(err) continue } refs = append(refs, &core.MsgReference{SenderID: lastMsg.SenderID, MsgID: lastMsg.ID()}) // load last msg from unlock user if exist - lastMsgByUser, err := n.getLastMsgByUser(n.tpUnlockedUser.ID()) + lastMsgByUser, err := db.GetLastMsgByUser(n.udb, n.tpUnlockedUser.ID()) if err != nil { log.Error(err) continue @@ -357,119 +496,48 @@ func (n Node) broadcastMsg(msg *core.Message) error { } func (n Node) saveMsg(msg *core.Message) error { - err := n.universe.AddMsg(msg) - if err != nil { + if err := n.universe.AddMsg(msg); err != nil { return err } - msgBytes, err := json.Marshal(msg) - if err != nil { - return err - } - countBytes, err := n.udb.Get(db.BucketConfig, db.ConfigMsgCount) - if err != nil { - return err - } - count := new(big.Int).SetBytes(countBytes) - err = n.udb.Set(db.BucketMsg, common.Hash2String(msg.ID()), msgBytes) - if err != nil { - return err - } - - err = n.udb.Set(db.BucketMID, count.String(), []byte(common.Hash2String(msg.ID()))) - if err != nil { - return err - } - count = count.Add(count, big.NewInt(1)) - err = n.udb.Set(db.BucketConfig, db.ConfigMsgCount, count.Bytes()) - if err != nil { - return err - } - - err = n.udb.Set(db.BucketLastMID, common.Hash2String(msg.SenderID), []byte(common.Hash2String(msg.ID()))) - if err != nil { + if err := db.SaveMsg(n.udb, msg); err != nil { return err } return nil } -func (n Node) getLastMsgByUser(userID common.Hash) (*core.Message, error) { - var msg core.Message - lastMsgBytes, err := n.udb.Get(db.BucketLastMID, common.Hash2String(userID)) - if err != nil { - return nil, err - } - msgBytes, err := n.udb.Get(db.BucketMsg, string(lastMsgBytes)) - if err != nil { - return nil, err - } - err = json.Unmarshal(msgBytes, &msg) - if err != nil { - return nil, err - } - - return &msg, nil -} - -func (n Node) getLastMsg() (*core.Message, error) { - var msg core.Message - countBytes, err := n.udb.Get(db.BucketConfig, db.ConfigMsgCount) - if err != nil { - return nil, err - } - count := new(big.Int).SetBytes(countBytes) - mid, err := n.udb.Get(db.BucketMID, count.Sub(count, big.NewInt(1)).String()) - if err != nil { - return nil, err - } - msgBytes, err := n.udb.Get(db.BucketMsg, string(mid)) - if err != nil { - return nil, err - } - err = json.Unmarshal(msgBytes, &msg) - if err != nil { - return nil, err - } - return &msg, nil -} - -func (n *Node) initUniverse() error { - var user0, user1 core.User - - root0, err := n.udb.Get(db.BucketConfig, db.ConfigRoot0) - if err != nil { - return err - } - root1, err := n.udb.Get(db.BucketConfig, db.ConfigRoot1) +func (n *Node) loadUniverse() (err error) { + stepBytes, err := n.udb.Get(db.BucketConfig, db.ConfigCurrentStep) if err != nil { return err } - if err := json.Unmarshal(root0, &user0); err != nil { - return err + currentStep := new(big.Int).SetBytes(stepBytes).Uint64() + var user0, user1 *core.User + if currentStep < db.StepRootsSaved { + return nil } - if err := json.Unmarshal(root1, &user1); err != nil { + user0, user1, err = db.GetRootUsers(n.udb) + if err != nil { return err } + // update init step + n.initStep = db.StepRootsSaved log.Info("root0", common.Hash2String(user0.ID())) log.Info("root1", common.Hash2String(user1.ID())) - n.universe, err = core.NewUniverse(&user0, &user1) + n.universe, err = core.NewUniverse(user0, user1) if err != nil { return err } - return nil -} - -func (n *Node) loadMessage() error { - cntBytes, err := n.udb.Get(db.BucketConfig, db.ConfigMsgCount) + msgCount, err := db.GetMsgCount(n.udb) if err != nil { return err } - msgCount := new(big.Int).SetBytes(cntBytes).Uint64() - for i := uint64(0); i < msgCount; i++ { + for i := uint64(0); i < msgCount.Uint64(); i++ { + // todo : replace by db.GetMsgByOrder() mid, err := n.udb.Get(db.BucketMID, new(big.Int).SetUint64(i).String()) if err != nil { return err } - msgBytes, err := n.udb.Get(db.BucketMsg, string(mid)) + msgBytes, err := n.udb.Get(db.BucketMsg, common.Bytes2String(mid)) if err != nil { return err } @@ -478,6 +546,7 @@ func (n *Node) loadMessage() error { if err != nil { return err } + err = n.universe.AddMsg(&msg) if err != nil { return err diff --git a/peer/peer.go b/peer/peer.go index 7fc88e6..376994f 100644 --- a/peer/peer.go +++ b/peer/peer.go @@ -20,7 +20,9 @@ import ( "encoding/json" "errors" "fmt" + "math/big" + "github.com/pdupub/go-pdu/common" "github.com/pdupub/go-pdu/core" "github.com/pdupub/go-pdu/galaxy" "golang.org/x/net/websocket" @@ -28,6 +30,13 @@ import ( var ( errPeerNotReachable = errors.New("this peer not reachable right now") + errArgsNotSupport = errors.New("arguments not support") + errMsgsNeedSplit = errors.New("messages need split into waves") +) + +const ( + // MaxMsgCountPerWave is the max number of msg per wave + MaxMsgCountPerWave = 2 ) // Peer contain the info of websocket connection @@ -35,7 +44,7 @@ type Peer struct { IP string `json:"ip"` Port uint64 `json:"port"` NodeKey string `json:"nodeKey"` - conn *websocket.Conn + Conn *websocket.Conn } // New create new Peer @@ -49,14 +58,14 @@ func (p *Peer) Dial() error { if err != nil { return err } - p.conn = conn + p.Conn = conn return nil } // Close the ws connection, func (p *Peer) Close() error { - if p.conn != nil { - return p.conn.Close() + if p.Conn != nil { + return p.Conn.Close() } return nil } @@ -68,29 +77,97 @@ func (p Peer) Url() string { // Connected return true if this peer is connected right now func (p *Peer) Connected() bool { - if p.conn != nil { + if p.Conn != nil { return true } return false } -// SendMsg is used to send msg to this peer -func (p *Peer) SendMsg(msg *core.Message) error { +// SendQuestion is used to send question to peer +func (p *Peer) SendQuestion(cmd string, args ...interface{}) error { if !p.Connected() { return errPeerNotReachable } - var msgs [][]byte - msgBytes, err := json.Marshal(msg) + newArgs, err := p.buildArgs(args...) + if err != nil { + return err + } + wave := &galaxy.WaveQuestion{ + Cmd: cmd, + Args: newArgs, + } + _, err = galaxy.SendWave(p.Conn, wave) if err != nil { return err } - msgs = append(msgs, msgBytes) + return nil +} + +func (p Peer) buildArgs(args ...interface{}) (result [][]byte, err error) { + for _, arg := range args { + var item []byte + switch arg.(type) { + case uint64: + item = new(big.Int).SetUint64(arg.(uint64)).Bytes() + case string: + item = []byte(arg.(string)) + case *big.Int: + item = arg.(*big.Int).Bytes() + case []byte: + item = arg.([]byte) + case common.Hash: + item = common.Hash2Bytes(arg.(common.Hash)) + default: + return nil, errArgsNotSupport + } + result = append(result, item) + } + return result, nil +} + +// SendMsg is used to send msg to peer +func (p *Peer) SendMsg(msg *core.Message) error { + return p.SendMsgs([]*core.Message{msg}) +} + +func (p *Peer) SendMsgs(msgs []*core.Message) error { + if len(msgs) > MaxMsgCountPerWave { + msgs = msgs[:MaxMsgCountPerWave] + } + if !p.Connected() { + return errPeerNotReachable + } + var msgsB [][]byte + for _, msg := range msgs { + msgBytes, err := json.Marshal(msg) + if err != nil { + return err + } + msgsB = append(msgsB, msgBytes) + } wave := &galaxy.WaveMessages{ - Msgs: msgs, + Msgs: msgsB, + } + if _, err := galaxy.SendWave(p.Conn, wave); err != nil { + return err + } + return nil +} + +// SendRoots is used to send 2 roots to peer +func (p *Peer) SendRoots(user0, user1 *core.User) error { + if !p.Connected() { + return errPeerNotReachable + } + var users [2]*core.User + users[0] = user0 + users[1] = user1 + wave := &galaxy.WaveRoots{ + Users: users, } - _, err = galaxy.SendWave(p.conn, wave) + _, err := galaxy.SendWave(p.Conn, wave) if err != nil { return err }