Skip to content

Claude/async typed object factory cz yhz#29

Open
martypitt wants to merge 6 commits intodevelopfrom
claude/async-typed-object-factory-CZYhz
Open

Claude/async typed object factory cz yhz#29
martypitt wants to merge 6 commits intodevelopfrom
claude/async-typed-object-factory-CZYhz

Conversation

@martypitt
Copy link
Contributor

No description provided.

martypitt and others added 4 commits February 20, 2026 06:34
All runBlocking {} call sites in TypedObjectFactory (handleTypeNotFound,
evaluateLambdaExpression, queryForParentType) now execute on a dedicated
blockingBridgeDispatcher backed by an unbounded cached thread pool.

Under concurrent load, multiple factory instances building simultaneously
would exhaust DefaultDispatcher workers — each blocked in runBlocking
waiting for coroutines that themselves needed DefaultDispatcher threads
to execute. The system would stall until restart.

This is a holding measure. The correct fix is to make TypedObjectFactory
fully async (suspend functions throughout).

More detail in original issue.

See ORB-1077
…uspend

Remove all runBlocking calls from TypedObjectFactory and its entire call
chain to eliminate DefaultDispatcher thread pool starvation under
concurrent load. The blockingBridgeDispatcher workaround is also removed.

Key changes:
- TypedObjectFactory: all methods now suspend, lazy field initializers
  replaced with HashMap cache, parallelStream replaced with coroutineScope
- Interfaces made suspend: EvaluationValueSupplier.getValue/readAccessor,
  ValueProjector.project, AccessorHandler.process, InPlaceQueryEngine.evaluate,
  FunctionInvoker.invoke, DeferredTypedInstance.evaluate
- AccessorReader: all read/evaluate methods now suspend
- All 24 NullSafeInvoker implementations: doInvoke now suspend
- All 7 direct NamedFunctionInvoker implementations: invoke now suspend
- Collection stdlib functions (filter, map, fold, reduce, groupBy)
  rewritten from non-suspend lambdas to explicit for loops
- External callers (TypedObject.fromValue, Vyne.evaluate, CSV parsers)
  use runBlocking at the call site as bridge pattern

https://claude.ai/code/session_016NsNPSNfFCADAi4RsyAD4F
…pend further

Fixes compilation errors discovered when building the async TypedObjectFactory
refactoring on a fresh checkout:

- Make xtimed() and timeBucket() inline so suspend calls work inside their lambdas
- Propagate suspend through: newFactory(), withAdditionalScopedFacts(),
  LogicalExpressionEvaluator, ConstraintEvaluation extension functions
- Propagate suspend through taxiql-query-engine: DirectServiceInvocationStrategy,
  QueryOperationInvocationStrategy, TaxiQlGrammarQueryBuilder, Vyne
- Convert non-suspend collection lambdas (fold, associateWith, mapNotNull, map)
  to explicit for loops where they call suspend functions
- Delete dead code DefaultConstraintEvaluator (never instantiated, can't compile
  with suspend Constraint.evaluate extension)
- Fix test compilation with runBlocking at test entry points

Status: mvn compile succeeds. Tests in vyne-core-types and taxiql-query-engine
pass. Remaining modules not yet tested (functions-binding, stream-engine,
schema-server-core may have additional test compilation fixes needed).

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
@martypitt
Copy link
Contributor Author

@copilot review this PR.

The goal is to undertake the broader refactoring to get from blocking to async using coroutines.

Here's a summary of the issue (taken from the codebase):

   /*
    * THREADING MODEL - READ BEFORE MODIFYING
    * More detail in ORB-1077
    *
    * Background
    * ----------
    * TypedObjectFactory uses per-field `lazy` delegates (LazyThreadSafetyMode.SYNCHRONIZED)
    * to handle cross-field dependencies during expression evaluation (e.g. `total = quantity * price`).
    * This was intentional: ConcurrentHashMap.computeIfAbsent is not re-entrant, so lazy was chosen
    * to allow safe recursive field access within a single factory instance.
    *
    * The problem
    * -----------
    * Several methods (handleTypeNotFound, evaluateLambdaExpression, queryForParentType) bridge
    * async operations back to synchronous code via runBlocking { }. When these are invoked from
    * a DefaultDispatcher worker thread, the worker blocks waiting for a coroutine to complete.
    * That coroutine also needs a DefaultDispatcher thread to run. Under concurrent load (multiple
    * factory instances building simultaneously), all DefaultDispatcher workers become blocked,
    * the coroutines they're waiting on can never be scheduled, and the system stalls indefinitely.
    * This was confirmed by a production thread dump showing all DefaultDispatcher workers parked
    * in runBlocking { } inside handleTypeNotFound, each holding a lazy lock on a different
    * factory instance.
    *
    * Short-term fix
    * --------------
    * All runBlocking { } call sites in this class use a dedicated blockingBridgeDispatcher
    * (unbounded cached thread pool) rather than inheriting the calling thread's dispatcher.
    * This moves the blocking onto threads outside the DefaultDispatcher pool, freeing coroutine
    * workers to complete the async operations being waited on.
    *
    * Why this is still a sticking plaster
    * -------------------------------------
    * The calling thread is still blocked — we've just moved the blockage off the bounded pool.
    * Under sustained high concurrency the bridge pool will grow unboundedly (one thread per
    * concurrent blocked call). This is acceptable in practice because it is bounded by upstream
    * request concurrency, but it is not a principled fix.
    *
    * The correct long-term fix
    * -------------------------
    * Make the entire call chain properly async: replace the lazy + runBlocking pattern with
    * suspend functions (or Deferred), propagating suspension all the way up through getValue(),
    * buildField(), getOrBuild(), and buildAsync(). This eliminates thread blocking entirely —
    * coroutines suspend and yield their workers back to the pool while waiting. This requires
    * changes to the classes that call into TypedObjectFactory and is a non-trivial refactor,
    * hence the bridge approach in the interim.
    */
    ```
    
This PR is attempting the "correct long-term fix" mentioned there.

There are a few changes relating to UI in the PR which just needs rebasing, we can do that later. But, pay close attention to the migration from blockgin to non-blocking, and look for correctness, or any issues that may be lurking here. This is a major refactor of a core part of our system.

Copy link

Copilot AI commented Feb 20, 2026

@martypitt I've opened a new pull request, #30, to work on those changes. Once the pull request is ready, I'll request review from you.

Copy link

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR broadly migrates core “typed object” construction, function invocation, and query evaluation paths to Kotlin coroutines (suspend) to remove internal blocking and address deadlock scenarios, while also extending query-graph reachability for no-arg services and adding some operational/UI improvements.

Changes:

  • Convert core model building / accessor evaluation / function invocation APIs to suspend, pushing runBlocking to the outermost call sites.
  • Replace blocking/parallel constructs (eg. parallelStream, internal runBlocking) with coroutine-based implementations and add deadlock regression tests.
  • Improve query graph traversal for no-arg operations and add UI handling for null/empty results plus repo lifecycle cleanup.

Reviewed changes

Copilot reviewed 83 out of 83 changed files in this pull request and generated 8 comments.

Show a summary per file
File Description
vyne-csv-utils/src/main/java/com/orbitalhq/formats/csv/CsvImporterUtil.kt Wraps TypedObjectFactory.build() with runBlocking at call site after build() became suspend.
vyne-core-types/src/main/java/com/orbitalhq/query/caching/AbstractMergingStateStore.kt Wraps sum-type build in runBlocking due to suspend factory build.
vyne-core-types/src/main/java/com/orbitalhq/models/functions/stdlib/transform/ToRawType.kt Makes invoker doInvoke suspend.
vyne-core-types/src/main/java/com/orbitalhq/models/functions/stdlib/transform/Convert.kt Makes doInvoke suspend; adds caching + conversion logic (but uses runBlocking inside cache lambda).
vyne-core-types/src/main/java/com/orbitalhq/models/functions/stdlib/math/Round.kt Makes invoker doInvoke suspend.
vyne-core-types/src/main/java/com/orbitalhq/models/functions/stdlib/math/Average.kt Makes invoker doInvoke suspend.
vyne-core-types/src/main/java/com/orbitalhq/models/functions/stdlib/errors/Throw.kt Makes invoke suspend.
vyne-core-types/src/main/java/com/orbitalhq/models/functions/stdlib/dates/ParseDate.kt Makes invoker doInvoke suspend.
vyne-core-types/src/main/java/com/orbitalhq/models/functions/stdlib/dates/Now.kt Makes invoker doInvoke suspend.
vyne-core-types/src/main/java/com/orbitalhq/models/functions/stdlib/dates/BaseDateMathFunction.kt Makes invoker doInvoke suspend.
vyne-core-types/src/main/java/com/orbitalhq/models/functions/stdlib/collections/SingleBy.kt Makes invoker suspend; rewrites grouping loop (but includes an unused re-evaluation).
vyne-core-types/src/main/java/com/orbitalhq/models/functions/stdlib/collections/Single.kt Makes invoker doInvoke suspend.
vyne-core-types/src/main/java/com/orbitalhq/models/functions/stdlib/collections/OrEmpty.kt Makes invoke suspend.
vyne-core-types/src/main/java/com/orbitalhq/models/functions/stdlib/collections/ListOf.kt Makes invoker doInvoke suspend.
vyne-core-types/src/main/java/com/orbitalhq/models/functions/stdlib/collections/JoinToString.kt Makes invoke suspend.
vyne-core-types/src/main/java/com/orbitalhq/models/functions/stdlib/collections/Intersection.kt Makes invoker doInvoke suspend.
vyne-core-types/src/main/java/com/orbitalhq/models/functions/stdlib/collections/IndexOf.kt Makes invoker doInvoke suspend.
vyne-core-types/src/main/java/com/orbitalhq/models/functions/stdlib/collections/IfEmpty.kt Makes invoker doInvoke suspend.
vyne-core-types/src/main/java/com/orbitalhq/models/functions/stdlib/collections/First.kt Makes invoker doInvoke suspend.
vyne-core-types/src/main/java/com/orbitalhq/models/functions/stdlib/collections/FilterEach.kt Makes invoker doInvoke suspend.
vyne-core-types/src/main/java/com/orbitalhq/models/functions/stdlib/collections/Filter.kt Makes invoker doInvoke suspend.
vyne-core-types/src/main/java/com/orbitalhq/models/functions/stdlib/collections/CollectionPredicateFunctions.kt Refactors predicate evaluation to suspend + explicit loops / Either handling.
vyne-core-types/src/main/java/com/orbitalhq/models/functions/stdlib/collections/CollectionFilteringFunction.kt Makes filtering/predicate evaluation suspend; refactors to loops for early-exit error propagation.
vyne-core-types/src/main/java/com/orbitalhq/models/functions/stdlib/collections/Append.kt Makes invoker doInvoke suspend.
vyne-core-types/src/main/java/com/orbitalhq/models/functions/stdlib/Strings.kt Makes string stdlib invokers suspend.
vyne-core-types/src/main/java/com/orbitalhq/models/functions/stdlib/ObjectFunctions.kt Makes object stdlib invokers suspend.
vyne-core-types/src/main/java/com/orbitalhq/models/functions/stdlib/Functional.kt Makes functional stdlib invokers suspend; refactors loops (but Reduce remains unimplemented and throws).
vyne-core-types/src/main/java/com/orbitalhq/models/functions/stdlib/EnumFunctions.kt Makes enum stdlib invokers suspend.
vyne-core-types/src/main/java/com/orbitalhq/models/functions/stdlib/Collections.kt Makes collection stdlib invokers suspend.
vyne-core-types/src/main/java/com/orbitalhq/models/functions/stdlib/BaseMathFunction.kt Makes math base invoker suspend.
vyne-core-types/src/main/java/com/orbitalhq/models/functions/Functions.kt Converts core invoker interfaces (FunctionInvoker, NullSafeInvoker) to suspend.
vyne-core-types/src/main/java/com/orbitalhq/models/functions/FunctionRegistry.kt Makes registry invoke suspend.
vyne-core-types/src/main/java/com/orbitalhq/models/constraints/DefaultConstraintEvaluator.kt Removes default evaluator implementation.
vyne-core-types/src/main/java/com/orbitalhq/models/constraints/ConstraintEvaluation.kt Makes constraint evaluation extensions suspend.
vyne-core-types/src/main/java/com/orbitalhq/models/conditional/WhenBlockEvaluator.kt Makes when-block evaluation suspend.
vyne-core-types/src/main/java/com/orbitalhq/models/conditional/LogicalExpressionEvaluator.kt Makes logical expression evaluation suspend; refactors loop.
vyne-core-types/src/main/java/com/orbitalhq/models/conditional/ConditionalFieldSetEvaluator.kt Makes evaluator suspend (still TODO).
vyne-core-types/src/main/java/com/orbitalhq/models/ValueProjector.kt Makes project suspend.
vyne-core-types/src/main/java/com/orbitalhq/models/TypedObjectFactory.kt Large coroutine migration: suspend build/read/eval paths; replace parallelStream with coroutines; add field cache map.
vyne-core-types/src/main/java/com/orbitalhq/models/TypedObject.kt Wraps suspend factory build in runBlocking for sync factory method.
vyne-core-types/src/main/java/com/orbitalhq/models/ProjectionFunctionScopeEvaluator.kt Makes scope build suspend; removes internal runBlocking.
vyne-core-types/src/main/java/com/orbitalhq/models/InPlaceQueryEngine.kt Makes evaluate suspend.
vyne-core-types/src/main/java/com/orbitalhq/models/FactBagValueSupplier.kt Makes value supplier methods suspend.
vyne-core-types/src/main/java/com/orbitalhq/models/EvaluationValueSupplier.kt Makes supplier interface methods suspend.
vyne-core-types/src/main/java/com/orbitalhq/models/DeferredExpression.kt Makes deferred evaluation suspend.
vyne-core-types/src/main/java/com/orbitalhq/models/AccessorReader.kt Converts accessor processing/reads/evaluation to suspend; removes internal runBlocking.
vyne-core-types/src/main/java/com/orbitalhq/formats/csv/CsvCollectionParser.kt Wraps parsing/build in runBlocking due to suspend build.
utils/src/main/java/com/orbitalhq/utils/Timed.kt Marks xtimed inline.
utils/src/main/java/com/orbitalhq/utils/TimeBucketed.kt Marks timeBucket inline.
taxiql-query-engine/src/test/java/com/orbitalhq/query/queryBuilders/TaxiQlGrammarQueryBuilderTest.kt Wraps suspend builder call in runBlocking.
taxiql-query-engine/src/test/java/com/orbitalhq/query/QueryOperationInvocationStrategyTest.kt Wraps new suspend strategy calls in runBlocking.
taxiql-query-engine/src/test/java/com/orbitalhq/models/functions/stdlib/StringsTest.kt Wraps new suspend invoker calls in runBlocking.
taxiql-query-engine/src/test/java/com/orbitalhq/models/ConditionalFieldReaderTest.kt Wraps suspend factory build in runBlocking.
taxiql-query-engine/src/test/java/com/orbitalhq/TypedObjectFactoryDeadlockSpec.kt Adds deadlock regression tests using timeouts and concurrent construction.
taxiql-query-engine/src/test/java/com/orbitalhq/ProjectionsSpec.kt Adds projection regression test for no-arg service reachability.
taxiql-query-engine/src/test/java/com/orbitalhq/ProjectionNamedScopeTest.kt Updates test schema/query to include FilmId parameter on operation.
taxiql-query-engine/src/main/java/com/orbitalhq/schemas/Link.kt Adds IS_NO_ARG_SERVICE relationship enum value.
taxiql-query-engine/src/main/java/com/orbitalhq/query/queryBuilders/TaxiQlGrammarQueryBuilder.kt Makes query building suspend; refactors loops + variable resolution to suspend.
taxiql-query-engine/src/main/java/com/orbitalhq/query/queryBuilders/QueryGrammarQueryBuilder.kt Makes query builder interface suspend.
taxiql-query-engine/src/main/java/com/orbitalhq/query/projection/LocalProjectionProvider.kt Makes scoped projection fact build suspend.
taxiql-query-engine/src/main/java/com/orbitalhq/query/policyManager/PolicyEvaluator.kt Makes policy evaluation suspend.
taxiql-query-engine/src/main/java/com/orbitalhq/query/policyManager/PolicyAwareOperationInvocationServiceDecorator.kt Makes policy instruction application suspend.
taxiql-query-engine/src/main/java/com/orbitalhq/query/graph/edges/ParameterFactory.kt Removes internal runBlocking; refactors collection parameter build loop.
taxiql-query-engine/src/main/java/com/orbitalhq/query/graph/edges/EdgeEvaluators.kt Adds evaluator for new no-arg service relationship.
taxiql-query-engine/src/main/java/com/orbitalhq/query/graph/VyneGraphBuilder.kt Adds graph edge for “no-arg service” reachability (but current condition also matches primitive-only params).
taxiql-query-engine/src/main/java/com/orbitalhq/query/graph/SimplifiedSearchPaths.kt Treats no-arg service relationship as non-navigational in simplified paths.
taxiql-query-engine/src/main/java/com/orbitalhq/query/collections/CollectionProjectionBuilder.kt Converts collection projection processing to suspend; removes internal runBlocking.
taxiql-query-engine/src/main/java/com/orbitalhq/query/QueryOperationInvocationStrategy.kt Makes candidate query op selection suspend; refactors to loops.
taxiql-query-engine/src/main/java/com/orbitalhq/query/QueryEngineFactory.kt Registers new no-arg service edge evaluator.
taxiql-query-engine/src/main/java/com/orbitalhq/query/QueryContext.kt Makes expression evaluation suspend.
taxiql-query-engine/src/main/java/com/orbitalhq/query/ObjectBuilder.kt Makes expression-type evaluation suspend.
taxiql-query-engine/src/main/java/com/orbitalhq/query/DirectServiceInvocationStrategy.kt Makes candidate service selection suspend; refactors loops.
taxiql-query-engine/src/main/java/com/orbitalhq/Vyne.kt Makes context/expression building suspend; refactors fact evaluation; wraps one factory build in runBlocking.
schema-server-core/src/test/java/com/orbitalhq/schemaServer/core/repositories/lifecycle/ReactiveProjectStoreManagerTest.kt Adds tests for git spec removal and subscription disposal.
schema-server-core/src/main/java/com/orbitalhq/schemaServer/core/repositories/lifecycle/ReactiveProjectStoreManager.kt Adds git spec removed subscription and working-dir cleanup; disposes more subscriptions.
query-node-core/src/main/java/com/orbitalhq/query/runtime/core/monitor/ActiveQueryMonitor.kt Reduces logged query payload (logs query source).
pom.xml Bumps Spring Boot parent; pins lz4-java version property + dependency.
pipelines/stream-engine/src/main/java/com/orbitalhq/pipelines/jet/streams/ResultStreamAuthorizationDecorator.kt Re-evaluates policies for stream items (but uses runBlocking in Reactor map).
orbital-ui/src/app/results-table/results-table.component.ts Adds UI empty-result message + hides grid when null result detected.
orbital-ui/src/app/results-table/results-table.component.scss Styles for empty-result message; adds common import.
function-loading/functions-binding/src/main/java/com/orbitalhq/functions/scanner/BoundFunction.kt Makes bound function invoker suspend.
cockpit-core/src/main/java/com/orbitalhq/cockpit/core/security/authentication/oidc/PropelAuthApiKeyValidator.kt Schedules API-key validation on boundedElastic (but ends with unsafe cast).
CLAUDE.md Adds contributor instructions (Maven + testing expectations).
Comments suppressed due to low confidence (1)

vyne-core-types/src/main/java/com/orbitalhq/models/functions/stdlib/Functional.kt:167

  • Reduce.invoke currently contains TODO() and ends with TODO("Not yet implemented"), meaning this stdlib function will always throw at runtime. Either implement the reduce fully (and add/adjust tests) or remove it from the exported function list until it’s complete.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines 155 to 159
var acc: TypedInstance = sourceCollection.first()
for (i in 1 until sourceCollection.size) {
val typedInstance = sourceCollection.toList()[i]
val reader = AccessorReader.forFacts(listOf(acc, typedInstance), schema)
val evaluated =
reader.evaluate(typedInstance, expressionReturnType, expression, dataSource = dataSource, format = null)
evaluated
acc = reader.evaluate(typedInstance, expressionReturnType, expression, dataSource = dataSource, format = null)
Copy link

Copilot AI Feb 20, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Inside the reduce loop, sourceCollection.toList()[i] rebuilds a list on every iteration, turning the operation into O(n²) for non-list-backed collections. Convert once outside the loop (or iterate directly with an iterator) to keep this O(n).

Copilot uses AI. Check for mistakes.
Comment on lines 50 to 54
val evaluated = deferredInstance.evaluate(collectionMember, dataSource, factBag, functionResultCache = resultCache)
if (evaluated is TypedNull) {
deferredInstance.evaluate(collectionMember, dataSource, factBag, functionResultCache = resultCache)
}
evaluated
grouped.getOrPut(evaluated) { mutableListOf() }.add(collectionMember)
Copy link

Copilot AI Feb 20, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The second deferredInstance.evaluate(...) call when evaluated is TypedNull discards its result, so it adds extra work but doesn’t change behavior (the key remains TypedNull). If the intent is to retry, assign the result back (or remove the retry entirely) so the extra evaluation is meaningful.

Copilot uses AI. Check for mistakes.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see why this change has been proposed. It should just return evaluated, like in the original. Please revert

Comment on lines 254 to 255
workingDir.toFile().deleteRecursively()
logger.info { "Successfully deleted git working directory at $workingDir" }
Copy link

Copilot AI Feb 20, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

deleteRecursively() returns a boolean indicating success, but the result is ignored and the code logs "Successfully deleted" unconditionally. Please check the return value (and/or verify the directory no longer exists) so failures are detected and logged accurately.

Suggested change
workingDir.toFile().deleteRecursively()
logger.info { "Successfully deleted git working directory at $workingDir" }
val deleted = workingDir.toFile().deleteRecursively()
if (deleted || !java.nio.file.Files.exists(workingDir)) {
logger.info { "Successfully deleted git working directory at $workingDir" }
} else {
logger.warn { "Failed to completely delete git working directory at $workingDir, but continuing. Directory may still exist." }
}

Copilot uses AI. Check for mistakes.
Comment on lines 47 to 52
return stream.mapNotNull { value ->
// first, parse back to a typed instance
val valueAsTypedInstance = TypedInstance.from(instanceType, value, querySchema, source = Provided)
val evaluatedTypedInstance = policyEvaluator.evaluate(valueAsTypedInstance, queryContext, executionScope)
val evaluatedTypedInstance = kotlinx.coroutines.runBlocking { policyEvaluator.evaluate(valueAsTypedInstance, queryContext, executionScope) }
// Convert back to a raw object, since that's what we started with
evaluatedTypedInstance.toRawObject()
Copy link

Copilot AI Feb 20, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Calling runBlocking inside Flux.mapNotNull will block Reactor threads while policies are evaluated, which can stall the entire stream pipeline under load. Prefer a non-blocking composition (eg. flatMap { mono { policyEvaluator.evaluate(...) } } / Mono.fromCallable { ... }.subscribeOn(...)) so evaluation happens asynchronously without blocking the reactive scheduler.

Copilot uses AI. Check for mistakes.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Resolved

// jwt
// ))
}
}.subscribeOn(Schedulers.boundedElastic()) as Mono<Authentication>
Copy link

Copilot AI Feb 20, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The as Mono<Authentication> cast after subscribeOn(...) is a code smell and can mask type inference issues. Mono.create should be parameterized as Mono.create<Authentication> { ... } (or use Mono.fromCallable/Mono.defer) so the method returns a correctly typed Mono<Authentication> without an unsafe cast.

Copilot uses AI. Check for mistakes.
Comment on lines 238 to 251
val projectedFieldValue = if (valueToProject is TypedCollection && targetType.isCollection) {
// Project each member of the collection seperately
valueToProject
.parallelStream()
.map { collectionMember ->
newFactory(
targetType.collectionType!!,
collectionMember,
scopedArguments = projection.projectionFunctionScope
)
.build()
}.collect(Collectors.toList())
.let { projectedCollection ->
// Use arrayOf (instead of from), as the collection may be empty, so we want to be explicit about it's type
TypedCollection.arrayOf(targetType.collectionType!!, projectedCollection, source)
}
// Project each member of the collection using coroutines instead of parallelStream
coroutineScope {
valueToProject.map { collectionMember ->
async {
newFactory(
targetType.collectionType!!,
collectionMember,
scopedArguments = projection.projectionFunctionScope
)
.build()
}
}.awaitAll()
}.let { projectedCollection ->
Copy link

Copilot AI Feb 20, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

coroutineScope { valueToProject.map { async { ... } }.awaitAll() } launches one coroutine per collection element. For large collections this can create very high concurrency, memory pressure, and unfair scheduling compared to the previous bounded parallelStream. Consider bounding parallelism (eg. chunking, a Semaphore, or Dispatchers.Default.limitedParallelism(n)) so projections scale predictably.

Copilot uses AI. Check for mistakes.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Implemented

Comment on lines 50 to 64
val converted = resultCache.getOrPut(resultCacheKey) {
if (targetType.isCollection && source is Collection<*>) {
val typedInstances = source.map { member ->
TypedObjectFactory(targetType.collectionType!!, FactBag.of(member as TypedInstance, schema), schema, source = dataSource, functionRegistry = schema.functionRegistry)
kotlinx.coroutines.runBlocking {
if (targetType.isCollection && source is Collection<*>) {
val typedInstances = source.map { member ->
TypedObjectFactory(targetType.collectionType!!, FactBag.of(member as TypedInstance, schema), schema, source = dataSource, functionRegistry = schema.functionRegistry)
.build()
.convertToRawTypeIfRequired()
}
TypedCollection.arrayOf(targetType.collectionType!!, typedInstances, dataSource)
} else {
TypedObjectFactory(targetType, FactBag.of(source, schema), schema, source = dataSource, functionRegistry = schema.functionRegistry)
.build()
.convertToRawTypeIfRequired()
}
TypedCollection.arrayOf(targetType.collectionType!!, typedInstances, dataSource)
} else {
TypedObjectFactory(targetType, FactBag.of(source, schema), schema, source = dataSource, functionRegistry = schema.functionRegistry)
.build()
.convertToRawTypeIfRequired()
}
Copy link

Copilot AI Feb 20, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

doInvoke is now suspend, but the cached computation uses runBlocking inside resultCache.getOrPut { ... }. This blocks a thread while waiting for suspend work (eg. TypedObjectFactory.build()), which can reintroduce the deadlock this PR is trying to avoid and hurts coroutine scalability. Consider replacing getOrPut with an explicit cache lookup + suspend computation + cache write (or store Deferred/Lazy values) so the computation stays non-blocking.

Copilot uses AI. Check for mistakes.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

operation.parameters
.filter { !it.type.isPrimitive }
val nonPrimitiveParams = operation.parameters.filter { !it.type.isPrimitive }
if (nonPrimitiveParams.isEmpty()) {
Copy link

Copilot AI Feb 20, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nonPrimitiveParams.isEmpty() is not the same as a no-arg operation: operations with only primitive parameters (or primitive params with defaults) will hit this branch and be linked from STARTING_ELEMENT, allowing the graph to reach/invoke them without satisfying required parameters. This should likely check operation.parameters.isEmpty() instead, while still skipping primitive parameter edges as intended.

Suggested change
if (nonPrimitiveParams.isEmpty()) {
if (operation.parameters.isEmpty()) {

Copilot uses AI. Check for mistakes.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants