From af0118ad6409bbe54bc5d0f8347458ea68703946 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E3=81=AA=E3=81=A4=E3=81=8D?= Date: Fri, 1 Sep 2023 16:09:06 -0700 Subject: [PATCH] Improve `sass --embedded` performance (#2013) Co-authored-by: Natalie Weizenbaum --- CHANGELOG.md | 8 + lib/src/embedded/dispatcher.dart | 221 +++++++++--------- lib/src/embedded/host_callable.dart | 6 +- lib/src/embedded/importer/file.dart | 48 ++-- lib/src/embedded/importer/host.dart | 64 +++-- lib/src/embedded/isolate_dispatcher.dart | 153 ++++-------- lib/src/embedded/reusable_isolate.dart | 142 +++++++++++ .../util/explicit_close_transformer.dart | 38 --- pubspec.yaml | 3 +- 9 files changed, 363 insertions(+), 320 deletions(-) create mode 100644 lib/src/embedded/reusable_isolate.dart delete mode 100644 lib/src/embedded/util/explicit_close_transformer.dart diff --git a/CHANGELOG.md b/CHANGELOG.md index 49739cdc2..1666f753f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,11 @@ +## 1.66.2 + +### Embedded Sass + +* Substantially improve the embedded compiler's performance when compiling many + files or files that require many importer or function call round-trips with + the embedded host. + ## 1.66.1 ### JS API diff --git a/lib/src/embedded/dispatcher.dart b/lib/src/embedded/dispatcher.dart index fae22b458..22df57fca 100644 --- a/lib/src/embedded/dispatcher.dart +++ b/lib/src/embedded/dispatcher.dart @@ -2,15 +2,15 @@ // MIT-style license that can be found in the LICENSE file or at // https://opensource.org/licenses/MIT. -import 'dart:async'; import 'dart:convert'; import 'dart:io'; +import 'dart:isolate'; import 'dart:typed_data'; +import 'package:native_synchronization/mailbox.dart'; import 'package:path/path.dart' as p; import 'package:protobuf/protobuf.dart'; import 'package:sass/sass.dart' as sass; -import 'package:stream_channel/stream_channel.dart'; import 'embedded_sass.pb.dart'; import 'function_registry.dart'; @@ -30,83 +30,69 @@ final _outboundRequestId = 0; /// A class that dispatches messages to and from the host for a single /// compilation. final class Dispatcher { - /// The channel of encoded protocol buffers, connected to the host. - final StreamChannel _channel; + /// The mailbox for receiving messages from the host. + final Mailbox _mailbox; + + /// The send port for sending messages to the host. + final SendPort _sendPort; /// The compilation ID for which this dispatcher is running. /// - /// This is added to outgoing messages but is _not_ parsed from incoming - /// messages, since that's already handled by the [IsolateDispatcher]. - final int _compilationId; + /// This is used in error messages. + late int _compilationId; /// [_compilationId], serialized as a varint. - final Uint8List _compilationIdVarint; - - /// Whether this dispatcher has received its compile request. - var _compiling = false; + /// + /// This is used in outgoing messages. + late Uint8List _compilationIdVarint; - /// A completer awaiting a response to an outbound request. + /// Whether we detected a [ProtocolError] while parsing an incoming response. /// - /// Since each [Dispatcher] is only running a single-threaded compilation, it - /// can only ever have one request outstanding. - Completer? _outstandingRequest; + /// If we have, we don't want to send the final compilation result because + /// it'll just be a wrapper around the error. + var _requestError = false; - /// Creates a [Dispatcher] that sends and receives encoded protocol buffers - /// over [channel]. - Dispatcher(this._channel, this._compilationId) - : _compilationIdVarint = serializeVarint(_compilationId); + /// Creates a [Dispatcher] that receives encoded protocol buffers through + /// [_mailbox] and sends them through [_sendPort]. + Dispatcher(this._mailbox, this._sendPort); /// Listens for incoming `CompileRequests` and runs their compilations. - /// - /// This may only be called once. Returns whether or not the compilation - /// succeeded. - Future listen() async { - var success = false; - await _channel.stream.listen((binaryMessage) async { - // Wait a single microtask tick so that we're running in a separate - // microtask from the initial request dispatch. Otherwise, [waitFor] will - // deadlock the event loop fiber that would otherwise be checking stdin - // for new input. - await Future.value(); + void listen() { + do { + var packet = _mailbox.take(); + if (packet.isEmpty) break; try { - InboundMessage? message; + var (compilationId, messageBuffer) = parsePacket(packet); + + _compilationId = compilationId; + _compilationIdVarint = serializeVarint(compilationId); + + InboundMessage message; try { - message = InboundMessage.fromBuffer(binaryMessage); + message = InboundMessage.fromBuffer(messageBuffer); } on InvalidProtocolBufferException catch (error) { throw parseError(error.message); } switch (message.whichMessage()) { - case InboundMessage_Message.versionRequest: - throw paramsError("VersionRequest must have compilation ID 0."); - case InboundMessage_Message.compileRequest: - if (_compiling) { - throw paramsError( - "A CompileRequest with compilation ID $_compilationId is " - "already active."); - } - _compiling = true; - var request = message.compileRequest; - var response = await _compile(request); - _send(OutboundMessage()..compileResponse = response); - success = true; - // Each Dispatcher runs a single compilation and then closes. - _channel.sink.close(); - - case InboundMessage_Message.canonicalizeResponse: - _dispatchResponse(message.id, message.canonicalizeResponse); - - case InboundMessage_Message.importResponse: - _dispatchResponse(message.id, message.importResponse); + var response = _compile(request); + if (!_requestError) { + _send(OutboundMessage()..compileResponse = response); + } - case InboundMessage_Message.fileImportResponse: - _dispatchResponse(message.id, message.fileImportResponse); + case InboundMessage_Message.versionRequest: + throw paramsError("VersionRequest must have compilation ID 0."); - case InboundMessage_Message.functionCallResponse: - _dispatchResponse(message.id, message.functionCallResponse); + case InboundMessage_Message.canonicalizeResponse || + InboundMessage_Message.importResponse || + InboundMessage_Message.fileImportResponse || + InboundMessage_Message.functionCallResponse: + throw paramsError( + "Response ID ${message.id} doesn't match any outstanding requests" + " in compilation $_compilationId."); case InboundMessage_Message.notSet: throw parseError("InboundMessage.message is not set."); @@ -115,16 +101,14 @@ final class Dispatcher { throw parseError( "Unknown message type: ${message.toDebugString()}"); } - } on ProtocolError catch (error, stackTrace) { - sendError(handleError(error, stackTrace)); - _channel.sink.close(); + } catch (error, stackTrace) { + _handleError(error, stackTrace); } - }).asFuture(); - return success; + } while (!_requestError); } - Future _compile( - InboundMessage_CompileRequest request) async { + OutboundMessage_CompileResponse _compile( + InboundMessage_CompileRequest request) { var functions = FunctionRegistry(); var style = request.style == OutputStyle.COMPRESSED @@ -159,7 +143,6 @@ final class Dispatcher { verbose: request.verbose, sourceMap: request.sourceMap, charset: request.charset); - break; case InboundMessage_CompileRequest_Input.path: if (request.path.isEmpty) { @@ -188,7 +171,6 @@ final class Dispatcher { ..end = SourceSpan_SourceLocation() ..url = p.toUri(request.path).toString())); } - break; case InboundMessage_CompileRequest_Input.notSet: throw mandatoryError("CompileRequest.input"); @@ -245,59 +227,86 @@ final class Dispatcher { void sendError(ProtocolError error) => _send(OutboundMessage()..error = error); - Future sendCanonicalizeRequest( + InboundMessage_CanonicalizeResponse sendCanonicalizeRequest( OutboundMessage_CanonicalizeRequest request) => _sendRequest( OutboundMessage()..canonicalizeRequest = request); - Future sendImportRequest( + InboundMessage_ImportResponse sendImportRequest( OutboundMessage_ImportRequest request) => _sendRequest( OutboundMessage()..importRequest = request); - Future sendFileImportRequest( + InboundMessage_FileImportResponse sendFileImportRequest( OutboundMessage_FileImportRequest request) => _sendRequest( OutboundMessage()..fileImportRequest = request); - Future sendFunctionCallRequest( + InboundMessage_FunctionCallResponse sendFunctionCallRequest( OutboundMessage_FunctionCallRequest request) => _sendRequest( OutboundMessage()..functionCallRequest = request); /// Sends [request] to the host and returns the message sent in response. - Future _sendRequest( - OutboundMessage request) async { - request.id = _outboundRequestId; - _send(request); - - if (_outstandingRequest != null) { - throw StateError( - "Dispatcher.sendRequest() can't be called when another request is " - "active."); - } + T _sendRequest(OutboundMessage message) { + message.id = _outboundRequestId; + _send(message); - return (_outstandingRequest = Completer()).future; - } + var packet = _mailbox.take(); - /// Dispatches [response] to the appropriate outstanding request. - /// - /// Throws an error if there's no outstanding request with the given [id] or - /// if that request is expecting a different type of response. - void _dispatchResponse(int? id, T response) { - var completer = _outstandingRequest; - _outstandingRequest = null; - if (completer == null || id != _outboundRequestId) { - throw paramsError( - "Response ID $id doesn't match any outstanding requests in " - "compilation $_compilationId."); - } else if (completer is! Completer) { - throw paramsError( - "Request ID $id doesn't match response type ${response.runtimeType} " - "in compilation $_compilationId."); + try { + var messageBuffer = + Uint8List.sublistView(packet, _compilationIdVarint.length); + + InboundMessage message; + try { + message = InboundMessage.fromBuffer(messageBuffer); + } on InvalidProtocolBufferException catch (error) { + throw parseError(error.message); + } + + var response = switch (message.whichMessage()) { + InboundMessage_Message.canonicalizeResponse => + message.canonicalizeResponse, + InboundMessage_Message.importResponse => message.importResponse, + InboundMessage_Message.fileImportResponse => message.fileImportResponse, + InboundMessage_Message.functionCallResponse => + message.functionCallResponse, + InboundMessage_Message.compileRequest => throw paramsError( + "A CompileRequest with compilation ID $_compilationId is already " + "active."), + InboundMessage_Message.versionRequest => + throw paramsError("VersionRequest must have compilation ID 0."), + InboundMessage_Message.notSet => + throw parseError("InboundMessage.message is not set."), + _ => + throw parseError("Unknown message type: ${message.toDebugString()}") + }; + + if (message.id != _outboundRequestId) { + throw paramsError( + "Response ID ${message.id} doesn't match any outstanding requests " + "in compilation $_compilationId."); + } else if (response is! T) { + throw paramsError( + "Request ID $_outboundRequestId doesn't match response type " + "${response.runtimeType} in compilation $_compilationId."); + } + + return response; + } catch (error, stackTrace) { + _handleError(error, stackTrace); + _requestError = true; + rethrow; } + } - completer.complete(response); + /// Handles an error thrown by the dispatcher or code it dispatches to. + /// + /// The [messageId] indicate the IDs of the message being responded to, if + /// available. + void _handleError(Object error, StackTrace stackTrace, {int? messageId}) { + sendError(handleError(error, stackTrace, messageId: messageId)); } /// Sends [message] to the host with the given [wireId]. @@ -306,16 +315,18 @@ final class Dispatcher { message.writeToCodedBufferWriter(protobufWriter); // Add one additional byte to the beginning to indicate whether or not the - // compilation is finished, so the [IsolateDispatcher] knows whether to - // treat this isolate as inactive. + // compilation has finished (1) or encountered a fatal error (2), so the + // [IsolateDispatcher] knows whether to treat this isolate as inactive or + // close out entirely. var packet = Uint8List( 1 + _compilationIdVarint.length + protobufWriter.lengthInBytes); - packet[0] = - message.whichMessage() == OutboundMessage_Message.compileResponse - ? 1 - : 0; + packet[0] = switch (message.whichMessage()) { + OutboundMessage_Message.compileResponse => 1, + OutboundMessage_Message.error => 2, + _ => 0 + }; packet.setAll(1, _compilationIdVarint); protobufWriter.writeTo(packet, 1 + _compilationIdVarint.length); - _channel.sink.add(packet); + _sendPort.send(packet); } } diff --git a/lib/src/embedded/host_callable.dart b/lib/src/embedded/host_callable.dart index bb1770ea4..448cce217 100644 --- a/lib/src/embedded/host_callable.dart +++ b/lib/src/embedded/host_callable.dart @@ -2,9 +2,6 @@ // MIT-style license that can be found in the LICENSE file or at // https://opensource.org/licenses/MIT. -// ignore: deprecated_member_use -import 'dart:cli'; - import '../callable.dart'; import '../exception.dart'; import 'dispatcher.dart'; @@ -37,8 +34,7 @@ Callable hostCallable( request.name = callable.name; } - // ignore: deprecated_member_use - var response = waitFor(dispatcher.sendFunctionCallRequest(request)); + var response = dispatcher.sendFunctionCallRequest(request); try { switch (response.whichResult()) { case InboundMessage_FunctionCallResponse_Result.success: diff --git a/lib/src/embedded/importer/file.dart b/lib/src/embedded/importer/file.dart index b945cba2e..8250515eb 100644 --- a/lib/src/embedded/importer/file.dart +++ b/lib/src/embedded/importer/file.dart @@ -2,9 +2,6 @@ // MIT-style license that can be found in the LICENSE file or at // https://opensource.org/licenses/MIT. -// ignore: deprecated_member_use -import 'dart:cli'; - import '../../importer.dart'; import '../dispatcher.dart'; import '../embedded_sass.pb.dart' hide SourceSpan; @@ -27,30 +24,27 @@ final class FileImporter extends ImporterBase { Uri? canonicalize(Uri url) { if (url.scheme == 'file') return _filesystemImporter.canonicalize(url); - // ignore: deprecated_member_use - return waitFor(() async { - var response = await dispatcher - .sendFileImportRequest(OutboundMessage_FileImportRequest() - ..importerId = _importerId - ..url = url.toString() - ..fromImport = fromImport); - - switch (response.whichResult()) { - case InboundMessage_FileImportResponse_Result.fileUrl: - var url = parseAbsoluteUrl("The file importer", response.fileUrl); - if (url.scheme != 'file') { - throw 'The file importer must return a file: URL, was "$url"'; - } - - return _filesystemImporter.canonicalize(url); - - case InboundMessage_FileImportResponse_Result.error: - throw response.error; - - case InboundMessage_FileImportResponse_Result.notSet: - return null; - } - }()); + var response = + dispatcher.sendFileImportRequest(OutboundMessage_FileImportRequest() + ..importerId = _importerId + ..url = url.toString() + ..fromImport = fromImport); + + switch (response.whichResult()) { + case InboundMessage_FileImportResponse_Result.fileUrl: + var url = parseAbsoluteUrl("The file importer", response.fileUrl); + if (url.scheme != 'file') { + throw 'The file importer must return a file: URL, was "$url"'; + } + + return _filesystemImporter.canonicalize(url); + + case InboundMessage_FileImportResponse_Result.error: + throw response.error; + + case InboundMessage_FileImportResponse_Result.notSet: + return null; + } } ImporterResult? load(Uri url) => _filesystemImporter.load(url); diff --git a/lib/src/embedded/importer/host.dart b/lib/src/embedded/importer/host.dart index e4a952100..a1d5eed31 100644 --- a/lib/src/embedded/importer/host.dart +++ b/lib/src/embedded/importer/host.dart @@ -2,9 +2,6 @@ // MIT-style license that can be found in the LICENSE file or at // https://opensource.org/licenses/MIT. -// ignore: deprecated_member_use -import 'dart:cli'; - import '../../importer.dart'; import '../dispatcher.dart'; import '../embedded_sass.pb.dart' hide SourceSpan; @@ -19,44 +16,35 @@ final class HostImporter extends ImporterBase { HostImporter(Dispatcher dispatcher, this._importerId) : super(dispatcher); Uri? canonicalize(Uri url) { - // ignore: deprecated_member_use - return waitFor(() async { - var response = await dispatcher - .sendCanonicalizeRequest(OutboundMessage_CanonicalizeRequest() - ..importerId = _importerId - ..url = url.toString() - ..fromImport = fromImport); - - return switch (response.whichResult()) { - InboundMessage_CanonicalizeResponse_Result.url => - parseAbsoluteUrl("The importer", response.url), - InboundMessage_CanonicalizeResponse_Result.error => - throw response.error, - InboundMessage_CanonicalizeResponse_Result.notSet => null - }; - }()); + var response = + dispatcher.sendCanonicalizeRequest(OutboundMessage_CanonicalizeRequest() + ..importerId = _importerId + ..url = url.toString() + ..fromImport = fromImport); + + return switch (response.whichResult()) { + InboundMessage_CanonicalizeResponse_Result.url => + parseAbsoluteUrl("The importer", response.url), + InboundMessage_CanonicalizeResponse_Result.error => throw response.error, + InboundMessage_CanonicalizeResponse_Result.notSet => null + }; } ImporterResult? load(Uri url) { - // ignore: deprecated_member_use - return waitFor(() async { - var response = - await dispatcher.sendImportRequest(OutboundMessage_ImportRequest() - ..importerId = _importerId - ..url = url.toString()); - - return switch (response.whichResult()) { - InboundMessage_ImportResponse_Result.success => ImporterResult( - response.success.contents, - sourceMapUrl: response.success.sourceMapUrl.isEmpty - ? null - : parseAbsoluteUrl( - "The importer", response.success.sourceMapUrl), - syntax: syntaxToSyntax(response.success.syntax)), - InboundMessage_ImportResponse_Result.error => throw response.error, - InboundMessage_ImportResponse_Result.notSet => null - }; - }()); + var response = dispatcher.sendImportRequest(OutboundMessage_ImportRequest() + ..importerId = _importerId + ..url = url.toString()); + + return switch (response.whichResult()) { + InboundMessage_ImportResponse_Result.success => ImporterResult( + response.success.contents, + sourceMapUrl: response.success.sourceMapUrl.isEmpty + ? null + : parseAbsoluteUrl("The importer", response.success.sourceMapUrl), + syntax: syntaxToSyntax(response.success.syntax)), + InboundMessage_ImportResponse_Result.error => throw response.error, + InboundMessage_ImportResponse_Result.notSet => null + }; } String toString() => "HostImporter"; diff --git a/lib/src/embedded/isolate_dispatcher.dart b/lib/src/embedded/isolate_dispatcher.dart index 78d340997..044e2b9c2 100644 --- a/lib/src/embedded/isolate_dispatcher.dart +++ b/lib/src/embedded/isolate_dispatcher.dart @@ -7,45 +7,33 @@ import 'dart:ffi'; import 'dart:isolate'; import 'dart:typed_data'; +import 'package:native_synchronization/mailbox.dart'; import 'package:pool/pool.dart'; import 'package:protobuf/protobuf.dart'; -import 'package:stream_channel/isolate_channel.dart'; import 'package:stream_channel/stream_channel.dart'; import 'dispatcher.dart'; import 'embedded_sass.pb.dart'; -import 'util/explicit_close_transformer.dart'; +import 'reusable_isolate.dart'; import 'util/proto_extensions.dart'; import 'utils.dart'; -/// The message sent to a previously-inactive isolate to initiate a new -/// compilation session. -/// -/// The [SendPort] is used to establish a dedicated [IsolateChannel] for this -/// compilation and the [int] is the compilation ID to use on the wire. -/// -/// We apply the compilation ID in the isolate for efficiency reasons: it allows -/// us to write the protobuf directly to the same buffer as the wire ID, which -/// saves a copy for each message. -typedef _InitialMessage = (SendPort, int); - /// A class that dispatches messages between the host and various isolates that /// are each running an individual compilation. class IsolateDispatcher { /// The channel of encoded protocol buffers, connected to the host. final StreamChannel _channel; - /// A map from compilation IDs to the sinks for isolates running those - /// compilations. - final _activeIsolates = >{}; - - /// A set of isolates that are _not_ actively running compilations. - final _inactiveIsolates = >{}; - - /// The actual isolate objects that have been spawned. + /// All isolates that have been spawned to dispatch to. /// /// Only used for cleaning up the process when the underlying channel closes. - final _allIsolates = >[]; + final _allIsolates = >[]; + + /// The isolates that aren't currently running compilations + final _inactiveIsolates = {}; + + /// A map from active compilationIds to isolates running those compilations. + final _activeIsolates = {}; /// A pool controlling how many isolates (and thus concurrent compilations) /// may be live at once. @@ -55,10 +43,6 @@ class IsolateDispatcher { /// See https://github.com/sass/dart-sass/pull/2019 final _isolatePool = Pool(sizeOf() <= 4 ? 7 : 15); - /// Whether the underlying channel has closed and the dispatcher is shutting - /// down. - var _closed = false; - IsolateDispatcher(this._channel); void listen() { @@ -70,13 +54,15 @@ class IsolateDispatcher { (compilationId, messageBuffer) = parsePacket(packet); if (compilationId != 0) { - // TODO(nweiz): Consider using the techniques described in - // https://github.com/dart-lang/language/issues/124#issuecomment-988718728 - // or https://github.com/dart-lang/language/issues/3118 for low-cost - // inter-isolate transfers. - (_activeIsolates[compilationId] ?? await _getIsolate(compilationId)) - .add(messageBuffer); - return; + var isolate = _activeIsolates[compilationId] ?? + await _getIsolate(compilationId); + try { + isolate.send(packet); + return; + } on StateError catch (_) { + throw paramsError( + "Received multiple messages for compilation ID $compilationId"); + } } try { @@ -102,20 +88,9 @@ class IsolateDispatcher { }, onError: (Object error, StackTrace stackTrace) { _handleError(error, stackTrace); }, onDone: () async { - _closed = true; for (var isolate in _allIsolates) { (await isolate).kill(); } - - // Killing isolates isn't sufficient to make sure the process closes; we - // also have to close all the [ReceivePort]s we've constructed (by closing - // the [IsolateChannel]s). - for (var sink in _activeIsolates.values) { - sink.close(); - } - for (var channel in _inactiveIsolates) { - channel.sink.close(); - } }); } @@ -123,63 +98,37 @@ class IsolateDispatcher { /// /// This re-uses an existing isolate if possible, and spawns a new one /// otherwise. - Future> _getIsolate(int compilationId) async { + Future _getIsolate(int compilationId) async { var resource = await _isolatePool.request(); + ReusableIsolate isolate; if (_inactiveIsolates.isNotEmpty) { - return _activate(_inactiveIsolates.first, compilationId, resource); + isolate = _inactiveIsolates.first; + _inactiveIsolates.remove(isolate); + } else { + var future = ReusableIsolate.spawn(_isolateMain); + _allIsolates.add(future); + isolate = await future; } - var receivePort = ReceivePort(); - var future = Isolate.spawn(_isolateMain, receivePort.sendPort); - _allIsolates.add(future); - await future; - - var channel = IsolateChannel<_InitialMessage?>.connectReceive(receivePort) - .transform(const ExplicitCloseTransformer()); - channel.stream.listen(null, - onError: (Object error, StackTrace stackTrace) => - _handleError(error, stackTrace), - onDone: _channel.sink.close); - return _activate(channel, compilationId, resource); - } + _activeIsolates[compilationId] = isolate; + isolate.checkOut().listen(_channel.sink.add, + onError: (Object error, StackTrace stackTrace) { + if (error is ProtocolError) { + // Protocol errors have already been through [_handleError] in the child + // isolate, so we just send them as-is and close out the underlying + // channel. + sendError(compilationId, error); + _channel.sink.close(); + } else { + _handleError(error, stackTrace); + } + }, onDone: () { + _activeIsolates.remove(compilationId); + _inactiveIsolates.add(isolate); + resource.release(); + }); - /// Activates [isolate] for a new individual compilation. - /// - /// This pipes all the outputs from the given isolate through to [_channel]. - /// The [resource] is released once the isolate is no longer active. - StreamSink _activate(StreamChannel<_InitialMessage> isolate, - int compilationId, PoolResource resource) { - _inactiveIsolates.remove(isolate); - - // Each individual compilation has its own dedicated [IsolateChannel], which - // closes once the compilation is finished. - var receivePort = ReceivePort(); - isolate.sink.add((receivePort.sendPort, compilationId)); - - var channel = IsolateChannel.connectReceive(receivePort); - channel.stream.listen( - (message) { - // The first byte of messages from isolates indicates whether the - // entire compilation is finished. Sending this as part of the message - // buffer rather than a separate message avoids a race condition where - // the host might send a new compilation request with the same ID as - // one that just finished before the [IsolateDispatcher] receives word - // that the isolate with that ID is done. See sass/dart-sass#2004. - if (message[0] == 1) { - channel.sink.close(); - _activeIsolates.remove(compilationId); - _inactiveIsolates.add(isolate); - resource.release(); - } - _channel.sink.add(Uint8List.sublistView(message, 1)); - }, - onError: (Object error, StackTrace stackTrace) => - _handleError(error, stackTrace), - onDone: () { - if (_closed) isolate.sink.close(); - }); - _activeIsolates[compilationId] = channel.sink; - return channel.sink; + return isolate; } /// Creates a [OutboundMessage_VersionResponse] @@ -211,14 +160,6 @@ class IsolateDispatcher { _send(compilationId, OutboundMessage()..error = error); } -void _isolateMain(SendPort sendPort) { - var channel = IsolateChannel<_InitialMessage?>.connectSend(sendPort) - .transform(const ExplicitCloseTransformer()); - channel.stream.listen((initialMessage) async { - var (compilationSendPort, compilationId) = initialMessage; - var compilationChannel = - IsolateChannel.connectSend(compilationSendPort); - var success = await Dispatcher(compilationChannel, compilationId).listen(); - if (!success) channel.sink.close(); - }); +void _isolateMain(Mailbox mailbox, SendPort sendPort) { + Dispatcher(mailbox, sendPort).listen(); } diff --git a/lib/src/embedded/reusable_isolate.dart b/lib/src/embedded/reusable_isolate.dart new file mode 100644 index 000000000..0ed9eec8c --- /dev/null +++ b/lib/src/embedded/reusable_isolate.dart @@ -0,0 +1,142 @@ +// Copyright 2023 Google Inc. Use of this source code is governed by an +// MIT-style license that can be found in the LICENSE file or at +// https://opensource.org/licenses/MIT. + +import 'dart:async'; +import 'dart:isolate'; +import 'dart:typed_data'; + +import 'package:native_synchronization/mailbox.dart'; +import 'package:native_synchronization/sendable.dart'; +import 'embedded_sass.pb.dart'; +import 'utils.dart'; + +/// The entrypoint for a [ReusableIsolate]. +/// +/// This must be a static global function. It's run when the isolate is spawned, +/// and is passed a [Mailbox] that receives messages from [ReusableIsolate.send] +/// and a [SendPort] that sends messages to the stream returned by +/// [ReusableIsolate.checkOut]. +/// +/// If the [sendPort] sends a message before [ReusableIsolate.checkOut] is +/// called, this will throw an unhandled [StateError]. +typedef ReusableIsolateEntryPoint = FutureOr Function( + Mailbox mailbox, SendPort sink); + +class ReusableIsolate { + /// The wrapped isolate. + final Isolate _isolate; + + /// The mailbox used to send messages to this isolate. + final Mailbox _mailbox; + + /// The [ReceivePort] that receives messages from the wrapped isolate. + final ReceivePort _receivePort; + + /// The subscription to [_port]. + final StreamSubscription _subscription; + + /// Whether [checkOut] has been called and the returned stream has not yet + /// closed. + bool _checkedOut = false; + + ReusableIsolate._(this._isolate, this._mailbox, this._receivePort) + : _subscription = _receivePort.listen(_defaultOnData); + + /// Spawns a [ReusableIsolate] that runs the given [entryPoint]. + static Future spawn( + ReusableIsolateEntryPoint entryPoint) async { + var mailbox = Mailbox(); + var receivePort = ReceivePort(); + var isolate = await Isolate.spawn( + _isolateMain, (entryPoint, mailbox.asSendable, receivePort.sendPort)); + return ReusableIsolate._(isolate, mailbox, receivePort); + } + + /// Checks out this isolate and returns a stream of messages from it. + /// + /// This isolate is considered "checked out" until the returned stream + /// completes. While checked out, messages may be sent to the isolate using + /// [send]. + /// + /// Throws a [StateError] if this is called while the isolate is already + /// checked out. + Stream checkOut() { + if (_checkedOut) { + throw StateError( + "Can't call ResuableIsolate.checkOut until the previous stream has " + "completed."); + } + _checkedOut = true; + + var controller = StreamController(sync: true); + + _subscription.onData((message) { + var fullBuffer = message as Uint8List; + + // The first byte of messages from isolates indicates whether the entire + // compilation is finished (1) or if it encountered an error (2). Sending + // this as part of the message buffer rather than a separate message + // avoids a race condition where the host might send a new compilation + // request with the same ID as one that just finished before the + // [IsolateDispatcher] receives word that the isolate with that ID is + // done. See sass/dart-sass#2004. + var category = fullBuffer[0]; + var packet = Uint8List.sublistView(fullBuffer, 1); + + if (category == 2) { + // Parse out the compilation ID and surface the [ProtocolError] as an + // error. This allows the [IsolateDispatcher] to notice that an error + // has occurred and close out the underlying channel. + var (_, buffer) = parsePacket(packet); + controller.addError(OutboundMessage.fromBuffer(buffer).error); + return; + } + + controller.sink.add(packet); + if (category == 1) { + _checkedOut = false; + _subscription.onData(_defaultOnData); + _subscription.onError(null); + controller.close(); + } + }); + + _subscription.onError(controller.addError); + + return controller.stream; + } + + /// Sends [message] to the isolate. + /// + /// Throws a [StateError] if this is called while the isolate isn't checked + /// out, or if a second message is sent before the isolate has processed the + /// first one. + void send(Uint8List message) { + _mailbox.put(message); + } + + /// Shuts down the isolate. + void kill() { + _isolate.kill(); + _receivePort.close(); + + // If the isolate is blocking on [Mailbox.take], it won't even process a + // kill event, so we send an empty message to make sure it wakes up. + try { + _mailbox.put(Uint8List(0)); + } on StateError catch (_) {} + } +} + +/// The default handler for data events from the wrapped isolate when it's not +/// checked out. +void _defaultOnData(Object? _) { + throw StateError("Shouldn't receive a message before being checked out."); +} + +void _isolateMain( + (ReusableIsolateEntryPoint, Sendable, SendPort) message) { + var (entryPoint, sendableMailbox, sendPort) = message; + entryPoint(sendableMailbox.materialize(), sendPort); +} diff --git a/lib/src/embedded/util/explicit_close_transformer.dart b/lib/src/embedded/util/explicit_close_transformer.dart deleted file mode 100644 index 43a5334dd..000000000 --- a/lib/src/embedded/util/explicit_close_transformer.dart +++ /dev/null @@ -1,38 +0,0 @@ -// Copyright 2023 Google Inc. Use of this source code is governed by an -// MIT-style license that can be found in the LICENSE file or at -// https://opensource.org/licenses/MIT. - -import 'dart:async'; - -import 'package:async/async.dart'; -import 'package:stream_channel/stream_channel.dart'; - -/// A [StreamChannelTransformer] that explicitly ensures that when one endpoint -/// closes its sink, the other endpoint will close as well. -/// -/// This must be applied to both ends of the channel, and the underlying channel -/// must reserve `null` for a close event. -class ExplicitCloseTransformer - implements StreamChannelTransformer { - const ExplicitCloseTransformer(); - - StreamChannel bind(StreamChannel channel) { - var closedUnderlyingSink = false; - return StreamChannel.withCloseGuarantee(channel.stream - .transform(StreamTransformer.fromHandlers(handleData: (data, sink) { - if (data == null) { - channel.sink.close(); - closedUnderlyingSink = true; - } else { - sink.add(data); - } - })), channel.sink - .transform(StreamSinkTransformer.fromHandlers(handleDone: (sink) { - if (!closedUnderlyingSink) { - closedUnderlyingSink = true; - sink.add(null); - sink.close(); - } - }))); - } -} diff --git a/pubspec.yaml b/pubspec.yaml index 8c93d7766..c77344546 100644 --- a/pubspec.yaml +++ b/pubspec.yaml @@ -1,5 +1,5 @@ name: sass -version: 1.66.1 +version: 1.66.2-dev description: A Sass implementation in Dart. homepage: https://github.com/sass/dart-sass @@ -20,6 +20,7 @@ dependencies: http: "^1.1.0" js: ^0.6.3 meta: ^1.3.0 + native_synchronization: ^0.2.0 node_interop: ^2.1.0 package_config: ^2.0.0 path: ^1.8.0