Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions internal/compiler/fileloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,8 +138,10 @@ func processAllProgramFiles(
CurrentDirectory: opts.Host.GetCurrentDirectory(),
},
filesParser: &filesParser{
wg: core.NewWorkGroup(singleThreaded),
maxDepth: maxNodeModuleJsDepth,
wg: core.NewWorkGroup(singleThreaded),
resolveSem: core.NewSemaphore(singleThreaded),
parseSem: core.NewSemaphore(singleThreaded),
maxDepth: maxNodeModuleJsDepth,
},
rootTasks: make([]*parseTask, 0, len(rootFiles)+len(compilerOptions.Lib)),
supportedExtensions: supportedExtensions,
Expand Down
15 changes: 15 additions & 0 deletions internal/compiler/filesparser.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,17 +102,26 @@ func (t *parseTask) load(loader *fileLoader) {
// to avoid adding spurious lookups to file watcher tracking.
t.metadata = ast.SourceFileMetaData{ImpliedNodeFormat: core.ResolutionModeCommonJS}
} else {
// Resolution: load metadata (package.json lookups involve stat calls).
loader.filesParser.resolveSem.Acquire()
t.metadata = loader.loadSourceFileMetaData(t.normalizedFilePath)
loader.filesParser.resolveSem.Release()
}

// Parsing: parse the source file (CPU-heavy).
loader.filesParser.parseSem.Acquire()
file := loader.parseSourceFile(t)
loader.filesParser.parseSem.Release()
if file == nil {
return
}

t.file = file
t.subTasks = make([]*parseTask, 0, len(file.ReferencedFiles)+len(file.Imports())+len(file.ModuleAugmentations))

// Resolution: resolve references, type directives, and imports (stat-heavy).
loader.filesParser.resolveSem.Acquire()

compilerOptions := loader.opts.Config.CompilerOptions()
if !compilerOptions.NoResolve.IsTrue() {
for index, ref := range file.ReferencedFiles {
Expand Down Expand Up @@ -152,6 +161,8 @@ func (t *parseTask) load(loader *fileLoader) {
}

loader.resolveImportsAndModuleAugmentations(t)

loader.filesParser.resolveSem.Release()
}

func (t *parseTask) redirect(loader *fileLoader, fileName string) {
Expand All @@ -165,6 +176,8 @@ func (t *parseTask) redirect(loader *fileLoader, fileName string) {
}

func (t *parseTask) loadAutomaticTypeDirectives(loader *fileLoader) {
loader.filesParser.resolveSem.Acquire()
defer loader.filesParser.resolveSem.Release()
toParseTypeRefs, typeResolutionsInFile, typeResolutionsTrace, pDiagnostics := loader.resolveAutomaticTypeDirectives(t.normalizedFilePath)
t.typeResolutionsInFile = typeResolutionsInFile
t.typeResolutionsTrace = typeResolutionsTrace
Expand Down Expand Up @@ -197,6 +210,8 @@ func (t *parseTask) addSubTask(ref resolvedRef, libFile *LibFile) {

type filesParser struct {
wg core.WorkGroup
resolveSem *core.Semaphore
parseSem *core.Semaphore
taskDataByPath collections.SyncMap[tspath.Path, *parseTaskData]
maxDepth int
}
Expand Down
2 changes: 1 addition & 1 deletion internal/compiler/program.go
Original file line number Diff line number Diff line change
Expand Up @@ -1580,7 +1580,7 @@ func (p *Program) Emit(ctx context.Context, options EmitOptions) *EmitResult {
return printer.NewTextWriter(newLine, 0)
},
}
wg := core.NewWorkGroup(p.SingleThreaded())
wg := core.NewThrottledWorkGroup(p.SingleThreaded())
var emitters []*emitter
sourceFiles := p.getSourceFilesToEmit(options.TargetSourceFile, options.EmitOnly == EmitOnlyForcedDts)

Expand Down
69 changes: 69 additions & 0 deletions internal/core/workgroup.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package core

import (
"context"
"runtime"
"sync"
"sync/atomic"

Expand All @@ -24,6 +25,49 @@ func NewWorkGroup(singleThreaded bool) WorkGroup {
return &parallelWorkGroup{}
}

// NewThrottledWorkGroup creates a WorkGroup that limits concurrent execution to GOMAXPROCS.
// If singleThreaded is true, all work runs sequentially in RunAndWait.
func NewThrottledWorkGroup(singleThreaded bool) WorkGroup {
if singleThreaded {
return &singleThreadedWorkGroup{}
}
return &throttledWorkGroup{
sem: make(chan struct{}, runtime.GOMAXPROCS(0)),
}
}

// Semaphore limits concurrent access to a resource.
// A nil *Semaphore is valid and performs no limiting;
// callers are expected to arrange sequential execution themselves
// when no semaphore is provided.
type Semaphore struct {
ch chan struct{}
}

// NewSemaphore creates a Semaphore sized to GOMAXPROCS.
// Returns nil when singleThreaded is true, as the caller's WorkGroup
// already ensures sequential execution.
func NewSemaphore(singleThreaded bool) *Semaphore {
if singleThreaded {
return nil
}
return &Semaphore{ch: make(chan struct{}, runtime.GOMAXPROCS(0))}
}

// Acquire blocks until a slot is available.
func (s *Semaphore) Acquire() {
if s != nil {
s.ch <- struct{}{}
}
}

// Release frees a slot.
func (s *Semaphore) Release() {
if s != nil {
<-s.ch
}
}

type parallelWorkGroup struct {
done atomic.Bool
wg sync.WaitGroup
Expand Down Expand Up @@ -88,6 +132,31 @@ func (w *singleThreadedWorkGroup) pop() func() {
return fn
}

type throttledWorkGroup struct {
done atomic.Bool
wg sync.WaitGroup
sem chan struct{}
}

var _ WorkGroup = (*throttledWorkGroup)(nil)

func (w *throttledWorkGroup) Queue(fn func()) {
if w.done.Load() {
panic("Queue called after RunAndWait returned")
}

w.wg.Go(func() {
w.sem <- struct{}{}
defer func() { <-w.sem }()
fn()
})
}

func (w *throttledWorkGroup) RunAndWait() {
defer w.done.Store(true)
w.wg.Wait()
}

// ThrottleGroup is like errgroup.Group but with global concurrency limiting via a semaphore.
type ThrottleGroup struct {
semaphore chan struct{}
Expand Down
Loading