Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ZiplineScope #769

Closed
swankjesse opened this issue Dec 13, 2022 · 5 comments
Closed

ZiplineScope #769

swankjesse opened this issue Dec 13, 2022 · 5 comments

Comments

@swankjesse
Copy link
Contributor

It’s difficult bookkeeping to call ZiplineService.close() all the places it’s necessary to.

Flow<T> is even worse. I’ve found a case where a Flow is never collected, and that causes it to leak. (We currently close flows after the first collect() completes; this is painful for both never collecting and also multiple collecting).

I propose the following API:

/**
 * Closes [ziplineService] when this scope completes (either through success,
 * failure, or cancelation). This will also attach all services returned from
 * [ziplineService] to this scope.
 *
 * This requires that [ziplineService] is a bridged proxy, either via
 * [Zipline.take], as a called function’s return value, or an implemented
 * function’s parameter value.
 */
fun CoroutineScope.attach(ziplineService: ZiplineService) {
  ...
}

One place this falls short is Flow<T>, which isn’t a ZiplineService. Worse, it’s difficult to get the ZiplineService from a Flow<T>.

We could do magic for inbound calls:

/**
 * Attaches all inbound services to this scope, so they will be closed
 * when this scope completes. Attached services will either be parameters
 * to this function, or properties of the parameters to this function.
 * 
 * This must be called in the body of a ZiplineService function.
 */
fun CoroutineScope.attachZiplineInboundCall() {
  ...
}

There’s a lot of room to experiment with syntax here, and I’m not particularly attached to the proposals above. But I do think there’s something handy to linking ZiplineService.close() to a coroutine scope.

@JakeWharton
Copy link
Member

The light/loose convention for placing things into a CoroutineScope's lifecycle is to have an "In"-suffixed method. In coroutines itself you see things like shareIn or launchIn. In Turbine we have testIn.

So what about doing something like takeIn<T>(scope) where that service and all sub-services become bound to a scope?

@swankjesse
Copy link
Contributor Author

Discussed with @JakeWharton offline.

One thing we decided is it’s probably best to create our own scope abstraction instead of borrowing the one from coroutines. That’ll feel lighter and likely be simpler to implement.

We also decided that this works a bit like ownership tagging in a garbage collector, and it’s possible we can borrow ideas from that domain.

We should be careful that the scope shouldn‘t artificially extend the visibility of a service if it’s closed manually with close().

@swankjesse
Copy link
Contributor Author

swankjesse commented Dec 15, 2022

Here’s a start.

/**
 * If you implement this on your (inbound) ZiplineService, any services passed as
 * parameters to it by Zipline will attach to this service’s scope.
 *
 * All (outbound) services produced by Zipline will implement this interface. 
 * Closing the scope will close all services in that scope and all services in child
 * scopes.
 */
interface ZiplineScoped {
  val scope: ZiplineScope
}
/**
 * A set of services that can be closed as a unit. Closing a parent scope
 * automatically closes all child scopes.
 */
class ZiplineScope(
  parent: ZiplineScope? = null,
) {
  fun close() {
  }
}

@swankjesse
Copy link
Contributor Author

I’ve got an implementation idea that might work. I’ll assume no parent scopes as a simplification.

The scope class is an ID wrapper that keeps track of which endpoints it has services on:

class ZiplineScope(
  parent: ZiplineScope? = null,
) {
  internal var isClosed = false

  internal val id = nextScopeId++

  /** Endpoints that have services on this scope. */
  internal val endpoints = mutableListOf<Endpoint>()

  fun close() {
    if (isClosed) return
    isClosed = true

    for (endpoint in endpoints) {
      endpoint.scopeClosed(id)
    }
    endpoints.clear()
  }

  private companion object {
    var nextScopeId = 1
  }
}

When doing a pass-by-reference, we take a scope ID from the bind() call and add it to the InboundService:

internal class InboundService<T : ZiplineService>(
  internal val service: T,
  internal val scopeId: Int,
  private val endpoint: Endpoint,
  functionsList: List<ZiplineFunction<T>>,
) {
}

Finally, Endpoint.scopeClosed() tells the peer endpoint to iterate its inbound services and close 'em if the scopes match.

fun scopeClosed(id: Int) {
  for (inboundService in inboundServices.values) {
    if (inboundService.scopeId == id) inboundService.close()
  }
}

(I’m not 100% sure I’ve got inbound/outbound all exactly right here! But I really like the option to make scopes IDs; it avoids all kinds of potential problems where scopes end up preventing services from being GC'd.)

@swankjesse swankjesse changed the title CoroutineScope.attach() ZiplineScope Dec 15, 2022
@swankjesse
Copy link
Contributor Author

The implementation with IDs didn’t work 'cause the endpoint that knows the IDs doesn’t have a list of services to enumerate. But I got something that works regardless.

With these PRs we’ve got a simple implementation that works nicely.

We’ve got a few remaining follow-ups, which I’ll create as separate tracking issues.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants