Skip to content

Commit e21acca

Browse files
authored
v2: translate Go's goroutine scheduler (GMP model) to V (#26740)
Add a full goroutine runtime module (vlib/goroutines/) translated from Go's runtime scheduler, implementing the GMP (Goroutine-Machine-Processor) model with work-stealing, lock-free local run queues, and cooperative scheduling. Runtime module: - goroutines.v: Core data structures (Goroutine, Machine, Processor, Sched) - scheduler.v: GMP scheduler with work stealing, goroutine_create() - park.v: gopark/goready for goroutine blocking/unblocking - chan.v: Channel implementation (send, recv, close, select) - init.v: Module init creating P's (one per CPU core) and M0 - context_nix.c.v: ucontext-based context switching (Linux/macOS) - context_windows.c.v: Windows Fiber-based context switching - atomic_ops.c.v: C atomic operation wrappers V2 compiler changes: - ssa/instr.v: Add go_call and spawn_call opcodes - ssa/builder.v: Handle .key_go and .key_spawn in build_keyword_operator, add build_go_or_spawn() helper to extract CallExpr operands - gen/c/c.v: Emit goroutines__goroutine_create() for go_call, pthread_create() for spawn_call, with argument packing structs The transformer now: - Detects `KeywordOperator{ op: .key_go }` expressions - Synthesizes per-function: args struct, trampoline function, and dispatch wrapper that calls goroutines__goroutine_create - Replaces `go foo(a, b)` with a plain call to the dispatch wrapper - Registers wrapper info in needed_go_wrappers map (thread-safe for parallel transform) Backends see only regular CallExpr nodes after transformation. SSA builder and cleanc gen_spawn_expr retain fallback .key_go handling. Benchmark results (V goroutines vs Go goroutines vs V spawn): fan-out/fan-in 500g: V=3017us Go=657us spawn=70465us ping-pong 100K rt: V=117ms Go=41ms spawn=953ms contended 10x1K: V=549us Go=824us spawn=14920us V goroutines are ~3-5x slower than Go on creation/scheduling (expected given Go's mature runtime) but dramatically faster than OS threads (spawn), and competitive on channel throughput.
1 parent 150e87c commit e21acca

22 files changed

Lines changed: 2904 additions & 1 deletion

vlib/goroutines/README.md

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
# goroutines
2+
3+
Go-style goroutine runtime for V, implementing the GMP (Goroutine-Machine-Processor) scheduling model translated from the Go runtime (`src/runtime/proc.go`, `runtime2.go`, `chan.go`).
4+
5+
## Overview
6+
7+
This module provides lightweight goroutines for V's `go` keyword, as opposed to `spawn` which creates OS threads.
8+
9+
### GMP Model
10+
11+
- **G (Goroutine)**: Lightweight unit of execution with its own stack (~8KB default)
12+
- **M (Machine)**: OS thread that executes goroutines
13+
- **P (Processor)**: Logical processor with a local run queue (one per CPU core)
14+
15+
### Key Features
16+
17+
- **Work stealing**: Idle processors steal work from busy ones
18+
- **Local run queues**: Lock-free per-P queues minimize contention
19+
- **Global run queue**: Overflow and fairness mechanism
20+
- **Goroutine parking**: Efficient blocking/unblocking for channels
21+
- **G reuse**: Dead goroutines are recycled to reduce allocation
22+
23+
## Usage
24+
25+
```v
26+
// `go` launches a goroutine (lightweight, scheduled by GMP)
27+
go expensive_computation()
28+
29+
// `spawn` launches an OS thread (traditional V behavior)
30+
spawn blocking_io_task()
31+
```
32+
33+
## Architecture
34+
35+
Translated from Go's runtime source:
36+
37+
| Go Source | V Module File | Purpose |
38+
|-----------|---------------|---------|
39+
| `runtime2.go` | `goroutines.v` | Core data structures (G, M, P, Sched) |
40+
| `proc.go` | `scheduler.v` | Scheduler loop, work stealing, run queues |
41+
| `proc.go` | `park.v` | gopark/goready, Sudog, WaitQ |
42+
| `proc.go` | `init.v` | Initialization (schedinit, procresize) |
43+
| `chan.go` | `chan.v` | Channel implementation |
44+
| asm (gogo/gosave) | `context_nix.c.v` | Context switching (ucontext) |
45+
| asm (gogo/gosave) | `context_windows.c.v` | Context switching (Windows fibers) |
46+
47+
## References
48+
49+
- [Go Scheduler Design Doc](https://golang.org/s/go11sched)
50+
- [Go Runtime Source](https://github.com/golang/go/tree/master/src/runtime)

vlib/goroutines/atomic_ops.c.v

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
// Copyright (c) 2019-2024 Alexander Medvednikov. All rights reserved.
2+
// Use of this source code is governed by an MIT license
3+
// that can be found in the LICENSE file.
4+
//
5+
// Atomic operations and C interop for the goroutine scheduler.
6+
module goroutines
7+
8+
#include <stdlib.h>
9+
#include <string.h>
10+
#include "@VMODROOT/vlib/goroutines/goroutines_tls.h"
11+
12+
#flag @VMODROOT/vlib/goroutines/tls.c
13+
14+
// Thread-local storage
15+
fn C.goroutines_get_current_m() voidptr
16+
fn C.goroutines_set_current_m(mp voidptr)
17+
18+
// Typed atomic operations (implemented in tls.c)
19+
fn C.goroutines_atomic_load_u32(ptr &u32) u32
20+
fn C.goroutines_atomic_store_u32(ptr &u32, val u32)
21+
fn C.goroutines_atomic_fetch_add_u32(ptr &u32, val u32) u32
22+
fn C.goroutines_atomic_fetch_add_i32(ptr &i32, val i32) i32
23+
fn C.goroutines_atomic_fetch_sub_i32(ptr &i32, val i32) i32
24+
fn C.goroutines_atomic_fetch_add_u64(ptr &u64, val u64) u64
25+
fn C.goroutines_atomic_cas_u32(ptr &u32, expected &u32, desired u32) bool
26+
fn C.goroutines_atomic_cas_ptr(ptr voidptr, expected voidptr, desired voidptr) bool
27+
28+
fn C.grt_spinlock_lock(lk &i32)
29+
fn C.grt_spinlock_unlock(lk &i32)
30+
31+
fn C.memcpy(dest voidptr, src voidptr, n usize) voidptr
32+
fn C.memset(dest voidptr, ch int, n usize) voidptr
33+
fn C.rand() int
34+
35+
// SpinLock - ucontext-safe lock (pthreads mutex breaks with swapcontext).
36+
pub struct SpinLock {
37+
mut:
38+
state i32
39+
}
40+
41+
pub fn (mut s SpinLock) acquire() {
42+
C.grt_spinlock_lock(&s.state)
43+
}
44+
45+
pub fn (mut s SpinLock) release() {
46+
C.grt_spinlock_unlock(&s.state)
47+
}

vlib/goroutines/chan.v

Lines changed: 285 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,285 @@
1+
// Copyright (c) 2019-2024 Alexander Medvednikov. All rights reserved.
2+
// Use of this source code is governed by an MIT license
3+
// that can be found in the LICENSE file.
4+
//
5+
// Channel implementation for goroutines.
6+
// Translated from Go's runtime/chan.go.
7+
//
8+
// Channels provide goroutine-safe communication between goroutines.
9+
// They support both buffered and unbuffered modes.
10+
//
11+
// Key operations translated from Go:
12+
// - makechan() -> chan_make()
13+
// - chansend() -> chan_send()
14+
// - chanrecv() -> chan_recv()
15+
// - closechan() -> chan_close()
16+
module goroutines
17+
18+
// Chan is a goroutine-safe channel for communication between goroutines.
19+
// Translated from Go's hchan struct in chan.go.
20+
pub struct Chan {
21+
pub mut:
22+
mu SpinLock // protects all fields (spinlock is ucontext-safe)
23+
qcount u32 // total data in the queue
24+
dataqsiz u32 // size of the circular buffer
25+
buf voidptr // circular buffer for buffered channels
26+
elemsize u16 // size of each element
27+
closed bool // true if channel is closed
28+
29+
sendx u32 // send index into circular buffer
30+
recvx u32 // receive index into circular buffer
31+
32+
recvq WaitQ // list of recv waiters
33+
sendq WaitQ // list of send waiters
34+
}
35+
36+
// chan_make creates a new channel.
37+
// If buf_size > 0, creates a buffered channel.
38+
// Translated from Go's makechan() in chan.go.
39+
pub fn chan_make(elem_size int, buf_size int) &Chan {
40+
mut c := &Chan{
41+
elemsize: u16(elem_size)
42+
dataqsiz: u32(buf_size)
43+
}
44+
if buf_size > 0 {
45+
c.buf = unsafe { malloc(elem_size * buf_size) }
46+
}
47+
return c
48+
}
49+
50+
// chan_send sends a value on the channel.
51+
// If block is true, blocks until the send can proceed.
52+
// Returns true if the value was sent.
53+
// Translated from Go's chansend() in chan.go.
54+
pub fn chan_send(c &Chan, ep voidptr, block bool) bool {
55+
if c == unsafe { nil } {
56+
if !block {
57+
return false
58+
}
59+
// Block forever on nil channel (Go behavior)
60+
gopark('chan send (nil chan)')
61+
return false // unreachable
62+
}
63+
64+
mut ch := unsafe { c }
65+
ch.mu.acquire()
66+
67+
if ch.closed {
68+
ch.mu.release()
69+
panic('send on closed channel')
70+
}
71+
72+
// Fast path: try to find a waiting receiver
73+
sg := ch.recvq.dequeue()
74+
if sg != unsafe { nil } {
75+
// Found a waiting receiver - send directly
76+
ch.mu.release()
77+
send_direct(sg, ep, ch.elemsize)
78+
return true
79+
}
80+
81+
// Buffered channel with space available
82+
if ch.qcount < ch.dataqsiz {
83+
// Put data in buffer
84+
dst := chan_buf(ch, ch.sendx)
85+
unsafe { C.memcpy(dst, ep, ch.elemsize) }
86+
ch.sendx++
87+
if ch.sendx == ch.dataqsiz {
88+
ch.sendx = 0
89+
}
90+
ch.qcount++
91+
ch.mu.release()
92+
return true
93+
}
94+
95+
if !block {
96+
ch.mu.release()
97+
return false
98+
}
99+
100+
// Block: enqueue ourselves on the send wait queue
101+
gp := get_current_g()
102+
mut mysg := &Sudog{
103+
g: unsafe { gp }
104+
elem: ep
105+
c: voidptr(ch)
106+
}
107+
ch.sendq.enqueue(mysg)
108+
ch.mu.release()
109+
110+
// Park the goroutine until a receiver wakes us
111+
gopark('chan send')
112+
113+
return true
114+
}
115+
116+
// chan_recv receives a value from the channel.
117+
// If block is true, blocks until a value is available.
118+
// Returns (received, ok). ok is false if channel is closed and empty.
119+
// Translated from Go's chanrecv() in chan.go.
120+
pub fn chan_recv(c &Chan, ep voidptr, block bool) (bool, bool) {
121+
if c == unsafe { nil } {
122+
if !block {
123+
return false, false
124+
}
125+
gopark('chan receive (nil chan)')
126+
return false, false // unreachable
127+
}
128+
129+
mut ch := unsafe { c }
130+
ch.mu.acquire()
131+
132+
// Fast path: try to find a waiting sender
133+
sg := ch.sendq.dequeue()
134+
if sg != unsafe { nil } {
135+
ch.mu.release()
136+
recv_direct(ch, sg, ep)
137+
return true, true
138+
}
139+
140+
// Buffered channel with data available
141+
if ch.qcount > 0 {
142+
src := chan_buf(ch, ch.recvx)
143+
if ep != unsafe { nil } {
144+
unsafe { C.memcpy(ep, src, ch.elemsize) }
145+
}
146+
ch.recvx++
147+
if ch.recvx == ch.dataqsiz {
148+
ch.recvx = 0
149+
}
150+
ch.qcount--
151+
ch.mu.release()
152+
return true, true
153+
}
154+
155+
if ch.closed {
156+
ch.mu.release()
157+
if ep != unsafe { nil } {
158+
unsafe { C.memset(ep, 0, ch.elemsize) }
159+
}
160+
return true, false
161+
}
162+
163+
if !block {
164+
ch.mu.release()
165+
return false, false
166+
}
167+
168+
// Block: enqueue ourselves on the recv wait queue
169+
gp := get_current_g()
170+
mut mysg := &Sudog{
171+
g: unsafe { gp }
172+
elem: ep
173+
c: voidptr(ch)
174+
}
175+
ch.recvq.enqueue(mysg)
176+
ch.mu.release()
177+
178+
// Park until a sender wakes us
179+
gopark('chan receive')
180+
181+
return true, true
182+
}
183+
184+
// chan_close closes the channel.
185+
// Translated from Go's closechan() in chan.go.
186+
pub fn chan_close(c &Chan) {
187+
if c == unsafe { nil } {
188+
panic('close of nil channel')
189+
}
190+
191+
mut ch := unsafe { c }
192+
ch.mu.acquire()
193+
194+
if ch.closed {
195+
ch.mu.release()
196+
panic('close of closed channel')
197+
}
198+
199+
ch.closed = true
200+
201+
// Wake all waiting receivers
202+
for {
203+
mut sg := ch.recvq.dequeue()
204+
if sg == unsafe { nil } {
205+
break
206+
}
207+
if sg.elem != unsafe { nil } {
208+
unsafe { C.memset(sg.elem, 0, ch.elemsize) }
209+
}
210+
sg.success = false
211+
goready(sg.g)
212+
}
213+
214+
// Wake all waiting senders (they will panic)
215+
for {
216+
mut sg := ch.sendq.dequeue()
217+
if sg == unsafe { nil } {
218+
break
219+
}
220+
sg.success = false
221+
goready(sg.g)
222+
}
223+
224+
ch.mu.release()
225+
}
226+
227+
// send_direct sends data directly from sender to a waiting receiver.
228+
// Translated from Go's send() in chan.go.
229+
fn send_direct(sg &Sudog, ep voidptr, elem_size u16) {
230+
if sg.elem != unsafe { nil } {
231+
unsafe { C.memcpy(sg.elem, ep, elem_size) }
232+
}
233+
mut s := unsafe { sg }
234+
s.success = true
235+
goready(sg.g)
236+
}
237+
238+
// recv_direct receives data directly from a waiting sender.
239+
fn recv_direct(ch &Chan, sg &Sudog, ep voidptr) {
240+
if ch.dataqsiz == 0 {
241+
// Unbuffered: copy directly from sender
242+
if ep != unsafe { nil } {
243+
unsafe { C.memcpy(ep, sg.elem, ch.elemsize) }
244+
}
245+
} else {
246+
// Buffered: take from buffer, then copy sender's data into buffer
247+
buf_elem := chan_buf(ch, unsafe { ch }.recvx)
248+
if ep != unsafe { nil } {
249+
unsafe { C.memcpy(ep, buf_elem, ch.elemsize) }
250+
}
251+
unsafe { C.memcpy(buf_elem, sg.elem, ch.elemsize) }
252+
unsafe {
253+
ch.recvx++
254+
if ch.recvx == ch.dataqsiz {
255+
ch.recvx = 0
256+
}
257+
ch.sendx = ch.recvx
258+
}
259+
}
260+
mut s := unsafe { sg }
261+
s.success = true
262+
goready(sg.g)
263+
}
264+
265+
// chan_buf returns a pointer to the i-th slot in the buffer.
266+
// Translated from Go's chanbuf() in chan.go.
267+
fn chan_buf(c &Chan, i u32) voidptr {
268+
return unsafe { voidptr(usize(c.buf) + usize(i) * usize(c.elemsize)) }
269+
}
270+
271+
// chan_len returns the number of elements in the channel buffer.
272+
pub fn chan_len(c &Chan) int {
273+
if c == unsafe { nil } {
274+
return 0
275+
}
276+
return int(c.qcount)
277+
}
278+
279+
// chan_cap returns the capacity of the channel buffer.
280+
pub fn chan_cap(c &Chan) int {
281+
if c == unsafe { nil } {
282+
return 0
283+
}
284+
return int(c.dataqsiz)
285+
}

0 commit comments

Comments
 (0)