forked from colinmarc/hdfs
-
Notifications
You must be signed in to change notification settings - Fork 0
/
rpc.go
73 lines (61 loc) · 1.5 KB
/
rpc.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
// Package rpc implements some of the lower-level functionality required to
// communicate with the namenode and datanodes.
package rpc
import (
"bytes"
"code.google.com/p/goprotobuf/proto"
"encoding/binary"
)
// ClientName is passed into the namenode on requests, and identifies this
// client to the namenode.
const (
ClientName = "go-hdfs"
dataTransferVersion = 0x1c
readBlockOp = 0x51
checksumBlockOp = 0x55
)
func makeDelimitedMsg(msg proto.Message) ([]byte, error) {
msgBytes, err := proto.Marshal(msg)
if err != nil {
return nil, err
}
lengthBytes := make([]byte, 10)
n := binary.PutUvarint(lengthBytes, uint64(len(msgBytes)))
return append(lengthBytes[:n], msgBytes...), nil
}
func makePacket(msgs ...proto.Message) ([]byte, error) {
packet := make([]byte, 4, 128)
length := 0
for _, msg := range msgs {
b, err := makeDelimitedMsg(msg)
if err != nil {
return nil, err
}
packet = append(packet, b...)
length += len(b)
}
binary.BigEndian.PutUint32(packet, uint32(length))
return packet, nil
}
// Doesn't include the uint32 length
func parsePacket(b []byte, msgs ...proto.Message) error {
reader := bytes.NewReader(b)
for _, msg := range msgs {
msgLength, err := binary.ReadUvarint(reader)
if err != nil {
return err
}
if msgLength != 0 {
msgBytes := make([]byte, msgLength)
_, err = reader.Read(msgBytes)
if err != nil {
return err
}
err = proto.Unmarshal(msgBytes, msg)
if err != nil {
return err
}
}
}
return nil
}