@@ -6,6 +6,8 @@ module builder
66import v2.ast
77import v2.transformer
88import runtime
9+ import os
10+ import time
911
1012$if ! windows {
1113 struct TransformChunkArgs {
@@ -28,6 +30,8 @@ $if !windows {
2830 fn transform_chunk_thread (arg voidptr ) voidptr {
2931 a := unsafe { & TransformChunkArgs (arg) }
3032 t := unsafe { & transformer.Transformer (a.t) }
33+ wprof := os.getenv ('V2_TTIME' ) != ''
34+ mut wsw := time.new_stopwatch ()
3135 mut w := t.new_worker_clone (a.worker_idx)
3236 if unsafe { a.flat != nil } {
3337 // Streaming rehydration: rehydrate one file at a time, transform it,
@@ -52,6 +56,9 @@ $if !windows {
5256 for i := 0 ; i < a.files.len; i++ {
5357 result << w.transform_file_pub (a.files[i])
5458 }
59+ if wprof {
60+ eprintln (' [ttime] worker ${a.worker_idx} : ${a.files.len} files in ${wsw.elapsed().milliseconds()} ms' )
61+ }
5562 unsafe {
5663 * (& []ast.File (a.result_ptr)) = result
5764 * (& voidptr (a.worker_ptr)) = voidptr (w)
@@ -61,8 +68,17 @@ $if !windows {
6168}
6269
6370fn (mut b Builder) transform_files_parallel (mut trans transformer.Transformer) []ast.File {
71+ timing := os.getenv ('V2_TTIME' ) != ''
72+ mut sw := time.new_stopwatch ()
6473 mut result := b.transform_files_parallel_no_post_pass (mut trans)
74+ if timing {
75+ eprintln (' [ttime] (parallel) prepare+fanout: ${sw.elapsed().milliseconds()} ms' )
76+ sw = time.new_stopwatch ()
77+ }
6578 trans.post_pass (mut result)
79+ if timing {
80+ eprintln (' [ttime] (parallel) post_pass: ${sw.elapsed().milliseconds()} ms' )
81+ }
6682 return result
6783}
6884
@@ -93,6 +109,8 @@ fn (mut b Builder) transform_files_parallel_no_post_pass_impl(mut trans transfor
93109 } else {
94110 trans.pre_pass (b.files)
95111 }
112+ timing_impl := os.getenv ('V2_TTIME' ) != ''
113+ mut sw_impl := time.new_stopwatch ()
96114 mut stream_files_from_flat := stream_from_flat
97115 mut files_to_transform := []ast.File{}
98116 if trans.needs_full_files_for_transform () {
@@ -102,6 +120,15 @@ fn (mut b Builder) transform_files_parallel_no_post_pass_impl(mut trans transfor
102120 } else if ! stream_from_flat {
103121 files_to_transform = b.files.clone ()
104122 }
123+ if timing_impl {
124+ eprintln (' [ttime] prepare_files_for_transform total: ${sw_impl.elapsed().milliseconds()} ms' )
125+ sw_impl = time.new_stopwatch ()
126+ }
127+ defer {
128+ if timing_impl {
129+ eprintln (' [ttime] per-file fanout: ${sw_impl.elapsed().milliseconds()} ms' )
130+ }
131+ }
105132
106133 // In flat mode, workers stream the rehydration per file (one legacy
107134 // ast.File in flight per worker at a time). Otherwise b.files is the
@@ -143,8 +170,32 @@ fn (mut b Builder) transform_files_parallel_no_post_pass_impl(mut trans transfor
143170 return result
144171 }
145172
146- // Split files into chunks and spawn workers via pthreads
147- chunk_size := (n_files + n_jobs - 1 ) / n_jobs // ceiling division
173+ // Assign files to workers. Contiguous chunks badly unbalance the load:
174+ // the few huge files (transformer.v, monomorphize.v, the cleanc gen
175+ // files, ...) cluster into adjacent chunks, so 2-3 workers run ~10s
176+ // while the rest finish in <0.5s and idle. For the non-flat path we
177+ // instead use longest-processing-time-first (LPT) bucketing keyed on a
178+ // cheap size proxy, then scatter each worker's results back to their
179+ // original file index after the join (no concurrent writes — workers
180+ // each fill their own chunk_results slot, the merge happens serially).
181+ mut bucket_indices := [][]int {len: n_jobs}
182+ if stream_files_from_flat {
183+ // Flat streaming still uses contiguous [start,end) ranges.
184+ chunk_size := (n_files + n_jobs - 1 ) / n_jobs
185+ mut i := 0
186+ mut w := 0
187+ for i < n_files {
188+ end := if i + chunk_size < n_files { i + chunk_size } else { n_files }
189+ for j in i .. end {
190+ bucket_indices[w] << j
191+ }
192+ i = end
193+ w++
194+ }
195+ } else {
196+ bucket_indices = lpt_buckets (files_to_transform, n_jobs)
197+ }
198+
148199 mut chunk_results := [][]ast.File{len: n_jobs}
149200 mut worker_ptrs := []voidptr {len: n_jobs, init: unsafe { nil }}
150201 mut thread_ids := []C.pthread_t{len: n_jobs}
@@ -159,21 +210,26 @@ fn (mut b Builder) transform_files_parallel_no_post_pass_impl(mut trans transfor
159210 C.pthread_attr_setstacksize (attr, 64 * 1024 * 1024 )
160211
161212 mut chunk_idx := 0
162- mut i := 0
163- for i < n_files {
164- end := if i + chunk_size < n_files { i + chunk_size } else { n_files }
213+ for w in 0 .. n_jobs {
214+ idxs := bucket_indices[w]
215+ if idxs.len == 0 {
216+ continue
217+ }
165218 if stream_files_from_flat {
166219 args << TransformChunkArgs{
167220 t: unsafe { voidptr (trans) }
168221 flat: unsafe { & b.flat }
169- flat_start: i
170- flat_end: end
222+ flat_start: idxs[ 0 ]
223+ flat_end: idxs[idxs.len - 1 ] + 1
171224 result_ptr: unsafe { voidptr (& chunk_results[chunk_idx]) }
172225 worker_ptr: unsafe { voidptr (& worker_ptrs[chunk_idx]) }
173226 worker_idx: chunk_idx
174227 }
175228 } else {
176- chunk := files_to_transform[i..end].clone ()
229+ mut chunk := []ast.File{cap: idxs.len}
230+ for fi in idxs {
231+ chunk << files_to_transform[fi]
232+ }
177233 args << TransformChunkArgs{
178234 t: unsafe { voidptr (trans) }
179235 files: chunk
@@ -184,7 +240,6 @@ fn (mut b Builder) transform_files_parallel_no_post_pass_impl(mut trans transfor
184240 }
185241 C.pthread_create (unsafe { & thread_ids[chunk_idx] }, attr, transform_chunk_thread,
186242 unsafe { voidptr (& args[chunk_idx]) })
187- i = end
188243 chunk_idx++
189244 }
190245 C.pthread_attr_destroy (attr)
@@ -194,22 +249,88 @@ fn (mut b Builder) transform_files_parallel_no_post_pass_impl(mut trans transfor
194249 C.pthread_join (thread_ids[ci], unsafe { nil })
195250 }
196251
197- // Collect results in chunk order and merge worker accumulated state
198- mut result := []ast.File{cap: n_files}
199- for ci := 0 ; ci < chunk_idx; ci++ {
252+ // Scatter each worker's results back to original file order and merge
253+ // accumulated state. bucket_indices[w] lists the original indices the
254+ // w-th spawned worker processed, in the same order it produced results.
255+ mut result := []ast.File{len: n_files}
256+ mut ci := 0
257+ for w in 0 .. n_jobs {
258+ idxs := bucket_indices[w]
259+ if idxs.len == 0 {
260+ continue
261+ }
200262 chunk_files := chunk_results[ci]
201- for k := 0 ; k < chunk_files.len; k++ {
202- result << chunk_files[k]
263+ for k, fi in idxs {
264+ if k < chunk_files.len {
265+ result[fi] = chunk_files[k]
266+ }
203267 }
204- w := unsafe { & transformer.Transformer (worker_ptrs[ci]) }
205- trans.merge_worker (w)
268+ worker := unsafe { & transformer.Transformer (worker_ptrs[ci]) }
269+ trans.merge_worker (worker)
270+ ci++
206271 }
207272 // Set synth_pos_counter past all worker ranges to avoid ID collisions in post_pass.
208273 trans.set_synth_pos_counter (- (chunk_idx * 100_000 ) - 1 )
209274 return result
210275 }
211276}
212277
278+ // lpt_buckets distributes file indices across n_jobs workers using the
279+ // longest-processing-time-first heuristic: process files largest-first and
280+ // always append to the currently least-loaded worker. This keeps the heaviest
281+ // files on separate workers so the fan-out wall time approaches
282+ // total_work / n_jobs instead of being pinned to one overloaded contiguous
283+ // chunk. The cost proxy is top-level statement count (cheap, and the giant
284+ // files have proportionally many declarations). Deterministic: files are
285+ // ordered by (cost desc, index asc) and ties pick the lowest worker index.
286+ fn lpt_buckets (files []ast.File, n_jobs int ) [][]int {
287+ n := files.len
288+ mut cost := []int {len: n}
289+ for i in 0 .. n {
290+ // Cost proxy: count function bodies, not just top-level declarations, so
291+ // a file of a few huge functions (transformer.v, the cleanc gen files)
292+ // outranks one with many tiny ones. Deterministic; one level deep is
293+ // enough to separate the heavyweight files that drove the imbalance.
294+ mut c := 1
295+ for stmt in files[i].stmts {
296+ c++
297+ if stmt is ast.FnDecl {
298+ c + = stmt.stmts.len
299+ }
300+ }
301+ cost[i] = c
302+ }
303+ // order = file indices by cost descending. Implemented as a plain insertion
304+ // sort (n is small, a few hundred) rather than sort_with_compare: this file
305+ // must self-host through every backend, and capturing closures / pointer
306+ // comparators are not reliably codegen'd by the v2 cleanc and arm64 paths.
307+ // Stable on index (only shifts on strictly-greater), so deterministic.
308+ mut order := []int {len: n, init: index}
309+ for i in 1 .. n {
310+ key := order[i]
311+ kc := cost[key]
312+ mut j := i - 1
313+ for j > = 0 && cost[order[j]] < kc {
314+ order[j + 1 ] = order[j]
315+ j--
316+ }
317+ order[j + 1 ] = key
318+ }
319+ mut buckets := [][]int {len: n_jobs}
320+ mut load := []i64 {len: n_jobs}
321+ for fi in order {
322+ mut mw := 0
323+ for w in 1 .. n_jobs {
324+ if load[w] < load[mw] {
325+ mw = w
326+ }
327+ }
328+ buckets[mw] << fi
329+ load[mw] + = i64 (cost[fi])
330+ }
331+ return buckets
332+ }
333+
213334// transform_files_parallel_to_flat is the parallel counterpart of
214335// Transformer.transform_files_to_flat. Today it composes the existing
215336// parallel transform with a boundary flatten_files() — same total work
0 commit comments