Skip to content

Commit

Permalink
Improve sass --embedded performance (#2013)
Browse files Browse the repository at this point in the history
Co-authored-by: Natalie Weizenbaum <nweiz@google.com>
  • Loading branch information
ntkme and nex3 committed Sep 1, 2023
1 parent 58cbab4 commit af0118a
Show file tree
Hide file tree
Showing 9 changed files with 363 additions and 320 deletions.
8 changes: 8 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,11 @@
## 1.66.2

### Embedded Sass

* Substantially improve the embedded compiler's performance when compiling many
files or files that require many importer or function call round-trips with
the embedded host.

## 1.66.1

### JS API
Expand Down
221 changes: 116 additions & 105 deletions lib/src/embedded/dispatcher.dart
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,15 @@
// MIT-style license that can be found in the LICENSE file or at
// https://opensource.org/licenses/MIT.

import 'dart:async';
import 'dart:convert';
import 'dart:io';
import 'dart:isolate';
import 'dart:typed_data';

import 'package:native_synchronization/mailbox.dart';
import 'package:path/path.dart' as p;
import 'package:protobuf/protobuf.dart';
import 'package:sass/sass.dart' as sass;
import 'package:stream_channel/stream_channel.dart';

import 'embedded_sass.pb.dart';
import 'function_registry.dart';
Expand All @@ -30,83 +30,69 @@ final _outboundRequestId = 0;
/// A class that dispatches messages to and from the host for a single
/// compilation.
final class Dispatcher {
/// The channel of encoded protocol buffers, connected to the host.
final StreamChannel<Uint8List> _channel;
/// The mailbox for receiving messages from the host.
final Mailbox _mailbox;

/// The send port for sending messages to the host.
final SendPort _sendPort;

/// The compilation ID for which this dispatcher is running.
///
/// This is added to outgoing messages but is _not_ parsed from incoming
/// messages, since that's already handled by the [IsolateDispatcher].
final int _compilationId;
/// This is used in error messages.
late int _compilationId;

/// [_compilationId], serialized as a varint.
final Uint8List _compilationIdVarint;

/// Whether this dispatcher has received its compile request.
var _compiling = false;
///
/// This is used in outgoing messages.
late Uint8List _compilationIdVarint;

/// A completer awaiting a response to an outbound request.
/// Whether we detected a [ProtocolError] while parsing an incoming response.
///
/// Since each [Dispatcher] is only running a single-threaded compilation, it
/// can only ever have one request outstanding.
Completer<GeneratedMessage>? _outstandingRequest;
/// If we have, we don't want to send the final compilation result because
/// it'll just be a wrapper around the error.
var _requestError = false;

/// Creates a [Dispatcher] that sends and receives encoded protocol buffers
/// over [channel].
Dispatcher(this._channel, this._compilationId)
: _compilationIdVarint = serializeVarint(_compilationId);
/// Creates a [Dispatcher] that receives encoded protocol buffers through
/// [_mailbox] and sends them through [_sendPort].
Dispatcher(this._mailbox, this._sendPort);

/// Listens for incoming `CompileRequests` and runs their compilations.
///
/// This may only be called once. Returns whether or not the compilation
/// succeeded.
Future<bool> listen() async {
var success = false;
await _channel.stream.listen((binaryMessage) async {
// Wait a single microtask tick so that we're running in a separate
// microtask from the initial request dispatch. Otherwise, [waitFor] will
// deadlock the event loop fiber that would otherwise be checking stdin
// for new input.
await Future<void>.value();
void listen() {
do {
var packet = _mailbox.take();
if (packet.isEmpty) break;

try {
InboundMessage? message;
var (compilationId, messageBuffer) = parsePacket(packet);

_compilationId = compilationId;
_compilationIdVarint = serializeVarint(compilationId);

InboundMessage message;
try {
message = InboundMessage.fromBuffer(binaryMessage);
message = InboundMessage.fromBuffer(messageBuffer);
} on InvalidProtocolBufferException catch (error) {
throw parseError(error.message);
}

switch (message.whichMessage()) {
case InboundMessage_Message.versionRequest:
throw paramsError("VersionRequest must have compilation ID 0.");

case InboundMessage_Message.compileRequest:
if (_compiling) {
throw paramsError(
"A CompileRequest with compilation ID $_compilationId is "
"already active.");
}
_compiling = true;

var request = message.compileRequest;
var response = await _compile(request);
_send(OutboundMessage()..compileResponse = response);
success = true;
// Each Dispatcher runs a single compilation and then closes.
_channel.sink.close();

case InboundMessage_Message.canonicalizeResponse:
_dispatchResponse(message.id, message.canonicalizeResponse);

case InboundMessage_Message.importResponse:
_dispatchResponse(message.id, message.importResponse);
var response = _compile(request);
if (!_requestError) {
_send(OutboundMessage()..compileResponse = response);
}

case InboundMessage_Message.fileImportResponse:
_dispatchResponse(message.id, message.fileImportResponse);
case InboundMessage_Message.versionRequest:
throw paramsError("VersionRequest must have compilation ID 0.");

case InboundMessage_Message.functionCallResponse:
_dispatchResponse(message.id, message.functionCallResponse);
case InboundMessage_Message.canonicalizeResponse ||
InboundMessage_Message.importResponse ||
InboundMessage_Message.fileImportResponse ||
InboundMessage_Message.functionCallResponse:
throw paramsError(
"Response ID ${message.id} doesn't match any outstanding requests"
" in compilation $_compilationId.");

case InboundMessage_Message.notSet:
throw parseError("InboundMessage.message is not set.");
Expand All @@ -115,16 +101,14 @@ final class Dispatcher {
throw parseError(
"Unknown message type: ${message.toDebugString()}");
}
} on ProtocolError catch (error, stackTrace) {
sendError(handleError(error, stackTrace));
_channel.sink.close();
} catch (error, stackTrace) {
_handleError(error, stackTrace);
}
}).asFuture<void>();
return success;
} while (!_requestError);
}

Future<OutboundMessage_CompileResponse> _compile(
InboundMessage_CompileRequest request) async {
OutboundMessage_CompileResponse _compile(
InboundMessage_CompileRequest request) {
var functions = FunctionRegistry();

var style = request.style == OutputStyle.COMPRESSED
Expand Down Expand Up @@ -159,7 +143,6 @@ final class Dispatcher {
verbose: request.verbose,
sourceMap: request.sourceMap,
charset: request.charset);
break;

case InboundMessage_CompileRequest_Input.path:
if (request.path.isEmpty) {
Expand Down Expand Up @@ -188,7 +171,6 @@ final class Dispatcher {
..end = SourceSpan_SourceLocation()
..url = p.toUri(request.path).toString()));
}
break;

case InboundMessage_CompileRequest_Input.notSet:
throw mandatoryError("CompileRequest.input");
Expand Down Expand Up @@ -245,59 +227,86 @@ final class Dispatcher {
void sendError(ProtocolError error) =>
_send(OutboundMessage()..error = error);

Future<InboundMessage_CanonicalizeResponse> sendCanonicalizeRequest(
InboundMessage_CanonicalizeResponse sendCanonicalizeRequest(
OutboundMessage_CanonicalizeRequest request) =>
_sendRequest<InboundMessage_CanonicalizeResponse>(
OutboundMessage()..canonicalizeRequest = request);

Future<InboundMessage_ImportResponse> sendImportRequest(
InboundMessage_ImportResponse sendImportRequest(
OutboundMessage_ImportRequest request) =>
_sendRequest<InboundMessage_ImportResponse>(
OutboundMessage()..importRequest = request);

Future<InboundMessage_FileImportResponse> sendFileImportRequest(
InboundMessage_FileImportResponse sendFileImportRequest(
OutboundMessage_FileImportRequest request) =>
_sendRequest<InboundMessage_FileImportResponse>(
OutboundMessage()..fileImportRequest = request);

Future<InboundMessage_FunctionCallResponse> sendFunctionCallRequest(
InboundMessage_FunctionCallResponse sendFunctionCallRequest(
OutboundMessage_FunctionCallRequest request) =>
_sendRequest<InboundMessage_FunctionCallResponse>(
OutboundMessage()..functionCallRequest = request);

/// Sends [request] to the host and returns the message sent in response.
Future<T> _sendRequest<T extends GeneratedMessage>(
OutboundMessage request) async {
request.id = _outboundRequestId;
_send(request);

if (_outstandingRequest != null) {
throw StateError(
"Dispatcher.sendRequest() can't be called when another request is "
"active.");
}
T _sendRequest<T extends GeneratedMessage>(OutboundMessage message) {
message.id = _outboundRequestId;
_send(message);

return (_outstandingRequest = Completer<T>()).future;
}
var packet = _mailbox.take();

/// Dispatches [response] to the appropriate outstanding request.
///
/// Throws an error if there's no outstanding request with the given [id] or
/// if that request is expecting a different type of response.
void _dispatchResponse<T extends GeneratedMessage>(int? id, T response) {
var completer = _outstandingRequest;
_outstandingRequest = null;
if (completer == null || id != _outboundRequestId) {
throw paramsError(
"Response ID $id doesn't match any outstanding requests in "
"compilation $_compilationId.");
} else if (completer is! Completer<T>) {
throw paramsError(
"Request ID $id doesn't match response type ${response.runtimeType} "
"in compilation $_compilationId.");
try {
var messageBuffer =
Uint8List.sublistView(packet, _compilationIdVarint.length);

InboundMessage message;
try {
message = InboundMessage.fromBuffer(messageBuffer);
} on InvalidProtocolBufferException catch (error) {
throw parseError(error.message);
}

var response = switch (message.whichMessage()) {
InboundMessage_Message.canonicalizeResponse =>
message.canonicalizeResponse,
InboundMessage_Message.importResponse => message.importResponse,
InboundMessage_Message.fileImportResponse => message.fileImportResponse,
InboundMessage_Message.functionCallResponse =>
message.functionCallResponse,
InboundMessage_Message.compileRequest => throw paramsError(
"A CompileRequest with compilation ID $_compilationId is already "
"active."),
InboundMessage_Message.versionRequest =>
throw paramsError("VersionRequest must have compilation ID 0."),
InboundMessage_Message.notSet =>
throw parseError("InboundMessage.message is not set."),
_ =>
throw parseError("Unknown message type: ${message.toDebugString()}")
};

if (message.id != _outboundRequestId) {
throw paramsError(
"Response ID ${message.id} doesn't match any outstanding requests "
"in compilation $_compilationId.");
} else if (response is! T) {
throw paramsError(
"Request ID $_outboundRequestId doesn't match response type "
"${response.runtimeType} in compilation $_compilationId.");
}

return response;
} catch (error, stackTrace) {
_handleError(error, stackTrace);
_requestError = true;
rethrow;
}
}

completer.complete(response);
/// Handles an error thrown by the dispatcher or code it dispatches to.
///
/// The [messageId] indicate the IDs of the message being responded to, if
/// available.
void _handleError(Object error, StackTrace stackTrace, {int? messageId}) {
sendError(handleError(error, stackTrace, messageId: messageId));
}

/// Sends [message] to the host with the given [wireId].
Expand All @@ -306,16 +315,18 @@ final class Dispatcher {
message.writeToCodedBufferWriter(protobufWriter);

// Add one additional byte to the beginning to indicate whether or not the
// compilation is finished, so the [IsolateDispatcher] knows whether to
// treat this isolate as inactive.
// compilation has finished (1) or encountered a fatal error (2), so the
// [IsolateDispatcher] knows whether to treat this isolate as inactive or
// close out entirely.
var packet = Uint8List(
1 + _compilationIdVarint.length + protobufWriter.lengthInBytes);
packet[0] =
message.whichMessage() == OutboundMessage_Message.compileResponse
? 1
: 0;
packet[0] = switch (message.whichMessage()) {
OutboundMessage_Message.compileResponse => 1,
OutboundMessage_Message.error => 2,
_ => 0
};
packet.setAll(1, _compilationIdVarint);
protobufWriter.writeTo(packet, 1 + _compilationIdVarint.length);
_channel.sink.add(packet);
_sendPort.send(packet);
}
}
6 changes: 1 addition & 5 deletions lib/src/embedded/host_callable.dart
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,6 @@
// MIT-style license that can be found in the LICENSE file or at
// https://opensource.org/licenses/MIT.

// ignore: deprecated_member_use
import 'dart:cli';

import '../callable.dart';
import '../exception.dart';
import 'dispatcher.dart';
Expand Down Expand Up @@ -37,8 +34,7 @@ Callable hostCallable(
request.name = callable.name;
}

// ignore: deprecated_member_use
var response = waitFor(dispatcher.sendFunctionCallRequest(request));
var response = dispatcher.sendFunctionCallRequest(request);
try {
switch (response.whichResult()) {
case InboundMessage_FunctionCallResponse_Result.success:
Expand Down

0 comments on commit af0118a

Please sign in to comment.