diff --git a/bin/dart_sass_embedded.dart b/bin/dart_sass_embedded.dart index 0f762c1..42fb1db 100644 --- a/bin/dart_sass_embedded.dart +++ b/bin/dart_sass_embedded.dart @@ -5,24 +5,15 @@ import 'dart:io'; import 'dart:convert'; -import 'package:path/path.dart' as p; -import 'package:sass/sass.dart' as sass; import 'package:stream_channel/stream_channel.dart'; -import 'package:sass_embedded/src/dispatcher.dart'; -import 'package:sass_embedded/src/embedded_sass.pb.dart'; -import 'package:sass_embedded/src/function_registry.dart'; -import 'package:sass_embedded/src/host_callable.dart'; -import 'package:sass_embedded/src/importer/file.dart'; -import 'package:sass_embedded/src/importer/host.dart'; -import 'package:sass_embedded/src/logger.dart'; +import 'package:sass_embedded/src/isolate_dispatcher.dart'; import 'package:sass_embedded/src/util/length_delimited_transformer.dart'; -import 'package:sass_embedded/src/utils.dart'; void main(List args) { if (args.isNotEmpty) { if (args.first == "--version") { - var response = Dispatcher.versionResponse(); + var response = IsolateDispatcher.versionResponse(); response.id = 0; stdout.writeln( JsonEncoder.withIndent(" ").convert(response.toProto3Json())); @@ -37,125 +28,9 @@ void main(List args) { return; } - var dispatcher = Dispatcher( + var dispatcher = IsolateDispatcher( StreamChannel.withGuarantees(stdin, stdout, allowSinkErrors: false) .transform(lengthDelimited)); - dispatcher.listen((request) async { - var functions = FunctionRegistry(); - - var style = request.style == OutputStyle.COMPRESSED - ? sass.OutputStyle.compressed - : sass.OutputStyle.expanded; - var logger = Logger(dispatcher, request.id, - color: request.alertColor, ascii: request.alertAscii); - - try { - var importers = request.importers.map((importer) => - _decodeImporter(dispatcher, request, importer) ?? - (throw mandatoryError("Importer.importer"))); - - var globalFunctions = request.globalFunctions.map((signature) { - try { - return hostCallable(dispatcher, functions, request.id, signature); - } on sass.SassException catch (error) { - throw paramsError('CompileRequest.global_functions: $error'); - } - }); - - late sass.CompileResult result; - switch (request.whichInput()) { - case InboundMessage_CompileRequest_Input.string: - var input = request.string; - result = sass.compileStringToResult(input.source, - color: request.alertColor, - logger: logger, - importers: importers, - importer: _decodeImporter(dispatcher, request, input.importer) ?? - (input.url.startsWith("file:") ? null : sass.Importer.noOp), - functions: globalFunctions, - syntax: syntaxToSyntax(input.syntax), - style: style, - url: input.url.isEmpty ? null : input.url, - quietDeps: request.quietDeps, - verbose: request.verbose, - sourceMap: request.sourceMap, - charset: request.charset); - break; - - case InboundMessage_CompileRequest_Input.path: - if (request.path.isEmpty) { - throw mandatoryError("CompileRequest.Input.path"); - } - - try { - result = sass.compileToResult(request.path, - color: request.alertColor, - logger: logger, - importers: importers, - functions: globalFunctions, - style: style, - quietDeps: request.quietDeps, - verbose: request.verbose, - sourceMap: request.sourceMap, - charset: request.charset); - } on FileSystemException catch (error) { - return OutboundMessage_CompileResponse() - ..failure = (OutboundMessage_CompileResponse_CompileFailure() - ..message = error.path == null - ? error.message - : "${error.message}: ${error.path}" - ..span = (SourceSpan() - ..start = SourceSpan_SourceLocation() - ..end = SourceSpan_SourceLocation() - ..url = p.toUri(request.path).toString())); - } - break; - - case InboundMessage_CompileRequest_Input.notSet: - throw mandatoryError("CompileRequest.input"); - } - - var success = OutboundMessage_CompileResponse_CompileSuccess() - ..css = result.css - ..loadedUrls.addAll(result.loadedUrls.map((url) => url.toString())); - - var sourceMap = result.sourceMap; - if (sourceMap != null) { - success.sourceMap = json.encode(sourceMap.toJson( - includeSourceContents: request.sourceMapIncludeSources)); - } - return OutboundMessage_CompileResponse()..success = success; - } on sass.SassException catch (error) { - var formatted = withGlyphs( - () => error.toString(color: request.alertColor), - ascii: request.alertAscii); - return OutboundMessage_CompileResponse() - ..failure = (OutboundMessage_CompileResponse_CompileFailure() - ..message = error.message - ..span = protofySpan(error.span) - ..stackTrace = error.trace.toString() - ..formatted = formatted); - } - }); -} - -/// Converts [importer] into a [sass.Importer]. -sass.Importer? _decodeImporter( - Dispatcher dispatcher, - InboundMessage_CompileRequest request, - InboundMessage_CompileRequest_Importer importer) { - switch (importer.whichImporter()) { - case InboundMessage_CompileRequest_Importer_Importer.path: - return sass.FilesystemImporter(importer.path); - - case InboundMessage_CompileRequest_Importer_Importer.importerId: - return HostImporter(dispatcher, request.id, importer.importerId); - - case InboundMessage_CompileRequest_Importer_Importer.fileImporterId: - return FileImporter(dispatcher, request.id, importer.fileImporterId); - - case InboundMessage_CompileRequest_Importer_Importer.notSet: - return null; - } + dispatcher.listen(); } diff --git a/lib/src/dispatcher.dart b/lib/src/dispatcher.dart index 12fe403..aef171b 100644 --- a/lib/src/dispatcher.dart +++ b/lib/src/dispatcher.dart @@ -3,20 +3,29 @@ // https://opensource.org/licenses/MIT. import 'dart:async'; +import 'dart:convert'; import 'dart:io'; -import 'dart:typed_data'; +import 'package:path/path.dart' as p; import 'package:protobuf/protobuf.dart'; +import 'package:sass/sass.dart' as sass; import 'package:stack_trace/stack_trace.dart'; -import 'package:stream_channel/stream_channel.dart'; import 'embedded_sass.pb.dart'; +import 'function_registry.dart'; +import 'host_callable.dart'; +import 'importer/file.dart'; +import 'importer/host.dart'; +import 'logger.dart'; import 'utils.dart'; /// A class that dispatches messages to and from the host. class Dispatcher { - /// The channel of encoded protocol buffers, connected to the host. - final StreamChannel _channel; + /// The stream of messages sent from the host to the compiler. + final Stream _stream; + + /// The sink for sending messages from the compiler to the host. + final StreamSink _sink; /// Completers awaiting responses to outbound requests. /// @@ -27,7 +36,7 @@ class Dispatcher { /// Creates a [Dispatcher] that sends and receives encoded protocol buffers /// over [channel]. - Dispatcher(this._channel); + Dispatcher(this._stream, this._sink); /// Listens for incoming `CompileRequests` and passes them to [callback]. /// @@ -37,35 +46,15 @@ class Dispatcher { /// `id` fields; the [Dispatcher] will take care of that. /// /// This may only be called once. - void listen( - FutureOr callback( - InboundMessage_CompileRequest request)) { - _channel.stream.listen((binaryMessage) async { - // Wait a single microtask tick so that we're running in a separate - // microtask from the initial request dispatch. Otherwise, [waitFor] will - // deadlock the event loop fiber that would otherwise be checking stdin - // for new input. - await Future.value(); - - InboundMessage? message; + void listen() { + _stream.listen((message) async { try { - try { - message = InboundMessage.fromBuffer(binaryMessage); - } on InvalidProtocolBufferException catch (error) { - throw _parseError(error.message); - } - switch (message.whichMessage()) { - case InboundMessage_Message.versionRequest: - var request = message.versionRequest; - var response = versionResponse(); - response.id = request.id; - _send(OutboundMessage()..versionResponse = response); - break; + // VersionRequest is handled by the isolate dispatcher. case InboundMessage_Message.compileRequest: var request = message.compileRequest; - var response = await callback(request); + var response = await _compile(request); response.id = request.id; _send(OutboundMessage()..compileResponse = response); break; @@ -91,10 +80,10 @@ class Dispatcher { break; case InboundMessage_Message.notSet: - throw _parseError("InboundMessage.message is not set."); + throw parseError("InboundMessage.message is not set."); default: - throw _parseError( + throw parseError( "Unknown message type: ${message.toDebugString()}"); } } on ProtocolError catch (error) { @@ -105,7 +94,7 @@ class Dispatcher { sendError(error); // PROTOCOL error from https://bit.ly/2poTt90 exitCode = 76; - _channel.sink.close(); + _sink.close(); } catch (error, stackTrace) { var errorMessage = "$error\n${Chain.forTrace(stackTrace)}"; stderr.write("Internal compiler error: $errorMessage"); @@ -113,11 +102,128 @@ class Dispatcher { ..type = ProtocolErrorType.INTERNAL ..id = _inboundId(message) ?? errorId ..message = errorMessage); - _channel.sink.close(); + _sink.close(); } }); } + Future _compile( + InboundMessage_CompileRequest request) async { + var functions = FunctionRegistry(); + + var style = request.style == OutputStyle.COMPRESSED + ? sass.OutputStyle.compressed + : sass.OutputStyle.expanded; + var logger = Logger(this, request.id, + color: request.alertColor, ascii: request.alertAscii); + + try { + var importers = request.importers.map((importer) => + _decodeImporter(request, importer) ?? + (throw mandatoryError("Importer.importer"))); + + var globalFunctions = request.globalFunctions.map((signature) { + try { + return hostCallable(this, functions, request.id, signature); + } on sass.SassException catch (error) { + throw paramsError('CompileRequest.global_functions: $error'); + } + }); + + late sass.CompileResult result; + switch (request.whichInput()) { + case InboundMessage_CompileRequest_Input.string: + var input = request.string; + result = sass.compileStringToResult(input.source, + color: request.alertColor, + logger: logger, + importers: importers, + importer: _decodeImporter(request, input.importer) ?? + (input.url.startsWith("file:") ? null : sass.Importer.noOp), + functions: globalFunctions, + syntax: syntaxToSyntax(input.syntax), + style: style, + url: input.url.isEmpty ? null : input.url, + quietDeps: request.quietDeps, + verbose: request.verbose, + sourceMap: request.sourceMap, + charset: request.charset); + break; + + case InboundMessage_CompileRequest_Input.path: + if (request.path.isEmpty) { + throw mandatoryError("CompileRequest.Input.path"); + } + + try { + result = sass.compileToResult(request.path, + color: request.alertColor, + logger: logger, + importers: importers, + functions: globalFunctions, + style: style, + quietDeps: request.quietDeps, + verbose: request.verbose, + sourceMap: request.sourceMap, + charset: request.charset); + } on FileSystemException catch (error) { + return OutboundMessage_CompileResponse() + ..failure = (OutboundMessage_CompileResponse_CompileFailure() + ..message = error.path == null + ? error.message + : "${error.message}: ${error.path}" + ..span = (SourceSpan() + ..start = SourceSpan_SourceLocation() + ..end = SourceSpan_SourceLocation() + ..url = p.toUri(request.path).toString())); + } + break; + + case InboundMessage_CompileRequest_Input.notSet: + throw mandatoryError("CompileRequest.input"); + } + + var success = OutboundMessage_CompileResponse_CompileSuccess() + ..css = result.css + ..loadedUrls.addAll(result.loadedUrls.map((url) => url.toString())); + + var sourceMap = result.sourceMap; + if (sourceMap != null) { + success.sourceMap = json.encode(sourceMap.toJson( + includeSourceContents: request.sourceMapIncludeSources)); + } + return OutboundMessage_CompileResponse()..success = success; + } on sass.SassException catch (error) { + var formatted = withGlyphs( + () => error.toString(color: request.alertColor), + ascii: request.alertAscii); + return OutboundMessage_CompileResponse() + ..failure = (OutboundMessage_CompileResponse_CompileFailure() + ..message = error.message + ..span = protofySpan(error.span) + ..stackTrace = error.trace.toString() + ..formatted = formatted); + } + } + + /// Converts [importer] into a [sass.Importer]. + sass.Importer? _decodeImporter(InboundMessage_CompileRequest request, + InboundMessage_CompileRequest_Importer importer) { + switch (importer.whichImporter()) { + case InboundMessage_CompileRequest_Importer_Importer.path: + return sass.FilesystemImporter(importer.path); + + case InboundMessage_CompileRequest_Importer_Importer.importerId: + return HostImporter(this, request.id, importer.importerId); + + case InboundMessage_CompileRequest_Importer_Importer.fileImporterId: + return FileImporter(this, request.id, importer.fileImporterId); + + case InboundMessage_CompileRequest_Importer_Importer.notSet: + return null; + } + } + /// Sends [event] to the host. void sendLog(OutboundMessage_LogEvent event) => _send(OutboundMessage()..logEvent = event); @@ -193,13 +299,7 @@ class Dispatcher { } /// Sends [message] to the host. - void _send(OutboundMessage message) => - _channel.sink.add(message.writeToBuffer()); - - /// Returns a [ProtocolError] with type `PARSE` and the given [message]. - ProtocolError _parseError(String message) => ProtocolError() - ..type = ProtocolErrorType.PARSE - ..message = message; + void _send(OutboundMessage message) => _sink.add(message); /// Returns the id for [message] if it it's a request, or `null` /// otherwise. @@ -213,6 +313,7 @@ class Dispatcher { } } + // TODO before landing: Make this an extension method /// Sets the id for [message] to [id]. /// /// Throws an [ArgumentError] if [message] doesn't have an id field. @@ -240,14 +341,4 @@ class Dispatcher { throw ArgumentError("Unknown message type: ${message.toDebugString()}"); } } - - /// Creates a [OutboundMessage_VersionResponse] - static OutboundMessage_VersionResponse versionResponse() { - return OutboundMessage_VersionResponse() - ..protocolVersion = const String.fromEnvironment("protocol-version") - ..compilerVersion = const String.fromEnvironment("compiler-version") - ..implementationVersion = - const String.fromEnvironment("implementation-version") - ..implementationName = "Dart Sass"; - } } diff --git a/lib/src/isolate_dispatcher.dart b/lib/src/isolate_dispatcher.dart new file mode 100644 index 0000000..29fbd08 --- /dev/null +++ b/lib/src/isolate_dispatcher.dart @@ -0,0 +1,181 @@ +// Copyright 2019 Google Inc. Use of this source code is governed by an +// MIT-style license that can be found in the LICENSE file or at +// https://opensource.org/licenses/MIT. + +import 'dart:io'; + +import 'dart:async'; +import 'dart:isolate'; +import 'dart:typed_data'; + +import 'package:async/async.dart'; +import 'package:protobuf/protobuf.dart'; +import 'package:stack_trace/stack_trace.dart'; +import 'package:stream_channel/isolate_channel.dart'; +import 'package:stream_channel/stream_channel.dart'; + +import 'embedded_sass.pb.dart'; +import 'utils.dart'; +import 'dispatcher.dart'; + +/// A class that dispatches messages to and from the host. +class IsolateDispatcher { + /// The channel of encoded protocol buffers, connected to the host. + final StreamChannel _channel; + + /// The set of all sinks for communicating with isolates. + final _allIsolates = >{}; + + /// A list whose indexes are outstanding request IDs and whose elements are + /// the sinks for isolates that are waiting for responses to those requests. + /// + /// A `null` element indicates an ID whose request has been responded to. + final _outstandingRequests = ?>[]; + + IsolateDispatcher(this._channel); + + void listen() { + _channel.stream.listen((binaryMessage) async { + late InboundMessage message; + try { + message = InboundMessage.fromBuffer(binaryMessage); + } on InvalidProtocolBufferException catch (error) { + throw parseError(error.message); + } + + switch (message.whichMessage()) { + case InboundMessage_Message.versionRequest: + var request = message.versionRequest; + var response = versionResponse(); + response.id = request.id; + _send(OutboundMessage()..versionResponse = response); + break; + + case InboundMessage_Message.compileRequest: + var request = message.compileRequest; + var response = await _compile(message); + response.id = request.id; + _send(OutboundMessage()..compileResponse = response); + break; + + case InboundMessage_Message.canonicalizeResponse: + var response = message.canonicalizeResponse; + _dispatchResponse(response.id, response); + break; + + case InboundMessage_Message.importResponse: + var response = message.importResponse; + _dispatchResponse(response.id, response); + break; + + case InboundMessage_Message.fileImportResponse: + var response = message.fileImportResponse; + _dispatchResponse(response.id, response); + break; + + case InboundMessage_Message.functionCallResponse: + var response = message.functionCallResponse; + _dispatchResponse(response.id, response); + break; + + case InboundMessage_Message.notSet: + throw parseError("InboundMessage.message is not set."); + + default: + throw parseError("Unknown message type: ${message.toDebugString()}"); + } + }); + } + + Future _compile( + InboundMessage compileRequest) async { + var receivePort = ReceivePort(); + var isolate = await Isolate.spawn(_isolateMain, receivePort.sendPort); + + var channel = IsolateChannel.connectReceive(receivePort); + _allIsolates.add(channel.sink); + channel.sink.add(compileRequest); + + await for (var message in channel.stream.cast()) { + // TODO before landing: close out the process on unrecoverable errors. + if (message.whichMessage() == OutboundMessage_Message.compileResponse) { + // TODO before landing: see if re-using isolates is more efficient + channel.sink.close(); + return message.compileResponse; + } + + _send(message); + + var id = _getOutboundId(message); + if (id >= _outstandingRequests.length) { + _outstandingRequests.length = id + 1; + } + + assert(_outstandingRequests[id] == null); + _outstandingRequests[id] = channel.sink; + } + + throw StateError( + "IsolateChannel closed without sending a CompileResponse."); + } + + // TODO before landing: make this an extensio method. + int _getOutboundId(OutboundMessage message) { + switch (message.whichMessage()) { + case OutboundMessage_Message.compileResponse: + return message.compileResponse.id; + case OutboundMessage_Message.canonicalizeRequest: + return message.canonicalizeRequest.id; + case OutboundMessage_Message.importRequest: + return message.importRequest.id; + case OutboundMessage_Message.fileImportRequest: + return message.fileImportRequest.id; + case OutboundMessage_Message.functionCallRequest: + return message.functionCallRequest.id; + case OutboundMessage_Message.versionResponse: + return message.versionResponse.id; + default: + throw ArgumentError("Unknown message type: ${message.toDebugString()}"); + } + } + + /// Creates a [OutboundMessage_VersionResponse] + static OutboundMessage_VersionResponse versionResponse() { + return OutboundMessage_VersionResponse() + ..protocolVersion = const String.fromEnvironment("protocol-version") + ..compilerVersion = const String.fromEnvironment("compiler-version") + ..implementationVersion = + const String.fromEnvironment("implementation-version") + ..implementationName = "Dart Sass"; + } + + /// Dispatches [response] to the appropriate outstanding request. + /// + /// Throws an error if there's no outstanding request with the given [id]. + void _dispatchResponse(int id, T response) { + Sink? sink; + if (id < _outstandingRequests.length) { + sink = _outstandingRequests[id]; + _outstandingRequests[id] = null; + } + + if (sink == null) { + throw paramsError( + "Response ID $id doesn't match any outstanding requests."); + } + + sink.add(response); + } + + /// Sends [message] to the host. + void _send(OutboundMessage message) => + _channel.sink.add(message.writeToBuffer()); +} + +void _isolateMain(SendPort sendPort) { + var channel = IsolateChannel.connectSend(sendPort); + Dispatcher( + channel.stream.cast(), + channel.sink.transform(StreamSinkTransformer.fromHandlers( + handleData: (data, sink) => sink.add(data)))).listen(); +} diff --git a/lib/src/utils.dart b/lib/src/utils.dart index 16cf103..d3241bf 100644 --- a/lib/src/utils.dart +++ b/lib/src/utils.dart @@ -27,6 +27,11 @@ ProtocolError paramsError(String message) => ProtocolError() ..type = ProtocolErrorType.PARAMS ..message = message; +/// Returns a [ProtocolError] with type `PARSE` and the given [message]. +ProtocolError parseError(String message) => ProtocolError() + ..type = ProtocolErrorType.PARSE + ..message = message; + /// Converts a Dart source span to a protocol buffer source span. proto.SourceSpan protofySpan(SourceSpan span) { var protoSpan = proto.SourceSpan()