/
chunk_rpc.go
69 lines (61 loc) · 1.9 KB
/
chunk_rpc.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
package chunk
import (
"net"
log "github.com/Sirupsen/logrus"
"strconv"
"net/rpc"
. "github.com/JetMuffin/whalefs/types"
"github.com/JetMuffin/whalefs/communication"
"bytes"
)
type ChunkRPC struct {
blockStore *BlockStore
blockSyncDone chan *BlockHeader
}
func NewChunkRPC(blockStore *BlockStore, blockSyncDone chan *BlockHeader) *ChunkRPC {
return &ChunkRPC{
blockStore: blockStore,
blockSyncDone: blockSyncDone,
}
}
func (c *ChunkRPC) Write(block Block, checksum *string) error {
cs, err := c.blockStore.WriteBlock(block.ID, block.Header.Size, block.Reader())
err = c.blockStore.WriteMeta(block.Header, cs)
*checksum = cs
if err != nil {
log.Errorf("Write block %v error: %v", block.ID, err)
return err
}
log.Infof("Successful write block %v with checksum %v", block.ID, cs)
return nil
}
func (c *ChunkRPC) Read(blockID BlockID, reply *communication.BlockMessage) error {
w := bytes.NewBufferString("")
err := c.blockStore.ReadBlock(blockID, w)
checksum, err := c.blockStore.BlockCheckSum(blockID)
reply.Data = w.Bytes()
reply.Checksum = checksum
log.WithField("checksum", checksum).Infof("Read block %v", blockID)
return err
}
func (c *ChunkRPC) Sync(block Block, checksum *string) error {
cs, err := c.blockStore.WriteBlock(block.ID, block.Header.Size, block.Reader())
err = c.blockStore.WriteMeta(block.Header, cs)
if err != nil {
log.Errorf("Write block %v error: %v", block.ID, err)
return err
}
log.Infof("Successful synchronize block %v with checksum %v", block.ID, cs)
c.blockSyncDone <- block.Header
return nil
}
// ListenRPC setup a RPC server on chunk node.
func (c *ChunkServer) ListenRPC() {
rpc.Register(NewChunkRPC(c.store, c.blockSyncDone))
listener, err := net.Listen("tcp", ":" + strconv.Itoa(c.RPCPort))
if err != nil {
log.Fatalf("Error: listen to rpc port error: %v.", err)
}
log.Infof("RPC Server listen on :%v.", strconv.Itoa(c.RPCPort))
go rpc.Accept(listener)
}