/
p2p.go
120 lines (109 loc) · 3.36 KB
/
p2p.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
package handler
import (
kb "github.com/lsds/KungFu/srcs/go/kungfu/base"
"github.com/lsds/KungFu/srcs/go/plan"
"github.com/lsds/KungFu/srcs/go/rchannel/client"
"github.com/lsds/KungFu/srcs/go/rchannel/connection"
"github.com/lsds/KungFu/srcs/go/store"
)
const defaultVersionCount = 3
type PeerToPeerEndpoint struct {
versionedStore *store.VersionedStore
store *store.Store
waitQ *BufferPool
recvQ *BufferPool
client *client.Client
}
func NewPeerToPeerEndpoint(client *client.Client) *PeerToPeerEndpoint {
return &PeerToPeerEndpoint{
versionedStore: store.NewVersionedStore(defaultVersionCount),
store: store.NewStore(),
waitQ: newBufferPool(1),
recvQ: newBufferPool(1),
client: client,
}
}
// Handle implements ConnHandler.Handle interface
func (e *PeerToPeerEndpoint) Handle(conn connection.Connection) (int, error) {
return connection.Stream(conn, e.accept, e.handle)
}
func (e *PeerToPeerEndpoint) Request(a plan.Addr, version string, m connection.Message) (bool, error) {
e.waitQ.require(a) <- &m
if err := e.client.Send(a, []byte(version), connection.ConnPeerToPeer, connection.NoFlag); err != nil {
<-e.waitQ.require(a)
return false, err // FIXME: allow send to fail
}
pm := <-e.recvQ.require(a)
if !m.Same(pm) {
return false, errRegisteredBufferNotUsed
}
return !pm.HasFlag(connection.RequestFailed), nil
}
func (e *PeerToPeerEndpoint) Save(name string, buf *kb.Vector) error {
blob, err := e.store.GetOrCreate(name, len(buf.Data))
if err != nil {
return err
}
return blob.CopyFrom(buf.Data)
}
func (e *PeerToPeerEndpoint) SaveVersion(version, name string, buf *kb.Vector) error {
blob, err := e.versionedStore.GetOrCreate(version, name, len(buf.Data))
if err != nil {
return err
}
return blob.CopyFrom(buf.Data)
}
func (e *PeerToPeerEndpoint) accept(conn connection.Connection) (string, *connection.Message, error) {
var mh connection.MessageHeader
if err := mh.ReadFrom(conn.Conn()); err != nil {
return "", nil, err
}
name := string(mh.Name)
if mh.HasFlag(connection.IsResponse) {
m := <-e.waitQ.require(conn.Src().WithName(name))
m.Flags = mh.Flags
if mh.HasFlag(connection.RequestFailed) {
var empty connection.Message
if err := empty.ReadInto(conn.Conn()); err != nil {
return "", nil, err
}
return name, m, nil
}
if err := m.ReadInto(conn.Conn()); err != nil {
return "", nil, err
}
return name, m, nil
}
var m connection.Message
m.Flags = mh.Flags
if err := m.ReadFrom(conn.Conn()); err != nil {
return "", nil, err
}
return name, &m, nil
}
func (e *PeerToPeerEndpoint) handle(name string, msg *connection.Message, conn connection.Connection) {
if msg.HasFlag(connection.IsResponse) {
e.recvQ.require(conn.Src().WithName(name)) <- msg
return
}
go e.response(name, msg.Data, conn.Src()) // FIXME: check error, use one queue
}
func (e *PeerToPeerEndpoint) response(name string, version []byte, remote plan.PeerID) error {
var blob *store.Blob
var err error
if len(version) == 0 {
blob, err = e.store.Get(name)
} else {
blob, err = e.versionedStore.Get(string(version), name)
}
flags := connection.IsResponse
var buf []byte
if err == nil {
blob.RLock()
defer blob.RUnlock()
buf = blob.Data
} else {
flags |= connection.RequestFailed
}
return e.client.Send(remote.WithName(name), buf, connection.ConnPeerToPeer, flags)
}