-
Notifications
You must be signed in to change notification settings - Fork 171
/
channel.go
292 lines (265 loc) · 8.25 KB
/
channel.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
package vm
import (
"fmt"
"github.com/goby-lang/goby/vm/classes"
"github.com/goby-lang/goby/vm/errors"
"sync"
)
// ChannelObject represents Goby's "channel", which equips the Golang' channel and works with `thread`.
// `thread` is actually a "goroutine".
// A channel object can relay any kind of objects and guarantees thread-safe communications.
// You should always use channel objects for safe communications between threads.
// `Channel#new` is available.
//
// Note that channels are not like files and you don't need to explicitly close them (e.g.: exiting a loop).
// See https://tour.golang.org/concurrency/4
//
// ```ruby
// def f(from)
// i = 0
// while i < 3 do
// puts(from + ": " + i.to_s)
// i += 1
// end
// end
//
// f("direct")
//
// c = Channel.new # spawning a channel object
//
// thread do
// puts(c.receive)
// f("thread")
// end
//
// thread do
// puts("going")
// c.deliver(10)
// end
//
// sleep(2) # This is to prevent main program finished before goroutine.
// ```
//
// Note that the possibility of race conditions still exists. Handle them with care.
//
// ```ruby
// c = Channel.new
//
// i = 0
// thread do
// i += 1
// c.deliver(i) # sends `i` to channel `c`
// end
//
// # If we put a bare `i += 1` here, then it will execute along with other thread,
// # which will cause a race condition.
// # The following "receive" is needed to block the main process until thread is finished
// c.receive
// i += 1
//
// c.close # Redundant: just for explanation and you don't need to call this here
// ```
type ChannelObject struct {
*baseObj
Chan chan int
ChannelState int
}
// Channel's state.
// To Goby language contributors: Golang's channels should be carefully handled because:
// - You cannot write to closed channels, or got a panic.
// - You cannot close closed channels, or got a panic.
// - You cannot write to nil channels, or causes a deadlock.
// - You cannot read nil channels, or causes a deadlock.
// Ref: https://beatsync.net/main/log20150325.html
const (
chOpen = iota
chClosed
)
// Class methods --------------------------------------------------------
func builtinChannelClassMethods() []*BuiltinMethodObject {
return []*BuiltinMethodObject{
{
// Creates an instance of `Channel` class, taking no arguments.
//
// ```ruby
// c = Channel.new
// c.class #=> Channel
// ```
//
// @return [Channel]
Name: "new",
Fn: func(receiver Object, sourceLine int, t *Thread, args []Object, blockFrame *normalCallFrame) Object {
c := &ChannelObject{baseObj: &baseObj{class: t.vm.topLevelClass(classes.ChannelClass)}, Chan: make(chan int, chOpen)}
return c
},
},
}
}
// Instance methods -----------------------------------------------------
func builtinChannelInstanceMethods() []*BuiltinMethodObject {
return []*BuiltinMethodObject{
{
// Just to close and the channel to declare no more objects will be sent.
// Channel is not like files, and you don't need to call `close` explicitly unless
// you definitely need to notify that no more objects will be sent,
// Well, you can call `#close` against the same channel twice or more, which is redundant.
// (Go's channel cannot do that)
// See https://tour.golang.org/concurrency/4
//
// ```ruby
// c = Channel.new
//
// 1001.times do |i|
// thread do
// c.deliver(i)
// end
// end
//
// r = 0
// 1001.times do
// r = r + c.receive
// end
//
// c.close # close the channel
//
// puts(r)
// ```
//
// If you call `close` twice against the same channel, an error is returned.
//
// It takes no argument.
//
// @return [Null]
Name: "close",
Fn: func(receiver Object, sourceLine int, t *Thread, args []Object, blockFrame *normalCallFrame) Object {
if len(args) != 0 {
return t.vm.InitErrorObject(errors.ArgumentError, sourceLine, errors.WrongNumberOfArgumentFormat, 0, len(args))
}
c := receiver.(*ChannelObject)
if c.ChannelState == chClosed {
return t.vm.InitErrorObject(errors.ChannelCloseError, sourceLine, errors.ChannelIsClosed)
}
c.ChannelState = chClosed
close(receiver.(*ChannelObject).Chan)
receiver = nil
return NULL
},
},
{
// Sends an object to the receiver (channel), then returns the object.
// Note that the method suspends the process until the object is actually received.
// Thus if you call `deliver` outside thread, the main process would suspend.
// Note that you don't need to send dummy object just to resume; use `close` instead.
//
// ```ruby
// c = Channel.new
//
// i = 0
// thread do
// i += 1
// c.deliver(i) # sends `i` to channel `c`
// end
//
// c.receive # receives `i`
// ```
//
// If you call `deliver` against the closed channel, an error is returned.
//
// It takes 1 argument.
//
// @param object [Object]
// @return [Object]
Name: "deliver",
Fn: func(receiver Object, sourceLine int, t *Thread, args []Object, blockFrame *normalCallFrame) Object {
if len(args) != 1 {
return t.vm.InitErrorObject(errors.ArgumentError, sourceLine, errors.WrongNumberOfArgumentFormat, 1, len(args))
}
c := receiver.(*ChannelObject)
if c.ChannelState == chClosed {
return t.vm.InitErrorObject(errors.ChannelCloseError, sourceLine, errors.ChannelIsClosed)
}
id := t.vm.channelObjectMap.storeObj(args[0])
c.Chan <- id
return args[0]
},
},
{
// Receives objects from other threads' `deliver` method, then returns it.
// The method works as if the channel would receive objects perpetually from outside.
// Note that the method suspends the process until it actually receives something via `deliver`.
// Thus if you call `receive` outside thread, the main process would suspend.
// This also means you can resume a code by using the `receive` method.
//
// ```ruby
// c = Channel.new
//
// thread do
// puts(c.receive) # prints the object received from other threads.
// f("thread")
// end
// ```
//
// If you call `receive` against the closed channel, an error is returned.
//
// It takes no arguments.
//
// @return [Object]
Name: "receive",
Fn: func(receiver Object, sourceLine int, t *Thread, args []Object, blockFrame *normalCallFrame) Object {
if len(args) != 0 {
if len(args) != 0 {
return t.vm.InitErrorObject(errors.ArgumentError, sourceLine, errors.WrongNumberOfArgumentFormat, 0, len(args))
}
}
c := receiver.(*ChannelObject)
if c.ChannelState == chClosed {
return t.vm.InitErrorObject(errors.ChannelCloseError, sourceLine, errors.ChannelIsClosed)
}
num := <-c.Chan
return t.vm.channelObjectMap.retrieveObj(num)
},
},
}
}
// Internal functions ===================================================
// Functions for initialization -----------------------------------------
func (vm *VM) initChannelClass() *RClass {
class := vm.initializeClass(classes.ChannelClass)
class.setBuiltinMethods(builtinChannelClassMethods(), true)
class.setBuiltinMethods(builtinChannelInstanceMethods(), false)
return class
}
// Polymorphic helper functions -----------------------------------------
// Value returns the object
func (co *ChannelObject) Value() interface{} {
return co.Chan
}
// toString returns the object's name as the string format
func (co *ChannelObject) toString() string {
return fmt.Sprintf("<Channel: %p>", co.Chan)
}
// toJSON just delegates to toString
func (co *ChannelObject) toJSON(t *Thread) string {
return co.toString()
}
// copy returns the duplicate of the Array object
func (co *ChannelObject) copy() Object {
newC := &ChannelObject{baseObj: &baseObj{class: co.class}, Chan: make(chan int)}
return newC
}
// objectMap ==========================================================
type objectMap struct {
store *sync.Map
}
// Polymorphic helper functions -----------------------------------------
// storeObj stores objects into the container map
// and update containerCount at the same time
func (m *objectMap) storeObj(obj Object) int {
m.store.Store(obj.id(), obj)
return obj.id()
}
// retrieveObj returns the objects with the number specified
func (m *objectMap) retrieveObj(num int) Object {
obj, _ := m.store.Load(num)
return obj.(Object)
}