-
Notifications
You must be signed in to change notification settings - Fork 0
/
mock_stream.go
152 lines (122 loc) · 3.15 KB
/
mock_stream.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
package mocknet
import (
"errors"
"io"
"net"
"sync/atomic"
"time"
"github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/protocol"
)
// stream implements network.Stream
type stream struct {
env Environment
write *io.PipeWriter
read *io.PipeReader
conn *conn
reset chan struct{}
close chan struct{}
closed chan struct{}
writeErr error
protocol atomic.Value
stat network.Stat
}
var ErrReset error = errors.New("stream reset")
var ErrClosed error = errors.New("stream closed")
type transportObject struct {
msg []byte
arrivalTime time.Duration
}
func NewStream(env Environment, w *io.PipeWriter, r *io.PipeReader, dir network.Direction) *stream {
s := &stream{
env: env,
read: r,
write: w,
reset: make(chan struct{}, 1),
close: make(chan struct{}, 1),
closed: make(chan struct{}),
stat: network.Stat{Direction: dir},
}
return s
}
func (s *stream) Write(p []byte) (n int, err error) {
l := s.conn.link
delay := l.GetLatency() + l.RateLimit(len(p))
// Copy it.
cpy := make([]byte, len(p))
copy(cpy, p)
select {
case <-s.closed: // bail out if we're closing.
return 0, s.writeErr
case sa := <-s.env.SynTask(delay):
// TODO: instead of scheduling a task for delivery, the message could be buffered,
// and split in two racing tasks: schedule delivery when buffer is full, or when X time has passed
// write this message.
n, err = s.write.Write(cpy)
sa <- TaskAck{}
return
}
}
func (s *stream) Protocol() protocol.ID {
// Ignore type error. It means that the protocol is unset.
p, _ := s.protocol.Load().(protocol.ID)
return p
}
func (s *stream) Stat() network.Stat {
return s.stat
}
func (s *stream) SetProtocol(proto protocol.ID) {
s.protocol.Store(proto)
}
func (s *stream) Close() error {
select {
case s.close <- struct{}{}:
default:
}
<-s.closed
if s.writeErr != ErrClosed {
return s.writeErr
}
return nil
}
func (s *stream) Reset() error {
// Cancel any pending reads/writes with an error.
s.write.CloseWithError(ErrReset)
s.read.CloseWithError(ErrReset)
select {
case s.reset <- struct{}{}:
default:
}
<-s.closed
// No meaningful error case here.
return nil
}
func (s *stream) teardown() {
// at this point, no streams are writing.
s.conn.removeStream(s)
// Mark as closed.
close(s.closed)
s.conn.net.notifyAll(func(n network.Notifiee) {
n.ClosedStream(s.conn.net, s)
})
}
func (s *stream) Conn() network.Conn {
return s.conn
}
func (s *stream) SetDeadline(t time.Time) error {
return &net.OpError{Op: "set", Net: "pipe", Source: nil, Addr: nil, Err: errors.New("deadline not supported")}
}
func (s *stream) SetReadDeadline(t time.Time) error {
return &net.OpError{Op: "set", Net: "pipe", Source: nil, Addr: nil, Err: errors.New("deadline not supported")}
}
func (s *stream) SetWriteDeadline(t time.Time) error {
return &net.OpError{Op: "set", Net: "pipe", Source: nil, Addr: nil, Err: errors.New("deadline not supported")}
}
func (s *stream) Read(b []byte) (int, error) {
return s.read.Read(b)
}
func (s *stream) resetWith(err error) {
s.write.CloseWithError(err)
s.read.CloseWithError(err)
s.writeErr = err
}