From 285b31e555a35717aab7074e673eeac70cea907e Mon Sep 17 00:00:00 2001 From: TATAUFO Date: Sat, 1 Feb 2020 23:18:14 +0800 Subject: [PATCH] add wave --- galaxy/wave.go | 131 +++++++++++++++++++++++++++++++++++++++++ galaxy/wave_test.go | 17 ++++++ galaxy/wavemsgs.go | 27 +++++++++ galaxy/waveping.go | 26 ++++++++ galaxy/wavepong.go | 26 ++++++++ galaxy/wavequestion.go | 26 ++++++++ galaxy/waveroots.go | 26 ++++++++ galaxy/waveuser.go | 26 ++++++++ galaxy/waveversion.go | 26 ++++++++ node/node.go | 45 ++++++-------- peer/peer.go | 10 +++- 11 files changed, 359 insertions(+), 27 deletions(-) create mode 100644 galaxy/wave.go create mode 100644 galaxy/wave_test.go create mode 100644 galaxy/wavemsgs.go create mode 100644 galaxy/waveping.go create mode 100644 galaxy/wavepong.go create mode 100644 galaxy/wavequestion.go create mode 100644 galaxy/waveroots.go create mode 100644 galaxy/waveuser.go create mode 100644 galaxy/waveversion.go diff --git a/galaxy/wave.go b/galaxy/wave.go new file mode 100644 index 0000000..0afcf87 --- /dev/null +++ b/galaxy/wave.go @@ -0,0 +1,131 @@ +// Copyright 2019 The PDU Authors +// This file is part of the PDU library. +// +// The PDU library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The PDU library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the PDU library. If not, see . + +package galaxy + +import ( + "bytes" + "encoding/json" + "errors" + "fmt" + "io" +) + +// WaveSize is the number of bytes in a wave +// WaveHeaderSize 24 bytes + waveBody +const WaveSize = 2048 + +// MessageHeaderSize is the number of bytes in a wave header +// command 12 bytes + length 4 bytes + checksum 4 bytes +const WaveHeaderSize = 24 + +// CommandSize is the fixed size of all commands +const CommandSize = 12 + +// Commands used in wave which describe the type of wave. +const ( + CmdQuestion = "question" + CmdVersion = "version" + CmdRoots = "roots" + CmdMessages = "messages" + CmdPing = "ping" + CmdPong = "aler" + CmdUser = "user" +) + +var ( + errWaveLengthTooLong = errors.New("wave length too long") +) + +// Wave is an interface that describes a galaxy information. +type Wave interface { + Command() string +} + +func makeEmptyWave(command string) (Wave, error) { + var wave Wave + switch command { + case CmdQuestion: + wave = &WaveQuestion{} + case CmdVersion: + wave = &WaveVersion{} + case CmdRoots: + wave = &WaveRoots{} + case CmdMessages: + wave = &WaveMessages{} + case CmdPing: + wave = &WavePing{} + case CmdPong: + wave = &WavePong{} + case CmdUser: + wave = &WaveUser{} + default: + return nil, fmt.Errorf("unhandled command [%s]", command) + } + return wave, nil +} + +// SendWave send a wave message to w +func SendWave(w io.Writer, wave Wave) (int, error) { + var magic, waveLen, checkSum [4]byte + var command [CommandSize]byte + cmd := wave.Command() + if len(cmd) > CommandSize { + return 0, fmt.Errorf("command [%s] is too long [max %v]", wave, CommandSize) + } + copy(command[:], []byte(cmd)) + copy(magic[:], []byte("")) + copy(waveLen[:], []byte("")) + copy(checkSum[:], []byte("")) + + waveHeader := bytes.NewBuffer(make([]byte, 0, WaveHeaderSize)) + waveHeader.Write((magic[:])) + waveHeader.Write(command[:]) + waveHeader.Write(waveLen[:]) + waveHeader.Write(checkSum[:]) + waveBody, err := json.Marshal(wave) + if err != nil { + return 0, err + } + waveBytes := append(waveHeader.Bytes(), waveBody...) + if len(waveBytes) > WaveSize { + return 0, errWaveLengthTooLong + } + return w.Write(waveBytes) +} + +// ReceiveWave receive a wave message from r +func ReceiveWave(r io.Reader) (Wave, error) { + waveBytes := make([]byte, WaveSize) + n, err := r.Read(waveBytes) + if err != nil { + return nil, err + } + waveHeader := waveBytes[:WaveHeaderSize] + waveBody := waveBytes[WaveHeaderSize:n] + + // Strip trailing zeros from command string. + command := string(bytes.TrimRight(waveHeader[4:CommandSize+4], string(0))) + msg, err := makeEmptyWave(command) + if err != nil { + return nil, err + } + + if err := json.Unmarshal(waveBody, msg); err != nil { + return nil, err + } + return msg, nil +} diff --git a/galaxy/wave_test.go b/galaxy/wave_test.go new file mode 100644 index 0000000..664fe62 --- /dev/null +++ b/galaxy/wave_test.go @@ -0,0 +1,17 @@ +// Copyright 2019 The PDU Authors +// This file is part of the PDU library. +// +// The PDU library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The PDU library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the PDU library. If not, see . + +package galaxy diff --git a/galaxy/wavemsgs.go b/galaxy/wavemsgs.go new file mode 100644 index 0000000..431fa39 --- /dev/null +++ b/galaxy/wavemsgs.go @@ -0,0 +1,27 @@ +// Copyright 2019 The PDU Authors +// This file is part of the PDU library. +// +// The PDU library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The PDU library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the PDU library. If not, see . + +package galaxy + +// WaveMessages implements the Wave interface and represents a broadcast new message. +type WaveMessages struct { + Msgs [][]byte `json:"msgs"` +} + +// Command returns the protocol command string for the wave. +func (w *WaveMessages) Command() string { + return CmdMessages +} diff --git a/galaxy/waveping.go b/galaxy/waveping.go new file mode 100644 index 0000000..65ef99e --- /dev/null +++ b/galaxy/waveping.go @@ -0,0 +1,26 @@ +// Copyright 2019 The PDU Authors +// This file is part of the PDU library. +// +// The PDU library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The PDU library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the PDU library. If not, see . + +package galaxy + +// WavePing implements the Wave interface and represents a ping message +type WavePing struct { +} + +// Command returns the protocol command string for the wave. +func (w *WavePing) Command() string { + return CmdPing +} diff --git a/galaxy/wavepong.go b/galaxy/wavepong.go new file mode 100644 index 0000000..c7f9e9c --- /dev/null +++ b/galaxy/wavepong.go @@ -0,0 +1,26 @@ +// Copyright 2019 The PDU Authors +// This file is part of the PDU library. +// +// The PDU library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The PDU library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the PDU library. If not, see . + +package galaxy + +// WavePong implements the Wave interface and represents a pong message +type WavePong struct { +} + +// Command returns the protocol command string for the wave. +func (w *WavePong) Command() string { + return CmdPong +} diff --git a/galaxy/wavequestion.go b/galaxy/wavequestion.go new file mode 100644 index 0000000..2d81489 --- /dev/null +++ b/galaxy/wavequestion.go @@ -0,0 +1,26 @@ +// Copyright 2019 The PDU Authors +// This file is part of the PDU library. +// +// The PDU library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The PDU library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the PDU library. If not, see . + +package galaxy + +// WaveQuestion implements the Wave interface and represents request info message. +type WaveQuestion struct { +} + +// Command returns the protocol command string for the wave. +func (w *WaveQuestion) Command() string { + return CmdQuestion +} diff --git a/galaxy/waveroots.go b/galaxy/waveroots.go new file mode 100644 index 0000000..21730cf --- /dev/null +++ b/galaxy/waveroots.go @@ -0,0 +1,26 @@ +// Copyright 2019 The PDU Authors +// This file is part of the PDU library. +// +// The PDU library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The PDU library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the PDU library. If not, see . + +package galaxy + +// WaveRoots implements the Wave interface and represents a getRoots message. +type WaveRoots struct { +} + +// Command returns the protocol command string for the wave. +func (w *WaveRoots) Command() string { + return CmdRoots +} diff --git a/galaxy/waveuser.go b/galaxy/waveuser.go new file mode 100644 index 0000000..d0b86aa --- /dev/null +++ b/galaxy/waveuser.go @@ -0,0 +1,26 @@ +// Copyright 2019 The PDU Authors +// This file is part of the PDU library. +// +// The PDU library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The PDU library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the PDU library. If not, see . + +package galaxy + +// WaveUser implements the Wave interface and represents a checkUser message. +type WaveUser struct { +} + +// Command returns the protocol command string for the wave. +func (w *WaveUser) Command() string { + return CmdUser +} diff --git a/galaxy/waveversion.go b/galaxy/waveversion.go new file mode 100644 index 0000000..3b6a23e --- /dev/null +++ b/galaxy/waveversion.go @@ -0,0 +1,26 @@ +// Copyright 2019 The PDU Authors +// This file is part of the PDU library. +// +// The PDU library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The PDU library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the PDU library. If not, see . + +package galaxy + +// WaveVersion implements the Wave interface and represents a galaxy protocol version message. +type WaveVersion struct { +} + +// Command returns the protocol command string for the wave. +func (w *WaveVersion) Command() string { + return CmdVersion +} diff --git a/node/node.go b/node/node.go index be24786..7b81f15 100644 --- a/node/node.go +++ b/node/node.go @@ -34,6 +34,7 @@ import ( "github.com/pdupub/go-pdu/core" "github.com/pdupub/go-pdu/crypto" "github.com/pdupub/go-pdu/db" + "github.com/pdupub/go-pdu/galaxy" "github.com/pdupub/go-pdu/peer" "golang.org/x/net/websocket" ) @@ -226,35 +227,27 @@ func (n *Node) Run(c <-chan os.Signal) { } func (n Node) wsHandler(ws *websocket.Conn) { - var err error - var content string - var msg core.Message - - // todo: move into galaxy - type Response struct { - Result string `json:"result"` - } - resByte, err := json.Marshal(Response{Result: "OK"}) - if err != nil { - log.Error(err) - } - for { - if err = websocket.Message.Receive(ws, &content); err != nil { + w, err := galaxy.ReceiveWave(ws) + if err != nil { + log.Error(err) break } - if err = json.Unmarshal([]byte(content), &msg); err != nil { - log.Error("Decode message fail", err) - } - log.Info("Msg received", common.Hash2String(msg.ID())) - if err = websocket.Message.Send(ws, resByte); err != nil { - log.Error("Send fail", err) - } - // save msg (universe & udb) - if err = n.saveMsg(&msg); err != nil { - log.Error("Add new msg fail", err) - } else if err = n.broadcastMsg(&msg); err != nil { - log.Error("Broadcast", common.Hash2String(msg.ID())) + if w.Command() == galaxy.CmdMessages { + var msg core.Message + wm := w.(*galaxy.WaveMessages) + for _, wmsg := range wm.Msgs { + if err = json.Unmarshal(wmsg, &msg); err != nil { + log.Error("Decode message fail", err) + } + log.Info("Msg received", common.Hash2String(msg.ID())) + } + // save msg (universe & udb) + if err = n.saveMsg(&msg); err != nil { + log.Error("Add new msg fail", err) + } else if err = n.broadcastMsg(&msg); err != nil { + log.Error("Broadcast", common.Hash2String(msg.ID())) + } } } } diff --git a/peer/peer.go b/peer/peer.go index 46b1f87..7fc88e6 100644 --- a/peer/peer.go +++ b/peer/peer.go @@ -20,7 +20,9 @@ import ( "encoding/json" "errors" "fmt" + "github.com/pdupub/go-pdu/core" + "github.com/pdupub/go-pdu/galaxy" "golang.org/x/net/websocket" ) @@ -78,11 +80,17 @@ func (p *Peer) SendMsg(msg *core.Message) error { return errPeerNotReachable } + var msgs [][]byte msgBytes, err := json.Marshal(msg) if err != nil { return err } - _, err = p.conn.Write(msgBytes) + msgs = append(msgs, msgBytes) + wave := &galaxy.WaveMessages{ + Msgs: msgs, + } + + _, err = galaxy.SendWave(p.conn, wave) if err != nil { return err }